Skip to content

Commit

Permalink
streamline httpstore for sync and lwt
Browse files Browse the repository at this point in the history
  • Loading branch information
zoj613 committed Nov 30, 2024
1 parent 1260055 commit 9833879
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 244 deletions.
118 changes: 1 addition & 117 deletions zarr-lwt/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ end

module ZipStore = Zarr.Zip.Make(Deferred)
module MemoryStore = Zarr.Memory.Make(Deferred)
module HttpStore = Zarr.Http.Make(Deferred)(Ezcurl_lwt)

module FilesystemStore = struct
module IO = struct
Expand Down Expand Up @@ -337,120 +338,3 @@ module AmazonS3Store = struct

include Zarr.Storage.Make(IO)
end

module HttpStore = struct
exception Not_implemented
exception Request_failed of int * string
open Deferred.Syntax

let raise_error (code, s) =
raise (Request_failed (Curl.int_of_curlCode code, s))

let fold_result = Result.fold ~error:raise_error ~ok:Fun.id

module IO = struct
module Deferred = Deferred

type t =
{tries : int
;base_url : string
;client : Ezcurl_lwt.t
;config : Ezcurl_lwt.Config.t}

let get t key =
let tries = t.tries and client = t.client and config = t.config in
let url = t.base_url ^ key in
let+ res = Ezcurl_lwt.get ~tries ~client ~config ~url () in
match fold_result res with
| {code; body; _} when code = 200 -> body
| {code; body; _} -> raise (Request_failed (code, body))

let size t key = Lwt.catch
(fun () ->
let+ data = get t key in
String.length data)
(function
| Request_failed (404, _) -> Deferred.return 0
| exn -> raise exn)
(*let size t key =
let tries = t.tries and client = t.client and config = t.config in
let url = t.base_url ^ key in
let type' = if String.ends_with ~suffix:".json" key then "json" else "octet-stream" in
let headers = [("Content-Type", "application/" ^ type')] in
let res = Ezcurl.http ~headers ~tries ~client ~config ~url ~meth:HEAD () in
match fold_result res with
| {code; _} when code = 404 -> 0
| {headers; _} ->
match List.assoc_opt "content-length" headers with
| (Some "0" | None) ->
begin try print_endline "empty content-length header"; String.length (get t key) with
| Request_failed (404, _) -> 0 end
| Some l -> int_of_string l *)

let is_member t key =
let+ s = size t key in
if s = 0 then false else true

let get_partial_values t key ranges =
let tries = t.tries and client = t.client and config = t.config and url = t.base_url ^ key in
let fetch range = Ezcurl_lwt.get ~range ~tries ~client ~config ~url () in
let end_index ofs l = Printf.sprintf "%d-%d" ofs (ofs + l - 1) in
let read_range acc (ofs, len) =
let none = Printf.sprintf "%d-" ofs in
let range = Option.fold ~none ~some:(end_index ofs) len in
let+ res = fetch range in
let response = fold_result res in
response.body :: acc
in
Deferred.fold_left read_range [] (List.rev ranges)

let set t key data =
let tries = t.tries and client = t.client and config = t.config
and url = t.base_url ^ key and content = `String data in
let type' = if String.ends_with ~suffix:".json" key then "json" else "octet-stream" in
let headers =
[("Content-Length", string_of_int (String.length data))
;("Content-Type", "application/" ^ type')] in
let+ res = Ezcurl_lwt.post ~params:[] ~headers ~tries ~client ~config ~url ~content () in
match fold_result res with
| {code; _} when code = 200 || code = 201 -> ()
| {code; body; _} -> raise (Request_failed (code, body))

let set_partial_values t key ?(append=false) rsv =
let* size = size t key in
let* ov = match size with
| 0 -> Deferred.return String.empty
| _ -> get t key
in
let f = if append || ov = String.empty then
fun acc (_, v) -> acc ^ v else
fun acc (rs, v) ->
let s = Bytes.unsafe_of_string acc in
Bytes.blit_string v 0 s rs String.(length v);
Bytes.unsafe_to_string s
in
set t key (List.fold_left f ov rsv)

(* make reshaping arrays possible *)
let erase t key =
let tries = t.tries and client = t.client and config = t.config in
let url = t.base_url ^ key in
let+ res = Ezcurl_lwt.http ~tries ~client ~config ~url ~meth:DELETE () in
match fold_result res with
| {code; _} when code = 200 -> ()
| {code; body; _} -> raise (Request_failed (code, body))

let erase_prefix _ = raise Not_implemented
let list _ = raise Not_implemented
let list_dir _ = raise Not_implemented
let rename _ = raise Not_implemented
end

let with_open ?(redirects=5) ?(tries=3) ?(timeout=5) url f =
let config = Ezcurl_lwt.Config.(default |> max_redirects redirects |> follow_location true) in
let perform client = f IO.{tries; client; config; base_url = url ^ "/"} in
let set_opts client = Curl.set_connecttimeout client timeout in
Ezcurl_lwt.with_client ~set_opts perform

include Zarr.Storage.Make(IO)
end
16 changes: 3 additions & 13 deletions zarr-lwt/src/storage.mli
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ module MemoryStore : Zarr.Memory.S with module Deferred = Deferred
(** An Lwt-aware Zip file storage backend for a Zarr v3 hierarchy. *)
module ZipStore : Zarr.Zip.S with module Deferred = Deferred

(** An Lwt-aware Http storage backend for a Zarr v3 hierarchy. *)
module HttpStore : Zarr.Http.S with module Deferred = Deferred

(** An Lwt-aware local filesystem storage backend for a Zarr V3 hierarchy. *)
module FilesystemStore : sig
include Zarr.Storage.STORE with module Deferred = Deferred
Expand All @@ -21,19 +24,6 @@ module FilesystemStore : sig
@raise Failure if [dir] is not a Zarr store path. *)
end

module HttpStore : sig
exception Not_implemented
exception Request_failed of int * string
include Zarr.Storage.STORE with module Deferred = Deferred
val with_open :
?redirects:int ->
?tries:int ->
?timeout:int ->
string ->
(t -> 'a Lwt.t) ->
'a Lwt.t
end

(** An Lwt-aware Amazon S3 bucket storage backend for a Zarr V3 hierarchy. *)
module AmazonS3Store : sig
exception Request_failed of Aws_s3_lwt.S3.error
Expand Down
108 changes: 1 addition & 107 deletions zarr-sync/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ end

module ZipStore = Zarr.Zip.Make(Deferred)
module MemoryStore = Zarr.Memory.Make(Deferred)
module HttpStore = Zarr.Http.Make(Deferred)(Ezcurl)

module FilesystemStore = struct
module IO = struct
Expand Down Expand Up @@ -116,110 +117,3 @@ module FilesystemStore = struct

include Zarr.Storage.Make(IO)
end

module HttpStore = struct
exception Not_implemented
exception Request_failed of int * string

let raise_error (code, s) = raise (Request_failed (Curl.int_of_curlCode code, s))

let fold_result = Result.fold ~error:raise_error ~ok:Fun.id

module IO = struct
module Deferred = Deferred

type t =
{tries : int
;base_url : string
;client : Ezcurl_core.t
;config : Ezcurl_core.Config.t}

let get t key =
let tries = t.tries and client = t.client and config = t.config in
let url = t.base_url ^ key in
let res = Ezcurl.get ~tries ~client ~config ~url () in
match fold_result res with
| {code; body; _} when code = 200 -> body
| {code; body; _} -> raise (Request_failed (code, body))

let size t key = try String.length (get t key) with
| Request_failed (404, _) -> 0
(*let size t key =
let tries = t.tries and client = t.client and config = t.config in
let url = t.base_url ^ key in
let type' = if String.ends_with ~suffix:".json" key then "json" else "octet-stream" in
let headers = [("Content-Type", "application/" ^ type')] in
let res = Ezcurl.http ~headers ~tries ~client ~config ~url ~meth:HEAD () in
match fold_result res with
| {code; _} when code = 404 -> 0
| {headers; _} ->
match List.assoc_opt "content-length" headers with
| (Some "0" | None) ->
begin try print_endline "empty content-length header"; String.length (get t key) with
| Request_failed (404, _) -> 0 end
| Some l -> int_of_string l *)

let is_member t key = if (size t key) = 0 then false else true

let get_partial_values t key ranges =
let tries = t.tries and client = t.client and config = t.config and url = t.base_url ^ key in
let fetch range = Ezcurl.get ~range ~tries ~client ~config ~url () in
let end_index ofs l = Printf.sprintf "%d-%d" ofs (ofs + l - 1) in
let read_range (ofs, len) =
let none = Printf.sprintf "%d-" ofs in
let range = Option.fold ~none ~some:(end_index ofs) len in
let response = fold_result (fetch range) in
response.body
in
List.map read_range ranges

let set t key data =
let tries = t.tries and client = t.client and config = t.config
and url = t.base_url ^ key and content = `String data in
let type' = if String.ends_with ~suffix:".json" key then "json" else "octet-stream" in
let headers =
[("Content-Length", string_of_int (String.length data))
;("Content-Type", "application/" ^ type')] in
let res = Ezcurl.post ~params:[] ~headers ~tries ~client ~config ~url ~content () in
match fold_result res with
| {code; _} when code = 200 || code = 201 -> ()
| {code; body; _} -> raise (Request_failed (code, body))

let set_partial_values t key ?(append=false) rsv =
let size = size t key in
let ov = match size with
| 0 -> String.empty
| _ -> get t key
in
let f = if append || ov = String.empty then
fun acc (_, v) -> acc ^ v else
fun acc (rs, v) ->
let s = Bytes.unsafe_of_string acc in
Bytes.blit_string v 0 s rs String.(length v);
Bytes.unsafe_to_string s
in
set t key (List.fold_left f ov rsv)

(* make reshaping arrays possible *)
let erase t key =
let tries = t.tries and client = t.client and config = t.config in
let url = t.base_url ^ key in
let res = Ezcurl.http ~tries ~client ~config ~url ~meth:DELETE () in
match fold_result res with
| {code; _} when code = 200 -> ()
| {code; body; _} -> raise (Request_failed (code, body))

let erase_prefix _ = raise Not_implemented
let list _ = raise Not_implemented
let list_dir _ = raise Not_implemented
let rename _ = raise Not_implemented
end

let with_open ?(redirects=5) ?(tries=3) ?(timeout=5) url f =
let config = Ezcurl_core.Config.(default |> max_redirects redirects |> follow_location true) in
let perform client = f IO.{tries; client; config; base_url = url ^ "/"} in
let set_opts client = Curl.set_connecttimeout client timeout in
Ezcurl_core.with_client ~set_opts perform

include Zarr.Storage.Make(IO)
end
10 changes: 3 additions & 7 deletions zarr-sync/src/storage.mli
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ module MemoryStore : Zarr.Memory.S with module Deferred = Deferred
(** A blocking I/O Zip file storage backend for a Zarr v3 hierarchy. *)
module ZipStore : Zarr.Zip.S with module Deferred = Deferred

(** A blocking I/O Http storage backend for a Zarr v3 hierarchy. *)
module HttpStore : Zarr.Http.S with module Deferred = Deferred

(** A blocking I/O local filesystem storage backend for a Zarr v3 hierarchy. *)
module FilesystemStore : sig
include Zarr.Storage.STORE with module Deferred = Deferred
Expand All @@ -20,10 +23,3 @@ module FilesystemStore : sig
@raise Failure if [dir] is not a Zarr store path. *)
end

module HttpStore : sig
exception Not_implemented
exception Request_failed of int * string
include Zarr.Storage.STORE with module Deferred = Deferred
val with_open : ?redirects:int -> ?tries:int -> ?timeout:int -> string -> (t -> 'a) -> 'a
end
1 change: 1 addition & 0 deletions zarr/src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
bytesrw.zstd
bytesrw.zlib
zipc
ezcurl
stdint
checkseum)
(ocamlopt_flags
Expand Down
Loading

0 comments on commit 9833879

Please sign in to comment.