From 20c6ccdd444f7c0d9087ac7abb414e2d47ed2ba1 Mon Sep 17 00:00:00 2001 From: Sebastian Scholz Date: Mon, 2 Sep 2024 20:43:23 +0200 Subject: [PATCH] Wait for previous message to be drained before sending next --- ports/tcpport.js | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/ports/tcpport.js b/ports/tcpport.js index 5e4133b..4f7baaf 100644 --- a/ports/tcpport.js +++ b/ports/tcpport.js @@ -48,7 +48,7 @@ class TcpPort extends EventEmitter { /** @type {net.Socket?} - Optional custom socket */ this._externalSocket = null; - if(typeof ip === "object") { + if (typeof ip === "object") { options = ip; ip = undefined; } @@ -62,7 +62,7 @@ class TcpPort extends EventEmitter { } /** @type {net.TcpSocketConnectOpts} - Options for net.connect(). */ - this.connectOptions = { + this.connectOptions = { // Default options ...{ host: ip || options.ip, @@ -72,8 +72,8 @@ class TcpPort extends EventEmitter { ...options }; - if(options.socket) { - if(options.socket instanceof net.Socket) { + if (options.socket) { + if (options.socket instanceof net.Socket) { this._externalSocket = options.socket; this.openFlag = this._externalSocket.readyState === "opening" || this._externalSocket.readyState === "open"; } else { @@ -92,6 +92,7 @@ class TcpPort extends EventEmitter { // init a socket this._client = this._externalSocket || new net.Socket(this.socketOpts); + this._writeCompleted = Promise.resolve(); if (options.timeout) this._client.setTimeout(options.timeout); @@ -174,10 +175,10 @@ class TcpPort extends EventEmitter { * @param {(err?: Error) => void} callback */ open(callback) { - if(this._externalSocket === null) { + if (this._externalSocket === null) { this.callback = callback; this._client.connect(this.connectOptions); - } else if(this.openFlag) { + } else if (this.openFlag) { modbusSerialDebug("TCP port: external socket is opened"); callback(); // go ahead to setup existing socket } else { @@ -214,7 +215,7 @@ class TcpPort extends EventEmitter { * @param {Buffer} data */ write(data) { - if(data.length < MIN_DATA_LENGTH) { + if (data.length < MIN_DATA_LENGTH) { modbusSerialDebug("expected length of data is to small - minimum is " + MIN_DATA_LENGTH); return; } @@ -240,7 +241,15 @@ class TcpPort extends EventEmitter { }); // send buffer to slave - this._client.write(buffer); + this._writeCompleted = new Promise((resolve, _) => { + this._writeCompleted.finally(() => { + if (this._client.write(buffer)) { + resolve(); + } else { + this._client.once("drain", resolve); + } + }); + }); // set next transaction id this._transactionIdWrite = (this._transactionIdWrite + 1) % MAX_TRANSACTIONS;