Skip to content

Commit

Permalink
Flexible conn (#111)
Browse files Browse the repository at this point in the history
* tests(conn): simplify test connections to enable different transports to use the tests

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart authored Nov 4, 2024
1 parent c48451e commit 86ef437
Show file tree
Hide file tree
Showing 37 changed files with 693 additions and 802 deletions.
19 changes: 8 additions & 11 deletions core/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
server!: ServerImpl;
features: Features;
connectPromise: Promise<void> | null;
raceTimer?: Timeout<void>;

constructor(options: ConnectionOptions, publisher: Publisher) {
this._closed = false;
Expand Down Expand Up @@ -512,11 +513,10 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {

async dial(srv: Server): Promise<void> {
const pong = this.prepare();
let timer;
try {
timer = timeout(this.options.timeout || 20000);
this.raceTimer = timeout(this.options.timeout || 20000);
const cp = this.transport.connect(srv, this.options);
await Promise.race([cp, timer]);
await Promise.race([cp, this.raceTimer]);
(async () => {
try {
for await (const b of this.transport) {
Expand All @@ -531,10 +531,8 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
}

try {
await Promise.race([timer, pong]);
if (timer) {
timer.cancel();
}
await Promise.race([this.raceTimer, pong]);
this.raceTimer?.cancel();
this.connected = true;
this.connectError = undefined;
this.sendSubscriptions();
Expand All @@ -544,9 +542,7 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
this.flushPending();
this.heartbeats.start();
} catch (err) {
if (timer) {
timer.cancel();
}
this.raceTimer?.cancel();
await this.transport.close(err as Error);
throw err;
}
Expand Down Expand Up @@ -1007,7 +1003,8 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
});
this._closed = true;
await this.transport.close(err);
await this.closed.resolve(err);
this.raceTimer?.cancel();
this.closed.resolve(err);
}

close(): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion core/src/ws_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ export class WsTransport implements Transport {
this.socket.onclose = (evt: CloseEvent) => {
this.socketClosed = true;
let reason: Error | undefined;
if (!evt.wasClean) {
if (!evt.wasClean && evt.reason !== "") {
reason = new Error(evt.reason);
}
this._closed(reason);
Expand Down
Loading

0 comments on commit 86ef437

Please sign in to comment.