Skip to content

Commit

Permalink
fix: requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Elin Angelow committed Jun 18, 2024
1 parent 2410fe3 commit 93cbebd
Showing 1 changed file with 72 additions and 54 deletions.
126 changes: 72 additions & 54 deletions lib/wss.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const connectionPool = ({
);
}, 10000);
return {
add: ({id, socket}) => {
add({id, socket}) {
pool.set(id, {id, timer: Date.now(), socket});
return pool.get(id);
},
Expand All @@ -71,7 +71,7 @@ const connectionPool = ({
);
return pool.get(id);
},
remove: (id) => {
remove(id) {
pool.delete(id);
},
get(id) {
Expand All @@ -90,28 +90,76 @@ const connectionPool = ({
};
const requestPool = ({
log,
timeOut = 1000 * 60 * 10
timeout = 30000
}) => {
const pool = new Map();
const int = setInterval(() => {
pool.forEach(({timer, id}) => {
// timeout catch
});
log(
'info',
`Requests count:${pool.size}: `,
`Requests count: ${pool.size}`,
[...pool.keys()],
[...pool.values()].map(({timer}) => new Date(timer))
);
}, 10000);
return {
add() {},
reject() {},
resolve() {},
add(connectionId, message) {
const id = requestCounter();
const wait = new Promise((resolve, reject) => {
const timeOut = setTimeout(
() => {
log(
'warn',
'Wss.request.timeout',
JSON.stringify(message)
);
pool.delete(id);
reject(new Error('ResponseTimeout'));
},
timeout
);
pool.set(id, {
id,
connectionId,
resolve: (...args) => {
clearTimeout(timeOut);
pool.delete(id);
resolve(...args);
},
reject: (...args) => {
clearTimeout(timeOut);
pool.delete(id);
reject(...args);
}
});
});
pool.set(id, {...pool.get(id), wait});
return pool.get(id);
},
resolve(id, packet) {
const r = pool.get(id);
r.resolve(packet);
},
reject(id, error) {
const r = pool.get(id);
r.reject(error);
},
get(id) {
if (!id) {
return pool;
}
return pool.get(id)
},
remove(id) {
const nrq = pool.get(id);
if (nrq?.timeOut) {
clearTimeout(nrq.timeOut);
pool.delete(id);
}
},
stop() {
clearInterval(int);
pool.forEach(({timer, id, socket}) => {
// reject request
pool.forEach(({reject}) => {
reject('StopPrematureReject');
});
}
};
Expand All @@ -125,9 +173,9 @@ const Wss = ({
super();
this.wssServer = undefined;
this.dumper = config?.dumpComm && dumpingComm(config?.dumpComm);
this.requests = new Map();
this.requests_ = requestPool({
log: (...a) => this.log(...a)
this.requests = requestPool({
log: (...a) => this.log(...a),
timeout: config?.request?.timeout
});
this.connections = connectionPool({
log: (...a) => this.log(...a),
Expand Down Expand Up @@ -176,7 +224,7 @@ const Wss = ({
}
// request, it will wait for response, and send data back
// here we are creating response matcher also
const rq = this.wsRequest(connection.id, message);
const rq = this.requests.add(connection.id, message);
try {
const encoded = this.encode({
...message.params,
Expand All @@ -188,11 +236,7 @@ const Wss = ({
const response = await rq.wait;
return response;
} catch (e) {
const nrq = this.requests.get(rq.id);
if (nrq?.timeOut) {
clearTimeout(nrq.timeOut);
this.requests.delete(id);
}
this.requests.remove(rq.id);
throw e;
}
} catch (e) {
Expand Down Expand Up @@ -220,35 +264,6 @@ const Wss = ({
});
return super.init && await super.init();
}
wsRequest(connectionId, message) {
const id = requestCounter();
const wait = new Promise((resolve, reject) => {
const timeOut = setTimeout(
() => {
this.log('info', JSON.stringify(message));
this.requests.delete(id);
reject(new Error('ResponseTimeout'));
},
config?.request?.timeout || 30000
);
this.requests.set(id, {
id,
connectionId,
resolve: (...args) => {
clearTimeout(timeOut);
this.requests.delete(id);
resolve(...args);
},
reject: (...args) => {
clearTimeout(timeOut);
this.requests.delete(id);
reject(...args);
}
});
});
this.requests.set(id, {...this.requests.get(id), wait});
return this.requests.get(id);
}
bindEvents(wsClient, {connectionId}) {
// maybe reject all requests made on this connection
wsClient.on('close', async() => {
Expand All @@ -263,9 +278,12 @@ const Wss = ({
connectionId
}});
} catch (e) {}
for (const [k, v] of this.requests) {
for (const [k, v] of this.requests.get()) {
if(v.connectionId === connectionId) {
v.reject(new Error('ForceClosingConnection'));
this.requests.reject(
v.id,
new Error('ForceClosingConnection')
);
}
}
wsClient.terminate();
Expand All @@ -290,7 +308,7 @@ const Wss = ({
}
// match response if request to device with exact id was send
if (decoded.id && this.requests.get(decoded.id)) {
return this.requests.get(decoded.id).resolve(decoded);
return this.requests.resolve(decoded.id, decoded);
}
// nowhere to send message
if (!config?.sink) {
Expand Down Expand Up @@ -391,7 +409,7 @@ const Wss = ({
}
async stop () {
this.connections.stop();
this.requests_.stop();
this.requests.stop();
this.wssServer?.close();
return super.stop && await super.stop();
}
Expand Down

0 comments on commit 93cbebd

Please sign in to comment.