Skip to content

Commit

Permalink
Reduce the chances of multiple Modbus APUs per TCP frame (#564)
Browse files Browse the repository at this point in the history
* Wait for previous message to be drained before sending next

* Enable NoDelay flag on TCP socket

* Fix tests

* Clarify write promise flow and handle `Socket.write` throwing

* Reset the writeComplete Promise when a new connection is established

* Fix lint warnings in netMock.js

* Account for the extra promise step in the tests
  • Loading branch information
Abestanis authored Jan 1, 2025
1 parent cbd4379 commit bd82480
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 18 deletions.
38 changes: 30 additions & 8 deletions ports/tcpport.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TcpPort extends EventEmitter {
/** @type {net.Socket?} - Optional custom socket */
this._externalSocket = null;

if(typeof ip === "object") {
if (typeof ip === "object") {
options = ip;
ip = undefined;
}
Expand All @@ -62,7 +62,7 @@ class TcpPort extends EventEmitter {
}

/** @type {net.TcpSocketConnectOpts} - Options for net.connect(). */
this.connectOptions = {
this.connectOptions = {
// Default options
...{
host: ip || options.ip,
Expand All @@ -72,8 +72,8 @@ class TcpPort extends EventEmitter {
...options
};

if(options.socket) {
if(options.socket instanceof net.Socket) {
if (options.socket) {
if (options.socket instanceof net.Socket) {
this._externalSocket = options.socket;
this.openFlag = this._externalSocket.readyState === "opening" || this._externalSocket.readyState === "open";
} else {
Expand All @@ -92,6 +92,7 @@ class TcpPort extends EventEmitter {

// init a socket
this._client = this._externalSocket || new net.Socket(this.socketOpts);
this._writeCompleted = Promise.resolve();

if (options.timeout) this._client.setTimeout(options.timeout);

Expand Down Expand Up @@ -131,7 +132,9 @@ class TcpPort extends EventEmitter {

this._client.on("connect", function() {
self.openFlag = true;
self._writeCompleted = Promise.resolve();
modbusSerialDebug("TCP port: signal connect");
self._client.setNoDelay();
handleCallback();
});

Expand Down Expand Up @@ -174,10 +177,10 @@ class TcpPort extends EventEmitter {
* @param {(err?: Error) => void} callback
*/
open(callback) {
if(this._externalSocket === null) {
if (this._externalSocket === null) {
this.callback = callback;
this._client.connect(this.connectOptions);
} else if(this.openFlag) {
} else if (this.openFlag) {
modbusSerialDebug("TCP port: external socket is opened");
callback(); // go ahead to setup existing socket
} else {
Expand Down Expand Up @@ -214,7 +217,7 @@ class TcpPort extends EventEmitter {
* @param {Buffer} data
*/
write(data) {
if(data.length < MIN_DATA_LENGTH) {
if (data.length < MIN_DATA_LENGTH) {
modbusSerialDebug("expected length of data is to small - minimum is " + MIN_DATA_LENGTH);
return;
}
Expand All @@ -240,7 +243,26 @@ class TcpPort extends EventEmitter {
});

// send buffer to slave
this._client.write(buffer);
const previousWritePromise = this._writeCompleted;
const newWritePromise = new Promise((resolveNewWrite, rejectNewWrite) => {
// Wait for the completion of any write that happened before.
previousWritePromise.finally(() => {
try {
// The previous write succeeded, write the new buffer.
if (this._client.write(buffer)) {
// Mark this write as complete.
resolveNewWrite();
} else {
// Wait for one `drain` event to mark this write as complete.
this._client.once("drain", resolveNewWrite);
}
} catch (error) {
rejectNewWrite(error);
}
});
});
// Overwrite `_writeCompleted` so that the next call to `TcpPort.write` will have to wait on our write to complete.
this._writeCompleted = newWritePromise;

// set next transaction id
this._transactionIdWrite = (this._transactionIdWrite + 1) % MAX_TRANSACTIONS;
Expand Down
5 changes: 5 additions & 0 deletions test/mocks/netMock.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@ class Socket extends EventEmitter {
}
}

setNoDelay() {
return this;
}

end() {
this.emit("close", false);
}

write(data) {
this._data = data;
return true;
}

receive(buffer) {
Expand Down
25 changes: 15 additions & 10 deletions test/ports/tcpport.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ describe("Modbus TCP port methods", function() {
});
port.open(function() {
port.write(Buffer.from("1103006B00037687", "hex"));

if (port._client._data.equals(Buffer.from("0001000000061103006B0003", "hex"))) {
port._client.receive(Buffer.from("000100000006110366778899", "hex"));
}
port._writeCompleted.then(function() {
if (port._client._data.equals(Buffer.from("0001000000061103006B0003", "hex"))) {
port._client.receive(Buffer.from("000100000006110366778899", "hex"));
}
});
});
});

Expand All @@ -136,18 +137,22 @@ describe("Modbus TCP port methods", function() {
});
port.open(function() {
port.write(Buffer.from("1103006B00037687", "hex"));

if (port._client._data.equals(Buffer.from("0002000000061103006B0003", "hex"))) {
port._client.receive(Buffer.from("000200000003118304", "hex"));
}
port._writeCompleted.then(function() {
if (port._client._data.equals(Buffer.from("0002000000061103006B0003", "hex"))) {
port._client.receive(Buffer.from("000200000003118304", "hex"));
}
});
});
});
});

describe("#write", function() {
it("should write a valid TCP message to the port", function() {
it("should write a valid TCP message to the port", function(done) {
port.write(Buffer.from("1103006B00037687", "hex"));
expect(port._client._data.toString("hex")).to.equal("0003000000061103006b0003");
port._writeCompleted.then(function() {
expect(port._client._data.toString("hex")).to.equal("0003000000061103006b0003");
done();
});
});
});

Expand Down

0 comments on commit bd82480

Please sign in to comment.