Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mount extensions on channel, support mux on top of corestore & corestore-swarm-networker #45

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 107 additions & 73 deletions mux.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ var inherits = require('inherits')
var events = require('events')
var debug = require('debug')('multifeed')
var once = require('once')
var AbstractExtension = require('abstract-extension')

// constants
var MULTIFEED = 'MULTIFEED'
Expand All @@ -19,6 +20,14 @@ var EXT_REPLICATE_FEEDS = 'MULTIFEED_REPLICATE_FEEDS'
var ERR_VERSION_MISMATCH = 'ERR_VERSION_MISMATCH'
var ERR_CLIENT_MISMATCH = 'ERR_CLIENT_MISMATCH'

var DEFAULT_TIMEOUT = 10000

class Extension extends AbstractExtension {
send (message) {
this.local.handlers.send(this.id, this.encode(message))
}
}

// `key` - protocol encryption key
function Multiplexer (isInitiator, key, opts) {
if (!(this instanceof Multiplexer)) return new Multiplexer(isInitiator, key, opts)
Expand All @@ -34,76 +43,88 @@ function Multiplexer (isInitiator, key, opts) {
self._remoteOffer = []
self._activeFeedStreams = {}

var onFirstKey = true
var stream = this.stream = new Protocol(isInitiator, Object.assign({}, opts, {
ondiscoverykey: function (key) {
if (onFirstKey) {
onFirstKey = false
if (!self.stream.remoteVerified(key)) {
self._finalize(new Error('Exchange key did not match remote'))
}
}
}
}))

this._handshakeExt = this.stream.registerExtension(EXT_HANDSHAKE, {
onmessage: onHandshake,
onerror: function (err) {
self._finalize(err)
},
encoding: 'json'
})

function onHandshake (header) {
debug(self._id + ' [REPLICATION] recv\'d handshake: ', JSON.stringify(header))
var err
if (opts.stream) {
self.stream = opts.stream
} else {
self.stream = new Protocol(isInitiator, Object.assign({}, opts))
}

if (!compatibleVersions(header.version, PROTOCOL_VERSION)) {
debug(self._id + ' [REPLICATION] aborting; version mismatch (us=' + PROTOCOL_VERSION + ')')
err = new Error('protocol version mismatch! us=' + PROTOCOL_VERSION + ' them=' + header.version)
err.code = ERR_VERSION_MISMATCH
err.usVersion = PROTOCOL_VERSION
err.themVersion = header.version
self._finalize(err)
return
// Prepare the extension handlers.
self._extensions = Extension.createLocal({
send (id, message) {
self._channel.extension(id, message)
}
})
self._remoteExtensions = self._extensions.remote()

if (header.client !== MULTIFEED) {
debug(self._id + ' [REPLICATION] aborting; Client mismatch! expected ', MULTIFEED, 'but got', header.client)
err = new Error('Client mismatch! expected ' + MULTIFEED + ' but got ' + header.client)
err.code = ERR_CLIENT_MISMATCH
err.usClient = MULTIFEED
err.themClient = header.client
self._finalize(err)
return
}

// Wait a tick, otherwise the _ready handler below won't be listening for this event yet.
process.nextTick(function () {
self.emit('ready', header)
})
}

// Open a virtual feed that has the key set to the shared key.
this._feed = stream.open(key, {
// Open a protocol channel on the shared key.
self._channel = self.stream.open(key, {
onopen: function () {
onFirstKey = false
if (!stream.remoteVerified(key)) {
if (!self.stream.remoteVerified(key)) {
debug(self._id + ' [REPLICATION] aborting; shared key mismatch')
self._finalize(new Error('shared key version mismatch!'))
self._finalize(new Error('shared key mismatch!'))
return
}

self._registerExtensions()

// send handshake
self._handshakeExt.send(Object.assign({}, opts, {
self._handshakeExt.send(Object.assign({}, {
client: MULTIFEED,
version: PROTOCOL_VERSION,
userData: opts.userData
}))
},
onextension (id, message) {
self._remoteExtensions.onmessage(id, message, self._channel)
},
onoptions (options) {
self._remoteExtensions.update(options.extensions)
}
})

this._manifestExt = stream.registerExtension(EXT_MANIFEST, {
if (!self._opts.live) {
self.stream.on('prefinalize', function () {
self._channel.close()
debug(self._id + ' [REPLICATION] feed finish/prefinalize (' + self.stream.prefinalize._tick + ')')
})
}

// The timeout will occur if the remote does not open a channel for
// the shared key.
if (opts.timeout !== false) {
this.timeout = setTimeout(() => {
this._finalize(new Error('Multifeed handshake remote timeout'))
}, opts.timeout || DEFAULT_TIMEOUT)
}

this._ready = readify(function (done) {
self.on('ready', function (remote) {
if (this.timeout) clearTimeout(this.timeout)
debug(self._id + ' [REPLICATION] remote connected and ready')
done(remote)
})
})
}

inherits(Multiplexer, events.EventEmitter)

Multiplexer.prototype.ready = function (cb) {
this._ready(cb)
}

Multiplexer.prototype._registerExtensions = function () {
var self = this

self._handshakeExt = self._extensions.add(EXT_HANDSHAKE, {
onmessage: onHandshake,
onerror: function (err) {
self._finalize(err)
},
encoding: 'json'
})

self._manifestExt = self._extensions.add(EXT_MANIFEST, {
onmessage: function (msg) {
debug(self._id, 'RECV\'D Ext MANIFEST:', JSON.stringify(msg))
self._remoteOffer = uniq(self._remoteOffer.concat(msg.keys))
Expand All @@ -115,7 +136,7 @@ function Multiplexer (isInitiator, key, opts) {
encoding: 'json'
})

this._requestFeedsExt = stream.registerExtension(EXT_REQUEST_FEEDS, {
self._requestFeedsExt = self._extensions.add(EXT_REQUEST_FEEDS, {
onmessage: function (msg) {
debug(self._id, 'RECV\'D Ext REQUEST_FEEDS:', msg)
self._onRequestFeeds(msg)
Expand All @@ -126,7 +147,7 @@ function Multiplexer (isInitiator, key, opts) {
encoding: 'json'
})

this._replicateFeedsExt = stream.registerExtension(EXT_REPLICATE_FEEDS, {
self._replicateFeedsExt = self._extensions.add(EXT_REPLICATE_FEEDS, {
onmessage: function (msg) {
debug(self._id, 'RECV\'D Ext REPLICATE_FEEDS:', msg)
self._onRemoteReplicate(msg)
Expand All @@ -137,25 +158,38 @@ function Multiplexer (isInitiator, key, opts) {
encoding: 'json'
})

if (!self._opts.live) {
self.stream.on('prefinalize', function () {
self._feed.close()
debug(self._id + ' [REPLICATION] feed finish/prefinalize (' + self.stream.prefinalize._tick + ')')
})
}
// Send the extension names to our remote peer.
self._channel.options({ extensions: self._extensions.names() })

this._ready = readify(function (done) {
self.on('ready', function (remote) {
debug(self._id + ' [REPLICATION] remote connected and ready')
done(remote)
})
})
}
function onHandshake (header) {
debug(self._id + ' [REPLICATION] recv\'d handshake: ', JSON.stringify(header))
var err

inherits(Multiplexer, events.EventEmitter)
if (!compatibleVersions(header.version, PROTOCOL_VERSION)) {
debug(self._id + ' [REPLICATION] aborting; version mismatch (us=' + PROTOCOL_VERSION + ')')
err = new Error('protocol version mismatch! us=' + PROTOCOL_VERSION + ' them=' + header.version)
err.code = ERR_VERSION_MISMATCH
err.usVersion = PROTOCOL_VERSION
err.themVersion = header.version
self._finalize(err)
return
}

Multiplexer.prototype.ready = function (cb) {
this._ready(cb)
if (header.client !== MULTIFEED) {
debug(self._id + ' [REPLICATION] aborting; Client mismatch! expected ', MULTIFEED, 'but got', header.client)
err = new Error('Client mismatch! expected ' + MULTIFEED + ' but got ' + header.client)
err.code = ERR_CLIENT_MISMATCH
err.usClient = MULTIFEED
err.themClient = header.client
self._finalize(err)
return
}

// Wait a tick, otherwise the _ready handler below won't be listening for this event yet.
process.nextTick(function () {
self.emit('ready', header)
})
}
}

Multiplexer.prototype._finalize = function (err) {
Expand Down Expand Up @@ -291,7 +325,7 @@ Multiplexer.prototype._replicateFeeds = function (keys, cb) {
// Bail on replication entirely if there were no feeds to add, and none are pending or active.
if (feeds.length === 0 && Object.keys(self._activeFeedStreams).length === 0) {
debug('[REPLICATION] terminating mux: no feeds to sync')
self._feed.close()
self._channel.close()
process.nextTick(cb)
}
}
Expand Down
12 changes: 8 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
},
"keywords": [],
"dependencies": {
"abstract-extension": "^3.1.0",
"debug": "^4.1.0",
"hypercore": "^8.3.0",
"hypercore-protocol": "^7.7.1",
"hypercore": "^9",
"hypercore-protocol": "^8",
"inherits": "^2.0.3",
"mutexify": "^1.2.0",
"once": "^1.4.0",
Expand All @@ -26,13 +27,16 @@
"through2": "^3.0.0"
},
"devDependencies": {
"hypercore-crypto": "^1.0.0",
"@hyperswarm/dht": "^3.6.5",
"corestore": "^5.3.2",
"corestore-swarm-networking": "^5.4.3",
"hypercore-crypto": "^2.0.0",
"pump": "^3.0.0",
"pumpify": "^1.5.1",
"random-access-latency": "^1.0.0",
"rimraf": "^2.6.3",
"standard": "~10.0.0",
"tape": "~4.6.2",
"tape": "^5",
"tmp": "0.0.33"
},
"license": "ISC"
Expand Down
Loading