Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a way to detect channel close to be able to recreate the channel #40

Closed
icastillejogomez opened this issue Aug 9, 2022 · 7 comments

Comments

@icastillejogomez
Copy link

I had an issue when I ack some messages who come from a queue that was marked as non-ack. So calling basicAck or basicNack was throw an error.

This issue was solved and my system now is running as expected but was sad not found a way to detect channel close event to be able to recreate/reconnect that channel.

I have:

// Connect and create a channel to RabbitMQ
this.client = new AMQPClient(connectionUrl)
this.connection = await this.client.connect()
this.channel = await this.connection.channel()

this.client.onerror = (error) => {
    console.error(error)
    this.tryReconnect()
}

// Handle disconnection
this.connection.onerror = () => {
    console.error(
        `[RabbitMqClient] ${new Date().toISOString()} Upps! We have a connection error... Trying to reconnect with exponential backoff..`
    )
    this.tryReconnect()
}

But this only work when RabbitMQ server goes down not when the channel crash.

@carlhoerberg
Copy link
Member

carlhoerberg commented Aug 9, 2022 via email

@icastillejogomez
Copy link
Author

When do you estimate this feature could be released? Thanks for the quickly response @carlhoerberg

@cressie176
Copy link

Hi @carlhoerberg,

Event handlers have caused one of the most difficult to resolve issues with amqplib. Specifically, because emitting events is synchronous, if the handler throws an error, it can interfere with the amqplib internals and be swallowed or manifest as something completely different. I'm not suggesting you avoid event handlers, just that you give some thought as to how you avoid the same pitfalls amqplib has.

One idea I've had is to emit events from a utility function wraps the actual emission in a try/catch. If an error is thrown it could emit an error event asynchronously. Still not sure though.

@ngbrown
Copy link
Contributor

ngbrown commented Sep 22, 2022

For this functionality in my application, I patched into the package (with patch-package) a lastHeartbeat field that is accessible by the consuming application. It is updated with performance.now() at every received message and on connect.

I haven't yet tested it with 2.1.0, but looking at the diff of changes, it seems that onerror is mainly called if the browser detects the socket connection close, not if it missed the heartbeat timeout. So they might be complementary to each other.

amqp-client does know the requested heartbeat interval so could perform this check too, but exposing lastHeartbeat and doing the rest in the app resulted in the simplest patch.

@icastillejogomez
Copy link
Author

For this functionality in my application, I patched into the package (with patch-package) a lastHeartbeat field that is accessible by the consuming application. It is updated with performance.now() at every received message and on connect.

I haven't yet tested it with 2.1.0, but looking at the diff of changes, it seems that onerror is mainly called if the browser detects the socket connection close, not if it missed the heartbeat timeout. So they might be complementary to each other.

amqp-client does know the requested heartbeat interval so could perform this check too, but exposing lastHeartbeat and doing the rest in the app resulted in the simplest patch.

Could you please share a demo example of that? Thanks for the tip 🙏

@ngbrown
Copy link
Contributor

ngbrown commented Sep 22, 2022

This is the patch against @cloudamqp/[email protected]. I apply it with patch-package.

View `@cloudamqp+amqp-client+2.0.3.patch`

diff --git a/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-base-client.js b/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-base-client.js
index dea7443..166de60 100644
--- a/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-base-client.js
+++ b/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-base-client.js
@@ -26,6 +26,7 @@ export class AMQPBaseClient {
         if (heartbeat < 0)
             throw new Error("heartbeat must be positive");
         this.heartbeat = heartbeat;
+        this.lastHeartbeat = undefined;
     }
     channel(id) {
         if (this.closed)
@@ -67,6 +68,7 @@ export class AMQPBaseClient {
         if (this.closed)
             return this.rejectClosed();
         this.closed = true;
+        this.lastHeartbeat = undefined;
         let j = 0;
         const frame = new AMQPView(new ArrayBuffer(512));
         frame.setUint8(j, 1);
@@ -619,6 +621,7 @@ export class AMQPBaseClient {
                 }
                 case 8: {
                     const heartbeat = new Uint8Array([8, 0, 0, 0, 0, 0, 0, 206]);
+                    this.lastHeartbeat = performance.now();
                     this.send(heartbeat).catch(err => console.warn("Error while sending heartbeat", err));
                     break;
                 }
diff --git a/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-websocket-client.js b/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-websocket-client.js
index 0d90507..55e3c4e 100644
--- a/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-websocket-client.js
+++ b/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-websocket-client.js
@@ -13,6 +13,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
         this.socket = socket;
         socket.binaryType = "arraybuffer";
         socket.onmessage = this.handleMessage.bind(this);
    +        this.lastHeartbeat = performance.now();
         return new Promise((resolve, reject) => {
             this.connectPromise = [resolve, reject];
             socket.onclose = reject;
@@ -22,7 +23,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
     }
     send(bytes) {
         return new Promise((resolve, reject) => {
-            if (this.socket) {
+            if (this.socket && this.socket.readyState === this.socket.OPEN) {
                 try {
                     this.socket.send(bytes);
                     resolve();
@@ -80,6 +81,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
                 this.frameSize = this.framePos = 0;
             }
         }
+        this.lastHeartbeat = performance.now();
     }
     static platform() {
         if (typeof (window) !== 'undefined')
diff --git a/node_modules/@cloudamqp/amqp-client/src/amqp-base-client.ts b/node_modules/@cloudamqp/amqp-client/src/amqp-base-client.ts
index f009883..82059bc 100644
--- a/node_modules/@cloudamqp/amqp-client/src/amqp-base-client.ts
+++ b/node_modules/@cloudamqp/amqp-client/src/amqp-base-client.ts
@@ -23,6 +23,7 @@ export abstract class AMQPBaseClient {
   channelMax = 0
   frameMax: number
   heartbeat: number
+  lastHeartbeat: number | undefined
   /**
    * @param name - name of the connection, set in client properties
    * @param platform - used in client properties
@@ -43,6 +44,7 @@ export abstract class AMQPBaseClient {
     this.frameMax = frameMax
     if (heartbeat < 0) throw new Error("heartbeat must be positive")
     this.heartbeat = heartbeat
+    this.lastHeartbeat = undefined
   }
 
   /**
diff --git a/node_modules/@cloudamqp/amqp-client/src/amqp-websocket-client.ts b/node_modules/@cloudamqp/amqp-client/src/amqp-websocket-client.ts
index 04331d9..552ca3f 100644
--- a/node_modules/@cloudamqp/amqp-client/src/amqp-websocket-client.ts
+++ b/node_modules/@cloudamqp/amqp-client/src/amqp-websocket-client.ts
@@ -28,6 +28,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
     this.socket = socket
     socket.binaryType = "arraybuffer"
     socket.onmessage = this.handleMessage.bind(this)
+    this.lastHeartbeat = performance.now()
     return new Promise((resolve, reject) => {
       this.connectPromise = [resolve, reject]
       socket.onclose = reject
@@ -42,7 +43,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
    */
   override send(bytes: Uint8Array): Promise<void> {
     return new Promise((resolve, reject) => {
-      if (this.socket) {
+      if (this.socket && this.socket.readyState === this.socket.OPEN) {
         try {
           this.socket.send(bytes)
           resolve()
@@ -108,6 +109,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
         this.frameSize = this.framePos = 0
       }
     }
+    this.lastHeartbeat = performance.now()
   }
 
   static platform(): string {
diff --git a/node_modules/@cloudamqp/amqp-client/types/amqp-base-client.d.ts b/node_modules/@cloudamqp/amqp-client/types/amqp-base-client.d.ts
index b67d0f5..9867249 100644
--- a/node_modules/@cloudamqp/amqp-client/types/amqp-base-client.d.ts
+++ b/node_modules/@cloudamqp/amqp-client/types/amqp-base-client.d.ts
@@ -18,6 +18,7 @@ export declare abstract class AMQPBaseClient {
     channelMax: number;
     frameMax: number;
     heartbeat: number;
+    lastHeartbeat: number | undefined;
     /**
      * @param name - name of the connection, set in client properties
      * @param platform - used in client properties

Then in my application I do something like this:

const AMQP_HEARTBEAT_SEC = 10;

amqpRef.current.client = new AMQPWebSocketClient(
  amqpUrl,
  "/",
  amqpToken.username,
  amqpToken.password,
  `app (${VERSION})`,
  4096,
  AMQP_HEARTBEAT_SEC
);

and

window.clearInterval(amqpRefCurrent.heartbeatTimerId ?? undefined);
amqpRefCurrent.heartbeatTimerId = window.setInterval(async () => {
  if (conn.closed) {
    logger.warn("Connection closed at heartbeat.");
    window.clearInterval(amqpRefCurrent.heartbeatTimerId ?? undefined);
    setAmqpState(AmqpState.Disconnected);
  } else if (
    typeof conn.lastHeartbeat === "number" &&
    performance.now() - conn.lastHeartbeat >
      2 * AMQP_HEARTBEAT_SEC * 1000
  ) {
    logger.warn("Heartbeat timed out.");
    try {
      await conn.close("Heartbeat timed out.");
    } catch (e) {
      console.error("conn.close", e);
    }
    window.clearInterval(amqpRefCurrent.heartbeatTimerId ?? undefined);
    setAmqpState(AmqpState.Disconnected);
  }
}, (AMQP_HEARTBEAT_SEC / 2) * 1000);

So with a requested heartbeat of 10 seconds, the application checks every 5 seconds if it has been at least 20 seconds since the last received message. If so, the interval code then sets the state of the connection in the application as disconnected, which starts a re-connect cycle (coded elsewhere).

@ngbrown
Copy link
Contributor

ngbrown commented Sep 22, 2022

Node.js has different methods to perform the window.setInterval() and performance.now() functionality, so I would implement it differently in the this library to make it cross-platform compatible. If keeping the timer in the calling application, maybe just expose a method called timeSinceLastRxMessage(). If this library includes the timer functionality, then just exposing the error through the new onerror callback would be enough.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants