-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmongodb.js
176 lines (165 loc) · 5.69 KB
/
mongodb.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
const { MongoClient } = require('mongodb');
/**
* Default settings for MongoDB operations.
* @type {Object}
*/
const settings = {
housekeepingMillis: 30000, // Interval for housekeeping tasks in milliseconds
responseMillis: 10000, // Maximum time to wait for the server to start responding
deadlineMillis: 25000, // Maximum time to wait for a full response
defaultTtlMillis: 60000, // Default time-to-live for data points in milliseconds
};
/**
* Class to handle MongoDB operations with buffered management for batch operations.
*/
class MongoDb {
/**
* Constructs the MongoDB utility object.
* @param {Object} app - The application interface for logging.
* @param {string} dbUri - The MongoDB connection URI.
* @param {string} database - The database name to connect to.
* @param {string} collection - The collection to use within the database.
*/
constructor(app, dbUri, database, collection) {
this.app = app;
this.dbUri = dbUri;
this.database = database;
this.collection = collection;
this.dbClient = new MongoClient(dbUri, { useNewUrlParser: true, useUnifiedTopology: true });
this.buffer = new Map();
this.flushExpiry = new Date();
this.flushMillis = 0;
this.ttlMillis = settings.defaultTtlMillis;
this.flushing = false;
this.isConnected = false;
}
/**
* Connects to the MongoDB server with retries on failure.
* @param {number} [retryCount=5] - Maximum number of connection attempts.
* @param {number} [retryDelay=1000] - Initial delay between retries, increases exponentially.
* @throws {Error} Throws an error if all connection attempts fail.
*/
async connect(retryCount = 5, retryDelay = 1000) {
for (let attempt = 0; attempt < retryCount; attempt++) {
try {
await this.dbClient.connect();
this.isConnected = true;
this.app.debug(`Successfully connected to MongoDB on attempt ${attempt + 1}`);
return;
} catch (err) {
this.app.error(`Failed to connect to MongoDB on attempt ${attempt + 1}: ${err.message}`);
if (attempt < retryCount - 1) {
let delay = retryDelay * Math.pow(2, attempt);
this.app.debug(`Retrying connection in ${delay}ms`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
this.app.error('All MongoDB connection attempts failed');
throw new Error('Unable to connect to MongoDB after multiple retries');
}
/**
* Initializes and starts the MongoDB handler.
* @param {Object} options - Custom configuration options.
*/
async start(options) {
this.options = { ...settings, ...options };
await this.connect();
this.timer = setInterval(() => this.housekeeping(), this.options.housekeepingMillis);
this.app.debug('MongoDB handler has started');
}
/**
* Stops the MongoDB handler and cleans up resources.
*/
async stop() {
clearInterval(this.timer);
await this.flush();
await this.dbClient.close();
this.isConnected = false;
this.app.debug('MongoDB connection closed');
}
/**
* Processes and queues a single data point for MongoDB insertion.
* @param {Object} point - The data point to process and send.
*/
async send(point) {
if (!this.isConnected) {
this.app.error('Attempting to reconnect to MongoDB');
await this.connect();
}
try {
const enrichedPoint = this.preparePoint(point);
this.buffer.set(enrichedPoint.uid, enrichedPoint);
if (this.buffer.size >= this.options.batchSize || Date.now() > this.flushExpiry) {
await this.flush();
}
} catch (err) {
this.app.error(`Error sending data: ${err}`);
}
}
/**
* Prepares a data point by adding necessary metadata.
* @param {Object} point - The raw data point.
* @returns {Object} The enriched data point.
*/
preparePoint(point) {
point.time = point.time ? new Date(point.time) : new Date();
point.expiry = new Date(Date.now() + this.ttlMillis);
point.uid = this.generateUID(JSON.stringify(point));
return point;
}
/**
* Generates a unique identifier for a data point based on its JSON string representation.
* @param {string} json - The JSON string of the data point.
* @returns {number} The hash-based unique identifier.
*/
generateUID(json) {
let hash = 0;
for (let i = 0; i < json.length; i++) {
const character = json.charCodeAt(i);
hash = (hash << 5) - hash + character;
hash |= 0; // Convert to 32bit integer
}
return hash;
}
/**
* Flushes the current buffer to MongoDB in batches.
*/
async flush() {
if (this.flushing || !this.isConnected) return;
this.flushing = true;
const keysToDelete = [];
try {
const batch = Array.from(this.buffer.values()).slice(0, this.options.batchSize);
const db = this.dbClient.db(this.database);
const coll = db.collection(this.collection);
await coll.insertMany(batch);
batch.forEach(point => keysToDelete.push(point.uid));
this.app.debug(`Flushed ${batch.length} points.`);
} catch (err) {
this.app.error(`Flush failed: ${err}`);
} finally {
keysToDelete.forEach(key => this.buffer.delete(key));
this.flushExpiry = new Date(Date.now() + this.flushMillis);
this.flushing = false;
}
}
/**
* Performs periodic cleanup and flushing tasks.
*/
housekeeping() {
const now = new Date();
for (const [key, point] of this.buffer.entries()) {
if (point.expiry < now) {
this.buffer.delete(key);
}
}
if (now > this.flushExpiry) {
this.flush().catch(err => this.app.error(`Housekeeping flush failed: ${err}`));
}
this.app.debug('Housekeeping completed');
}
}
module.exports = {
MongoDb,
};