diff --git a/.gitignore b/.gitignore index daec33a..2431652 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ node_modules npm-debug.log +.nyc_output diff --git a/LICENSE b/LICENSE index 7f7dc04..a65fcc3 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -node-graylog (C) 2011 Egor Egorov +Copyright (c) 2017 Wizcorp Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to diff --git a/Makefile b/Makefile index bec14ef..d64bd36 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,9 @@ install: test: npm test +bench: + node ./bench.js + # target: lint, Lints every JavaScript file in the project that are staged to be comitted. lint: ./scripts/lint-staged.sh diff --git a/README.md b/README.md index cbbb558..feb411b 100644 --- a/README.md +++ b/README.md @@ -1,100 +1,105 @@ -# node-graylog2 -[![NPM version](http://img.shields.io/npm/v/graylog2.svg?style=flat-square)](https://www.npmjs.org/package/graylog2) [![NPM license](http://img.shields.io/npm/l/graylog2.svg?style=flat-square)](https://www.npmjs.org/package/graylog2) +# graylog2 -Graylog2 client library for Node.js, based on node-graylog. This -has been heavily modified to the point where there is not much left -of the original; however, this library should still be compatible -with the old one, except for configuration and the GLOBAL function setup -(some optional arguments in logging calls are not supported; they will be -logged as additional data). +[![NPM version](http://img.shields.io/npm/v/graylog2.svg?style=flat-square)](https://www.npmjs.org/package/graylog2) +[![NPM license](http://img.shields.io/npm/l/graylog2.svg?style=flat-square)](https://www.npmjs.org/package/graylog2) -** New: ** Chunked [GELF](https://github.com/Graylog2/graylog2-docs/wiki/GELF) -is now supported. +Graylog2 client library for Node.js -## Synopsis +## Installation + +```sh +npm install graylog2 --save +``` + +## What is Graylog? + +Graylog is popular logging software. You can get it at http://www.graylog2.org. Incidentally, since this package +uses Graylog's [GELF](http://docs.graylog.org/en/latest/pages/gelf.html) protocol, you can also use this with other +logging software that has GELF support. + +## Usage ### Available functions -* graylog.emergency -* graylog.alert -* graylog.critical -* graylog.error -* graylog.warning -* graylog.notice -* graylog.info -* graylog.debug +* graylog.emergency(short, full, fields, timestamp) +* graylog.alert(short, full, fields, timestamp) +* graylog.critical(short, full, fields, timestamp) +* graylog.error(short, full, fields, timestamp) +* graylog.warning(short, full, fields, timestamp) +* graylog.notice(short, full, fields, timestamp) +* graylog.info(short, full, fields, timestamp) +* graylog.debug(short, full, fields, timestamp) + +Arguments: + +- short (string): A short message to log. +- full (string, optional): Additional details. +- fields (object, optional): An object of key/value pairs to help with filtering. +- timestamp (integer, optional): A custom timestamp (milliseconds). ### Code snippets -```javascript -var graylog2 = require("graylog2"); -var logger = new graylog2.graylog({ +```js +var Graylog = require('graylog2'); +var logger = new Graylog({ servers: [ - { 'host': 127.0.0.1, port: 12201 }, - { 'host': 127.0.0.2, port: 12201 } + { host: '127.0.0.1', port: 12201 }, + { host: '127.0.0.2', port: 12201 } ], - hostname: 'server.name', // the name of this host - // (optional, default: os.hostname()) - facility: 'Node.js', // the facility for these log messages - // (optional, default: "Node.js") - bufferSize: 1350 // max UDP packet size, should never exceed the - // MTU of your system (optional, default: 1400) + hostname: 'server.name', // the name of this host (optional, default: os.hostname()) + facility: 'Node.js', // the facility for these log messages (optional, default: "Node.js") + bufferSize: 1350 // max UDP packet size, should not exceed the MTU of your network (optional, default: 1400) }); logger.on('error', function (error) { - console.error('Error while trying to write to graylog2:', error); + console.error('Error while trying to write to Graylog2:', error); }); +logger.on('warning', function (error) { + console.error('Non-fatal error while trying to write to Graylog2:', error); +}); ``` Short message: -```javascript -logger.log("What we've got here is...failure to communicate"); +```js +logger.debug("What we've got here is...failure to communicate"); ``` Long message: -```javascript -logger.log("What we've got here is...failure to communicate", "Some men you just - can't reach. So you get what we had here last week, which is the way he wants - it... well, he gets it. I don't like it any more than you men."); +```js +var short = "What we've got here is...failure to communicate"; +var long = "Some men you just can't reach. So you get what we had here last week, " + + "which is the way he wants it... well, he gets it. I don't like it any more than you men."; + +logger.debug(short, long); ``` Short with additional data: -```javascript -logger.log("What we've got here is...failure to communicate", { cool: 'beans' }); +```js +logger.debug("What we've got here is...failure to communicate", { cool: 'beans' }); ``` Long with additional data: -```javascript -logger.log("What we've got here is...failure to communicate", "Some men you just - can't reach. So you get what we had here last week, which is the way he wants - it... well, he gets it. I don't like it any more than you men.", - { - cool: "beans" - } -); -``` +```js +var short = "What we've got here is...failure to communicate"; +var long = "Some men you just can't reach. So you get what we had here last week, " + + "which is the way he wants it... well, he gets it. I don't like it any more than you men."; -Flush all log messages and close down: -```javascript -logger.close(function(){ - console.log('All done - cookie now?'); - process.exit(); -}); +logger.debug(short, long, { cool: 'beans' }); ``` -## Example - -See `test.js`. +Send all pending log messages and close the socket: -## What is graylog2 after all? - -It's a miracle. Get it at http://www.graylog2.org/ +```js +logger.close(function () { + console.log('All done!'); +}); +``` -## Installation +### More examples - npm install graylog2 +See the files in the `test` folder. diff --git a/bench.js b/bench.js new file mode 100644 index 0000000..6f750de --- /dev/null +++ b/bench.js @@ -0,0 +1,75 @@ +var Graylog = require('.').graylog; +var fs = require('fs'); +var client; +var servers = [ + { host: '127.0.0.1', port: 12201 } +]; + +function createClient() { + client = new Graylog({ + servers: servers, + facility: 'node-graylog benchmark' + }); + + client.on('error', function (error) { + throw error; + }); +} + + +var i = 0; +var count = 20000; +var small = 'h'.repeat(1000); +var big = 'h'.repeat(25000); +var bigRandom = require('crypto').randomBytes(20000).toString('base64'); + +console.log(''); + +function log(str, label, i, n, cb) { + if (i === 0) { + createClient(); + console.time(label + ' x' + n); + + client.on('drain', function () { + console.timeEnd(label + ' x' + n); + + console.log('Sent:', client.sent, '- Compressed:', client.compressed); + console.log(''); + + if (client.sent !== n) { + throw new Error('Should have sent: ' + n); + } + + cb(); + }); + } + + if (i < n) { + client.log('test', str); + process.nextTick(log, str, label, i + 1, n, cb); + } +} + +function testSmall(cb) { + log(small, 'small', 0, 10000, cb); +} + +function testBig(cb) { + log(big, 'big', 0, 5000, cb); +} + +function testBigAndRandom(cb) { + log(bigRandom, 'bigAndRandom', 0, 2000, cb); +} + +function end() { + console.log('Complete.'); + console.log('Please check your logging service and verify that insertion was successful.'); + console.log(''); +} + +testSmall(function () { + testBig(function () { + testBigAndRandom(end); + }); +}); diff --git a/graylog.js b/graylog.js deleted file mode 100644 index 250b2bf..0000000 --- a/graylog.js +++ /dev/null @@ -1,320 +0,0 @@ -var zlib = require('zlib'), - crypto = require('crypto'), - dgram = require('dgram'), - util = require('util'), - EventEmitter = require('events').EventEmitter, - assert = require('assert'); - -/** - * Graylog instances emit errors. That means you really really should listen for them, - * or accept uncaught exceptions (node throws if you don't listen for "error"). - */ - -var graylog = function graylog(config) { - EventEmitter.call(this); - - this.config = config; - - this.servers = config.servers; - this.client = null; - this.hostname = config.hostname || require('os').hostname(); - this.facility = config.facility || 'Node.js'; - this.deflate = config.deflate || 'optimal'; - assert( - this.deflate === 'optimal' || this.deflate === 'always' || this.deflate === 'never', - 'deflate must be one of "optimal", "always", or "never". was "' + this.deflate + '"'); - - this._unsentMessages = 0; - this._unsentChunks = 0; - this._callCount = 0; - - this._onClose = null; - this._isDestroyed = false; - - this._bufferSize = config.bufferSize || this.DEFAULT_BUFFERSIZE; -}; - -util.inherits(graylog, EventEmitter); - -graylog.prototype.DEFAULT_BUFFERSIZE = 1400; // a bit less than a typical MTU of 1500 to be on the safe side - -graylog.prototype.level = { - EMERG: 0, // system is unusable - ALERT: 1, // action must be taken immediately - CRIT: 2, // critical conditions - ERR: 3, // error conditions - ERROR: 3, // because people WILL typo - WARNING: 4, // warning conditions - NOTICE: 5, // normal, but significant, condition - INFO: 6, // informational message - DEBUG: 7 // debug level message -}; - -graylog.prototype.getServer = function () { - return this.servers[this._callCount++ % this.servers.length]; -}; - -graylog.prototype.getClient = function () { - if (!this.client && !this._isDestroyed) { - this.client = dgram.createSocket("udp4"); - - var that = this; - this.client.on('error', function (err) { - that.emit('error', err); - }); - } - - return this.client; -}; - -graylog.prototype.destroy = function () { - if (this.client) { - this.client.close(); - this.client.removeAllListeners(); - this.client = null; - this._onClose = null; - this._isDestroyed = true; - } -}; - -graylog.prototype.emergency = function (short_message, full_message, additionalFields, timestamp) { - return this._log(short_message, full_message, additionalFields, timestamp, this.level.EMERG); -}; - -graylog.prototype.alert = function (short_message, full_message, additionalFields, timestamp) { - return this._log(short_message, full_message, additionalFields, timestamp, this.level.ALERT); -}; - -graylog.prototype.critical = function (short_message, full_message, additionalFields, timestamp) { - return this._log(short_message, full_message, additionalFields, timestamp, this.level.CRIT); -}; - -graylog.prototype.error = function (short_message, full_message, additionalFields, timestamp) { - return this._log(short_message, full_message, additionalFields, timestamp, this.level.ERROR); -}; - -graylog.prototype.warning = function (short_message, full_message, additionalFields, timestamp) { - return this._log(short_message, full_message, additionalFields, timestamp, this.level.WARNING); -}; -graylog.prototype.warn = graylog.prototype.warning; - -graylog.prototype.notice = function (short_message, full_message, additionalFields, timestamp) { - return this._log(short_message, full_message, additionalFields, timestamp, this.level.NOTICE); -}; - -graylog.prototype.info = function (short_message, full_message, additionalFields, timestamp) { - return this._log(short_message, full_message, additionalFields, timestamp, this.level.INFO); -}; -graylog.prototype.log = graylog.prototype.info; - -graylog.prototype.debug = function (short_message, full_message, additionalFields, timestamp) { - return this._log(short_message, full_message, additionalFields, timestamp, this.level.DEBUG); -}; - -graylog.prototype._log = function log(short_message, full_message, additionalFields, timestamp, level) { - this._unsentMessages += 1; - - var payload, - fileinfo, - that = this, - field = '', - message = { - version : '1.0', - timestamp : (timestamp || new Date()).getTime() / 1000, - host : this.hostname, - facility : this.facility, - level : level - }; - - if (typeof(short_message) !== 'object' && typeof(full_message) === 'object' && additionalFields === undefined) { - // Only short message and additional fields are available - message.short_message = short_message; - message.full_message = short_message; - - additionalFields = full_message; - } else if (typeof(short_message) !== 'object') { - // We normally set the data - message.short_message = short_message; - message.full_message = full_message || short_message; - } else if (short_message.stack && short_message.message) { - - // Short message is an Error message, we process accordingly - message.short_message = short_message.message; - message.full_message = short_message.stack; - - // extract error file and line - fileinfo = message.stack.split('\n')[0]; - fileinfo = fileinfo.substr(fileinfo.indexOf('('), fileinfo.indeOf(')')); - fileinfo = fileinfo.split(':'); - - message.file = fileinfo[0]; - message.line = fileinfo[1]; - - additionalFields = full_message || additionalFields; - } else { - message.full_message = message.short_message = JSON.stringify(short_message); - } - - // We insert additional fields - for (field in additionalFields) { - message['_' + field] = additionalFields[field]; - } - - // https://github.com/Graylog2/graylog2-docs/wiki/GELF - if (message._id) { - message.__id = message._id; - delete message._id; - } - - // Compression - payload = new Buffer(JSON.stringify(message)); - - function sendPayload(err, buffer) { - if (err) { - that._unsentMessages -= 1; - return that.emitError(err); - } - - // If it all fits, just send it - if (buffer.length <= that._bufferSize) { - that._unsentMessages -= 1; - return that.send(buffer, that.getServer()); - } - - // It didn't fit, so prepare for a chunked stream - - var bufferSize = that._bufferSize; - var dataSize = bufferSize - 12; // the data part of the buffer is the buffer size - header size - var chunkCount = Math.ceil(buffer.length / dataSize); - - if (chunkCount > 128) { - that._unsentMessages -= 1; - return that.emitError('Cannot log messages bigger than ' + (dataSize * 128) + ' bytes'); - } - - // Generate a random id in buffer format - crypto.randomBytes(8, function (err, id) { - if (err) { - that._unsentMessages -= 1; - return that.emitError(err); - } - - // To be tested: what's faster, sending as we go or prebuffering? - var server = that.getServer(); - var chunk = new Buffer(bufferSize); - var chunkSequenceNumber = 0; - - // Prepare the header - - // Set up magic number (bytes 0 and 1) - chunk[0] = 30; - chunk[1] = 15; - - // Set the total number of chunks (byte 11) - chunk[11] = chunkCount; - - // Set message id (bytes 2-9) - id.copy(chunk, 2, 0, 8); - - function send(err) { - if (err || chunkSequenceNumber >= chunkCount) { - // We have reached the end, or had an error (which will already have been emitted) - that._unsentMessages -= 1; - return; - } - - // Set chunk sequence number (byte 10) - chunk[10] = chunkSequenceNumber; - - // Copy data from full buffer into the chunk - var start = chunkSequenceNumber * dataSize; - var stop = Math.min((chunkSequenceNumber + 1) * dataSize, buffer.length); - - buffer.copy(chunk, 12, start, stop); - - chunkSequenceNumber++; - - // Send the chunk - that.send(chunk.slice(0, stop - start + 12), server, send); - } - - send(); - }); - } - - if (this.deflate === 'never' || (this.deflate === 'optimal' && payload.length <= this._bufferSize)) { - sendPayload(null, payload); - } else { - zlib.deflate(payload, sendPayload); - } -}; - -graylog.prototype.send = function (chunk, server, cb) { - var that = this, - client = this.getClient(); - - if (!client) { - var error = new Error('Socket was already destroyed'); - - this.emit('error', error); - return cb(error); - } - - this._unsentChunks += 1; - - client.send(chunk, 0, chunk.length, server.port, server.host, function (err) { - that._unsentChunks -= 1; - - if (err) { - that.emit('error', err); - } - - if (cb) { - cb(err); - } - - if (that._unsentChunks === 0 && that._unsentMessages === 0 && that._onClose) { - that._onClose(); - } - }); -}; - -graylog.prototype.emitError = function (err) { - this.emit('error', err); - - if (this._unsentChunks === 0 && this._unsentMessages === 0 && this._onClose) { - this._onClose(); - } -}; - -graylog.prototype.close = function (cb) { - var that = this; - - if (this._onClose || this._isDestroyed) { - return process.nextTick(function () { - var error = new Error('Close was already called once'); - - if (cb) { - return cb(error); - } - - that.emit('error', error); - }); - } - - this._onClose = function () { - that.destroy(); - - if (cb) { - cb(); - } - }; - - if (this._unsentChunks === 0 && this._unsentMessages === 0) { - process.nextTick(function () { - that._onClose(); - }); - } -}; - -exports.graylog = graylog; diff --git a/index.js b/index.js new file mode 100644 index 0000000..586d787 --- /dev/null +++ b/index.js @@ -0,0 +1 @@ +module.exports = require('./lib/Graylog'); diff --git a/lib/Graylog.js b/lib/Graylog.js new file mode 100644 index 0000000..606b171 --- /dev/null +++ b/lib/Graylog.js @@ -0,0 +1,427 @@ +var zlib = require('zlib'); +var crypto = require('crypto'); +var dgram = require('dgram'); +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var Queue = require('./Queue'); + +var MAX_SAFE_INT = 9007199254740991; // Number.MAX_SAFE_INTEGER + +/** + * Graylog instances emit errors. That means you really really should listen for them, + * or accept uncaught exceptions (node throws if you don't listen for "error"). + */ + +function Graylog(config) { + EventEmitter.call(this); + + // settings (all safe to change at runtime through these setters) + this.setServers(config.servers); + this.setHostname(config.hostname || require('os').hostname()); + this.setFacility(config.facility || 'Node.js'); + this.setBufferSize(config.bufferSize || this.DEFAULT_BUFFERSIZE); + this.setDeflate(config.deflate || 'optimal'); + + // state + this._client = null; + this._serverIterator = 0; + this._headerPool = []; + this._isDeflating = false; + this._isSending = false; + + this._sendQueue = new Queue(); + this._deflateQueue = new Queue(); + + // stats + this.sent = 0; + this.compressed = 0; +} + +util.inherits(Graylog, EventEmitter); + +Graylog.prototype.DEFAULT_BUFFERSIZE = 1400; // a bit less than a typical MTU of 1500 to be on the safe side + +Graylog.prototype.level = { + EMERG: 0, // system is unusable + ALERT: 1, // action must be taken immediately + CRIT: 2, // critical conditions + ERROR: 3, // error conditions + WARNING: 4, // warning conditions + NOTICE: 5, // normal, but significant, condition + INFO: 6, // informational message + DEBUG: 7 // debug level message +}; + + +Graylog.prototype.setServers = function (servers) { + if (!Array.isArray(servers)) { + throw new TypeError('Servers must be an array'); + } + + if (servers.length === 0) { + throw new Error('Servers array cannot be empty'); + } + + for (var i = 0; i < servers.length; i += 1) { + var server = servers[i]; + if (!server || typeof server !== 'object') { + throw new TypeError('A server entry must be an object with "host" and "port" properties'); + } + + if (!server.hasOwnProperty('host') || !server.hasOwnProperty('port')) { + throw new TypeError('A server entry must be an object with "host" and "port" properties'); + } + + if (typeof server.host !== 'string') { + throw new TypeError('A server host must be a string'); + } + + if (typeof server.port !== 'number') { + throw new TypeError('A server port must be a number'); + } + } + + this._servers = servers; +}; + + +Graylog.prototype.setHostname = function (hostname) { + if (typeof hostname !== 'string') { + throw new TypeError('Host name must be a string'); + } + + this._hostname = hostname; +}; + + +Graylog.prototype.setFacility = function (facility) { + if (typeof facility !== 'string') { + throw new TypeError('Facility must be a string'); + } + + this._facility = facility; +}; + + +Graylog.prototype.setBufferSize = function (bufferSize) { + if (typeof bufferSize !== 'number') { + throw new TypeError('Buffer size must be a number'); + } + + this._bufferSize = bufferSize; +}; + + +Graylog.prototype.setDeflate = function (deflate) { + if (deflate !== 'optimal' && deflate !== 'always' && deflate !== 'never') { + throw new Error('deflate must be "optimal", "always", or "never". was "' + deflate + '"'); + } + + this._neverDeflate = deflate === 'never'; + this._alwaysDeflate = deflate === 'always'; +}; + + +Graylog.prototype._getServer = function () { + if (this._servers.length === 1) { + // common case + return this._servers[0]; + } + + if (this._serverIterator >= MAX_SAFE_INT) { + this._serverIterator = 0; + } + + return this._servers[this._serverIterator++ % this._servers.length]; +}; + + +Graylog.prototype._getClient = function () { + if (!this._client) { + this._client = dgram.createSocket('udp4'); + + this._client.unref(); + + var that = this; + + this._client.on('error', function (error) { + // When a callback is passed to client.send(), this event does not fire. + that.emit('error', error); + }); + } + + return this._client; +}; + + +Graylog.prototype.destroy = function () { + this._sendQueue = null; + this._deflateQueue = null; + this._headerPool = []; + this._isDeflating = false; + this._isSending = false; + + if (this._client) { + this._client.close(); + this._client.removeAllListeners(); + this._client = null; + } +}; + +Graylog.prototype.emergency = function (short, full, fields, timestamp) { + this._log(this.level.EMERG, short, full, fields, timestamp); +}; + +Graylog.prototype.alert = function (short, full, fields, timestamp) { + this._log(this.level.ALERT, short, full, fields, timestamp); +}; + +Graylog.prototype.critical = function (short, full, fields, timestamp) { + this._log(this.level.CRIT, short, full, fields, timestamp); +}; + +Graylog.prototype.error = function (short, full, fields, timestamp) { + this._log(this.level.ERROR, short, full, fields, timestamp); +}; + +Graylog.prototype.warning = function (short, full, fields, timestamp) { + this._log(this.level.WARNING, short, full, fields, timestamp); +}; + +Graylog.prototype.notice = function (short, full, fields, timestamp) { + this._log(this.level.NOTICE, short, full, fields, timestamp); +}; + +Graylog.prototype.info = function (short, full, fields, timestamp) { + this._log(this.level.INFO, short, full, fields, timestamp); +}; + +Graylog.prototype.debug = function (short, full, fields, timestamp) { + this._log(this.level.DEBUG, short, full, fields, timestamp); +}; + +Graylog.prototype.log = Graylog.prototype.info; +Graylog.prototype.warn = Graylog.prototype.warning; + + +Graylog.prototype._serialize = function (level, short, full, fields, timestamp) { + var message = { + version: '1.0', + timestamp: parseInt((timestamp || Date.now()) / 1000, 10), + host: this._hostname, + facility: this._facility, + level: level, + short_message: short === null ? undefined : short, + full_message: full === null ? undefined : full + }; + + // We insert additional fields + + if (fields) { + for (var field in fields) { + if (field === 'id') { + // http://docs.graylog.org/en/2.1/pages/gelf.html + message.__id = fields.id; + } else { + message['_' + field] = fields[field]; + } + } + } + + return new Buffer(JSON.stringify(message), 'utf8'); +}; + + +Graylog.prototype._getHeadersFromPool = function (n) { + for (var i = this._headerPool.length; i < n; i += 1) { + var header = this._headerPool[i] = new Buffer(12); + + // Set the magic number (bytes 0 and 1) + header[0] = 30; + header[1] = 15; + + // Set the chunk sequence number (byte 10) + header[10] = i; + } + + return this._headerPool; +}; + + +Graylog.prototype._sendChunked = function (id, message, cb) { + var maxDataSize = this._bufferSize - 12; // the message part of each chunk is the buffer size - header size + var chunkCount = Math.ceil(message.length / maxDataSize); + + if (chunkCount > 128) { + return cb(new Error('Graylog2 message too long: ' + message.length + ' bytes')); + } + + var client = this._getClient(); + var server = this._getServer(); + + var headers = this._getHeadersFromPool(chunkCount); + var msgOffset = 0; + + for (var i = 0; i < chunkCount; i += 1) { + var header = headers[i]; + + // Set the message id (bytes 2-9) + id.copy(header, 2); + + // Set the total number of chunks (byte 11) + header[11] = chunkCount; + + // Slice out the message part + var data = message.slice(msgOffset, msgOffset + maxDataSize); + + this.emit('chunk', header, data, i, chunkCount, server); + + if (i < chunkCount - 1) { + client.send([header, data], server.port, server.host); + + msgOffset += maxDataSize; + } else { + client.send([header, data], server.port, server.host, cb); + } + } +}; + + +var count = 0; + +Graylog.prototype._tickDeflate = function () { + if (this._isDeflating || this._deflateQueue.isEmpty()) { + return; + } + + this._isDeflating = true; + + var that = this; + var msg = this._deflateQueue.getOne(); + + function done() { + that._isDeflating = false; + that._sendQueue.append(msg); + + that._tickSend(); + that._tickDeflate(); + } + + if (!this._alwaysDeflate && msg.buff.length <= this._bufferSize) { + process.nextTick(done); + return; + } + + zlib.deflate(msg.buff, function (error, compressed) { + if (error) { + that.emit('warning', error); + } else { + that.compressed += 1; + + if (that._alwaysDeflate || compressed.length < msg.buff.length) { + msg.buff = compressed; + } + } + + done(); + }); +}; + + +Graylog.prototype._tickSend = function () { + if (this._isSending) { + return; + } + + if (this._sendQueue.isEmpty()) { + if (!this._isDeflating) { + this.emit('drain'); + } + return; + } + + this._isSending = true; + + var that = this; + var msg = this._sendQueue.getOne(); + + function done(error) { + that._isSending = false; + + if (error) { + that.emit('error', error); + } + + that.sent += 1; + that._tickSend(); + } + + var buff = msg.buff; + msg.buff = null; // help GC a bit + + if (buff.length <= this._bufferSize) { + // No need to chunk this message + + var client = this._getClient(); + var server = this._getServer(); + + this.emit('message', buff, server); + + client.send(buff, 0, buff.length, server.port, server.host, done); + return; + } + + // Generate a random ID (buffer) + crypto.randomBytes(8, function (error, id) { + if (error) { + return done(error); + } + + that._sendChunked(id, buff, done); + }); +}; + + +Graylog.prototype._log = function log(level, short, full, fields, timestamp) { + if (!this._sendQueue) { + // destroyed + this.emit('warning', new Error('Trying to send on a closed client')); + return; + } + + var message = { + buff: this._serialize(level, short, full, fields, timestamp), + next: null + }; + + if (!this._neverDeflate) { + this._deflateQueue.append(message); + this._tickDeflate(); + } else { + this._sendQueue.append(message); + this._tickSend(); + } +}; + + +Graylog.prototype.close = function (cb) { + if (!cb) { + cb = function () {}; + } + + if (!this._isSending && !this._isDeflating) { + this.destroy(); + return cb(); + } + + var that = this; + + this.once('drain', function () { + that.destroy(); + cb(); + }); +}; + + +module.exports = Graylog; +module.exports.graylog = Graylog; // deprecated diff --git a/lib/Queue.js b/lib/Queue.js new file mode 100644 index 0000000..642d437 --- /dev/null +++ b/lib/Queue.js @@ -0,0 +1,37 @@ +function Queue() { + this.first = null; + this.last = null; +} + +module.exports = Queue; + + +Queue.prototype.append = function (obj) { + if (this.last) { + this.last.next = obj; + this.last = obj; + } else { + this.first = this.last = obj; + } +}; + + +Queue.prototype.getOne = function () { + var result = this.first; + + if (result) { + this.first = result.next; + result.next = null; + + if (result === this.last) { + this.last = null; + } + } + + return result; +}; + + +Queue.prototype.isEmpty = function () { + return this.last === null; +}; diff --git a/package.json b/package.json index 121bb32..ccb08cf 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,10 @@ "url": "http://github.com/Wizcorp/node-graylog2/issues" }, "contributors": [ + { + "name": "Ron Korving", + "email": "rkorving@wizcorp.jp" + }, { "name": "Egor Egorov", "email": "me@egorfine.com" @@ -17,19 +21,25 @@ } ], "devDependencies": { - "jshint": "0.9.1" + "jshint": "^2.9.4", + "nyc": "^10.0.0", + "tap-spec": "^4.1.1", + "tape": "^4.6.3" }, "engines": { - "node": ">=0.6.11" + "node": ">=5.7.0" }, "homepage": "http://github.com/Wizcorp/node-graylog2", "license": "MIT", - "main": "./graylog.js", + "main": "./index.js", "repository": { "type": "git", "url": "http://github.com/Wizcorp/node-graylog2.git" }, "scripts": { - "test": "node ./test" + "test": "tape ./test/*.js | tap-spec", + "cover": "nyc tape ./test/*.js", + "lint": "jshint ./lib", + "bench": "node ./bench.js" } } diff --git a/test.js b/test.js deleted file mode 100644 index 373810e..0000000 --- a/test.js +++ /dev/null @@ -1,85 +0,0 @@ -var graylog = require('./graylog'), - fs = require('fs'), - assert = require('assert'), - file, - data, - servers = [ - { 'host': '127.0.0.1', 'port': 12201 } - ]; - - -var client = new graylog.graylog({ - servers: servers, - facility: 'Test logger / Node.JS Test Script' - }); - -console.log('---------------------------------------------'); -console.log('Sending three test as info, warning and error'); -console.log('---------------------------------------------'); -client.log('test1', 'i get this1', {cool: 'beans'}); -client.warn('test2', 'i get this2', {cool: 'beans'}); -client.error('test3', 'i get this3', {cool: 'beans'}); -client.error('customTime', 'i get this3', {cool: 'beans'}, new Date('2012-10-10 13:20:31.619Z')); -console.log(''); - -console.log('---------------------------------------------'); -console.log('Sending Sean Connery\' picture (as critical)'); -console.log('---------------------------------------------'); -file = './data/sean.jpg'; -data = fs.readFileSync(file); -client.critical('My Nice Sean Connery Picture', data.toString(), {name: 'James Bond'}); -console.log(''); - -console.log('---------------------------------------------'); -console.log('Sending data of different sizes (as critical)'); -console.log('---------------------------------------------'); -for (var i = 4; i <= 128; i *= 2) { - file = './data/' + i + '.dat'; - data = fs.readFileSync(file); - console.log('sending', file); - client.critical('Test 4 ' + file, data.toString(), {datafile: i + '.dat'}); -} -console.log(''); - -console.log('---------------------------------------------'); -console.log('Sending different parameters'); -console.log('---------------------------------------------'); -client.log('ParametersTest - Only short message'); -client.log('ParametersTest - Short message and json', {cool: 'beans'}); -client.log('ParametersTest - Short message and full message', 'Full message'); -client.log('ParametersTest - Short Message with full message and json', 'Full message', {cool: 'beans'}); -console.log(''); - -console.log('---------------------------------------------'); -console.log('Sending without deflate'); -console.log('---------------------------------------------'); -client.deflate = 'never'; -for (var i = 4; i <= 64; i *= 2) { - file = './data/' + i + '.dat'; - data = fs.readFileSync(file); - console.log('sending', file); - client.critical('Test 4 ' + file, data.toString(), {datafile: i + '.dat'}); -} -client.deflate = 'optimal'; -console.log(''); - -client.close(function () { - console.log('Insertion complete. Please check', 'http://' + servers[0].host + ':3000', 'and verify that insertion was successfull'); - console.log(''); -}); - -console.log('---------------------------------------------'); -console.log('Checking deflate assertion'); -console.log('---------------------------------------------'); -try { - new graylog.graylog({ - servers: servers, - facility: 'Test logger / Node.JS Test Script', - deflate: 'not an option' - }); - throw new Error('should not get here') -} catch (err) { - assert( - err.message === 'deflate must be one of "optimal", "always", or "never". was "not an option"', - 'assertion msg was wrong: ' + err.message); -} diff --git a/test/chunked.js b/test/chunked.js new file mode 100644 index 0000000..a86969c --- /dev/null +++ b/test/chunked.js @@ -0,0 +1,79 @@ +var test = require('tape'); +var zlib = require('zlib'); +var Graylog = require('..'); + +test('chunked', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }], deflate: 'never' }); + var buffers = []; + + client.on('chunk', function (header, data, i, count) { + buffers.push(data); + + t.equal(header[10], i); + t.equal(header[11], count); + + if (i === count - 1) { + var message = JSON.parse(Buffer.concat(buffers)); + + t.equal(message.short_message, 'short message'); + t.equal(message.full_message, 'full message'.repeat(1000)); + t.end(); + } + }); + + client.info('short message', 'full message'.repeat(1000)); +}); + + +test('compressed chunked', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }], deflate: 'optimal', bufferSize: 100 }); + var buffers = []; + + client.on('chunk', function (header, data, i, count) { + buffers.push(data); + + t.equal(header[10], i); + t.equal(header[11], count); + + if (i === count - 1) { + var message = JSON.parse(zlib.inflateSync(Buffer.concat(buffers))); + + t.equal(message.short_message, 'short message'); + t.equal(message.full_message, 'full message'.repeat(1000)); + t.end(); + } + + }); + + client.info('short message', 'full message'.repeat(1000)); +}); + + +test('too big', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }], deflate: 'never', bufferSize: 20 }); + + client.on('error', function () { + t.end(); + }); + + client.info('short message', 'full message'.repeat(1000)); +}); + + +test('crypto error', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }], deflate: 'never' }); + var crypto = require('crypto'); + + var oldRandomBytes = crypto.randomBytes; + + crypto.randomBytes = function (n, cb) { + cb(new Error('Oops')); + }; + + client.on('error', function () { + crypto.randomBytes = oldRandomBytes; + t.end(); + }); + + client.info('short message', 'full message'.repeat(1000)); +}); diff --git a/test/close.js b/test/close.js new file mode 100644 index 0000000..99eada6 --- /dev/null +++ b/test/close.js @@ -0,0 +1,73 @@ +var test = require('tape'); +var Graylog = require('..'); + +test('drain', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }], deflate: 'never' }); + var total = 100; + var sent = 0; + var received = 0; + + client.on('chunk', function (header, data, i, count) { + if (i === count - 1) { + received += 1; + } + }); + + client.on('drain', function () { + t.equal(sent, total); + t.equal(received, total); + t.end(); + }); + + for (var i = 0; i < total; i++) { + client.info('short message', 'full message'.repeat(1000)); + sent += 1; + } +}); + + +test('graceful close', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }], deflate: 'never' }); + var total = 100; + var sent = 0; + var received = 0; + + client.on('chunk', function (header, data, i, count) { + if (i === count - 1) { + received += 1; + } + }); + + for (var i = 0; i < total; i++) { + client.info('short message', 'full message'.repeat(1000)); + sent += 1; + } + + client.close(function () { + t.equal(sent, total); + t.equal(received, total); + t.end(); + }); +}); + + +test('instant close', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }] }); + + client.close(function () { + t.end(); + }); +}); + + +test('send after close', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }] }); + + client.on('warning', function () { + t.end(); + }); + + client.close(function () { + client.info('short message'); + }); +}); diff --git a/test/compressed.js b/test/compressed.js new file mode 100644 index 0000000..732b2a7 --- /dev/null +++ b/test/compressed.js @@ -0,0 +1,49 @@ +var test = require('tape'); +var zlib = require('zlib'); +var Graylog = require('..'); + +test('compressed', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }], deflate: 'optimal' }); + + client.on('message', function (message) { + message = JSON.parse(zlib.inflateSync(message)); + + t.equal(message.short_message, 'short message'); + t.equal(message.full_message, 'full message'.repeat(1000)); + t.end(); + }); + + client.info('short message', 'full message'.repeat(1000)); +}); + + +test('forced compressed', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }], deflate: 'always' }); + + client.on('message', function (message) { + message = JSON.parse(zlib.inflateSync(message)); + + t.equal(message.short_message, 'short message'); + t.equal(message.full_message, 'full message'); + t.end(); + }); + + client.info('short message', 'full message'); +}); + + +test('compression error handling', function (t) { + var oldDeflate = zlib.deflate; + zlib.deflate = function (buff, cb) { + cb(new Error('Oops')); + }; + + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }], deflate: 'always' }); + + client.on('warning', function (error) { + zlib.deflate = oldDeflate; + t.end(); + }); + + client.info('short message', 'full message'); +}); diff --git a/test/levels.js b/test/levels.js new file mode 100644 index 0000000..a16207c --- /dev/null +++ b/test/levels.js @@ -0,0 +1,41 @@ +var test = require('tape'); +var Graylog = require('..'); + +test('levels', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }] }); + + var levels = { + debug: 7, + info: 6, + notice: 5, + warning: 4, + error: 3, + critical: 2, + alert: 1, + emergency: 0 + }; + + var levelNames = Object.keys(levels); + var cursor = 0; // traverses over levelNames + + client.on('message', function (message) { + message = JSON.parse(message); + + var expectedLevelName = levelNames[cursor]; + var expectedLevel = levels[expectedLevelName]; + + t.equal(message.short_message, 'short message for level ' + expectedLevelName); + t.equal(message.level, expectedLevel); + + cursor += 1; + + if (cursor === levelNames.length) { + t.end(); + } + }); + + for (var i = 0; i < levelNames.length; i += 1) { + var level = levelNames[i]; + client[level]('short message for level ' + level); + } +}); diff --git a/test/options.js b/test/options.js new file mode 100644 index 0000000..d35b8dd --- /dev/null +++ b/test/options.js @@ -0,0 +1,86 @@ +var test = require('tape'); +var Graylog = require('..'); + +test('options', function (t) { + t.throws(function () { + new Graylog(); + }); + + t.throws(function () { + new Graylog({ servers: 'foo' }); + }); + + t.throws(function () { + new Graylog({ servers: [] }); + }); + + t.throws(function () { + new Graylog({ servers: ['foo'] }); + }); + + t.throws(function () { + new Graylog({ servers: [{}] }); + }); + + t.throws(function () { + new Graylog({ servers: [{ host: 1, port: 1 }] }); + }); + + t.throws(function () { + new Graylog({ servers: [{ host: 0, port: 'bar' }] }); + }); + + t.throws(function () { + new Graylog({ servers: [{ host: 'foo', port: 'foo' }] }); + }); + + t.doesNotThrow(function () { + new Graylog({ servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.throws(function () { + new Graylog({ hostname: 1, servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.doesNotThrow(function () { + new Graylog({ hostname: 'bar', servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.throws(function () { + new Graylog({ facility: 1, servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.doesNotThrow(function () { + new Graylog({ facility: 'bar', servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.throws(function () { + new Graylog({ bufferSize: 'foo', servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.doesNotThrow(function () { + new Graylog({ bufferSize: 100, servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.throws(function () { + new Graylog({ deflate: 1, servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.throws(function () { + new Graylog({ deflate: 'foo', servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.doesNotThrow(function () { + new Graylog({ deflate: 'optimal', servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.doesNotThrow(function () { + new Graylog({ deflate: 'always', servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.doesNotThrow(function () { + new Graylog({ deflate: 'never', servers: [{ host: 'foo', port: 12345 }] }); + }); + + t.end(); +}); diff --git a/test/small.js b/test/small.js new file mode 100644 index 0000000..f7734c6 --- /dev/null +++ b/test/small.js @@ -0,0 +1,67 @@ +var test = require('tape'); +var Graylog = require('..'); + +test('small', function (t) { + var client = new Graylog({ servers: [{ host: '127.0.0.1', port: 12345 }], hostname: 'foo', facility: 'bar' }); + + client.on('message', function (message) { + message = JSON.parse(message); + + t.equal(message.version, '1.0'); + t.equal(typeof message.timestamp, 'number'); + t.equal(message.host, 'foo'); + t.equal(message.facility, 'bar'); + t.equal(message.level, client.level.INFO) + t.equal(message.short_message, 'short message'); + t.equal(message.full_message, 'full message'); + t.equal(message.__id, 1); + t.equal(message._foo, 'bar'); + t.end(); + }); + + client.info('short message', 'full message', { id: 1, foo: 'bar' }); +}); + + +test('multiserver', function (t) { + var servers = [ + { host: '127.0.0.1', port: 12345 }, + { host: '127.0.0.1', port: 12345 } + ]; + + var client = new Graylog({ servers: servers }); + var cursor = 0; + var count = 0; + var total = 5; + + client.on('message', function (message, server) { + t.equal(server, servers[cursor]); + + cursor += 1; + count += 1; + + if (cursor >= servers.length) { + cursor = 0; + } + + if (count === total) { + t.end(); + } + }); + + for (var i = 0; i < total; i += 1) { + client.info('short message'); + } +}); + + +test('bad host', function (t) { + var client = new Graylog({ servers: [{ host: 'foobar', port: 12345 }] }); + + client.on('error', function (error) { + t.equal(error.code, 'ENOTFOUND'); + t.end(); + }); + + client.info('short message'); +});