diff --git a/index.js b/index.js index 18022fa..719132a 100644 --- a/index.js +++ b/index.js @@ -94,10 +94,6 @@ function Node(address, options) { this.leader = ''; // Leader in our cluster. this.term = 0; // Our current term. - if ('function' === this.type(this.initialize)) { - this.once('initialize', this.initialize); - } - this._initialize(options); } @@ -139,36 +135,38 @@ for (var s = 0; s < Node.states.length; s++) { * @param {Object} options The configuration you passed in the constructor. * @api private */ -Node.prototype._initialize = function initialize(options) { +Node.prototype._initialize = function initializing(options) { + var node = this; + // // Reset our vote as we're starting a new term. Votes only last one term. // - this.on('term change', function change() { - this.votes.for = null; - this.votes.granted = 0; + node.on('term change', function change() { + node.votes.for = null; + node.votes.granted = 0; }); // // Reset our times and start the heartbeat again. If we're promoted to leader // the heartbeat will automatically be broadcasted to users as well. // - this.on('state change', function change(state) { - this.timers.clear('heartbeat, election'); - this.heartbeat(Node.LEADER === this.state ? this.beat : this.timeout()); - this.emit(Node.states[state].toLowerCase()); + node.on('state change', function change(state) { + node.timers.clear('heartbeat, election'); + node.heartbeat(Node.LEADER === node.state ? node.beat : node.timeout()); + node.emit(Node.states[state].toLowerCase()); }); // // Receive incoming messages and process them. // - this.on('data', function incoming(packet, write) { + node.on('data', function incoming(packet, write) { write = write || nope; var reason; - if ('object' !== this.type(packet)) { + if ('object' !== node.type(packet)) { reason = 'Invalid packet received'; - this.emit('error', new Error(reason)); - return write(this.packet('error', reason)); + node.emit('error', new Error(reason)); + return write(node.packet('error', reason)); } // @@ -181,16 +179,16 @@ Node.prototype._initialize = function initialize(options) { // If the node receives a request with a stale term number it should be // rejected. // - if (packet.term > this.term) { - this.change({ - leader: Node.LEADER === packet.state ? packet.address : packet.leader || this.leader, + if (packet.term > node.term) { + node.change({ + leader: Node.LEADER === packet.state ? packet.address : packet.leader || node.leader, state: Node.FOLLOWER, term: packet.term }); - } else if (packet.term < this.term) { - reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ this.term; - this.emit('error', new Error(reason)); - return write(this.packet('error', reason)); + } else if (packet.term < node.term) { + reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ node.term; + node.emit('error', new Error(reason)); + return write(node.packet('error', reason)); } // @@ -204,14 +202,14 @@ Node.prototype._initialize = function initialize(options) { // would be changed or prevented above.. // if (Node.LEADER === packet.state) { - if (Node.FOLLOWER !== this.state) this.change({ state: Node.FOLLOWER }); - if (packet.address !== this.leader) this.change({ leader: packet.address }); + if (Node.FOLLOWER !== node.state) node.change({ state: Node.FOLLOWER }); + if (packet.address !== node.leader) node.change({ leader: packet.address }); // // Always when we receive an message from the Leader we need to reset our // heartbeat. // - this.heartbeat(this.timeout()); + node.heartbeat(node.timeout()); } switch (packet.type) { @@ -227,9 +225,9 @@ Node.prototype._initialize = function initialize(options) { // The term of the vote is bigger then ours so we need to update it. If // it's the same and we already voted, we need to deny the vote. // - if (this.votes.for && this.votes.for !== packet.address) { - this.emit('vote', packet, false); - return write(this.packet('voted', { granted: false })); + if (node.votes.for && node.votes.for !== packet.address) { + node.emit('vote', packet, false); + return write(node.packet('voted', { granted: false })); } // @@ -239,12 +237,12 @@ Node.prototype._initialize = function initialize(options) { // @TODO point to index of last commit entry. // @TODO point to term of last commit entry. // - if (this.log && packet.last && ( - this.log.index > packet.last.index - || this.term > packet.last.term + if (node.log && packet.last && ( + node.log.index > packet.last.index + || node.term > packet.last.term )) { - this.emit('vote', packet, false); - return write(this.packet('voted', { granted: false })); + node.emit('vote', packet, false); + return write(node.packet('voted', { granted: false })); } // @@ -252,10 +250,10 @@ Node.prototype._initialize = function initialize(options) { // candidate came in first so it gets our vote as all requirements are // met. // - this.votes.for = packet.address; - this.emit('vote', packet, true); - this.change({ leader: packet.address, term: packet.term }); - write(this.packet('voted', { granted: true })); + node.votes.for = packet.address; + node.emit('vote', packet, true); + node.change({ leader: packet.address, term: packet.term }); + write(node.packet('voted', { granted: true })); // // We've accepted someone as potential new leader, so we should reset @@ -263,7 +261,7 @@ Node.prototype._initialize = function initialize(options) { // Which would again increment the term causing us to be next CANDIDATE // and invalidates the request we just got, so that's silly willy. // - this.heartbeat(this.timeout()); + node.heartbeat(node.timeout()); break; // @@ -273,8 +271,8 @@ Node.prototype._initialize = function initialize(options) { // // Only accepts votes while we're still in a CANDIDATE state. // - if (Node.CANDIDATE !== this.state) { - return write(this.packet('error', 'No longer a candidate, ignoring vote')); + if (Node.CANDIDATE !== node.state) { + return write(node.packet('error', 'No longer a candidate, ignoring vote')); } // @@ -282,20 +280,20 @@ Node.prototype._initialize = function initialize(options) { // granted by the node that received the data. // if (packet.data.granted) { - this.votes.granted++; + node.votes.granted++; } // // Check if we've received the minimal amount of votes required for this // current voting round to be considered valid. // - if (this.quorum(this.votes.granted)) { - this.change({ leader: this.address, state: Node.LEADER }); + if (node.quorum(node.votes.granted)) { + node.change({ leader: node.address, state: Node.LEADER }); // // Send a heartbeat message to all connected clients. // - this.message(Node.FOLLOWER, this.packet('append')); + node.message(Node.FOLLOWER, node.packet('append')); } // @@ -305,7 +303,7 @@ Node.prototype._initialize = function initialize(options) { break; case 'error': - this.emit('error', new Error(packet.data)); + node.emit('error', new Error(packet.data)); break; // @@ -332,10 +330,10 @@ Node.prototype._initialize = function initialize(options) { // return an error. // default: - if (this.listeners('rpc').length) { - this.emit('rpc', packet, write); + if (node.listeners('rpc').length) { + node.emit('rpc', packet, write); } else { - write(this.packet('error', 'Unknown message type: '+ packet.type)); + write(node.packet('error', 'Unknown message type: '+ packet.type)); } } }); @@ -344,27 +342,40 @@ Node.prototype._initialize = function initialize(options) { // We do not need to execute the rest of the functionality below as we're // currently running as "child" node of the cluster not as the "root" node. // - if (Node.CHILD === this.state) return; + if (Node.CHILD === node.state) return node.emit('initialize'); // // Setup the log & appends. Assume that if we're given a function log that it // needs to be initialized as it requires access to our node instance so it // can read our information like our leader, state, term etc. // - if ('function' === this.type(this.Log)) { - this.log = new this.Log(this, options); + if ('function' === node.type(node.Log)) { + node.log = new node.Log(node, options); } - // - // The node is now listening to events so we can start our heartbeat timeout. - // So that if we don't hear anything from a leader we can promote our selfs to - // a candidate state. - // - // We want to call the `initialize` event before starting a heartbeat so - // implementors have some time to start listening for incoming ping packets. - // - this.emit('initialize'); - this.heartbeat(this.timeout()); + /** + * The node is now listening to events so we can start our heartbeat timeout. + * So that if we don't hear anything from a leader we can promote our selfs to + * a candidate state. + * + * Start listening listening for heartbeats when implementors are also ready + * with setting up their code. + * + * @api private + */ + function initialize(err) { + if (err) return node.emit('error', err); + + node.emit('initialize'); + node.heartbeat(node.timeout()); + } + + if ('function' === node.type(node.initialize)) { + if (node.initialize.length > 1) return node.initialize(options, initialize); + node.initialize(options); + } + + initialize(); }; /** diff --git a/test.js b/test.js index bc79edc..052793a 100644 --- a/test.js +++ b/test.js @@ -91,6 +91,48 @@ describe('liferaft', function () { new MyRaft(); }); + + it('async emits the initialize event once the initialize method is done', function (next) { + var ready = false; + + var MyRaft = Raft.extend({ + initialize: function initialize(options, init) { + assume(options.custom).equals('options'); + assume(ready).is.false(); + + setTimeout(function () { + ready = true; + init(); + }, 100); + } + }); + + var raft = new MyRaft('foobar', { custom: 'options' }); + + raft.on('initialize', function () { + assume(ready).is.true(); + + next(); + }); + }); + + it('emits error when the initialize fails', function (next) { + var MyRaft = Raft.extend({ + initialize: function initialize(options, init) { + setTimeout(function () { + init(new Error('Failure')); + }, 100); + } + }); + + var raft = new MyRaft(); + + raft.on('error', function (err) { + assume(err.message).equals('Failure'); + + next(); + }); + }); }); describe('#indefinitely', function () {