Skip to content

Commit

Permalink
Add support for storage transformers.
Browse files Browse the repository at this point in the history
  • Loading branch information
zoj613 committed Jul 9, 2024
1 parent aa31327 commit 510222b
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 8 deletions.
95 changes: 95 additions & 0 deletions lib/extensions.ml
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,98 @@ module Datatype = struct
| `String "nativeint" -> Ok Nativeint
| _ -> Error ("Unsupported metadata data_type")
end

type tf_error =
[ `Store_read of string
| `Store_write of string
| `Node_invariant of string
| `Json_decode of string
| `Bytes_encode_error of string
| `Bytes_decode_error of string
| `Sharding_shape_mismatch of int array * int array * string
| `Invalid_transpose_order of int array * string
| `Gzip of Ezgzip.error
| `Grid of grid_info ]

type range = ByteRange of int * int option

module type STORAGE_TRANSFORMER = sig
type t
val get : t -> string -> (string, [> tf_error]) result
val set : t -> string -> string -> unit
val erase : t -> string -> unit
end

module StorageTransformers = struct
type transformer =
| Identity
type t = transformer list

let default = [Identity]

let deserialize x =
match
Util.get_name x,
Yojson.Safe.Util.(member "configuration" x)
with
| "identity", `Null -> Ok Identity
| _ ->
Error "Unsupported storage transformer name or configuration."

let of_yojson x =
let open Util.Result_syntax in
List.fold_right
(fun x acc ->
acc >>= fun l ->
deserialize x >>| fun s ->
s :: l) (Yojson.Safe.Util.to_list x) (Ok [])

let to_yojson x =
`List
(List.fold_right
(fun x acc ->
match x with
| Identity -> acc) x [])

let get
(type a)
(module M : STORAGE_TRANSFORMER with type t = a)
(store : a)
(transformers : t)
(key : string)
=
let open Util.Result_syntax in
M.get store key >>| fun raw ->
snd @@
List.fold_right
(fun x (k, v) ->
match x with
| Identity -> (k, v)) transformers (key, raw)

let set
(type a)
(module M : STORAGE_TRANSFORMER with type t = a)
(store : a)
(transformers : t)
(key : string)
(value : string)
=
let k', v' =
List.fold_left
(fun (k, v) -> function
| Identity -> (k, v)) (key, value) transformers
in
M.set store k' v'

let erase
(type a)
(module M : STORAGE_TRANSFORMER with type t = a)
(store : a)
(transformers : t)
(key : string)
=
M.erase store @@
List.fold_left
(fun k -> function
| Identity -> k) key transformers
end
50 changes: 50 additions & 0 deletions lib/extensions.mli
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,53 @@ module Datatype : sig
val of_yojson : Yojson.Safe.t -> (t, string) result
val to_yojson : t -> Yojson.Safe.t
end

type tf_error =
[ `Store_read of string
| `Store_write of string
| `Node_invariant of string
| `Json_decode of string
| `Bytes_encode_error of string
| `Bytes_decode_error of string
| `Sharding_shape_mismatch of int array * int array * string
| `Invalid_transpose_order of int array * string
| `Gzip of Ezgzip.error
| `Grid of grid_info ]

type range = ByteRange of int * int option

module type STORAGE_TRANSFORMER = sig
type t
val get : t -> string -> (string, [> tf_error]) result
val set : t -> string -> string -> unit
val erase : t -> string -> unit
end

module StorageTransformers : sig
type transformer =
| Identity
type t = transformer list

val default : t
val of_yojson : Yojson.Safe.t -> (t, string) result
val to_yojson : t -> Yojson.Safe.t
val get :
(module STORAGE_TRANSFORMER with type t = 'a) ->
'a ->
t ->
string ->
(string, [> tf_error ]) result
val set :
(module STORAGE_TRANSFORMER with type t = 'a) ->
'a ->
t ->
string ->
string ->
unit
val erase :
(module STORAGE_TRANSFORMER with type t = 'a) ->
'a ->
t ->
string ->
unit
end
19 changes: 15 additions & 4 deletions lib/metadata.ml
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,14 @@ module ArrayMetadata = struct
;chunk_key_encoding : ChunkKeyEncoding.t
;attributes : Yojson.Safe.t
;dimension_names : string option list
;storage_transformers : Yojson.Safe.t Util.ExtPoint.t list}
;storage_transformers : StorageTransformers.t}

let create
?(sep=`Slash)
?(codecs=Codecs.Chain.default)
?(dimension_names=[])
?(attributes=`Null)
?(storage_transformers=[StorageTransformers.Identity])
~shape
kind
fv
Expand All @@ -134,7 +135,7 @@ module ArrayMetadata = struct
;dimension_names
;zarr_format = 3
;node_type = "array"
;storage_transformers = []
;storage_transformers
;fill_value = FillValue.of_kind kind fv
;data_type = Datatype.of_kind kind
;chunk_key_encoding = ChunkKeyEncoding.create sep}
Expand Down Expand Up @@ -168,6 +169,12 @@ module ArrayMetadata = struct
| None -> `Null) xs
in
l @ [("dimension_names", `List xs')]
in
let l =
match t.storage_transformers with
| [StorageTransformers.Identity] -> l
| xs ->
l @ [("storage_transformers", StorageTransformers.to_yojson xs)]
in `Assoc l

let of_yojson x =
Expand Down Expand Up @@ -256,8 +263,10 @@ module ArrayMetadata = struct
>>= fun dimension_names ->

(match member "storage_transformers" x with
| `Null -> Ok []
| _ -> Error "storage_transformers field is not yet supported.")
| `Null | `List [] ->
Ok StorageTransformers.default
| _ ->
Error "storage_transformers field is not yet supported.")
>>| fun storage_transformers ->

{zarr_format; shape; node_type; data_type; codecs; fill_value; chunk_grid
Expand Down Expand Up @@ -286,6 +295,8 @@ module ArrayMetadata = struct

let attributes t = t.attributes

let storage_transformers t = t.storage_transformers

let chunk_shape t =
RegularGrid.chunk_shape t.chunk_grid

Expand Down
5 changes: 5 additions & 0 deletions lib/metadata.mli
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ module ArrayMetadata : sig
?codecs:Codecs.Chain.t ->
?dimension_names:string option list ->
?attributes:Yojson.Safe.t ->
?storage_transformers:Extensions.StorageTransformers.t ->
shape:int array ->
('a, 'b) Bigarray.kind ->
'a ->
Expand Down Expand Up @@ -76,6 +77,10 @@ module ArrayMetadata : sig
(** [attributes t] Returns a Yojson type containing user attributes assigned
to the zarr array represented by [t]. *)

val storage_transformers : t -> Extensions.StorageTransformers.t
(** [storage_transformers t] Returns the storage transformers to be applied
to the keys and values of this store. *)

val dimension_names : t -> string option list
(** [dimension_name t] returns a list of dimension names. If none are
defined then an empty list is returned. *)
Expand Down
12 changes: 8 additions & 4 deletions lib/storage/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module ArraySet = Util.ArraySet
module Arraytbl = Util.Arraytbl
module AM = Metadata.ArrayMetadata
module GM = Metadata.GroupMetadata
module ST = Extensions.StorageTransformers

module Make (M : STORE) : S with type t = M.t = struct
include M
Expand Down Expand Up @@ -154,11 +155,12 @@ module Make (M : STORE) : S with type t = M.t = struct
in
let codecs = AM.codecs meta in
let prefix = ArrayNode.to_key node ^ "/" in
let tf = AM.storage_transformers meta in
let cindices = ArraySet.of_seq @@ Arraytbl.to_seq_keys tbl in
ArraySet.fold (fun idx acc ->
acc >>= fun () ->
let chunkkey = prefix ^ AM.chunk_key meta idx in
(match get t chunkkey with
(match ST.get (module M) t tf chunkkey with
| Ok b ->
Codecs.Chain.decode codecs repr b
| Error _ ->
Expand All @@ -173,7 +175,7 @@ module Make (M : STORE) : S with type t = M.t = struct
List.iter
(fun (c, v) -> Ndarray.set arr c v) @@ Arraytbl.find_all tbl idx;
Codecs.Chain.encode codecs arr >>| fun encoded ->
set t chunkkey encoded) cindices (Ok ())
ST.set (module M) t tf chunkkey encoded) cindices (Ok ())

let get_array
: type a b.
Expand Down Expand Up @@ -206,6 +208,7 @@ module Make (M : STORE) : S with type t = M.t = struct
let tbl = Arraytbl.create @@ Array.length pair in
let prefix = ArrayNode.to_key node ^ "/" in
let chain = AM.codecs meta in
let tf = AM.storage_transformers meta in
let repr =
{kind
;shape = AM.chunk_shape meta
Expand All @@ -217,7 +220,7 @@ module Make (M : STORE) : S with type t = M.t = struct
| Some arr ->
Ok (Ndarray.get arr coord :: l)
| None ->
(match get t @@ prefix ^ AM.chunk_key meta idx with
(match ST.get (module M) t tf @@ prefix ^ AM.chunk_key meta idx with
| Ok b ->
Codecs.Chain.decode chain repr b
| Error _ ->
Expand All @@ -243,8 +246,9 @@ module Make (M : STORE) : S with type t = M.t = struct
ArraySet.of_list @@ AM.chunk_indices meta @@ AM.shape meta in
let s' =
ArraySet.of_list @@ AM.chunk_indices meta shape in
let tf = AM.storage_transformers meta in
ArraySet.iter
(fun v -> erase t @@ pre ^ AM.chunk_key meta v)
(fun v -> ST.erase (module M) t tf @@ pre ^ AM.chunk_key meta v)
ArraySet.(diff s s');
Ok (set t mkey @@ AM.encode @@ AM.update_shape meta shape)
end
Expand Down

0 comments on commit 510222b

Please sign in to comment.