Skip to content

Commit

Permalink
Merge pull request #295 from talex5/graceful-disconnect
Browse files Browse the repository at this point in the history
Only disconnect socket when sending is done
  • Loading branch information
talex5 authored Nov 26, 2024
2 parents ae9c3e4 + e9c59b5 commit 2e01348
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 22 deletions.
1 change: 0 additions & 1 deletion capnp-rpc-net/capTP_capnp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ module Make (Network : S.NETWORK) = struct
if not t.disconnecting then (
t.disconnecting <- true;
send_abort t ex;
Endpoint.disconnect t.endpoint;
Conn.disconnect t.conn ex
)

Expand Down
18 changes: 10 additions & 8 deletions capnp-rpc-net/endpoint.ml
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,16 @@ let shutdown_send t =
Write.close t.writer

let rec run_writer ~tags t =
let bufs = Write.await_batch t.writer in
match Eio.Flow.single_write t.flow bufs with
| n -> Write.shift t.writer n; run_writer ~tags t
| exception (Eio.Io (Eio.Net.E Connection_reset _, _) as ex) ->
Log.info (fun f -> f ~tags "Send failed: %a" Eio.Exn.pp ex)
| exception ex ->
Eio.Fiber.check ();
Log.warn (fun f -> f ~tags "Error sending messages: %a (will shutdown connection)" Fmt.exn ex)
match Write.await_batch t.writer with
| exception End_of_file -> () (* Due to [shutdown_send] closing it. *)
| bufs ->
match Eio.Flow.single_write t.flow bufs with
| n -> Write.shift t.writer n; run_writer ~tags t
| exception (Eio.Io (Eio.Net.E Connection_reset _, _) as ex) ->
Log.info (fun f -> f ~tags "Send failed: %a" Eio.Exn.pp ex)
| exception ex ->
Eio.Fiber.check ();
Log.warn (fun f -> f ~tags "Error sending messages: %a (will shutdown connection)" Fmt.exn ex)

let run_writer ~tags t =
let cleanup () =
Expand Down
2 changes: 1 addition & 1 deletion capnp-rpc-net/vat.ml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ module Make (Network : S.NETWORK) = struct
let my_id = Auth.Secret_key.digest ~hash (Lazy.force t.secret_key) in
let keep_new = (my_id > peer_id) = (mode = `Connect) in
if keep_new then (
let reason = Capnp_rpc.Exception.v "Closing duplicate connection" in
let reason = Capnp_rpc.Exception.v "Invalidated by newer connection" in
CapTP.disconnect existing reason;
run_connection_tls t endpoint r
) else (
Expand Down
22 changes: 12 additions & 10 deletions capnp-rpc/proto/capTP.ml
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,6 @@ module Make (EP : Message_types.ENDPOINT) = struct
else `SenderPromise id

let bootstrap t remote_promise =
check_connected t;
Questions.alloc t.questions (Question.v ~params_for_release:[] ~remote_promise)

(* This is for level 0 implementations, which don't understand about releasing caps. *)
Expand Down Expand Up @@ -1086,15 +1085,18 @@ module Make (EP : Message_types.ENDPOINT) = struct
t.queue_send (`Disembargo_request request)

let bootstrap t object_id =
let result = make_remote_promise t in
let question = Send.bootstrap t (result :> Core_types.struct_resolver) in
result#set_question question;
let qid = Question.id question in
Log.debug (fun f -> f ~tags:(with_qid qid t) "Sending: bootstrap");
t.queue_send (`Bootstrap (qid, object_id));
let service = result#cap Wire.Path.root in
dec_ref result;
service
match t.disconnected with
| Some ex -> Core_types.broken_cap ex
| None ->
let result = make_remote_promise t in
let question = Send.bootstrap t (result :> Core_types.struct_resolver) in
result#set_question question;
let qid = Question.id question in
Log.debug (fun f -> f ~tags:(with_qid qid t) "Sending: bootstrap");
t.queue_send (`Bootstrap (qid, object_id));
let service = result#cap Wire.Path.root in
dec_ref result;
service

module Switchable = struct
class type handler = object
Expand Down
6 changes: 4 additions & 2 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -598,13 +598,15 @@ let test_crossed_calls ~net =
let c_got, s_got =
match c_got, s_got with
| Ok x, Ok y -> (x, y)
| Ok x, Error _ ->
| Ok x, Error (`Capnp e) ->
(* Server got an error. Try client again. *)
Logs.info (fun f -> f ~tags:Test_utils.server_tags "%a" Capnp_rpc.Error.pp e);
let to_client = Sturdy_ref.connect_exn sr_to_client in
Capability.with_ref to_client @@ fun to_client ->
Echo.ping to_client "ping" |> fun s_got -> (x, s_got)
| Error _, Ok y ->
| Error (`Capnp e), Ok y ->
(* Client got an error. Try server again. *)
Logs.info (fun f -> f ~tags:Test_utils.client_tags "%a" Capnp_rpc.Error.pp e);
let to_server = Sturdy_ref.connect_exn sr_to_server in
Capability.with_ref to_server @@ fun to_server ->
Echo.ping to_server "ping" |> fun c_got -> (c_got, y)
Expand Down

0 comments on commit 2e01348

Please sign in to comment.