Skip to content

Commit

Permalink
Merge pull request #2205 from art-w/migrate-lower
Browse files Browse the repository at this point in the history
  • Loading branch information
metanivek authored Mar 3, 2023
2 parents 101dccf + 8d409de commit ce09bb7
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 79 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
@metanivek)
- Add a `behaviour` function to the GC to check wether the GC will archive or
delete data. (#2190, @Firobe)
- Add a migration on `open_rw` to move the data to the `lower_root` if
the configuration was enabled (#2205, @art-w)

### Changed

Expand Down
3 changes: 0 additions & 3 deletions src/irmin-pack/unix/control_file_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ module Payload = struct
start_offset : int63;
end_offset : int63;
mapping_end_poff : int63;
data_end_poff : int63;
checksum : int63;
}
[@@deriving irmin]
Expand All @@ -275,8 +274,6 @@ module Payload = struct
- [end_offset] is the global offset for the end of the volume's data.
Used for routing reads.
- [mapping_end_poff] is the end offset for the mapping file. Used when
writing.
- [data_end_poff] is the end offset for the data file. Used for
writing. *)
end

Expand Down
2 changes: 2 additions & 0 deletions src/irmin-pack/unix/errors.ml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type base_error =
| `Pending_flush
| `Rw_not_allowed
| `Migration_needed
| `Migration_to_lower_not_allowed
| `Corrupted_control_file
| `Sys_error of string
| `V3_store_from_the_future
Expand All @@ -67,6 +68,7 @@ type base_error =
| `Gc_forbidden_on_32bit_platforms
| `Invalid_prefix_read of string
| `Invalid_sparse_read of [ `After | `Before | `Hole ] * int63
| `Invalid_volume_read of [ `Empty | `Closed ] * int63
| `Inconsistent_store
| `Split_forbidden_during_batch
| `Split_disallowed
Expand Down
113 changes: 89 additions & 24 deletions src/irmin-pack/unix/file_manager.ml
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ struct

(* Constructors *********************************************************** *)

module Layout = Irmin_pack.Layout.V4
module Layout = Irmin_pack.Layout.V5

let open_prefix ~root ~generation =
let open Result_syntax in
Expand Down Expand Up @@ -497,6 +497,79 @@ struct

(* Open rw **************************************************************** *)

let dead_header_size_of_status = function
| Payload.From_v1_v2_post_upgrade _ -> legacy_io_header_size
| No_gc_yet | Gced _ | Used_non_minimal_indexing_strategy -> 0
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
| T15 ->
failwith "invalid status: T1..T15"

let can_migrate_to_lower (payload : Payload.t) =
match payload.status with
| No_gc_yet | Used_non_minimal_indexing_strategy | From_v1_v2_post_upgrade _
->
payload.chunk_num = 1 && payload.volume_num = 0
| Gced _ -> false
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
| T15 ->
failwith "invalid status: T1..T15"

let migrate_to_lower ~root ~lower_root ~control (payload : Payload.t) =
let open Result_syntax in
(* Step 1. Create a lower by moving the suffix file. *)
let suffix_file =
Layout.suffix_chunk ~root ~chunk_idx:payload.chunk_start_idx
in
let dead_header_size = dead_header_size_of_status payload.status in
let end_offset = payload.appendable_chunk_poff in
let* () =
Lower.create_from ~src:suffix_file ~dead_header_size ~size:end_offset
lower_root
in
(* Step 2. Create a new empty suffix for the upper. *)
let chunk_start_idx = payload.chunk_start_idx + 1 in
let* () =
Suffix.create_rw ~root ~overwrite:false ~auto_flush_threshold:1_000_000
~auto_flush_procedure:`Internal ~start_idx:chunk_start_idx
>>= Suffix.close
in
(* Step 3. Create a new empty prefix for the upper. *)
let generation = 1 in
let* () =
let mapping = Layout.mapping ~generation ~root in
let data = Layout.prefix ~root ~generation in
Sparse.Ao.create ~mapping ~data >>= Sparse.Ao.close
in
(* Step 4. Update the upper control file. *)
let payload =
{
payload with
chunk_start_idx;
appendable_chunk_poff = Int63.zero;
volume_num = 1;
status =
Gced
{
suffix_start_offset = end_offset;
generation;
latest_gc_target_offset = end_offset;
suffix_dead_bytes = Int63.zero;
};
}
in
let* () = Control.set_payload control payload in
Ok payload

let load_payload ~config ~root ~lower_root ~control =
let payload = Control.payload control in
match lower_root with
| Some lower_root when payload.volume_num = 0 ->
if Irmin_pack.Conf.no_migrate config then Error `Migration_needed
else if not (can_migrate_to_lower payload) then
Error `Migration_to_lower_not_allowed
else migrate_to_lower ~root ~lower_root ~control payload
| _ -> Ok payload

let open_rw_with_control_file config =
let open Result_syntax in
let root = Irmin_pack.Conf.root config in
Expand All @@ -505,17 +578,17 @@ struct
let path = Layout.control ~root in
Control.open_ ~readonly:false ~path
in
let Payload.
{
status;
appendable_chunk_poff;
chunk_start_idx = start_idx;
chunk_num;
dict_end_poff;
volume_num;
_;
} =
Control.payload control
let* Payload.
{
status;
appendable_chunk_poff;
chunk_start_idx = start_idx;
chunk_num;
dict_end_poff;
volume_num;
_;
} =
load_payload ~config ~root ~lower_root ~control
in
let* dead_header_size =
match status with
Expand All @@ -540,10 +613,9 @@ struct
let make_lower () =
match lower_root with
| None -> Ok None
| Some path ->
let+ l = Lower.v ~readonly:false ~volume_num path in
(* TODO: if [volume_num] is 0, we need to migrate
(or return error if [no_migrate = true]) *)
| Some lower_root ->
assert (volume_num > 0);
let+ l = Lower.v ~readonly:false ~volume_num lower_root in
Some l
in
finish_constructing_rw config control ~make_dict ~make_suffix ~make_index
Expand Down Expand Up @@ -664,14 +736,7 @@ struct
} =
Control.payload control
in
let* dead_header_size =
match status with
| From_v1_v2_post_upgrade _ -> Ok legacy_io_header_size
| No_gc_yet | Gced _ | Used_non_minimal_indexing_strategy -> Ok 0
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
| T15 ->
Error `V3_store_from_the_future
in
let dead_header_size = dead_header_size_of_status status in
let generation = generation status in
(* 2. Open the other files *)
let* suffix =
Expand Down
6 changes: 5 additions & 1 deletion src/irmin-pack/unix/file_manager_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ module type S = sig
| `Invalid_layout
| `Io_misc of Io.misc_error
| `Migration_needed
| `Migration_to_lower_not_allowed
| `No_such_file_or_directory of string
| `Not_a_directory of string
| `Not_a_file
Expand All @@ -128,7 +129,10 @@ module type S = sig
| `Index_failure of string
| `Sys_error of string
| `Inconsistent_store
| `Volume_missing of string ]
| `Volume_missing of string
| `Multiple_empty_volumes
| `Invalid_parent_directory
| `Pending_flush ]

val open_rw : Irmin.Backend.Conf.t -> (t, [> open_rw_error ]) result
(** Create a rw instance of [t] by opening existing files.
Expand Down
2 changes: 2 additions & 0 deletions src/irmin-pack/unix/io_errors.ml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ module Make (Io : Io.S) : S with module Io = Io = struct
| `Pending_flush
| `Rw_not_allowed
| `Migration_needed
| `Migration_to_lower_not_allowed
| `Corrupted_control_file
| `Sys_error of string
| `V3_store_from_the_future
Expand All @@ -68,6 +69,7 @@ module Make (Io : Io.S) : S with module Io = Io = struct
| `Gc_forbidden_on_32bit_platforms
| `Invalid_prefix_read of string
| `Invalid_sparse_read of [ `After | `Before | `Hole ] * int63
| `Invalid_volume_read of [ `Empty | `Closed ] * int63
| `Inconsistent_store
| `Closed
| `Ro_not_allowed
Expand Down
65 changes: 39 additions & 26 deletions src/irmin-pack/unix/lower.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

open Import
include Lower_intf
module Layout = Irmin_pack.Layout.V5.Volume
module Payload = Control_file.Payload.Volume.Latest

module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
module Io = Io
Expand All @@ -27,7 +29,7 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
| Empty of { path : string }
| Nonempty of {
path : string;
control : Control_file.Payload.Volume.Latest.t;
control : Payload.t;
mutable sparse : Sparse.t option;
}

Expand All @@ -41,7 +43,7 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let v volume_path =
let open Result_syntax in
let* control =
let path = Irmin_pack.Layout.V5.Volume.control ~root:volume_path in
let path = Layout.control ~root:volume_path in
match Io.classify_path path with
| `File ->
let+ payload = Control.read_payload ~path in
Expand All @@ -66,20 +68,39 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let* () = Io.mkdir volume_path in
(* 2. Make empty mapping *)
let* () =
Irmin_pack.Layout.V5.Volume.(
Io.create ~path:(mapping ~root:volume_path) ~overwrite:true)
Io.create ~path:(Layout.mapping ~root:volume_path) ~overwrite:true
>>= Io.close
in
(* 3. Make empty data *)
let* () =
Irmin_pack.Layout.V5.Volume.(
Io.create ~path:(data ~root:volume_path) ~overwrite:true)
Io.create ~path:(Layout.data ~root:volume_path) ~overwrite:true
>>= Io.close
in
(* TODO: handle failure to create all artifacts, either here or in a cleanup
when the store starts. *)
v volume_path

let create_from ~src ~dead_header_size ~size lower_root =
let open Result_syntax in
let root = Layout.directory ~root:lower_root ~idx:0 in
let data = Layout.data ~root in
let mapping = Layout.mapping ~root in
let* () = Io.mkdir root in
let* () = Io.move_file ~src ~dst:data in
let* mapping_end_poff =
Sparse.Wo.create_from_data ~mapping ~dead_header_size ~size ~data
in
let payload =
{
Payload.start_offset = Int63.zero;
end_offset = size;
mapping_end_poff;
checksum = Int63.zero;
}
in
let control = Layout.control ~root in
Control.create_rw ~path:control ~overwrite:false payload >>= Control.close

let path = function Empty { path } -> path | Nonempty { path; _ } -> path

let control = function
Expand All @@ -101,8 +122,8 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
| Some _ -> Ok () (* Sparse file is already open *)
| None ->
let open Result_syntax in
let mapping = Irmin_pack.Layout.V5.Volume.mapping ~root in
let data = Irmin_pack.Layout.V5.Volume.data ~root in
let mapping = Layout.mapping ~root in
let data = Layout.data ~root in
let+ sparse = Sparse.open_ro ~mapping ~data in
t.sparse <- Some sparse)

Expand All @@ -121,10 +142,10 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let eq a b = identifier_eq ~id:(identifier b) a

let read_range_exn ~off ~min_len ~max_len b = function
| Empty _ -> ()
| Empty _ -> Errs.raise_error (`Invalid_volume_read (`Empty, off))
| Nonempty { sparse; _ } -> (
match sparse with
| None -> Errs.raise_error `Closed
| None -> Errs.raise_error (`Invalid_volume_read (`Closed, off))
| Some s -> Sparse.read_range_exn s ~off ~min_len ~max_len b)

let archive_seq ~upper_root ~generation ~is_first ~to_archive ~off t =
Expand Down Expand Up @@ -167,24 +188,14 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let* () = Sparse.Ao.flush ao in
let* () = Sparse.Ao.close ao in
(* Prepare new control file *)
let start_offset, old_data_end_poff =
let start_offset =
match t with
| Empty _ -> (off, Int63.zero)
| Nonempty { control; _ } -> (control.start_offset, control.data_end_poff)
in
let data_len =
Seq.fold_left (fun len str -> len + String.length str) 0 to_archive
|> Int63.of_int
| Empty _ -> off
| Nonempty { control; _ } -> control.start_offset
in
let new_control =
Control_file.Payload.Volume.V5.
{
start_offset;
end_offset;
mapping_end_poff;
data_end_poff = Int63.add old_data_end_poff data_len;
checksum = Int63.zero;
}
{ start_offset; end_offset; mapping_end_poff; checksum = Int63.zero }
in
(* Write into temporary file on disk *)
let tmp_control_path =
Expand Down Expand Up @@ -235,7 +246,7 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let* volumes =
let root = t.root in
let volume i =
let path = Irmin_pack.Layout.V5.Volume.directory ~root ~idx:i in
let path = Layout.directory ~root ~idx:i in
match Io.classify_path path with
| `File | `Other | `No_such_file_or_directory ->
raise (LoadVolumeError (`Volume_missing path))
Expand Down Expand Up @@ -277,7 +288,7 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
in
let volume_path =
let next_idx = volume_num t in
Irmin_pack.Layout.V5.Volume.directory ~root:t.root ~idx:next_idx
Layout.directory ~root:t.root ~idx:next_idx
in
let* vol = Volume.create_empty volume_path in
t.volumes <- Array.append t.volumes [| vol |];
Expand Down Expand Up @@ -339,4 +350,6 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct

let read_exn ~off ~len ?volume t b =
read_range_exn ~off ~min_len:len ~max_len:len ?volume t b

let create_from = Volume.create_from
end
13 changes: 13 additions & 0 deletions src/irmin-pack/unix/lower_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,19 @@ module type S = sig
volume_identifier
(** Same as [read_exn] but will read at least [min_len] bytes and at most
[max_len]. *)

type create_error :=
[ open_error | close_error | add_error | `Sys_error of string ]

val create_from :
src:string ->
dead_header_size:int ->
size:Int63.t ->
string ->
(unit, [> create_error ]) result
(** [create_from ~src ~dead_header_size ~size lower_root] initializes the
first lower volume in the directory [lower_root] by moving the suffix file
[src] with end offset [size]. *)
end

module type Sigs = sig
Expand Down
Loading

0 comments on commit ce09bb7

Please sign in to comment.