From dc42faec4a84acbb0b3c902a77daaebda339ebb8 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 31 Jul 2019 16:39:11 +0100 Subject: [PATCH 1/7] (WIP) bulk atomic appends. --- index.js | 87 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/index.js b/index.js index f5f0495..30f9dc9 100644 --- a/index.js +++ b/index.js @@ -240,6 +240,53 @@ exports.append = function (state, hmac_key, msg) { return exports.appendKVT(state, hmac_key, exports.toKeyValueTimestamp(msg)) } +exports.appendBulk = function(state, hmac_key, messages) { + + var kvtMessages = messages.map(function (msg) { + if(err = exports.checkInvalid(_state, hmac_key, msg.message)) + throw err + + if (!msg.message.author === keys.id) { + throw new Error("Bulk append author must be equal to key author.") + } + + return exports.toKeyValueTimestamp(msg.message, msg.id) + }); + + var msgAuthor = keys.id; + var lowestSequence = kvtMessages[0].value.sequence + var lastMessage = kvtMessages[kvtMessages.length - 1] + var highestSequence = lastMessage.value.sequence + var lastMessageId = lastMessage.value.key + var timestamp = kvtMessages[0].timestamp + + // Dequeue anything on the per-feed queue to the main queue before making the new write + if (state.feeds[msgAuthor]) { + var a = state.feeds[msgAuthor] + a.id = lastMessageId + a.sequence = highestSequence + a.timestamp = timestamp + var q = state.feeds[msgAuthor].queue + state.validated += q.length + state.queued -= q.length + for (var i = 0; i < q.length; ++i) + state.queue.push(q[i]) + q = [] + } else if (lowestSequence === 1) { + state.feeds[msg.author] = { + id: lastMessageId, + sequence: highestSequence, + timestamp: timestamp, + queue: [] + } + } + + state.queue.push(kvtMessages) + state.validated += 1 + + return state +} + exports.validate = function (state, hmac_key, feed) { if(!state.feeds[feed] || !state.feeds[feed].queue.length) { return state @@ -274,6 +321,46 @@ exports.create = function (state, keys, hmac_key, content, timestamp) { return ssbKeys.signObj(keys, hmac_key, msg) } +exports.createAll = function (state, keys, hmac_key, messages, timestamp) { + if(timestamp == null || isNaN(+timestamp)) throw new Error('timestamp must be provided for messages') + if(state && +timestamp <= state.timestamp) throw new Error('timestamp must be increasing') + + var previous = state ? state.id : null + var nextSequenceNumber = state ? state.sequence + 1 : 1 + var result = []; + + messages.forEach(function (content) { + if(!isObject(content) && !isEncrypted(content)) { + throw new Error('invalid message content, must be object or encrypted string') + } + + var msg = { + previous: previous, + sequence: nextSequenceNumber, + author: keys.id, + timestamp: +timestamp, + hash: 'sha256', + content: content + } + + var err = isInvalidShape(msg) + if(err) throw err + + var signedMsg = ssbKeys.signObj(keys, hmac_key, msg); + + var msgId = exports.id(signedMsg) + nextSequenceNumber = nextSequenceNumber + 1 + previous = msgId + + result.push({ + message: signedMsg, + id: msgId + }) + }) + + return result; +} + exports.id = function (msg) { return '%'+ssbKeys.hash(JSON.stringify(msg, null, 2)) } From bd3f83972e6f0ba4e29c7ada0a54e95db12c5f5d Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Tue, 6 Aug 2019 00:17:02 +0100 Subject: [PATCH 2/7] WIP --- index.js | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/index.js b/index.js index 30f9dc9..81deec7 100644 --- a/index.js +++ b/index.js @@ -157,6 +157,30 @@ exports.checkInvalid = function (state, hmac_key, msg) { return false //not invalid } +exports.checkInvalidBulk = function (state, hmac_key, messages) { + + for (var i = 0; i < messages.length; i++) { + var message = messages[i].message + + if(!ref.isFeedId(message.author)) + return new Error('invalid message: must have author') + if(!isSigMatchesCurve(message)) + return new Error('invalid message: signature type must match author type') + + if(!isValidOrder(message, true)) + return fatal(new Error('message must have keys in allowed order')) + + var invalidShape = isInvalidShape(message) + + if (invalidShape) return invalidShape + + if(!ssbKeys.verifyObj({public: message.author.substring(1)}, hmac_key, message)) + return fatal(new Error('invalid signature')) + + } + +} + /* { //an array of messages which have been validated, but not written to the database yet. @@ -242,18 +266,17 @@ exports.append = function (state, hmac_key, msg) { exports.appendBulk = function(state, hmac_key, messages) { - var kvtMessages = messages.map(function (msg) { - if(err = exports.checkInvalid(_state, hmac_key, msg.message)) - throw err + var err = exports.checkInvalidBulk(state, hmac_key, messages) - if (!msg.message.author === keys.id) { - throw new Error("Bulk append author must be equal to key author.") - } + if (err) throw err; + var kvtMessages = messages.map(function (msg) { return exports.toKeyValueTimestamp(msg.message, msg.id) }); - var msgAuthor = keys.id; + // todo: validate all authors are ourself + var msgAuthor = kvtMessages[0].value.author; + var lowestSequence = kvtMessages[0].value.sequence var lastMessage = kvtMessages[kvtMessages.length - 1] var highestSequence = lastMessage.value.sequence @@ -273,7 +296,7 @@ exports.appendBulk = function(state, hmac_key, messages) { state.queue.push(q[i]) q = [] } else if (lowestSequence === 1) { - state.feeds[msg.author] = { + state.feeds[msgAuthor] = { id: lastMessageId, sequence: highestSequence, timestamp: timestamp, From 8bc135c7db05d546f0d167805b100490a0e3700b Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Tue, 6 Aug 2019 22:50:10 +0100 Subject: [PATCH 3/7] wip --- index.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index 81deec7..c8f8283 100644 --- a/index.js +++ b/index.js @@ -352,7 +352,7 @@ exports.createAll = function (state, keys, hmac_key, messages, timestamp) { var nextSequenceNumber = state ? state.sequence + 1 : 1 var result = []; - messages.forEach(function (content) { + messages.forEach(function (content, idx) { if(!isObject(content) && !isEncrypted(content)) { throw new Error('invalid message content, must be object or encrypted string') } @@ -361,7 +361,9 @@ exports.createAll = function (state, keys, hmac_key, messages, timestamp) { previous: previous, sequence: nextSequenceNumber, author: keys.id, - timestamp: +timestamp, + // The createFeedStream index relies on increasing timestamps. + // todo: start a discussion about this... + timestamp: +timestamp + (idx / 100), hash: 'sha256', content: content } From fe168523955155fd00355f6686f585a132fe680d Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 7 Aug 2019 11:03:08 +0100 Subject: [PATCH 4/7] Expect monotonic timestamp to be supplied with each message for bulk appends. --- index.js | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/index.js b/index.js index c8f8283..b3cbe5c 100644 --- a/index.js +++ b/index.js @@ -344,15 +344,18 @@ exports.create = function (state, keys, hmac_key, content, timestamp) { return ssbKeys.signObj(keys, hmac_key, msg) } -exports.createAll = function (state, keys, hmac_key, messages, timestamp) { - if(timestamp == null || isNaN(+timestamp)) throw new Error('timestamp must be provided for messages') - if(state && +timestamp <= state.timestamp) throw new Error('timestamp must be increasing') +exports.createAll = function (state, keys, hmac_key, messages) { var previous = state ? state.id : null var nextSequenceNumber = state ? state.sequence + 1 : 1 var result = []; - messages.forEach(function (content, idx) { + messages.forEach(function (message, idx) { + var content = message.content + var timestamp = message.timestamp + + // todo: Validate timestamps are increasing + if(!isObject(content) && !isEncrypted(content)) { throw new Error('invalid message content, must be object or encrypted string') } @@ -361,9 +364,7 @@ exports.createAll = function (state, keys, hmac_key, messages, timestamp) { previous: previous, sequence: nextSequenceNumber, author: keys.id, - // The createFeedStream index relies on increasing timestamps. - // todo: start a discussion about this... - timestamp: +timestamp + (idx / 100), + timestamp: +timestamp, hash: 'sha256', content: content } From 5882231bce3748852b2be539ac8129ae57466762 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 7 Aug 2019 12:35:30 +0100 Subject: [PATCH 5/7] Fix last timestamp field for bulk appends. --- index.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index b3cbe5c..961a6f3 100644 --- a/index.js +++ b/index.js @@ -279,9 +279,11 @@ exports.appendBulk = function(state, hmac_key, messages) { var lowestSequence = kvtMessages[0].value.sequence var lastMessage = kvtMessages[kvtMessages.length - 1] + var highestSequence = lastMessage.value.sequence var lastMessageId = lastMessage.value.key - var timestamp = kvtMessages[0].timestamp + + var timestamp = lastMessage.timestamp // Dequeue anything on the per-feed queue to the main queue before making the new write if (state.feeds[msgAuthor]) { From 7277624a5fe443f83d721fb42f051dc2ce3db8ae Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 7 Aug 2019 14:26:28 +0100 Subject: [PATCH 6/7] Fix issue with feed state timestamp on bulk append --- index.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index 961a6f3..38becdf 100644 --- a/index.js +++ b/index.js @@ -265,7 +265,6 @@ exports.append = function (state, hmac_key, msg) { } exports.appendBulk = function(state, hmac_key, messages) { - var err = exports.checkInvalidBulk(state, hmac_key, messages) if (err) throw err; @@ -283,7 +282,7 @@ exports.appendBulk = function(state, hmac_key, messages) { var highestSequence = lastMessage.value.sequence var lastMessageId = lastMessage.value.key - var timestamp = lastMessage.timestamp + var timestamp = lastMessage.value.timestamp // Dequeue anything on the per-feed queue to the main queue before making the new write if (state.feeds[msgAuthor]) { @@ -323,6 +322,7 @@ exports.validate = function (state, hmac_key, feed) { //pass in your own timestamp, so it's completely deterministic exports.create = function (state, keys, hmac_key, content, timestamp) { + if(timestamp == null || isNaN(+timestamp)) throw new Error('timestamp must be provided') if(!isObject(content) && !isEncrypted(content)) From 2d99f215e4406347192ab4671846d4bc16631674 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 7 Aug 2019 15:02:01 +0100 Subject: [PATCH 7/7] Add comments --- index.js | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/index.js b/index.js index 38becdf..47d9be4 100644 --- a/index.js +++ b/index.js @@ -159,6 +159,8 @@ exports.checkInvalid = function (state, hmac_key, msg) { exports.checkInvalidBulk = function (state, hmac_key, messages) { + // todo: validate all authors are ourself + for (var i = 0; i < messages.length; i++) { var message = messages[i].message @@ -273,12 +275,12 @@ exports.appendBulk = function(state, hmac_key, messages) { return exports.toKeyValueTimestamp(msg.message, msg.id) }); - // todo: validate all authors are ourself - var msgAuthor = kvtMessages[0].value.author; - - var lowestSequence = kvtMessages[0].value.sequence + var firstMessage = kvtMessages[0]; var lastMessage = kvtMessages[kvtMessages.length - 1] + var msgAuthor = firstMessage.value.author; + var lowestSequence = firstMessage.value.sequence + var highestSequence = lastMessage.value.sequence var lastMessageId = lastMessage.value.key @@ -348,7 +350,13 @@ exports.create = function (state, keys, hmac_key, content, timestamp) { exports.createAll = function (state, keys, hmac_key, messages) { + // The 'previous' for the first message in the bulk append will be the + // message currently at the head of the log or null if this is the first + // append var previous = state ? state.id : null + + // The first sequence number is the head of the log's sequence number + 1, or 1 if this is the first + // append var nextSequenceNumber = state ? state.sequence + 1 : 1 var result = []; @@ -374,12 +382,18 @@ exports.createAll = function (state, keys, hmac_key, messages) { var err = isInvalidShape(msg) if(err) throw err + // We need to sign the message this stage to know what its ID is to use it as the 'previous' + // for the next message var signedMsg = ssbKeys.signObj(keys, hmac_key, msg); var msgId = exports.id(signedMsg) nextSequenceNumber = nextSequenceNumber + 1 + + // The next 'previous' for the next message in the bulk append will be this message + // once the message has been appended previous = msgId + // We return the ID with the signed message as well as the ID so we don't have to compute it again result.push({ message: signedMsg, id: msgId