From e884915c38190fcb2ceed0ae3f7db17984429139 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Mon, 12 Jul 2021 17:45:28 +0300 Subject: [PATCH] refactor benchmarks to avoid DeferredPromise --- benchmark/index.js | 552 +++++++++++++++++++++++---------------------- 1 file changed, 279 insertions(+), 273 deletions(-) diff --git a/benchmark/index.js b/benchmark/index.js index 65c2a688..df996eb1 100644 --- a/benchmark/index.js +++ b/benchmark/index.js @@ -7,14 +7,13 @@ const generateFixture = require('ssb-fixtures') const SecretStack = require('secret-stack') const caps = require('ssb-caps') const ssbKeys = require('ssb-keys') -const multicb = require('multicb') const pull = require('pull-stream') const asyncFilter = require('pull-async-filter') const validate = require('ssb-validate') const fromEvent = require('pull-stream-util/from-event') -const DeferredPromise = require('p-defer') const trammel = require('trammel') -const sleep = require('util').promisify(setTimeout) +const pify = require('util').promisify +const sleep = pify(setTimeout) const { and, where, @@ -29,6 +28,7 @@ const { paginate, descending, toCallback, + toPromise, } = require('../operators') const dir = '/tmp/ssb-db2-benchmark' @@ -93,45 +93,6 @@ test('setup', (t) => { t.end() }) -test('add a bunch of messages', async (t) => { - const sbot = SecretStack({ appKey: caps.shs }) - .use(require('../')) - .call(null, { keys, path: dirAdd }) - - let state = validate.initial() - for (var i = 0; i < 1000; ++i) { - state = validate.appendNew( - state, - null, - keys, - { type: 'tick', count: i }, - Date.now() - ) - } - - const messages = state.queue.map((x) => x.value) - - const ended = DeferredPromise() - const start = Date.now() - - pull( - pull.values(messages), - asyncFilter(sbot.db.add), - pull.collect((err) => { - const duration = Date.now() - start - - if (err) t.fail(err) - - t.pass(`duration: ${duration}ms`) - fs.appendFileSync(reportPath, `| add 1000 elements | ${duration}ms |\n`) - - sbot.close(() => ended.resolve()) - }) - ) - - await ended.promise -}) - test('migrate (+db1)', async (t) => { rimraf.sync(db2Path) t.pass('delete db2 folder to start clean') @@ -150,24 +111,26 @@ test('migrate (+db1)', async (t) => { await sleep(500) // some silence to make it easier to read the CPU profiler - const ended = DeferredPromise() - const start = Date.now() - sbot.db2migrate.start() - - pull( - fromEvent('ssb:db2:migrate:progress', sbot), - pull.filter((progress) => progress === 1), - pull.take(1), - pull.drain(async () => { - const duration = Date.now() - start - t.pass(`duration: ${duration}ms`) - fs.appendFileSync(reportPath, `| Migrate (+db1) | ${duration}ms |\n`) - await sleep(2000) // wait for new log FS writes to finalize - sbot.close(() => ended.resolve()) - }) - ) + const duration = await new Promise((resolve) => { + const start = Date.now() + sbot.db2migrate.start() + pull( + fromEvent('ssb:db2:migrate:progress', sbot), + pull.filter((progress) => progress === 1), + pull.take(1), + pull.drain(() => { + resolve(Date.now() - start) + }) + ) + }) - await ended.promise + t.pass(`duration: ${duration}ms`) + fs.appendFileSync(reportPath, `| Migrate (+db1) | ${duration}ms |\n`) + + await sleep(2000) // wait for new log FS writes to finalize + + await pify(sbot.close)(true) + t.end() }) test('migrate (alone)', async (t) => { @@ -180,24 +143,26 @@ test('migrate (alone)', async (t) => { await sleep(500) // some silence to make it easier to read the CPU profiler - const ended = DeferredPromise() - const start = Date.now() - sbot.db2migrate.start() - - pull( - fromEvent('ssb:db2:migrate:progress', sbot), - pull.filter((progress) => progress === 1), - pull.take(1), - pull.drain(async () => { - const duration = Date.now() - start - t.pass(`duration: ${duration}ms`) - fs.appendFileSync(reportPath, `| Migrate (alone) | ${duration}ms |\n`) - await sleep(2000) // wait for new log FS writes to finalize - sbot.close(() => ended.resolve()) - }) - ) + const duration = await new Promise((resolve) => { + const start = Date.now() + sbot.db2migrate.start() + pull( + fromEvent('ssb:db2:migrate:progress', sbot), + pull.filter((progress) => progress === 1), + pull.take(1), + pull.drain(() => { + resolve(Date.now() - start) + }) + ) + }) - await ended.promise + t.pass(`duration: ${duration}ms`) + fs.appendFileSync(reportPath, `| Migrate (alone) | ${duration}ms |\n`) + + await sleep(2000) // wait for new log FS writes to finalize + + await pify(sbot.close)(true) + t.end() }) test('migrate (+db1 +db2)', async (t) => { @@ -211,24 +176,25 @@ test('migrate (+db1 +db2)', async (t) => { await sleep(500) // some silence to make it easier to read the CPU profiler - const ended = DeferredPromise() - const start = Date.now() - sbot.db2migrate.start() - - pull( - fromEvent('ssb:db2:migrate:progress', sbot), - pull.filter((progress) => progress === 1), - pull.take(1), - pull.drain(async () => { - const duration = Date.now() - start - t.pass(`duration: ${duration}ms`) - fs.appendFileSync(reportPath, `| Migrate (+db1 +db2) | ${duration}ms |\n`) - await new Promise((resolve) => sbot.db.onDrain(resolve)) - sbot.close(() => ended.resolve()) - }) - ) + const duration = await new Promise((resolve) => { + const start = Date.now() + sbot.db2migrate.start() + pull( + fromEvent('ssb:db2:migrate:progress', sbot), + pull.filter((progress) => progress === 1), + pull.take(1), + pull.drain(() => { + resolve(Date.now() - start) + }) + ) + }) - await ended.promise + t.pass(`duration: ${duration}ms`) + fs.appendFileSync(reportPath, `| Migrate (+db1 +db2) | ${duration}ms |\n`) + + await pify(sbot.db.onDrain)() + await pify(sbot.close)(true) + t.end() }) test('migrate (+db2)', async (t) => { @@ -241,24 +207,25 @@ test('migrate (+db2)', async (t) => { await sleep(500) // some silence to make it easier to read the CPU profiler - const ended = DeferredPromise() - const start = Date.now() - sbot.db2migrate.start() - - pull( - fromEvent('ssb:db2:migrate:progress', sbot), - pull.filter((progress) => progress === 1), - pull.take(1), - pull.drain(async () => { - const duration = Date.now() - start - t.pass(`duration: ${duration}ms`) - fs.appendFileSync(reportPath, `| Migrate (+db2) | ${duration}ms |\n`) - await new Promise((resolve) => sbot.db.onDrain(resolve)) - sbot.close(() => ended.resolve()) - }) - ) + const duration = await new Promise((resolve) => { + const start = Date.now() + sbot.db2migrate.start() + pull( + fromEvent('ssb:db2:migrate:progress', sbot), + pull.filter((progress) => progress === 1), + pull.take(1), + pull.drain(() => { + resolve(Date.now() - start) + }) + ) + }) - await ended.promise + t.pass(`duration: ${duration}ms`) + fs.appendFileSync(reportPath, `| Migrate (+db2) | ${duration}ms |\n`) + + await pify(sbot.db.onDrain)() + await pify(sbot.close)(true) + t.end() }) test('migrate continuation (+db2)', async (t) => { @@ -271,50 +238,54 @@ test('migrate continuation (+db2)', async (t) => { await sleep(500) // some silence to make it easier to read the CPU profiler - const ended = DeferredPromise() - sbot.db2migrate.start() - - pull( - fromEvent('ssb:db2:migrate:progress', sbot), - pull.filter((progress) => progress > 0.9), - pull.take(1), - pull.drain(async () => { - sbot.db2migrate.stop() - await new Promise((resolve) => sbot.db.onDrain(resolve)) - await new Promise((resolve) => sbot.close(resolve)) - await sleep(500) // some silence - t.pass('migrated 90%, will reset sbot') - - sbot = SecretStack({ appKey: caps.shs }) - .use(require('../')) - .call(null, { keys, path: dir }) - - global.gc() - await sleep(500) - updateMaxRAM() // will report later, just to make the report order pretty - - const start = Date.now() - sbot.db2migrate.start() - - pull( - fromEvent('ssb:db2:migrate:progress', sbot), - pull.filter((progress) => progress === 1), - pull.take(1), - pull.drain(async () => { - const duration = Date.now() - start - t.pass(`duration: ${duration}ms`) - fs.appendFileSync( - reportPath, - `| Migrate continuation (+db2) | ${duration}ms |\n` - ) - await new Promise((resolve) => sbot.db.onDrain(resolve)) - sbot.close(() => ended.resolve()) - }) - ) - }) + await new Promise((resolve) => { + sbot.db2migrate.start() + pull( + fromEvent('ssb:db2:migrate:progress', sbot), + pull.filter((progress) => progress > 0.9), + pull.take(1), + pull.drain(() => { + sbot.db2migrate.stop() + resolve() + }) + ) + }) + + await pify(sbot.db.onDrain)() + await pify(sbot.close)(true) + await sleep(500) // some silence + t.pass('migrated 90%, will reset sbot') + + sbot = SecretStack({ appKey: caps.shs }) + .use(require('../')) + .call(null, { keys, path: dir }) + + global.gc() + await sleep(500) + updateMaxRAM() // will report later, just to make the report order pretty + + const duration = await new Promise((resolve) => { + const start = Date.now() + sbot.db2migrate.start() + pull( + fromEvent('ssb:db2:migrate:progress', sbot), + pull.filter((progress) => progress === 1), + pull.take(1), + pull.drain(() => { + resolve(Date.now() - start) + }) + ) + }) + + t.pass(`duration: ${duration}ms`) + fs.appendFileSync( + reportPath, + `| Migrate continuation (+db2) | ${duration}ms |\n` ) - await ended.promise + await pify(sbot.db.onDrain)() + await pify(sbot.close)(true) + t.end() }) test('Memory usage without indexes', (t) => { @@ -333,29 +304,31 @@ test('initial indexing', async (t) => { await sleep(500) // some silence to make it easier to read the CPU profiler - const ended = DeferredPromise() - const start = Date.now() + const duration = await new Promise((resolve) => { + const start = Date.now() + sbot.db.query( + where(type('post')), + descending(), + paginate(1), + toCallback((err, { results, total }) => { + const duration = Date.now() - start + t.error(err) + if (total === 0) t.fail('should respond with msgs') + if (results.length !== 1) t.fail('should respond with 1 msg') + if (!results[0].value.content.text.includes('LATESTMSG')) + t.fail('should have LATESTMSG') - sbot.db.query( - where(type('post')), - descending(), - paginate(1), - toCallback((err, { results, total }) => { - const duration = Date.now() - start - t.error(err) - if (total === 0) t.fail('should respond with msgs') - if (results.length !== 1) t.fail('should respond with 1 msg') - if (!results[0].value.content.text.includes('LATESTMSG')) - t.fail('should have LATESTMSG') - t.pass(`duration: ${duration}ms`) - fs.appendFileSync(reportPath, `| Initial indexing | ${duration}ms |\n`) - updateMaxRAM() - global.gc() - sbot.close(() => ended.resolve()) - }) - ) + resolve(duration) + }) + ) + }) - await ended.promise + t.pass(`duration: ${duration}ms`) + fs.appendFileSync(reportPath, `| Initial indexing | ${duration}ms |\n`) + updateMaxRAM() + global.gc() + await pify(sbot.close)(true) + t.end() }) test('initial indexing maxcpu 86', async (t) => { @@ -368,32 +341,33 @@ test('initial indexing maxcpu 86', async (t) => { await sleep(500) // some silence to make it easier to read the CPU profiler - const ended = DeferredPromise() - const start = Date.now() + const duration = await new Promise((resolve) => { + const start = Date.now() + sbot.db.query( + where(type('post')), + descending(), + paginate(1), + toCallback((err, { results, total }) => { + const duration = Date.now() - start + t.error(err) + if (total === 0) t.fail('should respond with msgs') + if (results.length !== 1) t.fail('should respond with 1 msg') + if (!results[0].value.content.text.includes('LATESTMSG')) + t.fail('should have LATESTMSG') + resolve(duration) + }) + ) + }) - sbot.db.query( - where(type('post')), - descending(), - paginate(1), - toCallback((err, { results, total }) => { - const duration = Date.now() - start - t.error(err) - if (total === 0) t.fail('should respond with msgs') - if (results.length !== 1) t.fail('should respond with 1 msg') - if (!results[0].value.content.text.includes('LATESTMSG')) - t.fail('should have LATESTMSG') - t.pass(`duration: ${duration}ms`) - fs.appendFileSync( - reportPath, - `| Initial indexing maxCpu=86 | ${duration}ms |\n` - ) - updateMaxRAM() - global.gc() - sbot.close(() => ended.resolve()) - }) + t.pass(`duration: ${duration}ms`) + fs.appendFileSync( + reportPath, + `| Initial indexing maxCpu=86 | ${duration}ms |\n` ) - - await ended.promise + updateMaxRAM() + global.gc() + await pify(sbot.close)(true) + t.end() }) test('initial indexing compat', async (t) => { @@ -404,24 +378,22 @@ test('initial indexing compat', async (t) => { await sleep(500) // some silence to make it easier to read the CPU profiler - const ended = DeferredPromise() - const start = Date.now() - - sbot.db.onDrain('base', () => { - sbot.db.onDrain('ebt', () => { - const duration = Date.now() - start - t.pass(`duration: ${duration}ms`) - fs.appendFileSync( - reportPath, - `| Initial indexing compat | ${duration}ms |\n` - ) - updateMaxRAM() - global.gc() - sbot.close(() => ended.resolve()) + const duration = await new Promise((resolve) => { + const start = Date.now() + sbot.db.onDrain('base', () => { + sbot.db.onDrain('ebt', () => { + resolve(Date.now() - start) + }) }) }) - await ended.promise + t.pass(`duration: ${duration}ms`) + fs.appendFileSync(reportPath, `| Initial indexing compat | ${duration}ms |\n`) + updateMaxRAM() + global.gc() + + await pify(sbot.close)(true) + t.end() }) test('Two indexes updating concurrently', async (t) => { @@ -432,27 +404,23 @@ test('Two indexes updating concurrently', async (t) => { await sleep(500) // some silence to make it easier to read the CPU profiler - const ended = DeferredPromise() - const done = multicb({ pluck: 1 }) const start = Date.now() + await Promise.all([ + sbot.db.query(where(type('about')), toPromise()), + sbot.db.query(where(and(type('about'), isPublic())), toPromise()), + ]) + const duration = Date.now() - start - sbot.db.query(where(type('about')), toCallback(done())) - sbot.db.query(where(and(type('about'), isPublic())), toCallback(done())) - - done((err) => { - if (err) t.fail(err) - const duration = Date.now() - start - t.pass(`duration: ${duration}ms`) - fs.appendFileSync( - reportPath, - `| Two indexes updating concurrently | ${duration}ms |\n` - ) - updateMaxRAM() - global.gc() - sbot.close(() => ended.resolve()) - }) + t.pass(`duration: ${duration}ms`) + fs.appendFileSync( + reportPath, + `| Two indexes updating concurrently | ${duration}ms |\n` + ) + updateMaxRAM() + global.gc() - await ended.promise + await pify(sbot.close)(true) + t.end() }) test.skip('ssb-threads and ssb-friends', async (t) => { @@ -464,31 +432,31 @@ test.skip('ssb-threads and ssb-friends', async (t) => { await sleep(500) // some silence to make it easier to read the CPU profiler - const ended = DeferredPromise() - const start = Date.now() - - pull( - sbot.threads.publicSummary({ allowlist: ['post', 'contact'] }), - pull.take(1), - pull.collect(async (err, threads) => { - const duration = Date.now() - start - if (err) t.fail(err) - if (threads.length !== 1) t.fail('missing results') - t.pass(`duration: ${duration}ms`) - fs.appendFileSync( - reportPath, - `| ssb-threads and ssb-friends | ${duration}ms |\n` - ) - updateMaxRAM() - global.gc() - await sleep(2000) // wait for jitdb indexes to save to disk - sbot.close(() => { - ended.resolve() + const duration = await new Promise((resolve) => { + const start = Date.now() + pull( + sbot.threads.publicSummary({ allowlist: ['post', 'contact'] }), + pull.take(1), + pull.collect((err, threads) => { + const duration = Date.now() - start + if (err) t.fail(err) + if (threads.length !== 1) t.fail('missing results') + resolve(duration) }) - }) + ) + }) + + t.pass(`duration: ${duration}ms`) + fs.appendFileSync( + reportPath, + `| ssb-threads and ssb-friends | ${duration}ms |\n` ) + updateMaxRAM() + global.gc() - await ended.promise + await sleep(2000) // wait for jitdb indexes to save to disk + await pify(sbot.close)(true) + t.end() }) test.skip('ssb-threads and ssb-friends again', async (t) => { @@ -500,29 +468,29 @@ test.skip('ssb-threads and ssb-friends again', async (t) => { await sleep(500) // some silence to make it easier to read the CPU profiler - const ended = DeferredPromise() - const start = Date.now() - - pull( - sbot.threads.publicSummary({ allowlist: ['post', 'contact'] }), - pull.take(1), - pull.collect(async (err, threads) => { - const duration = Date.now() - start - if (err) t.fail(err) - if (threads.length !== 1) t.fail('missing results') - t.pass(`duration: ${duration}ms`) - fs.appendFileSync( - reportPath, - `| ssb-threads and ssb-friends again | ${duration}ms |\n` - ) - updateMaxRAM() - global.gc() - await sleep(2000) // wait for jitdb indexes to save to disk - sbot.close(() => { - ended.resolve() + const duration = await new Promise((resolve) => { + const start = Date.now() + pull( + sbot.threads.publicSummary({ allowlist: ['post', 'contact'] }), + pull.take(1), + pull.collect((err, threads) => { + const duration = Date.now() - start + if (err) t.fail(err) + if (threads.length !== 1) t.fail('missing results') + resolve(duration) }) - }) + ) + }) + + t.pass(`duration: ${duration}ms`) + fs.appendFileSync( + reportPath, + `| ssb-threads and ssb-friends again | ${duration}ms |\n` ) + updateMaxRAM() + global.gc() + await sleep(2000) // wait for jitdb indexes to save to disk + await pify(sbot.close)(true) await ended.promise }) @@ -632,6 +600,44 @@ for (const title in queries) { }) } +test('add a bunch of messages', async (t) => { + const sbot = SecretStack({ appKey: caps.shs }) + .use(require('../')) + .call(null, { keys, path: dirAdd }) + + let state = validate.initial() + for (var i = 0; i < 1000; ++i) { + state = validate.appendNew( + state, + null, + keys, + { type: 'tick', count: i }, + Date.now() + ) + } + + const messages = state.queue.map((x) => x.value) + + const duration = await new Promise((resolve) => { + const start = Date.now() + pull( + pull.values(messages), + asyncFilter(sbot.db.add), + pull.collect((err) => { + if (err) t.fail(err) + resolve(Date.now() - start) + }) + ) + }) + + t.pass(`duration: ${duration}ms`) + fs.appendFileSync(reportPath, `| add 1000 elements | ${duration}ms |\n`) + + await pify(sbot.db.onDrain)() + await pify(sbot.close)(true) + t.end() +}) + test('maximum RAM used', (t) => { t.pass(`maximum memory usage: ${reportMem()}`) fs.appendFileSync(reportPath, `| Maximum memory usage | ${reportMem()} |\n`)