Skip to content

Commit

Permalink
Merge pull request #191 from patrickbkr/http11-retry
Browse files Browse the repository at this point in the history
Fix a connection reuse race
  • Loading branch information
patrickbkr authored Nov 24, 2024
2 parents 809847d + 4e811a1 commit ac03da8
Showing 1 changed file with 51 additions and 15 deletions.
66 changes: 51 additions & 15 deletions lib/Cro/HTTP/Client.pm6
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class Cro::HTTP::Client::Policy::Timeout does Cro::Policy::Timeout[%(
#| for multiple requests, as well as allowing configuration of common properties of
#| many requests at construction time.
class Cro::HTTP::Client {
my class PipelineClosedBeforeHeaders is Exception { }
my class Pipeline {
has Bool $.secure;
has Str $.host;
Expand All @@ -127,6 +128,7 @@ class Cro::HTTP::Client {
has Tap $!tap;
has $!next-response-vow;
has Bool $.dead = False;
has Lock::Async $!lock .= new;

submethod BUILD(:$!secure!, :$!host!, :$!port!, :$!in!, :$out!) {
$!tap = supply {
Expand All @@ -136,18 +138,22 @@ class Cro::HTTP::Client {
$vow.keep($_);
LAST {
$!dead = True;
if $!next-response-vow {
$!next-response-vow.break:
'Connection unexpectedly closed before response headers received';
$!next-response-vow = Nil;
$!lock.protect: {
if $!next-response-vow {
$!next-response-vow.break:
PipelineClosedBeforeHeaders.new;
$!next-response-vow = Nil;
}
}
}
QUIT {
default {
$!dead = True;
if $!next-response-vow {
$!next-response-vow.break($_);
$!next-response-vow = Nil;
$!lock.protect: {
if $!next-response-vow {
$!next-response-vow.break($_);
$!next-response-vow = Nil;
}
}
}
}
Expand All @@ -156,8 +162,21 @@ class Cro::HTTP::Client {
}

method send-request($request --> Promise) {
my $next-response-promise = Promise.new;
$!next-response-vow = $next-response-promise.vow;
my $next-response-promise;
my $broken = False;
$!lock.protect: {
if $!dead {
# Without https://github.com/MoarVM/MoarVM/pull/1782 merged,
# we can't put the below return here.
$broken = True;
}
else {
$next-response-promise = Promise.new;
$!next-response-vow = $next-response-promise.vow;
}
}
return Promise.broken(PipelineClosedBeforeHeaders.new) if $broken;

$!in.emit($request);
return $next-response-promise;
}
Expand Down Expand Up @@ -217,13 +236,22 @@ class Cro::HTTP::Client {

method send-request(Cro::HTTP::Request $request --> Promise) {
my $p = Promise.new;
my $broken = False;
$!lock.protect: {
my $stream-id = $!next-stream-id;
$!next-stream-id += 2;
$request.http2-stream-id = $stream-id;
$request.http-version = '2.0';
%!outstanding-stream-responses{$stream-id} = $p.vow;
if $!dead {
# Without https://github.com/MoarVM/MoarVM/pull/1782 merged,
# we can't put the below return here.
$broken = True;
}
else {
my $stream-id = $!next-stream-id;
$!next-stream-id += 2;
$request.http2-stream-id = $stream-id;
$request.http-version = '2.0';
%!outstanding-stream-responses{$stream-id} = $p.vow;
}
}
return Promise.broken(PipelineClosedBeforeHeaders.new) if $broken;
$!in.emit($request);
$p
}
Expand Down Expand Up @@ -617,7 +645,6 @@ class Cro::HTTP::Client {

# Send the request.
whenever $pipeline.send-request($request-object) {
$headers-kept = True;
QUIT {
$request-log.end;
when GoAwayRetry {
Expand All @@ -628,7 +655,16 @@ class Cro::HTTP::Client {
.goaway-exception.rethrow;
}
}
when PipelineClosedBeforeHeaders {
if $goaway-retries > 0 && !$headers-kept {
$retry-supplier.emit: True;
}
else {
.rethrow;
}
}
}
$headers-kept = True;

# Consider adding the connection back into the cache to use it
# again.
Expand Down

0 comments on commit ac03da8

Please sign in to comment.