diff --git a/bench/bench_copy.ml b/bench/bench_copy.ml index 0249bf6e8..34d918aeb 100644 --- a/bench/bench_copy.ml +++ b/bench/bench_copy.ml @@ -6,6 +6,22 @@ let chunk_size = 1 lsl 16 let n_chunks = 10000 let n_bytes = n_chunks * chunk_size +let rec await pid = + match Unix.waitpid [] pid with + | (_pid, status) -> status + | exception Unix.Unix_error (Unix.EINTR, _, _) -> await pid + +let run prog args = + await (Unix.create_process prog (Array.of_list (prog :: args)) Unix.stdin Unix.stdout Unix.stderr) + +let check_status = function + | Unix.WEXITED 0 -> () + | _ -> assert false + +let generate_file name = + run "fallocate" [ "-l"; string_of_int n_bytes; name ] + |> check_status + let run_client sock = Fiber.both (fun () -> @@ -35,8 +51,26 @@ let time name service = traceln "%s: %.2f MB/s" name (bytes_per_second /. 1024. /. 1024.); Metric.create name (`Float bytes_per_second) "bytes/s" (name ^ " Flow.copy") -let run _env = +let time_fs fs name service = + let ( / ) = Eio.Path.(/) in + let fname_in = "cptest.in" in + let fname_out ="cptest.out" in + if not (Sys.file_exists fname_in) then generate_file fname_in; + Eio.Path.with_open_in (fs / fname_in) @@ fun inflow -> + Eio.Path.with_open_out ~create:(`Exclusive 0o644) (fs / fname_out) @@ fun outflow -> + let t0 = Unix.gettimeofday () in + service inflow outflow; + let t1 = Unix.gettimeofday () in + let time = t1 -. t0 in + let bytes_per_second = float n_bytes /. time in + traceln "%s: %.2f MB/s" name (bytes_per_second /. 1024. /. 1024.); + at_exit (fun () -> try Sys.remove fname_in with _ -> ()); + Sys.remove fname_out; + Metric.create name (`Float bytes_per_second) "bytes/s" (name ^ " Flow.copy") + +let run env = [ + time_fs env#fs "default_fs" (fun inflow outflow -> Eio.Flow.copy inflow outflow); time "default" (fun sock -> Eio.Flow.copy sock sock); time "buf_read" (fun sock -> let r = Eio.Buf_read.of_flow sock ~initial_size:(64 * 1024) ~max_size:(64 * 1024) |> Eio.Buf_read.as_flow in diff --git a/lib_eio_linux/flow.ml b/lib_eio_linux/flow.ml index a280527ef..4b82cdc65 100644 --- a/lib_eio_linux/flow.ml +++ b/lib_eio_linux/flow.ml @@ -1,5 +1,32 @@ open Eio.Std +(* When copying between files of finite size (e.g. regular files and + block devices), we want to batch up read and write requests to + submit at one go *) +let batch_copy src dst = + let read_then_write_chunk infd outfd file_offset = + let buf = Low_level.alloc_fixed_or_wait () in + let len = Uring.Region.length buf in + Low_level.read_exactly ~file_offset infd buf len; + Low_level.write ~file_offset outfd buf len; + Low_level.free_fixed buf + in + let copy_file infd outfd insize block_size = + let module Int63 = Optint.Int63 in + Switch.run @@ fun sw -> + let rec copy_block file_offset = + let remaining = Int63.(sub insize file_offset) in + if remaining <> Int63.zero then ( + let len = Int63.to_int (min (Int63.of_int block_size) remaining) in + Fiber.fork ~sw (fun () -> read_then_write_chunk infd outfd file_offset); + copy_block Int63.(add file_offset (of_int len)) + ) + in + copy_block Int63.zero + in + let insize = (Low_level.fstat src).size in + copy_file src dst insize 4096 + (* When copying between a source with an FD and a sink with an FD, we can share the chunk and avoid copying. *) let fast_copy src dst = @@ -13,14 +40,17 @@ let fast_copy src dst = done with End_of_file -> () in - Low_level.with_chunk ~fallback @@ fun chunk -> - let chunk_size = Uring.Region.length chunk in - try - while true do - let got = Low_level.read_upto src chunk chunk_size in - Low_level.write dst chunk got - done - with End_of_file -> () + match (Low_level.fstat src).kind with + | `Block_device | `Regular_file -> batch_copy src dst + | _ -> + Low_level.with_chunk ~fallback @@ fun chunk -> + let chunk_size = Uring.Region.length chunk in + try + while true do + let got = Low_level.read_upto src chunk chunk_size in + Low_level.write dst chunk got + done + with End_of_file -> () (* Try a fast copy using splice. If the FDs don't support that, switch to copying. *) let _fast_copy_try_splice src dst = @@ -35,7 +65,7 @@ let _fast_copy_try_splice src dst = (* XXX workaround for issue #319, PR #327 *) let fast_copy_try_splice src dst = fast_copy src dst - + let[@tail_mod_cons] rec list_take n = function | [] -> [] | x :: xs ->