diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..3fd4a62 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "scache"] + path = scache + url = https://github.com/moyodiallo/scache.git + branch = master diff --git a/bin/main.ml b/bin/main.ml index 2d19373..9d759cc 100644 --- a/bin/main.ml +++ b/bin/main.ml @@ -75,24 +75,25 @@ let start_server ~service vat_config = let+ vat = Capnp_rpc_unix.serve vat_config ~restore in Capnp_rpc_unix.Vat.sturdy_uri vat service_id -let main_service () solver cap_file vat_config = - let uri = start_server ~service:(Solver_service.Service.v solver) vat_config in +let main_service () solver cacheable cap_file vat_config = + let uri = start_server ~service:(Solver_service.Service.v ~cacheable solver) vat_config in Capnp_rpc_unix.Cap_file.save_uri uri cap_file |> or_die; Fmt.pr "Wrote solver service's address to %S@." cap_file; Fiber.await_cancel () -let main_service_pipe () solver = +let main_service_pipe () solver cacheable = let socket = Lwt_unix.stdin in (* Run locally reading from socket *) - export (Solver_service.Service.v solver) ~on:socket + export (Solver_service.Service.v ~cacheable solver) ~on:socket -let main_cluster () solver name capacity register_addr = +let main_cluster () solver cacheable name capacity register_addr = let vat = Capnp_rpc_unix.client_only_vat () in let sr = Capnp_rpc_unix.Vat.import_exn vat register_addr in let `Cancelled = Solver_worker.run solver sr ~name ~capacity + ~cacheable in () @@ -128,6 +129,11 @@ let capacity = @@ Arg.info ~doc:"The number of cluster jobs that can run in parallel" ~docv:"N" [ "capacity" ] +let cacheable = + Arg.value + @@ Arg.flag + @@ Arg.info ~doc:"Activate the cache system" [ "activate-cache"; "cache" ] + let cap_file = Arg.required @@ Arg.opt Arg.(some string) None @@ -168,6 +174,7 @@ let () = const main_service $ setup_log $ solver + $ cacheable $ cap_file $ Capnp_rpc_unix.Vat_config.cmd ) @@ -179,6 +186,7 @@ let () = const main_service_pipe $ setup_log $ solver + $ cacheable ) in let run_agent = @@ -188,6 +196,7 @@ let () = const main_cluster $ setup_log $ solver + $ cacheable $ worker_name $ capacity $ register_addr diff --git a/scache b/scache new file mode 160000 index 0000000..265c821 --- /dev/null +++ b/scache @@ -0,0 +1 @@ +Subproject commit 265c8217f251edd59ed0a8074adcb4c42c5e57cf diff --git a/service/dune b/service/dune index bd4a7e8..2b0f0d4 100644 --- a/service/dune +++ b/service/dune @@ -7,5 +7,8 @@ solver-service-api opam-0install capnp-rpc-net + opam-file-format git-unix - ocaml-version)) + scache + ocaml-version) + (preprocess (pps ppx_deriving_yojson))) diff --git a/service/service.ml b/service/service.ml index 2732d8c..85393a7 100644 --- a/service/service.ml +++ b/service/service.ml @@ -1,6 +1,6 @@ module Worker = Solver_service_api.Worker -let v t = +let v ?cacheable t = let open Capnp_rpc_lwt in let module X = Solver_service_api.Raw.Service.Solver in X.local @@ -27,7 +27,7 @@ let v t = (Capnp_rpc.Error.exn "Bad JSON in request: %s" msg)) | Ok request -> Lwt_eio.run_eio @@ fun () -> - let selections = Solver.solve t ~log request in + let selections = Solver.solve ?cacheable t ~log request in let json = Yojson.Safe.to_string (Worker.Solve_response.to_yojson selections) diff --git a/service/service.mli b/service/service.mli index bb2d4c7..7cc6f87 100644 --- a/service/service.mli +++ b/service/service.mli @@ -1,2 +1,2 @@ -val v : Solver.t -> Solver_service_api.Solver.t +val v : ?cacheable:bool -> Solver.t -> Solver_service_api.Solver.t (** [capnp_service t] is a Cap'n Proto service that handles requests using [t]. *) diff --git a/service/solve_cache.ml b/service/solve_cache.ml new file mode 100644 index 0000000..48a64e9 --- /dev/null +++ b/service/solve_cache.ml @@ -0,0 +1,364 @@ +open Eio.Std + +module Worker = Solver_service_api.Worker +module Log_data = Solver_service_api.Solver.Log +module Cache = Scache.Cache +module Set = Set.Make(Worker.Selection) + +type t = { + cache_dir : string; + process_mgr : [`Generic] Eio.Process.mgr_ty r; +} + +module Solve_cache = struct + + type t = { + request: Worker.Solve_request.t; + (** The request *) + solve_response : Worker.Solve_response.t; + (** Response of the solved request *) + last_opam_repository_commits: (string * string) list; + (** Pair of repo URL and commit hash, for each last opam-repository used during the request *) + }[@@deriving yojson] + + let marshal t = t |> to_yojson |> Yojson.Safe.to_string + let unmarshal t = t |> Yojson.Safe.from_string |> of_yojson |> Result.get_ok + +end + +(* module Log = struct *) +(* (* let src = Logs.Src.create "solver-worker" ~doc:"solver worker agent" *) *) +(* let src = Logs.Src.create "solver-scache" ~doc:"solver cache system" *) +(* include (val Logs.src_log src : Logs.LOG) *) +(* end *) + +let mutex = Lazy.from_fun (fun () -> Eio.Mutex.create ()) + +let git_command ?cwd args = + "git" :: + match cwd with + | Some dir -> "-C" :: dir :: args + | None -> args + +let find_command ?cwd args = + "find" :: + match cwd with + | Some dir -> dir :: args + | None -> args + +let grep_command ?cwd args = let _ = cwd in "grep" :: args + +let run_git ?cwd t args = + Eio.Process.run t.process_mgr (git_command ?cwd args) + +let take_all_opt r = + if Eio.Buf_read.at_end_of_input r then None + else Some (Eio.Buf_read.take_all r) + +let run_take_all ?cwd ?stdin t args command = + Eio.Process.parse_out ?stdin t.process_mgr take_all_opt (command ?cwd args) + +let lines_opt r = + if Eio.Buf_read.at_end_of_input r then None + else (Eio.Buf_read.map List.of_seq Eio.Buf_read.lines) r |> Option.some + +let run_lines ?cwd ?stdin t args command = + Eio.Process.parse_out ?stdin t.process_mgr lines_opt (command ?cwd args) + +let run_git_lines ?cwd ?stdin t args = run_lines ?cwd ?stdin t args git_command + +let run_git_take_all ?cwd ?stdin t args = run_take_all ?cwd ?stdin t args git_command + +let run_grep_lines ?cwd ?stdin t args = run_lines ?cwd ?stdin t args grep_command + +let run_find_take_all ?cwd ?stdin t args = run_take_all ?cwd ?stdin t args find_command + +module Git_clone = struct + + let replace_special = + String.map @@ function + | 'A'..'Z' + | 'a'..'z' + | '0'..'9' + | '-' as c -> c + | _ -> '_' +let rec mkdir_p path = + try Unix.mkdir path 0o700 with + | Unix.Unix_error (EEXIST, _, _) -> () + | Unix.Unix_error (ENOENT, _, _) -> + let parent = Filename.dirname path in + mkdir_p parent; + Unix.mkdir path 0o700 + + let repo_url_to_clone_path t repo_url = + let solve_dir = "solve" in + let uri = Uri.of_string repo_url in + let sane_host = + match Uri.host uri with + | Some host -> replace_special host + | None -> "no_host" + in + let sane_path = + Uri.( + path uri + |> pct_decode + |> Filename.chop_extension + |> replace_special) + in + Fpath.(v t.cache_dir / solve_dir / sane_host / sane_path) + + let clone t repo_url = + let clone_path = repo_url_to_clone_path t repo_url in + let clone_parent = Fpath.parent clone_path |> Fpath.to_string in + let clone_path_str = Fpath.to_string clone_path in + match Unix.lstat clone_path_str with + | Unix.{ st_kind = S_DIR; _ } -> () + | _ -> Fmt.failwith "%S is not a directory!" clone_path_str + | exception Unix.Unix_error (Unix.ENOENT, _, _) -> + mkdir_p clone_parent; + try + run_git t ["clone"; repo_url; clone_path_str; "--branch"; "master"] + with Eio.Io _ as ex -> + let bt = Printexc.get_raw_backtrace () in + Eio.Exn.reraise_with_context ex bt "cloning %S" repo_url + + let pull t repo_url = + try + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_git t ~cwd:clone_path ["pull"; "origin"] + with Eio.Io _ as ex -> + let bt = Printexc.get_raw_backtrace () in + Eio.Exn.reraise_with_context ex bt "pulling %S" repo_url + + let all_commits_rev t repo_url = + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_git_lines t ~cwd:clone_path + @@ "log" + :: "--reverse" + :: [ "--format=format:%H" ] + |> Option.value ~default:[] + + let _log t repo_url = + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_git_take_all t ~cwd:clone_path + @@ ["log"] + |> Option.value ~default:"Nothing" + + let _branch t repo_url = + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_git_take_all t ~cwd:clone_path + @@ ["branch"] + |> Option.value ~default:"Nothing" + + let _reflog t repo_url = + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_git_take_all t ~cwd:clone_path + @@ ["reflog"] + |> Option.value ~default:"Nothing" + + let find t repo_url = + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_find_take_all t ~cwd:clone_path + @@ [] + |> Option.value ~default:"Nothing" + + let diff t ~repo_url ~new_commit ~old_commit = + let clone_path = repo_url_to_clone_path t repo_url |> Fpath.to_string in + run_git_take_all t ~cwd:clone_path + @@ "diff" + :: old_commit + :: new_commit + :: "--" + :: [ "packages" ] + |> function + | None -> [] + | Some diff -> + try + run_grep_lines ~stdin:(Eio.Flow.string_source diff) t ["^... ./packages/.*/opam"] |> Option.value ~default:[] + |> List.map (fun path -> + Astring.String.cuts ~rev:true ~sep:"/" path + |> function + | _::_::package::_ -> package + | _ -> Fmt.failwith "Pkgs diff between %s and %s of %s@." repo_url new_commit old_commit) + with _ -> [] (* grep could exits with status 1 *) +end + +let cache = Cache.start ~name:"solve" + +let digest_request request = + request + |> Worker.Solve_request.to_yojson + |> Yojson.Safe.to_string + |> Digest.string + |> Digest.to_hex + +let _digest_solve_response ~solve_response = + solve_response + |> Worker.Solve_response.to_yojson + |> Yojson.Safe.to_string + |> Digest.string + +let get_solve ~cache ~digest : Solve_cache.t option = + match Cache.get cache ~key:digest with + | None -> None + | Some r -> Solve_cache.unmarshal r |> Option.some + +let set_solve ~cache ~solve_cache ~digest = + solve_cache + |> Solve_cache.marshal + |> fun response -> Cache.set cache ~key:digest ~value:response + +let remove_commits opam_repository_commits = + opam_repository_commits |> List.map (fun (url,_) -> (url,"")) + +let sort_by_url opam_repository_commits = + opam_repository_commits + |> List.sort (fun (url1,_) (url2,_) -> String.compare url1 url2) + +let _digest_opam_commits opam_repository_commits = + opam_repository_commits + |> List.fold_left (fun acc (_,commit) -> acc^commit) "" + |> Digest.string + +let is_same_solution ~solve_response_cache ~solve_response = + match solve_response_cache, solve_response with + | Error _, _ -> false + | _, Error _ -> false + | Ok selections_cache, Ok selections -> + Set.equal (Set.of_list selections_cache) (Set.of_list selections) + +let yojson_of_list l = l |> [%to_yojson: string list] +let yojosn_to_list l = l |> [%of_yojson: string list] + + +(* opam-repository comit with their rank *) +let opam_commits = Lazy.from_fun (fun () -> Hashtbl.create 10) + +let update_commits t repo_url commit = + let opam_commits = Lazy.force opam_commits in + let mutex = Lazy.force mutex in + match Hashtbl.find_opt opam_commits commit with + | Some _ -> () + | None -> ( + Eio.Mutex.use_rw mutex ~protect:true (fun () -> + Git_clone.clone t repo_url; + Git_clone.pull t repo_url; + Git_clone.all_commits_rev t repo_url) + |> List.iteri (fun rank commit -> Hashtbl.replace opam_commits commit rank)) + +let changed_packages t ~new_opam_repo ~old_opam_repo = + let opam_commits = Lazy.force opam_commits in + try + List.combine new_opam_repo old_opam_repo + |> List.map (fun ((repo_url,new_commit), (_,old_commit)) -> + let key = ("diff"^new_commit^"-"^old_commit) in + match Cache.get cache ~key with + | Some pkgs -> Yojson.Safe.from_string pkgs |> yojosn_to_list |> Result.get_ok + | None -> ( + update_commits t repo_url new_commit; + update_commits t repo_url old_commit; + (* Fmt.pr "DIFF new=%s old=%s :%a@." new_commit old_commit *) + (* Fmt.(list string) (Git_clone.diff t ~repo_url ~new_commit ~old_commit); *) + (* Fmt.pr "ALL COMMITS = %a@." Fmt.(list string) (Git_clone.all_commits t repo_url); *) + (* Fmt.pr "FIND = %s@." (Git_clone.find t repo_url); *) + (* Fmt.pr "BRANCH = %s@." (Git_clone.branch t repo_url); *) + (* Fmt.pr "REFLOG = %s@." (Git_clone.reflog t repo_url); *) + (* Fmt.pr "LOG %s@." (Git_clone.log t repo_url); *) + match Hashtbl.find_opt opam_commits new_commit, Hashtbl.find_opt opam_commits old_commit with + | Some rank_new, Some rank_old -> + (* With the rank, we make sure the new_commit is newer in the opam-repo git history *) + if rank_new > rank_old then + let pkgs = Git_clone.diff t ~repo_url ~new_commit ~old_commit in + Cache.set cache ~key ~value:(Yojson.Safe.to_string (yojson_of_list pkgs)); + pkgs + else [] + | None, _ -> + Fmt.failwith "The repo %s has not the commit %s@." repo_url new_commit + | _, None -> + Fmt.failwith "The repo %s has not the commit %s@." repo_url old_commit)) + |> List.flatten + |> Option.some + with Failure er -> + Fmt.epr "%s" er; None + +let get_names = OpamFormula.fold_left (fun a (name, _) -> name :: a) [] + +let deps opam_pkgs = + opam_pkgs + |> List.map (fun (_, content) -> + OpamFile.OPAM.read_from_string content |> OpamFile.OPAM.depends |> get_names) + +let is_invalidated t ~request ~solve_cache = + let { + Worker.Solve_request.opam_repository_commits; + root_pkgs; + pinned_pkgs; _ } = request + in + let request_pkgs = + List.concat_map (fun pkgs -> deps pkgs) [root_pkgs; pinned_pkgs] + |> List.flatten + |> OpamPackage.Name.Set.of_list + in + let old_opam_repo = + solve_cache.Solve_cache.last_opam_repository_commits + |> List.sort (fun (url1,_) (url2,_) -> String.compare url1 url2) + in + let new_opam_repo = + opam_repository_commits + |> List.sort (fun (url1,_) (url2,_) -> String.compare url1 url2) + in + match changed_packages t ~old_opam_repo ~new_opam_repo with + | None -> true (* Invalidate when a commit does not exist *) + | Some pkgs -> + pkgs + |> List.find_opt (fun pkg -> + OpamPackage.Name.Set.mem (OpamPackage.Name.of_string pkg) request_pkgs) + |> Option.is_some + +(** TODO describe solve funciton *) +let solve t solve log (request: Worker.Solve_request.t) = + let request = + { request with opam_repository_commits = sort_by_url request.opam_repository_commits } + in + let solve () = solve ~log request in + match get_solve ~cache ~digest:(digest_request request) with + | Some solve_cache when Result.is_ok solve_cache.solve_response -> ( + (* Log.info (fun f -> f "Solve from cache with exact opam-commits"); *) + Log_data.info log "From cache@."; + solve_cache.solve_response) + | _ -> ( + let req = + { request with opam_repository_commits = remove_commits request.opam_repository_commits } + in + match get_solve ~cache ~digest:(digest_request req) with + | Some solve_cache when not (Result.is_error solve_cache.solve_response || is_invalidated t ~request ~solve_cache) -> + (* Log.info (fun f -> f "Solve from cache (the old solve wasn't invalidated"); *) + Log_data.info log "From cache@."; + let solve_cache = + { solve_cache with last_opam_repository_commits = request.opam_repository_commits } + in + set_solve ~cache ~solve_cache ~digest:(digest_request req); + set_solve ~cache ~solve_cache ~digest:(digest_request request); + solve_cache.solve_response + | Some solve_cache when Result.is_ok solve_cache.solve_response -> + (* Log.info (fun f -> f "Invalidated solve from cache"); *) + let solve_response = solve () in + let solve_cache = + if is_same_solution ~solve_response_cache:solve_cache.solve_response ~solve_response then + { solve_cache with last_opam_repository_commits = request.opam_repository_commits } + else + {request; solve_response; last_opam_repository_commits = request.opam_repository_commits } + in + set_solve ~cache ~solve_cache ~digest:(digest_request req); + set_solve ~cache ~solve_cache ~digest:(digest_request request); + solve_cache.solve_response + | _ -> + (* Log.info (fun f -> f "Solve not from cache"); *) + let solve_response = solve () in + let solve_cache = + { Solve_cache.request; solve_response; last_opam_repository_commits = request.opam_repository_commits } + in + set_solve ~cache ~digest:(digest_request req) ~solve_cache; + set_solve ~cache ~digest:(digest_request request) ~solve_cache; + solve_response + ) diff --git a/service/solver.ml b/service/solver.ml index 160d9da..41cc502 100644 --- a/service/solver.ml +++ b/service/solver.ml @@ -8,6 +8,8 @@ let (let*!) = Result.bind type t = { pool : (Domain_worker.request, Domain_worker.reply) Pool.t; stores : Stores.t; + cache_dir : string; + process_mgr : [`Generic] Eio.Process.mgr_ty r; } let ocaml = OpamPackage.Name.of_string "ocaml" @@ -30,7 +32,7 @@ let env vars v = if List.mem v OpamPackageVar.predefined_depends_variables then None else Domain_worker.env vars (OpamVariable.Full.to_string v) -let solve_for_platform ?cancelled t ~log ~opam_repository_commits ~packages ~root_pkgs ~pinned_pkgs ~pins ~vars id = +let solve_for_platform ?cancelled t ~cacheable ~log ~opam_repository_commits ~packages ~root_pkgs ~pinned_pkgs ~pins ~vars id = let ocaml_version = OpamPackage.Version.of_string vars.Worker.Vars.ocaml_version in let root_pkgs = root_pkgs @@ -55,18 +57,25 @@ let solve_for_platform ?cancelled t ~log ~opam_repository_commits ~packages ~roo Error (`No_solution e) | Ok packages -> Log.info log "%s: found solution in %.2f s" id time; - let repo_packages = - packages - |> List.filter_map (fun (pkg : OpamPackage.t) -> - if OpamPackage.Name.Set.mem pkg.name pins then None - else Some pkg) - in - (* Hack: ocaml-ci sometimes also installs odoc, but doesn't tell us about it. + let commits = + if cacheable then + (* The cache system handle a sort of oldest_commit*) + opam_repository_commits + else + let repo_packages = + packages + |> List.filter_map (fun (pkg : OpamPackage.t) -> + if OpamPackage.Name.Set.mem pkg.name pins then None + else Some pkg) + in + (* Hack: ocaml-ci sometimes also installs odoc, but doesn't tell us about it. Make sure we have at least odoc 2.1.1 available, otherwise it won't work on OCaml 5.0. *) - let repo_packages = - OpamPackage.of_string "odoc.2.1.1" :: repo_packages + let repo_packages = + OpamPackage.of_string "odoc.2.1.1" :: repo_packages + in + let commits = Stores.oldest_commits_with t.stores repo_packages ~from:opam_repository_commits in + commits in - let commits = Stores.oldest_commits_with t.stores repo_packages ~from:opam_repository_commits in let compat_pkgs = let to_string (name, (version,_)) = OpamPackage.to_string (OpamPackage.create name version) in List.map to_string root_pkgs @@ -96,7 +105,7 @@ let rec parse_opams = function Ok (x :: xs) (* Handle a request by distributing it among the worker processes and then aggregating their responses. *) -let solve ?cancelled t ~log request = +let solve ?cancelled ~cacheable t ~log request = let { Worker.Solve_request.opam_repository_commits; platforms; @@ -124,6 +133,7 @@ let solve ?cancelled t ~log request = let result = solve_for_platform t id ?cancelled + ~cacheable ~log ~opam_repository_commits ~packages @@ -161,10 +171,20 @@ let solve ?cancelled t ~log request = | [] -> Ok results | errors -> Fmt.error_msg "@[%a@]" Fmt.(list ~sep:cut string) errors +let solve ?cacheable ?cancelled t ~log request = + match cacheable with + | Some true -> + let solve = solve t ?cancelled ~cacheable:true in + let cache = { Solve_cache.cache_dir = t.cache_dir; process_mgr = t.process_mgr } in + Solve_cache.solve cache solve log request + | _ -> solve ?cancelled ~cacheable:false t ~log request + let create ~sw ~domain_mgr ~process_mgr ~cache_dir ~n_workers = let stores = Stores.create ~process_mgr ~cache_dir in let pool = Pool.create ~sw ~domain_mgr ~n_workers Domain_worker.solve in { stores; pool; + cache_dir; + process_mgr = (process_mgr :> [`Generic] Eio.Process.mgr_ty r); } diff --git a/service/solver.mli b/service/solver.mli index 7e3c19a..973356f 100644 --- a/service/solver.mli +++ b/service/solver.mli @@ -17,6 +17,7 @@ val create : @param n_workers Maximum number of worker domains. *) val solve : + ?cacheable:bool -> ?cancelled:unit Eio.Promise.t -> t -> log:Solver_service_api.Solver.Log.t -> diff --git a/test/dune b/test/dune index f6453d5..658aa25 100644 --- a/test/dune +++ b/test/dune @@ -1,4 +1,4 @@ (tests (names test) (package solver-service) - (libraries solver-service eio_main lwt_eio)) + (libraries solver-service scache eio_main lwt_eio)) diff --git a/test/test.expected b/test/test.expected index d0e8843..de7de79 100644 --- a/test/test.expected +++ b/test/test.expected @@ -246,3 +246,83 @@ results: packages: [foo.dev; ocaml-base-compiler.5.0] commits: [(opam-repo.git, 1bc28b8e8d98db6e524822c6f28bddebbc3504a3)] lower_bound: false] + +# Solve_cache + +## Select foo.1.0 ## + +commits: [(opam-repo.git, [ocaml-base-compiler.5.0; foo.1.0])] +root_pkgs: [app.dev] +platforms: [debian-12-ocaml-5] +results: + [debian-12-ocaml-5: + compat_pkgs: [app.dev] + packages: [app.dev; foo.1.0; ocaml-base-compiler.5.0] + commits: [(opam-repo.git, bf39e9df31e82a307e3daee9409f62d1a15acfe7)] + lower_bound: false] + +## Foo 1.1 now available ## + +commits: [(opam-repo.git, [foo.1.1; ocaml-base-compiler.5.0; foo.1.0])] +root_pkgs: [app.dev] +platforms: [debian-12-ocaml-5] +Already up to date. +Need to update opam-repo.git to get new commit 6b20fc580721bea55bdb0bf54780aaa98d162e0e +results: + [debian-12-ocaml-5: + compat_pkgs: [app.dev] + packages: [app.dev; foo.1.1; ocaml-base-compiler.5.0] + commits: [(opam-repo.git, 6b20fc580721bea55bdb0bf54780aaa98d162e0e)] + lower_bound: false] + +## Foo 1.1 (hit the cache, the commit won't change) ## + +commits: [(opam-repo.git, + [foo.1.1; foo.1.1; ocaml-base-compiler.5.0; foo.1.0])] +root_pkgs: [app.dev] +platforms: [debian-12-ocaml-5] +Updating 6b20fc5..8b14733 +Fast-forward +results: + [debian-12-ocaml-5: + compat_pkgs: [app.dev] + packages: [app.dev; foo.1.1; ocaml-base-compiler.5.0] + commits: [(opam-repo.git, 6b20fc580721bea55bdb0bf54780aaa98d162e0e)] + lower_bound: false] + +## Oof 1.0 now available (hit the cache, the commit won't change) ## + +commits: [(opam-repo.git, + [oof.1.0; oof.1.0; foo.1.1; ocaml-base-compiler.5.0; foo.1.0])] +root_pkgs: [app.dev] +platforms: [debian-12-ocaml-5] +Updating 8b14733..fbfff59 +Fast-forward + packages/oof/oof.1.0/opam | 4 ++++ + 1 file changed, 4 insertions(+) + create mode 100644 packages/oof/oof.1.0/opam +results: + [debian-12-ocaml-5: + compat_pkgs: [app.dev] + packages: [app.dev; foo.1.1; ocaml-base-compiler.5.0] + commits: [(opam-repo.git, 6b20fc580721bea55bdb0bf54780aaa98d162e0e)] + lower_bound: false] + +## Oof 1.1 now available (invalidate the cache foo 1.1 will be removed on the next commit, the newest commit for the result) ## + +commits: [(opam-repo.git, [oof.1.1; ocaml-base-compiler.5.0; foo.1.0])] +root_pkgs: [app.dev] +platforms: [debian-12-ocaml-5] +Updating fbfff59..df311eb +Fast-forward + packages/oof/oof.1.0/opam | 4 ---- + packages/{foo/foo.1.1 => oof/oof.1.1}/opam | 0 + 2 files changed, 4 deletions(-) + delete mode 100644 packages/oof/oof.1.0/opam + rename packages/{foo/foo.1.1 => oof/oof.1.1}/opam (100%) +results: + [debian-12-ocaml-5: + compat_pkgs: [app.dev] + packages: [app.dev; foo.1.1; ocaml-base-compiler.5.0] + commits: [(opam-repo.git, 6b20fc580721bea55bdb0bf54780aaa98d162e0e)] + lower_bound: false] diff --git a/test/test.ml b/test/test.ml index cb37273..b00e70b 100644 --- a/test/test.ml +++ b/test/test.ml @@ -231,6 +231,44 @@ let test_available t = "mac", { debian_12_ocaml_5 with os = "macos" }; ] +let test_solve_cache t = + let solve = solve_cache in + let opam_repo = Opam_repo.create "opam-repo.git" in + let root_pkgs = ["app.dev", {| depends: [ "foo" ] |}] in + let opam_packages = [ + "ocaml-base-compiler.5.0", ""; + "foo.1.0", {| depends: [ "ocaml-base-compiler" ] |}; + ] + in + let first_opam_packages = opam_packages in + let recent_commits = + solve t "Select foo.1.0" ~platforms ~root_pkgs ~previous_commits:[opam_repo,[]] + ~commits:[opam_repo, opam_packages] + in + let opam_packages = ("foo.1.1","") :: opam_packages in + let recent_commits = + solve t "Foo 1.1 now available" ~previous_commits:recent_commits ~platforms ~root_pkgs ~commits:[ + opam_repo, opam_packages + ] + in + let recent_commits = + solve t "Foo 1.1 (hit the cache, the commit won't change)" ~previous_commits:recent_commits ~platforms ~root_pkgs ~commits:[ + opam_repo, ("foo.1.1","")::opam_packages + ] + in + let opam_packages = ("oof.1.0","") :: opam_packages in + let recent_commits = + solve t "Oof 1.0 now available (hit the cache, the commit won't change)" ~previous_commits:recent_commits ~platforms ~root_pkgs ~commits:[ + opam_repo, ("oof.1.0","") :: opam_packages + ] + in + solve t + "Oof 1.1 now available (invalidate the cache foo 1.1 will be removed on the next commit, the newest commit for the result)" + ~previous_commits:recent_commits ~platforms ~root_pkgs ~commits:[ + opam_repo, ("oof.1.1","") :: first_opam_packages + ] |> ignore; + () + let () = Eio_main.run @@ fun env -> let domain_mgr = env#domain_mgr in @@ -250,6 +288,7 @@ let () = "Pinned", test_pinned; "Cancel", test_cancel; "Available", test_available; + "Solve_cache", test_solve_cache; ] |> List.iter (fun (name, fn) -> Fmt.pr "@.# %s@." name; diff --git a/test/utils.ml b/test/utils.ml index 557879b..a206883 100644 --- a/test/utils.ml +++ b/test/utils.ml @@ -6,13 +6,18 @@ let add_opam_header s = synopsis: "Test package" |} ^ s +module S = Git_unix.Store + module Opam_repo : sig - type t + type t = { + name : string; + store : S.t; + } val create : string -> t (** [create path] opens a Git repository at [path], creating it if needed. *) - val commit : t -> (string * string) list -> string * string + val commit : ?parents:S.Value.Commit.hash list -> t -> (string * string) list -> string * string (** [commit t files] creates a commit with the given opam package files and returns the [(repo_url, hash]) pair for it. *) @@ -20,7 +25,6 @@ module Opam_repo : sig end = struct (* A fake opam-repository upstream for testing. *) - module S = Git_unix.Store type t = { name : string; @@ -85,7 +89,7 @@ end = struct in write_tree t (List.map write_version versions) - let commit t pkgs = + let commit ?parents t pkgs = let pkgs = group_packages pkgs |> OpamPackage.Name.Map.to_seq @@ -95,10 +99,11 @@ end = struct ) in let tree = write_tree t ["packages", `Dir (write_tree t pkgs)] in - let commit = S.Value.Commit.make ~tree ~author:test_user ~committer:test_user (Some "Commit") in + let commit = S.Value.Commit.make ?parents ~tree ~author:test_user ~committer:test_user (Some "Commit") in let commit = write t (S.Value.commit commit) in set_branch t Git.Reference.master commit; (t.name, S.Hash.to_hex commit) + end let stderr_log = @@ -153,3 +158,39 @@ let solve ?cancelled ?(pinned_pkgs=[]) t label ~commits ~root_pkgs ~platforms = in let response = Solver_service.Solver.solve ?cancelled t ~log:stderr_log req in Fmt.pr "@[results:@,%a@]@." pp_response response + +let solve_cache ?cancelled ?(pinned_pkgs=[]) + t label ~previous_commits ~commits ~root_pkgs ~platforms = + Fmt.pr "@.## %s ##@.@.commits: %a@.root_pkgs: %a@.platforms: %a@." + label + pp_commits commits + pp_packages root_pkgs + pp_platforms platforms; + if pinned_pkgs <> [] then Fmt.pr "pinned: %a@." pp_packages pinned_pkgs; + let root_pkgs = List.map (fun (pkg, opam) -> pkg, add_opam_header opam) root_pkgs in + let pinned_pkgs = List.map (fun (pkg, opam) -> pkg, add_opam_header opam) pinned_pkgs in + let opam_repository_commits = + commits + |> List.map (fun (repo, packages) -> + let (_, parents) = + List.find (fun (repo_prev,_) -> String.equal repo_prev.Opam_repo.name repo.Opam_repo.name) previous_commits + in + let parents = List.map S.Hash.of_hex parents in + Opam_repo.commit ~parents repo packages) + in + let req = { Solver_service_api.Worker.Solve_request. + opam_repository_commits; + root_pkgs; + pinned_pkgs; + platforms; + } + in + let response = Solver_service.Solver.solve ~cacheable:true ?cancelled t ~log:stderr_log req in + Fmt.pr "@[results:@,%a@]@." pp_response response; + let recent_commits = + List.map (fun (repo_prev, repo_previous_commits) -> + let (_, commit) = + List.find (fun (repo,_) -> String.equal repo_prev.Opam_repo.name repo) opam_repository_commits + in repo_prev, commit::repo_previous_commits) previous_commits + in + recent_commits diff --git a/worker/custom.ml b/worker/custom.ml index 3b3cbb8..c93399b 100644 --- a/worker/custom.ml +++ b/worker/custom.ml @@ -28,14 +28,14 @@ let cluster_worker_log log = Capnp_rpc_lwt.Service.return_empty () end -let solve ~cancelled ~solver ~log c = +let solve ~cacheable ~cancelled ~solver ~log c = let selections = let*! request = solve_of_custom c in let log = cluster_worker_log log in Lwt_eio.run_lwt @@ fun () -> Capnp_rpc_lwt.Capability.with_ref log @@ fun log -> Lwt_eio.run_eio @@ fun () -> - Solver_service.Solver.solve ~cancelled solver ~log request + Solver_service.Solver.solve ~cacheable ~cancelled solver ~log request in begin match selections with | Ok sels -> diff --git a/worker/custom.mli b/worker/custom.mli index 3082d65..9a5abc6 100644 --- a/worker/custom.mli +++ b/worker/custom.mli @@ -2,6 +2,7 @@ custom job specification. *) val solve : + cacheable:bool -> cancelled:unit Eio.Promise.t -> solver:Solver_service.Solver.t -> log:Log_data.t -> diff --git a/worker/solver_worker.ml b/worker/solver_worker.ml index bd594f9..d3b0403 100644 --- a/worker/solver_worker.ml +++ b/worker/solver_worker.ml @@ -34,6 +34,7 @@ type t = { name : string; registration_service : Cluster_api.Raw.Client.Registration.t Sturdy_ref.t; capacity : int; + cacheable : bool; mutable in_use : int; (* Number of active builds *) cond : unit Lwt_condition.t; (* Fires when a build finishes (or switch turned off) *) @@ -50,7 +51,7 @@ let metrics = function | `Host -> failwith "No host metrics from solver service" -let build ~cancelled ~log t descr = +let build ~cacheable ~cancelled ~log t descr = let module R = Cluster_api.Raw.Reader.JobDescr in match Cluster_api.Submission.get_action descr with | Custom_build c -> @@ -58,7 +59,7 @@ let build ~cancelled ~log t descr = f "Got request to build a job of kind \"%s\"" (Cluster_api.Custom.kind c)); (* Oddly, the protocol has us report cancellation and errors as "successful" jobs with the error inside! *) - let output = Custom.solve ~cancelled ~solver:t.solver ~log c in + let output = Custom.solve ~cacheable ~cancelled ~solver:t.solver ~log c in Log_data.write log "Job succeeded\n"; (Ok output, "ok") | Obuilder_build _ | Docker_build _ -> @@ -95,7 +96,7 @@ let loop t queue = Lwt_eio.run_eio @@ fun () -> Log_data.info log "Building on %s" t.name; let t0 = Unix.gettimeofday () in - match build ~cancelled ~log t request with + match build ~cacheable:t.cacheable ~cancelled ~log t request with | (outcome, metric_label) -> let t1 = Unix.gettimeofday () in Prometheus.Summary.observe @@ -127,7 +128,7 @@ let loop t queue = let self_update () = failwith "TODO: Self-update" -let run ~name ~capacity solver registration_service = +let run ~cacheable ~name ~capacity solver registration_service = Lwt_eio.run_lwt @@ fun () -> let t = { solver; @@ -136,6 +137,7 @@ let run ~name ~capacity solver registration_service = cond = Lwt_condition.create (); capacity; in_use = 0; + cacheable; } in let rec reconnect () = let connect_time = Unix.gettimeofday () in diff --git a/worker/solver_worker.mli b/worker/solver_worker.mli index 564ff13..3bf159c 100644 --- a/worker/solver_worker.mli +++ b/worker/solver_worker.mli @@ -1,6 +1,7 @@ open Capnp_rpc_lwt val run : + cacheable: bool -> name:string -> capacity:int -> Solver_service.Solver.t ->