diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..3fd4a62 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "scache"] + path = scache + url = https://github.com/moyodiallo/scache.git + branch = master diff --git a/Dockerfile b/Dockerfile index 8b793ce..8c7938d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,11 @@ FROM ocaml/opam:debian-12-ocaml-5.1 AS build RUN sudo apt-get update && sudo apt-get install libev-dev capnproto m4 pkg-config libsqlite3-dev libgmp-dev libzstd-dev -y --no-install-recommends -RUN cd ~/opam-repository && git fetch -q origin master && git reset --hard b61304c6db353e679a36720d8b914b029d6fbc0c && opam update +RUN cd ~/opam-repository && git fetch -q origin master && git reset --hard bc52affc41b55ff00c0d3ac9a376538d79695aaf && opam update RUN sudo ln -sf /usr/bin/opam-2.1 /usr/bin/opam +COPY --chown=opam scache/scache.opam /src/scache/ COPY --chown=opam solver-service.opam solver-service-api.opam /src/ WORKDIR /src +RUN opam pin add -yn scache.dev "./scache" RUN opam install -y --deps-only . ADD --chown=opam . . RUN opam exec -- dune build @install diff --git a/bin/main.ml b/bin/main.ml index 2d19373..9d759cc 100644 --- a/bin/main.ml +++ b/bin/main.ml @@ -75,24 +75,25 @@ let start_server ~service vat_config = let+ vat = Capnp_rpc_unix.serve vat_config ~restore in Capnp_rpc_unix.Vat.sturdy_uri vat service_id -let main_service () solver cap_file vat_config = - let uri = start_server ~service:(Solver_service.Service.v solver) vat_config in +let main_service () solver cacheable cap_file vat_config = + let uri = start_server ~service:(Solver_service.Service.v ~cacheable solver) vat_config in Capnp_rpc_unix.Cap_file.save_uri uri cap_file |> or_die; Fmt.pr "Wrote solver service's address to %S@." cap_file; Fiber.await_cancel () -let main_service_pipe () solver = +let main_service_pipe () solver cacheable = let socket = Lwt_unix.stdin in (* Run locally reading from socket *) - export (Solver_service.Service.v solver) ~on:socket + export (Solver_service.Service.v ~cacheable solver) ~on:socket -let main_cluster () solver name capacity register_addr = +let main_cluster () solver cacheable name capacity register_addr = let vat = Capnp_rpc_unix.client_only_vat () in let sr = Capnp_rpc_unix.Vat.import_exn vat register_addr in let `Cancelled = Solver_worker.run solver sr ~name ~capacity + ~cacheable in () @@ -128,6 +129,11 @@ let capacity = @@ Arg.info ~doc:"The number of cluster jobs that can run in parallel" ~docv:"N" [ "capacity" ] +let cacheable = + Arg.value + @@ Arg.flag + @@ Arg.info ~doc:"Activate the cache system" [ "activate-cache"; "cache" ] + let cap_file = Arg.required @@ Arg.opt Arg.(some string) None @@ -168,6 +174,7 @@ let () = const main_service $ setup_log $ solver + $ cacheable $ cap_file $ Capnp_rpc_unix.Vat_config.cmd ) @@ -179,6 +186,7 @@ let () = const main_service_pipe $ setup_log $ solver + $ cacheable ) in let run_agent = @@ -188,6 +196,7 @@ let () = const main_cluster $ setup_log $ solver + $ cacheable $ worker_name $ capacity $ register_addr diff --git a/dune b/dune index 1e8abdc..ca70495 100644 --- a/dune +++ b/dune @@ -1 +1,3 @@ -(dirs :standard \ var cache) +(dirs :standard \ var) + +(vendored_dirs scache) diff --git a/dune-project b/dune-project index 2711d3d..3bbfbff 100644 --- a/dune-project +++ b/dune-project @@ -1,48 +1,97 @@ (lang dune 3.7) + (name solver-service) -(formatting (enabled_for dune)) +(formatting + (enabled_for dune)) + (generate_opam_files true) -(source (github ocurrent/solver-service)) -(authors "Thomas Leonard " "Patrick Ferris ") -(maintainers "alpha@tarides.com" "Tim McGilchrist ") + +(source + (github ocurrent/solver-service)) + +(authors + "Thomas Leonard " + "Patrick Ferris ") + +(maintainers alpha@tarides.com "Tim McGilchrist ") (package (name solver-service) (synopsis "Choose package versions to test") (depends - (ocaml (>= 5.0.0)) - ; Examples dependencies - (current_web (and (>= 0.6.4) :with-test)) - (current_git (and (>= 0.6.4) :with-test)) - (current_github (and (>= 0.6.4) :with-test)) - (current_ocluster (and (>= 0.2.1) :with-test)) - (ppx_deriving_yojson (>= 3.6.1)) - (ppx_deriving (>= 5.1)) - (yojson (>= 2.1.0)) - (lwt (>= 5.6.1)) - (eio (>= 0.12)) - (eio_main (>= 0.12)) - (lwt_eio (>= 0.5)) - (logs (>= 0.7.0)) - (fmt (>= 0.9.0)) - (ocaml-version (>= 3.6.1)) - (solver-service-api (= :version)) - (dune-build-info (>= 3.8.0)) - (opam-0install (>= 0.4.3)) - (git-unix (>= 3.12.0)) - (ocluster-api (>= 0.2.1)) - (prometheus-app (>= 1.2)) - (capnp-rpc-net (>= 1.2.3)) - (capnp-rpc-unix (>= 1.2.3))) - (conflicts (carton (< 0.4.2)))) + (opam-file-format + (>= 2.1.6)) + (ocaml + (>= 5.0.0)) + (current_web + (and + (>= 0.6.4) + :with-test)) + (current_git + (and + (>= 0.6.4) + :with-test)) + (current_github + (and + (>= 0.6.4) + :with-test)) + (current_ocluster + (and + (>= 0.2.1) + :with-test)) + (ppx_deriving_yojson + (>= 3.6.1)) + (ppx_deriving + (>= 5.1)) + (yojson + (>= 2.1.0)) + (lwt + (>= 5.6.1)) + (eio + (>= 0.12)) + (eio_main + (>= 0.12)) + (lwt_eio + (>= 0.5)) + (logs + (>= 0.7.0)) + (fmt + (>= 0.9.0)) + sqlite3 + (ocaml-version + (>= 3.6.1)) + (solver-service-api + (= :version)) + (dune-build-info + (>= 3.8.0)) + (opam-0install + (>= 0.4.3)) + (git-unix + (>= 3.12.0)) + (ocluster-api + (>= 0.2.1)) + (prometheus-app + (>= 1.2)) + (capnp-rpc-net + (>= 1.2.3)) + (capnp-rpc-unix + (>= 1.2.3))) + (conflicts + (carton + (< 0.4.2)))) (package (name solver-service-api) (synopsis "Cap'n Proto API for the solver service") (depends - (ocaml (>= 4.14.1)) - (capnp (>= 3.5.0)) - (capnp-rpc-lwt (>= 1.2.3)) - (ppx_deriving_yojson (>= 3.6.1)) - (ppx_deriving (>= 5.1)))) + (ocaml + (>= 4.14.1)) + (capnp + (>= 3.5.0)) + (capnp-rpc-lwt + (>= 1.2.3)) + (ppx_deriving_yojson + (>= 3.6.1)) + (ppx_deriving + (>= 5.1)))) \ No newline at end of file diff --git a/scache b/scache new file mode 160000 index 0000000..265c821 --- /dev/null +++ b/scache @@ -0,0 +1 @@ +Subproject commit 265c8217f251edd59ed0a8074adcb4c42c5e57cf diff --git a/service/dune b/service/dune index bd4a7e8..82e85b6 100644 --- a/service/dune +++ b/service/dune @@ -7,5 +7,9 @@ solver-service-api opam-0install capnp-rpc-net + opam-file-format git-unix - ocaml-version)) + scache + ocaml-version) + (preprocess + (pps ppx_deriving_yojson))) diff --git a/service/git_clone.ml b/service/git_clone.ml new file mode 100644 index 0000000..ddb32fe --- /dev/null +++ b/service/git_clone.ml @@ -0,0 +1,181 @@ +open Eio.Std + +module Store = Git_unix.Store + +module type CacheType = sig + type cache + val dir: cache -> string + val process_mgr : cache -> [`Generic] Eio.Process.mgr_ty r +end + +module Make (Cache : CacheType) = struct + + type t = Cache.cache + + let git_command ?cwd args = + "git" :: + match cwd with + | Some dir -> "-C" :: dir :: args + | None -> args + + let [@warning "-27"] grep_command ?cwd args = "grep" :: args + + let [@warning "-27"] rm_command ?cwd args = "rm" :: args + + let run_git ?cwd t args = + Eio.Process.run (Cache.process_mgr t) (git_command ?cwd args) + + let run_rm ?cwd t args = + Eio.Process.run (Cache.process_mgr t) (rm_command ?cwd args) + + let line_opt r = + if Eio.Buf_read.at_end_of_input r then None + else Some (Eio.Buf_read.line r) + + let run_git_line ?cwd t args = + Eio.Process.parse_out (Cache.process_mgr t) line_opt (git_command ?cwd args) + + let take_all_opt r = + if Eio.Buf_read.at_end_of_input r then None + else Some (Eio.Buf_read.take_all r) + + let run_take_all ?cwd ?stdin t args command = + Eio.Process.parse_out ?stdin (Cache.process_mgr t) take_all_opt (command ?cwd args) + + let lines_opt r = + if Eio.Buf_read.at_end_of_input r then None + else (Eio.Buf_read.map List.of_seq Eio.Buf_read.lines) r |> Option.some + + let run_lines ?cwd ?stdin t args command = + Eio.Process.parse_out ?stdin (Cache.process_mgr t) lines_opt (command ?cwd args) + + let run_git_lines ?cwd ?stdin t args = run_lines ?cwd ?stdin t args git_command + + let run_git_take_all ?cwd ?stdin t args = run_take_all ?cwd ?stdin t args git_command + + let run_grep_lines ?cwd ?stdin t args = run_lines ?cwd ?stdin t args grep_command + + let replace_special = + String.map @@ function + | 'A'..'Z' + | 'a'..'z' + | '0'..'9' + | '-' as c -> c + | _ -> '_' + let rec mkdir_p path = + try Unix.mkdir path 0o700 with + | Unix.Unix_error (EEXIST, _, _) -> () + | Unix.Unix_error (ENOENT, _, _) -> + let parent = Filename.dirname path in + mkdir_p parent; + Unix.mkdir path 0o700 + + let repo_url_to_clone_path t repo_url = + let cache_dir = Cache.dir t in + let uri = Uri.of_string repo_url in + let sane_host = + match Uri.host uri with + | Some host -> replace_special host + | None -> "no_host" + in + let sane_path = + Uri.( + path uri + |> pct_decode + |> Filename.chop_extension + |> replace_special) + in + Fpath.(v cache_dir / sane_host / sane_path) + + let remove t repo_url = + let clone_path = repo_url_to_clone_path t repo_url in + let clone_path_str = Fpath.to_string clone_path in + match Unix.lstat clone_path_str with + | Unix.{ st_kind = S_DIR; _ } -> ( + try + run_rm t ["-fr"; clone_path_str] + with Eio.Io _ as ex -> + let bt = Printexc.get_raw_backtrace () in + Eio.Exn.reraise_with_context ex bt "removing %S" clone_path_str) + | _ -> () + + let clone ~bare t repo_url = + let clone_path = repo_url_to_clone_path t repo_url in + let clone_parent = Fpath.parent clone_path |> Fpath.to_string in + let clone_path_str = Fpath.to_string clone_path in + match Unix.lstat clone_path_str with + | Unix.{ st_kind = S_DIR; _ } -> () + | _ -> Fmt.failwith "%S is not a directory!" clone_path_str + | exception Unix.Unix_error (Unix.ENOENT, _, _) -> + mkdir_p clone_parent; + try + if bare then + run_git t ["clone"; "--bare"; repo_url; clone_path_str] + else + run_git t ["clone"; repo_url; clone_path_str] + with Eio.Io _ as ex -> + let bt = Printexc.get_raw_backtrace () in + Eio.Exn.reraise_with_context ex bt "cloning %S" repo_url + + let clone_bare t repo_url = clone ~bare:true t repo_url + + let clone t repo_url = clone ~bare:false t repo_url + + let fetch t repo_url = + try + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_git t ~cwd:clone_path ["fetch"; "origin"] + with Eio.Io _ as ex -> + let bt = Printexc.get_raw_backtrace () in + Eio.Exn.reraise_with_context ex bt "fetching %S" repo_url + + let pull t repo_url = + try + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_git t ~cwd:clone_path ["pull"; "origin"] + with Eio.Io _ as ex -> + let bt = Printexc.get_raw_backtrace () in + Eio.Exn.reraise_with_context ex bt "pulling %S" repo_url + + let all_commits_rev t repo_url = + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_git_lines t ~cwd:clone_path + @@ "log" + :: "--reverse" + :: [ "--format=format:%H" ] + |> Option.value ~default:[] + + let diff_pkgs t ~repo_url ~new_commit ~old_commit = + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_git_take_all t ~cwd:clone_path + @@ "diff" + :: old_commit + :: new_commit + :: "--" + :: [ "packages" ] + |> function + | None -> [] + | Some diff -> + try + run_grep_lines ~stdin:(Eio.Flow.string_source diff) t ["^... ./packages/.*/opam"] + |> Option.value ~default:[] + |> List.filter_map (fun path -> Astring.String.cut ~sep:"/" path |> Option.map snd) + with _ -> [] (* grep could exits with status 1 if there's no match *) + + let oldest_commit_with t ~repo_url ~from paths = + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_git_line t ~cwd:clone_path + @@ "log" + :: "-n" :: "1" + :: "--format=format:%H" + :: from + :: "--" + :: paths + + let open_store t repo_url = + let path = repo_url_to_clone_path t repo_url in + match Lwt_eio.run_lwt (fun () -> Git_unix.Store.v ~dotgit:path path) with + | Ok x -> x + | Error e -> + Fmt.failwith "Failed to open %a: %a" Fpath.pp path Store.pp_error e +end diff --git a/service/service.ml b/service/service.ml index 2732d8c..85393a7 100644 --- a/service/service.ml +++ b/service/service.ml @@ -1,6 +1,6 @@ module Worker = Solver_service_api.Worker -let v t = +let v ?cacheable t = let open Capnp_rpc_lwt in let module X = Solver_service_api.Raw.Service.Solver in X.local @@ -27,7 +27,7 @@ let v t = (Capnp_rpc.Error.exn "Bad JSON in request: %s" msg)) | Ok request -> Lwt_eio.run_eio @@ fun () -> - let selections = Solver.solve t ~log request in + let selections = Solver.solve ?cacheable t ~log request in let json = Yojson.Safe.to_string (Worker.Solve_response.to_yojson selections) diff --git a/service/service.mli b/service/service.mli index bb2d4c7..7cc6f87 100644 --- a/service/service.mli +++ b/service/service.mli @@ -1,2 +1,2 @@ -val v : Solver.t -> Solver_service_api.Solver.t +val v : ?cacheable:bool -> Solver.t -> Solver_service_api.Solver.t (** [capnp_service t] is a Cap'n Proto service that handles requests using [t]. *) diff --git a/service/solve_cache.ml b/service/solve_cache.ml new file mode 100644 index 0000000..526fbd3 --- /dev/null +++ b/service/solve_cache.ml @@ -0,0 +1,254 @@ +open Eio.Std + +module Worker = Solver_service_api.Worker +module Log_data = Solver_service_api.Solver.Log +module Cache = Scache.Cache +module Selections = Set.Make(Worker.Selection) + +exception Invalidated + +type t = { + cache_dir : string; + process_mgr : [`Generic] Eio.Process.mgr_ty r; +} + +let create ~cache_dir ~proc_mgr = + { + cache_dir = Fpath.(v cache_dir / "solve") |> Fpath.to_string; + process_mgr = proc_mgr + } + +module Git_clone = Git_clone.Make ( struct + type cache = t + let dir t = t.cache_dir + let process_mgr t = t.process_mgr +end) + +module Solve_cache = struct + + type t = { + request: Worker.Solve_request.t; + (** The request *) + solve_response : Worker.Solve_response.t; + (** Response of the solved request *) + last_opam_repository_commits: (string * string) list; + (** Pair of repo URL and commit hash, for each last opam-repository used during the request *) + }[@@deriving yojson] + + let marshal t = t |> to_yojson |> Yojson.Safe.to_string + let unmarshal t = t |> Yojson.Safe.from_string |> of_yojson |> Result.get_ok + +end + +let mutex = Lazy.from_fun (fun () -> Eio.Mutex.create ()) + +let cache = Cache.start ~name:"solve" + +let digest_request request = + request + |> Worker.Solve_request.to_yojson + |> Yojson.Safe.to_string + |> Digest.string + |> Digest.to_hex + +let get_solve ~cache ~digest : Solve_cache.t option = + match Cache.get cache ~key:digest with + | None -> None + | Some r -> Solve_cache.unmarshal r |> Option.some + +let set_solve ~cache ~solve_cache ~digest = + solve_cache + |> Solve_cache.marshal + |> fun response -> Cache.set cache ~key:digest ~value:response + +let remove_commits opam_repository_commits = + opam_repository_commits |> List.map (fun (url,_) -> (url,"")) + +(* important because of the digest_request *) +let sort_by_url opam_repository_commits = + opam_repository_commits + |> List.sort (fun (url1,_) (url2,_) -> String.compare url1 url2) + +let is_same_solution ~solve_response_cache ~solve_response = + match solve_response_cache, solve_response with + | Error _, _ -> false + | _, Error _ -> false + | Ok selections_cache, Ok selections -> + let selections_cache = + List.map (fun sel -> { sel with Worker.Selection.commits = []}) selections_cache + in + let selections = + List.map (fun sel -> { sel with Worker.Selection.commits = []}) selections + in + Selections.equal (Selections.of_list selections_cache) (Selections.of_list selections) + +let yojson_of_list l = l |> [%to_yojson: string list] +let yojosn_to_list l = l |> [%of_yojson: string list] + +(* opam-repo comits with their rank *) +let opam_commits = Lazy.from_fun (fun () -> Hashtbl.create 10) + +let update_commit t repo_url commit = + let opam_commits = Lazy.force opam_commits in + let mutex = Lazy.force mutex in + let get_repo t repo_url = + Git_clone.clone t repo_url; + Git_clone.pull t repo_url; + Git_clone.all_commits_rev t repo_url + in + match Hashtbl.find_opt opam_commits commit with + | Some _ -> () + | None -> + Eio.Mutex.use_rw mutex ~protect:true (fun () -> + try + get_repo t repo_url; + with _ -> ( + (* could be a conflict between commits when pulling *) + Git_clone.remove t repo_url; + get_repo t repo_url)) + |> List.iteri (fun rank commit -> Hashtbl.replace opam_commits commit rank) + +let changed_packages t ~new_opam_repo ~old_opam_repo = + if new_opam_repo = old_opam_repo then + Some [] + else + let opam_commits = Lazy.force opam_commits in + try + (* new_opam_repo and old_opam_repo nead to be sorted by url *) + List.combine new_opam_repo old_opam_repo + |> List.map (fun ((repo_url,new_commit), (_,old_commit)) -> + let key = ("diff"^new_commit^"-"^old_commit) in + match Cache.get cache ~key with + | Some pkgs -> Yojson.Safe.from_string pkgs |> yojosn_to_list |> Result.get_ok + | None -> ( + update_commit t repo_url new_commit; + update_commit t repo_url old_commit; + match Hashtbl.find_opt opam_commits new_commit, Hashtbl.find_opt opam_commits old_commit with + | Some new_rank, Some old_rank when new_rank < old_rank -> + (* This new commit is supposed to be newer in the commit history, + this could be a specific request on opam commits, like fixed demand + so it invalidated *) + raise Invalidated + | Some _, Some _ -> + let pkgs_filename = Git_clone.diff_pkgs t ~repo_url ~new_commit ~old_commit in + Cache.set cache ~key ~value:(Yojson.Safe.to_string (yojson_of_list pkgs_filename)); + pkgs_filename + | None, _ -> + Fmt.epr "The repo %s has not the commit %s@." repo_url new_commit; raise Invalidated + | _, None -> + Fmt.epr "The repo %s has not the commit %s@." repo_url old_commit; raise Invalidated)) + |> List.flatten + |> Option.some + with Invalidated -> None + +let get_names = OpamFormula.fold_left (fun a (name, _) -> name :: a) [] + +let deps_of_opam_file opam_pkgs = + opam_pkgs + |> List.map (fun (_, content) -> + OpamFile.OPAM.read_from_string content |> OpamFile.OPAM.depends |> get_names) + +let is_invalidated t ~request ~solve_cache = + let { + Worker.Solve_request.opam_repository_commits; + root_pkgs; + pinned_pkgs; _ } = request + in + let request_pkgs () = + List.concat_map (fun pkgs_name -> deps_of_opam_file pkgs_name) [root_pkgs; pinned_pkgs] + |> List.flatten + |> OpamPackage.Name.Set.of_list + in + let response_pkgs () = + solve_cache.Solve_cache.solve_response + |> Result.get_ok + |> List.map (fun selection -> selection.Worker.Selection.packages) + |> List.concat + |> List.map (fun pkg_version -> + pkg_version + |> Astring.String.cut ~sep:"." + |> Option.get + |> fun (name,version) -> + OpamPackage.create (OpamPackage.Name.of_string name) (OpamPackage.Version.of_string version)) + |> OpamPackage.Set.of_list + in + let old_opam_repo = + solve_cache.Solve_cache.last_opam_repository_commits + |> List.sort (fun (url1,_) (url2,_) -> String.compare url1 url2) + in + let new_opam_repo = + opam_repository_commits + |> List.sort (fun (url1,_) (url2,_) -> String.compare url1 url2) + in + match changed_packages t ~old_opam_repo ~new_opam_repo with + | None -> true (* Invalidate when a commit does not exist *) + | Some pkgs -> + pkgs + |> List.find_opt (fun pkg -> + let request_pkgs = request_pkgs () in + let response_pkgs = response_pkgs () in + OpamFilename.raw pkg + |> OpamPackage.of_filename + |> Option.get + |> fun opam_pkg -> + OpamPackage.Name.Set.mem (OpamPackage.name opam_pkg) request_pkgs || OpamPackage.Set.mem opam_pkg response_pkgs) + |> Option.is_some + +(** + There is 2 stage of looking for the cache: + * With opam repository URL and their commit: (url,commit) list + * Only the opam repository URL: (url,_) list + + When the cache is hited with only opam URLs, it try to invalidate it + because of the opam repository commit could be updated with new commit. + + The invalidation is about looking if the request packages is involve in + the 2 different commit, the commit of the cached response and the commit of + the request. Also the cache response contain the transitive dependencies, we + make sure those ones are not also involve in the commit changes. + + The oldest commit used during the solve is kept when the response is the same + as previous solve. +*) +let solve t ~solve log (request: Worker.Solve_request.t) = + let request = + { request with opam_repository_commits = sort_by_url request.opam_repository_commits } + in + let solve () = solve ~log request in + match get_solve ~cache ~digest:(digest_request request) with + | Some solve_cache when Result.is_ok solve_cache.solve_response -> ( + Log_data.info log "From cache@."; + solve_cache.solve_response) + | _ -> ( + let req = + { request with opam_repository_commits = remove_commits request.opam_repository_commits } + in + match get_solve ~cache ~digest:(digest_request req) with + | Some solve_cache when not (Result.is_error solve_cache.solve_response || is_invalidated t ~request ~solve_cache) -> + Log_data.info log "From cache@."; + let solve_cache = + { solve_cache with last_opam_repository_commits = request.opam_repository_commits } + in + set_solve ~cache ~solve_cache ~digest:(digest_request req); + set_solve ~cache ~solve_cache ~digest:(digest_request request); + solve_cache.solve_response + | Some solve_cache when Result.is_ok solve_cache.solve_response -> + let solve_response = solve () in + let solve_cache = + if is_same_solution ~solve_response_cache:solve_cache.solve_response ~solve_response then + { solve_cache with last_opam_repository_commits = request.opam_repository_commits } + else + {request; solve_response; last_opam_repository_commits = request.opam_repository_commits } + in + set_solve ~cache ~solve_cache ~digest:(digest_request req); + set_solve ~cache ~solve_cache ~digest:(digest_request request); + solve_cache.solve_response + | _ -> + let solve_response = solve () in + let solve_cache = + { Solve_cache.request; solve_response; last_opam_repository_commits = request.opam_repository_commits } + in + set_solve ~cache ~digest:(digest_request req) ~solve_cache; + set_solve ~cache ~digest:(digest_request request) ~solve_cache; + solve_response + ) diff --git a/service/solve_cache.mli b/service/solve_cache.mli new file mode 100644 index 0000000..1794d0d --- /dev/null +++ b/service/solve_cache.mli @@ -0,0 +1,18 @@ +open Eio.Std +module Worker = Solver_service_api.Worker + +(** Cache system where try to hit in a Sqlite3 database *) + +type t + +val create : cache_dir: string -> proc_mgr: [`Generic] Eio.Process.mgr_ty r -> t + +val solve : + t -> + solve:(log:Solver_service_api.Solver.Log.t -> Worker.Solve_request.t -> Worker.Solve_response.t) -> + Solver_service_api.Solver.Log.t -> + Worker.Solve_request.t -> + Worker.Solve_response.t +(** [solve t ~solve log request] try to hit the cache, if missed it uses [t] and [solve] to solve the [request] + + @param log Diagnostics about failed solves (and other logging) goes here. *) diff --git a/service/solver.ml b/service/solver.ml index 160d9da..db05ff5 100644 --- a/service/solver.ml +++ b/service/solver.ml @@ -8,6 +8,8 @@ let (let*!) = Result.bind type t = { pool : (Domain_worker.request, Domain_worker.reply) Pool.t; stores : Stores.t; + cache_dir : string; + process_mgr : [`Generic] Eio.Process.mgr_ty r; } let ocaml = OpamPackage.Name.of_string "ocaml" @@ -30,7 +32,7 @@ let env vars v = if List.mem v OpamPackageVar.predefined_depends_variables then None else Domain_worker.env vars (OpamVariable.Full.to_string v) -let solve_for_platform ?cancelled t ~log ~opam_repository_commits ~packages ~root_pkgs ~pinned_pkgs ~pins ~vars id = +let solve_for_platform ?cancelled t ~cacheable ~log ~opam_repository_commits ~packages ~root_pkgs ~pinned_pkgs ~pins ~vars id = let ocaml_version = OpamPackage.Version.of_string vars.Worker.Vars.ocaml_version in let root_pkgs = root_pkgs @@ -55,18 +57,25 @@ let solve_for_platform ?cancelled t ~log ~opam_repository_commits ~packages ~roo Error (`No_solution e) | Ok packages -> Log.info log "%s: found solution in %.2f s" id time; - let repo_packages = - packages - |> List.filter_map (fun (pkg : OpamPackage.t) -> - if OpamPackage.Name.Set.mem pkg.name pins then None - else Some pkg) - in - (* Hack: ocaml-ci sometimes also installs odoc, but doesn't tell us about it. + let commits = + if cacheable then + (* The cache system handle a sort of oldest_commit*) + opam_repository_commits + else + let repo_packages = + packages + |> List.filter_map (fun (pkg : OpamPackage.t) -> + if OpamPackage.Name.Set.mem pkg.name pins then None + else Some pkg) + in + (* Hack: ocaml-ci sometimes also installs odoc, but doesn't tell us about it. Make sure we have at least odoc 2.1.1 available, otherwise it won't work on OCaml 5.0. *) - let repo_packages = - OpamPackage.of_string "odoc.2.1.1" :: repo_packages + let repo_packages = + OpamPackage.of_string "odoc.2.1.1" :: repo_packages + in + let commits = Stores.oldest_commits_with t.stores repo_packages ~from:opam_repository_commits in + commits in - let commits = Stores.oldest_commits_with t.stores repo_packages ~from:opam_repository_commits in let compat_pkgs = let to_string (name, (version,_)) = OpamPackage.to_string (OpamPackage.create name version) in List.map to_string root_pkgs @@ -96,7 +105,7 @@ let rec parse_opams = function Ok (x :: xs) (* Handle a request by distributing it among the worker processes and then aggregating their responses. *) -let solve ?cancelled t ~log request = +let solve ?cancelled ~cacheable t ~log request = let { Worker.Solve_request.opam_repository_commits; platforms; @@ -124,6 +133,7 @@ let solve ?cancelled t ~log request = let result = solve_for_platform t id ?cancelled + ~cacheable ~log ~opam_repository_commits ~packages @@ -161,10 +171,20 @@ let solve ?cancelled t ~log request = | [] -> Ok results | errors -> Fmt.error_msg "@[%a@]" Fmt.(list ~sep:cut string) errors +let solve ?cacheable ?cancelled t ~log request = + match cacheable with + | Some true -> + let solve = solve t ?cancelled ~cacheable:true in + let cache = Solve_cache.create ~cache_dir:t.cache_dir ~proc_mgr:t.process_mgr in + Solve_cache.solve cache ~solve log request + | _ -> solve ?cancelled ~cacheable:false t ~log request + let create ~sw ~domain_mgr ~process_mgr ~cache_dir ~n_workers = let stores = Stores.create ~process_mgr ~cache_dir in let pool = Pool.create ~sw ~domain_mgr ~n_workers Domain_worker.solve in { stores; pool; + cache_dir; + process_mgr = (process_mgr :> [`Generic] Eio.Process.mgr_ty r); } diff --git a/service/solver.mli b/service/solver.mli index 7e3c19a..973356f 100644 --- a/service/solver.mli +++ b/service/solver.mli @@ -17,6 +17,7 @@ val create : @param n_workers Maximum number of worker domains. *) val solve : + ?cacheable:bool -> ?cancelled:unit Eio.Promise.t -> t -> log:Solver_service_api.Solver.Log.t -> diff --git a/service/stores.ml b/service/stores.ml index f546db5..6ace9b2 100644 --- a/service/stores.ml +++ b/service/stores.ml @@ -16,110 +16,31 @@ type t = { mutable index_cache : (commit list * Packages.t Promise.or_exn) option; } -let git_command ?cwd args = - "git" :: - match cwd with - | Some dir -> "-C" :: dir :: args - | None -> args - -let run_git ?cwd t args = - Eio.Process.run t.process_mgr (git_command ?cwd args) - -let line_opt r = - if Eio.Buf_read.at_end_of_input r then None - else Some (Eio.Buf_read.line r) - -let run_git_line ?cwd t args = - Eio.Process.parse_out t.process_mgr line_opt (git_command ?cwd args) - module Git_clone = struct - let replace_special = - String.map @@ function - | 'A'..'Z' - | 'a'..'z' - | '0'..'9' - | '-' as c -> c - | _ -> '_' - - let rec mkdir_p path = - try Unix.mkdir path 0o700 with - | Unix.Unix_error (EEXIST, _, _) -> () - | Unix.Unix_error (ENOENT, _, _) -> - let parent = Filename.dirname path in - mkdir_p parent; - Unix.mkdir path 0o700 - - let repo_url_to_clone_path t repo_url = - let uri = Uri.of_string repo_url in - let sane_host = - match Uri.host uri with - | Some host -> replace_special host - | None -> "no_host" - in - let sane_path = - Uri.( - path uri - |> pct_decode - |> Filename.chop_extension - |> replace_special) - in - Fpath.(v t.cache_dir / sane_host / sane_path) - - let clone t repo_url = - let clone_path = repo_url_to_clone_path t repo_url in - let clone_parent = Fpath.parent clone_path |> Fpath.to_string in - let clone_path_str = Fpath.to_string clone_path in - match Unix.lstat clone_path_str with - | Unix.{ st_kind = S_DIR; _ } -> () - | _ -> Fmt.failwith "%S is not a directory!" clone_path_str - | exception Unix.Unix_error (Unix.ENOENT, _, _) -> - mkdir_p clone_parent; - try - run_git t ["clone"; "--bare"; repo_url; clone_path_str] - with Eio.Io _ as ex -> - let bt = Printexc.get_raw_backtrace () in - Eio.Exn.reraise_with_context ex bt "cloning %S" repo_url - - let open_store t repo_url = - let path = repo_url_to_clone_path t repo_url in - match Lwt_eio.run_lwt (fun () -> Git_unix.Store.v ~dotgit:path path) with - | Ok x -> x - | Error e -> - Fmt.failwith "Failed to open %a: %a" Fpath.pp path Store.pp_error e - - let oldest_commit_with t ~repo_url ~from paths = - let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in - run_git_line t ~cwd:clone_path - @@ "log" - :: "-n" :: "1" - :: "--format=format:%H" - :: from - :: "--" - :: paths - - let oldest_commits_with t ~from pkgs = - let paths = - pkgs - |> List.map (fun pkg -> - let name = OpamPackage.name_to_string pkg in - let version = OpamPackage.version_to_string pkg in - Printf.sprintf "packages/%s/%s.%s" name name version) - in - from - |> Fiber.List.filter_map (fun (repo_url, hash) -> - oldest_commit_with t ~repo_url ~from:hash paths - |> Option.map (fun commit -> (repo_url, commit)) - ) - let fetch t repo_url = - try - let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in - run_git t ~cwd:clone_path ["fetch"; "origin"] - with Eio.Io _ as ex -> - let bt = Printexc.get_raw_backtrace () in - Eio.Exn.reraise_with_context ex bt "fetching %S" repo_url + include ( Git_clone.Make ( struct + type cache = t + let dir t = t.cache_dir + let process_mgr t = t.process_mgr + end)) + + let clone = clone_bare end +let oldest_commits_with t ~from pkgs = + let paths = + pkgs + |> List.map (fun pkg -> + let name = OpamPackage.name_to_string pkg in + let version = OpamPackage.version_to_string pkg in + Printf.sprintf "packages/%s/%s.%s" name name version) + in + from + |> Fiber.List.filter_map (fun (repo_url, hash) -> + Git_clone.oldest_commit_with t ~repo_url ~from:hash paths + |> Option.map (fun commit -> (repo_url, commit)) + ) + let oldest_commit = Eio.Semaphore.make 180 (* we are using at most 360 pipes at the same time and that's enough to keep the current * performance and prevent some jobs to fail because of file descriptors exceed the limit.*) @@ -191,7 +112,7 @@ let create ~process_mgr ~cache_dir = let oldest_commits_with t ~from repo_packages = Eio.Semaphore.acquire oldest_commit; Fun.protect ~finally:(fun () -> Eio.Semaphore.release oldest_commit) @@ fun () -> - Git_clone.oldest_commits_with t repo_packages ~from + oldest_commits_with t repo_packages ~from (* We could do this in parallel, except that there might be duplicate repos in the list. *) let rec fetch_commits t = function diff --git a/solver-service.opam b/solver-service.opam index a0feb66..a5cc92f 100644 --- a/solver-service.opam +++ b/solver-service.opam @@ -9,6 +9,7 @@ homepage: "https://github.com/ocurrent/solver-service" bug-reports: "https://github.com/ocurrent/solver-service/issues" depends: [ "dune" {>= "3.7"} + "opam-file-format" {>= "2.1.6"} "ocaml" {>= "5.0.0"} "current_web" {>= "0.6.4" & with-test} "current_git" {>= "0.6.4" & with-test} @@ -23,6 +24,7 @@ depends: [ "lwt_eio" {>= "0.5"} "logs" {>= "0.7.0"} "fmt" {>= "0.9.0"} + "sqlite3" "ocaml-version" {>= "3.6.1"} "solver-service-api" {= version} "dune-build-info" {>= "3.8.0"} diff --git a/test/dune b/test/dune index f6453d5..658aa25 100644 --- a/test/dune +++ b/test/dune @@ -1,4 +1,4 @@ (tests (names test) (package solver-service) - (libraries solver-service eio_main lwt_eio)) + (libraries solver-service scache eio_main lwt_eio)) diff --git a/test/test.expected b/test/test.expected index d0e8843..dec48b2 100644 --- a/test/test.expected +++ b/test/test.expected @@ -246,3 +246,114 @@ results: packages: [foo.dev; ocaml-base-compiler.5.0] commits: [(opam-repo.git, 1bc28b8e8d98db6e524822c6f28bddebbc3504a3)] lower_bound: false] + +# Solve_cache + +## Select foo.1.0 ## + +commits: [(opam-repo.git, + [ocaml-base-compiler.5.0; bar.1.0; baz.1.0; foo.1.0; foobar.0.1])] +root_pkgs: [app.dev] +platforms: [debian-12-ocaml-5] +Need to update opam-repo.git to get new commit 4733cec979c5946f667f86e0df0ac20741277d68 +results: + [debian-12-ocaml-5: + compat_pkgs: [app.dev] + packages: [app.dev; bar.1.0; baz.1.0; foo.1.0; ocaml-base-compiler.5.0] + commits: [(opam-repo.git, 4733cec979c5946f667f86e0df0ac20741277d68)] + lower_bound: false] + +## Foo 1.1 now available (A direct dependency, the result will contain the new commit) ## + +commits: [(opam-repo.git, + [foo.1.1; ocaml-base-compiler.5.0; bar.1.0; baz.1.0; foo.1.0; + foobar.0.1])] +root_pkgs: [app.dev] +platforms: [debian-12-ocaml-5] +Already up to date. +Need to update opam-repo.git to get new commit e82813b4ed60505d8511d132e77ca062776b0b10 +results: + [debian-12-ocaml-5: + compat_pkgs: [app.dev] + packages: [app.dev; bar.1.0; baz.1.0; foo.1.1; ocaml-base-compiler.5.0] + commits: [(opam-repo.git, e82813b4ed60505d8511d132e77ca062776b0b10)] + lower_bound: false] + +## Foo 1.1 again (hit the cache, the commit won't change) ## + +commits: [(opam-repo.git, + [foo.1.1; foo.1.1; ocaml-base-compiler.5.0; bar.1.0; baz.1.0; + foo.1.0; foobar.0.1])] +root_pkgs: [app.dev] +platforms: [debian-12-ocaml-5] +Updating e82813b..07bc05a +Fast-forward +results: + [debian-12-ocaml-5: + compat_pkgs: [app.dev] + packages: [app.dev; bar.1.0; baz.1.0; foo.1.1; ocaml-base-compiler.5.0] + commits: [(opam-repo.git, e82813b4ed60505d8511d132e77ca062776b0b10)] + lower_bound: false] + +## Baz 1.0 is a transitive dep of Foo, the cache will be invalidated(the result will contain the new commit) ## + +commits: [(opam-repo.git, + [baz.1.0; foo.1.1; foo.1.1; ocaml-base-compiler.5.0; bar.1.0; + foo.1.0; foobar.0.1])] +root_pkgs: [app.dev] +platforms: [debian-12-ocaml-5] +Updating 07bc05a..86fca69 +Fast-forward + packages/baz/baz.1.0/opam | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) +Need to update opam-repo.git to get new commit 86fca698addda8dd554cffc2ea09bf616c351332 +results: + [debian-12-ocaml-5: + compat_pkgs: [app.dev] + packages: [app.dev; bar.1.0; baz.1.0; foo.1.1; foobar.0.1; + ocaml-base-compiler.5.0] + commits: [(opam-repo.git, 86fca698addda8dd554cffc2ea09bf616c351332)] + lower_bound: false] + +## Oof 1.0 now available (hit the cache, the commit won't change in the result) ## + +commits: [(opam-repo.git, + [oof.1.0; baz.1.0; foo.1.1; foo.1.1; ocaml-base-compiler.5.0; + bar.1.0; foo.1.0; foobar.0.1])] +root_pkgs: [app.dev] +platforms: [debian-12-ocaml-5] +Updating 86fca69..0eb9328 +Fast-forward + packages/oof/oof.1.0/opam | 4 ++++ + 1 file changed, 4 insertions(+) + create mode 100644 packages/oof/oof.1.0/opam +results: + [debian-12-ocaml-5: + compat_pkgs: [app.dev] + packages: [app.dev; bar.1.0; baz.1.0; foo.1.1; foobar.0.1; + ocaml-base-compiler.5.0] + commits: [(opam-repo.git, 86fca698addda8dd554cffc2ea09bf616c351332)] + lower_bound: false] + +## Oof 1.1 now available (will invalidate the cache because foo 1.1 will be removed, the result will contain the new commit) ## + +commits: [(opam-repo.git, + [oof.1.1; ocaml-base-compiler.5.0; bar.1.0; baz.1.0; foo.1.0; + foobar.0.1])] +root_pkgs: [app.dev] +platforms: [debian-12-ocaml-5] +Updating 0eb9328..44d1c6f +Fast-forward + packages/baz/baz.1.0/opam | 2 +- + packages/foo/foo.1.1/opam | 4 ---- + packages/oof/{oof.1.0 => oof.1.1}/opam | 0 + 3 files changed, 1 insertion(+), 5 deletions(-) + delete mode 100644 packages/foo/foo.1.1/opam + rename packages/oof/{oof.1.0 => oof.1.1}/opam (100%) +Need to update opam-repo.git to get new commit 44d1c6f9bf942a6ea6719e70e0f1340cfb6b3c9e +results: + [debian-12-ocaml-5: + compat_pkgs: [app.dev] + packages: [app.dev; bar.1.0; baz.1.0; foo.1.0; ocaml-base-compiler.5.0] + commits: [(opam-repo.git, 44d1c6f9bf942a6ea6719e70e0f1340cfb6b3c9e)] + lower_bound: false] diff --git a/test/test.ml b/test/test.ml index cb37273..0498b56 100644 --- a/test/test.ml +++ b/test/test.ml @@ -231,6 +231,59 @@ let test_available t = "mac", { debian_12_ocaml_5 with os = "macos" }; ] +let test_solve_cache t = + let solve = solve_cache in + let opam_repo = Opam_repo.create "opam-repo.git" in + let root_pkgs = ["app.dev", {| depends: [ "foo" ] |}] in + let depends = {| depends: [ "ocaml-base-compiler" "bar" ] |} in + let opam_packages = [ + "ocaml-base-compiler.5.0", ""; + "bar.1.0", {| depends: [ "baz" ] |}; + "baz.1.0", ""; + "foo.1.0",depends; + "foobar.0.1", ""; + ] + in + let first_opam_packages = opam_packages in + let recent_commits = + solve t "Select foo.1.0" ~platforms ~root_pkgs ~previous_commits:[opam_repo,[]] + ~commits:[opam_repo, opam_packages] + in + let opam_packages = ("foo.1.1",depends) :: opam_packages in + let recent_commits = + solve t "Foo 1.1 now available (A direct dependency, the result will contain the new commit)" ~previous_commits:recent_commits ~platforms ~root_pkgs ~commits:[ + opam_repo, opam_packages + ] + in + let opam_packages = ("foo.1.1",depends)::opam_packages in + let recent_commits = + solve t "Foo 1.1 again (hit the cache, the commit won't change)" ~previous_commits:recent_commits ~platforms ~root_pkgs ~commits:[ + opam_repo, opam_packages + ] + in + let opam_packages = + ("baz.1.0",{|depends: [ "foobar" ]|}) + ::(List.remove_assoc "baz.1.0" opam_packages) + in + let recent_commits = + solve t "Baz 1.0 is a transitive dep of Foo, the cache will be invalidated(the result will contain the new commit)" ~previous_commits:recent_commits ~platforms ~root_pkgs ~commits:[ + opam_repo, opam_packages + ] + in + + let opam_packages = ("oof.1.0","") :: opam_packages in + let recent_commits = + solve t "Oof 1.0 now available (hit the cache, the commit won't change in the result)" ~previous_commits:recent_commits ~platforms ~root_pkgs ~commits:[ + opam_repo, opam_packages + ] + in + solve t + "Oof 1.1 now available (will invalidate the cache because foo 1.1 will be removed, the result will contain the new commit)" + ~previous_commits:recent_commits ~platforms ~root_pkgs ~commits:[ + opam_repo, ("oof.1.1","") :: first_opam_packages + ] |> ignore; + () + let () = Eio_main.run @@ fun env -> let domain_mgr = env#domain_mgr in @@ -250,6 +303,7 @@ let () = "Pinned", test_pinned; "Cancel", test_cancel; "Available", test_available; + "Solve_cache", test_solve_cache; ] |> List.iter (fun (name, fn) -> Fmt.pr "@.# %s@." name; diff --git a/test/utils.ml b/test/utils.ml index 557879b..a206883 100644 --- a/test/utils.ml +++ b/test/utils.ml @@ -6,13 +6,18 @@ let add_opam_header s = synopsis: "Test package" |} ^ s +module S = Git_unix.Store + module Opam_repo : sig - type t + type t = { + name : string; + store : S.t; + } val create : string -> t (** [create path] opens a Git repository at [path], creating it if needed. *) - val commit : t -> (string * string) list -> string * string + val commit : ?parents:S.Value.Commit.hash list -> t -> (string * string) list -> string * string (** [commit t files] creates a commit with the given opam package files and returns the [(repo_url, hash]) pair for it. *) @@ -20,7 +25,6 @@ module Opam_repo : sig end = struct (* A fake opam-repository upstream for testing. *) - module S = Git_unix.Store type t = { name : string; @@ -85,7 +89,7 @@ end = struct in write_tree t (List.map write_version versions) - let commit t pkgs = + let commit ?parents t pkgs = let pkgs = group_packages pkgs |> OpamPackage.Name.Map.to_seq @@ -95,10 +99,11 @@ end = struct ) in let tree = write_tree t ["packages", `Dir (write_tree t pkgs)] in - let commit = S.Value.Commit.make ~tree ~author:test_user ~committer:test_user (Some "Commit") in + let commit = S.Value.Commit.make ?parents ~tree ~author:test_user ~committer:test_user (Some "Commit") in let commit = write t (S.Value.commit commit) in set_branch t Git.Reference.master commit; (t.name, S.Hash.to_hex commit) + end let stderr_log = @@ -153,3 +158,39 @@ let solve ?cancelled ?(pinned_pkgs=[]) t label ~commits ~root_pkgs ~platforms = in let response = Solver_service.Solver.solve ?cancelled t ~log:stderr_log req in Fmt.pr "@[results:@,%a@]@." pp_response response + +let solve_cache ?cancelled ?(pinned_pkgs=[]) + t label ~previous_commits ~commits ~root_pkgs ~platforms = + Fmt.pr "@.## %s ##@.@.commits: %a@.root_pkgs: %a@.platforms: %a@." + label + pp_commits commits + pp_packages root_pkgs + pp_platforms platforms; + if pinned_pkgs <> [] then Fmt.pr "pinned: %a@." pp_packages pinned_pkgs; + let root_pkgs = List.map (fun (pkg, opam) -> pkg, add_opam_header opam) root_pkgs in + let pinned_pkgs = List.map (fun (pkg, opam) -> pkg, add_opam_header opam) pinned_pkgs in + let opam_repository_commits = + commits + |> List.map (fun (repo, packages) -> + let (_, parents) = + List.find (fun (repo_prev,_) -> String.equal repo_prev.Opam_repo.name repo.Opam_repo.name) previous_commits + in + let parents = List.map S.Hash.of_hex parents in + Opam_repo.commit ~parents repo packages) + in + let req = { Solver_service_api.Worker.Solve_request. + opam_repository_commits; + root_pkgs; + pinned_pkgs; + platforms; + } + in + let response = Solver_service.Solver.solve ~cacheable:true ?cancelled t ~log:stderr_log req in + Fmt.pr "@[results:@,%a@]@." pp_response response; + let recent_commits = + List.map (fun (repo_prev, repo_previous_commits) -> + let (_, commit) = + List.find (fun (repo,_) -> String.equal repo_prev.Opam_repo.name repo) opam_repository_commits + in repo_prev, commit::repo_previous_commits) previous_commits + in + recent_commits diff --git a/worker/custom.ml b/worker/custom.ml index 3b3cbb8..c93399b 100644 --- a/worker/custom.ml +++ b/worker/custom.ml @@ -28,14 +28,14 @@ let cluster_worker_log log = Capnp_rpc_lwt.Service.return_empty () end -let solve ~cancelled ~solver ~log c = +let solve ~cacheable ~cancelled ~solver ~log c = let selections = let*! request = solve_of_custom c in let log = cluster_worker_log log in Lwt_eio.run_lwt @@ fun () -> Capnp_rpc_lwt.Capability.with_ref log @@ fun log -> Lwt_eio.run_eio @@ fun () -> - Solver_service.Solver.solve ~cancelled solver ~log request + Solver_service.Solver.solve ~cacheable ~cancelled solver ~log request in begin match selections with | Ok sels -> diff --git a/worker/custom.mli b/worker/custom.mli index 3082d65..9a5abc6 100644 --- a/worker/custom.mli +++ b/worker/custom.mli @@ -2,6 +2,7 @@ custom job specification. *) val solve : + cacheable:bool -> cancelled:unit Eio.Promise.t -> solver:Solver_service.Solver.t -> log:Log_data.t -> diff --git a/worker/solver_worker.ml b/worker/solver_worker.ml index bd594f9..d3b0403 100644 --- a/worker/solver_worker.ml +++ b/worker/solver_worker.ml @@ -34,6 +34,7 @@ type t = { name : string; registration_service : Cluster_api.Raw.Client.Registration.t Sturdy_ref.t; capacity : int; + cacheable : bool; mutable in_use : int; (* Number of active builds *) cond : unit Lwt_condition.t; (* Fires when a build finishes (or switch turned off) *) @@ -50,7 +51,7 @@ let metrics = function | `Host -> failwith "No host metrics from solver service" -let build ~cancelled ~log t descr = +let build ~cacheable ~cancelled ~log t descr = let module R = Cluster_api.Raw.Reader.JobDescr in match Cluster_api.Submission.get_action descr with | Custom_build c -> @@ -58,7 +59,7 @@ let build ~cancelled ~log t descr = f "Got request to build a job of kind \"%s\"" (Cluster_api.Custom.kind c)); (* Oddly, the protocol has us report cancellation and errors as "successful" jobs with the error inside! *) - let output = Custom.solve ~cancelled ~solver:t.solver ~log c in + let output = Custom.solve ~cacheable ~cancelled ~solver:t.solver ~log c in Log_data.write log "Job succeeded\n"; (Ok output, "ok") | Obuilder_build _ | Docker_build _ -> @@ -95,7 +96,7 @@ let loop t queue = Lwt_eio.run_eio @@ fun () -> Log_data.info log "Building on %s" t.name; let t0 = Unix.gettimeofday () in - match build ~cancelled ~log t request with + match build ~cacheable:t.cacheable ~cancelled ~log t request with | (outcome, metric_label) -> let t1 = Unix.gettimeofday () in Prometheus.Summary.observe @@ -127,7 +128,7 @@ let loop t queue = let self_update () = failwith "TODO: Self-update" -let run ~name ~capacity solver registration_service = +let run ~cacheable ~name ~capacity solver registration_service = Lwt_eio.run_lwt @@ fun () -> let t = { solver; @@ -136,6 +137,7 @@ let run ~name ~capacity solver registration_service = cond = Lwt_condition.create (); capacity; in_use = 0; + cacheable; } in let rec reconnect () = let connect_time = Unix.gettimeofday () in diff --git a/worker/solver_worker.mli b/worker/solver_worker.mli index 564ff13..3bf159c 100644 --- a/worker/solver_worker.mli +++ b/worker/solver_worker.mli @@ -1,6 +1,7 @@ open Capnp_rpc_lwt val run : + cacheable: bool -> name:string -> capacity:int -> Solver_service.Solver.t ->