Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream database file line by line to avoid string length limit in Node.js #5

Merged
merged 36 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a67622d
Stream files line by line when parsing to avoid memory limits
eliot-akira Aug 11, 2021
2a3afcb
Do not stream on browser side, since localforage returns entire datab…
eliot-akira Aug 11, 2021
b7e7f2b
Optimize browser build by replacing "byline" with empty object
eliot-akira Aug 11, 2021
a7ce6a7
Remove unused library "util" for browser side
eliot-akira Aug 11, 2021
f3f6252
Restore original tests for non-streaming method treatRawData
eliot-akira Aug 14, 2021
842f3e9
Lint: Formatting
eliot-akira Oct 3, 2021
84eee58
Use regular expression literal instead of RegExp
eliot-akira Oct 3, 2021
e6388c2
Test: Assert error is null
eliot-akira Oct 3, 2021
24dd632
On error, result is null; on result, error is null
eliot-akira Oct 3, 2021
3729d4c
Use arrow functions to preserve this
eliot-akira Oct 3, 2021
5b37cff
Use template literals for string interpolation
eliot-akira Oct 3, 2021
143a08e
Remove console.log from test
eliot-akira Oct 3, 2021
631d672
Return treated data even after corrupt threshold
eliot-akira Oct 3, 2021
ec5f3ff
Include fork of byline library
eliot-akira Oct 3, 2021
6884f23
Lint byline: Use strict equal; const instead of var; arrow function f…
eliot-akira Oct 3, 2021
1cd3381
Handle error: For this test, it is expected to be *not* null
eliot-akira Oct 3, 2021
af5c5d7
Remove dependency on byline
eliot-akira Oct 3, 2021
a826a9c
Align behavior of treatRawData and treatRawStream in handling last bl…
eliot-akira Oct 5, 2021
575f46c
Include tests for byline
eliot-akira Oct 5, 2021
225fe0f
Adapt tests from byline: Lint; Use assert from chai; Local paths for …
eliot-akira Oct 5, 2021
0af674d
Tests: Simplify getting local paths for test files
eliot-akira Oct 5, 2021
a610cf8
treatRawData and treatRawStream: Get values more efficiently with Obj…
eliot-akira Oct 7, 2021
823db2a
assert.deepStrictEqual instead of deepEqual
eliot-akira Oct 7, 2021
fc1d0e0
Write file line by line when persisting cached database
eliot-akira Oct 9, 2021
af1d64f
Write new line separately, instead of adding to line string
eliot-akira Oct 9, 2021
437a841
Lint
eliot-akira Oct 9, 2021
ba14f11
Add final new line to align behavior with Node.js version
eliot-akira Oct 12, 2021
7404ee1
Stream lines using setImmediate to ensure it doesn't block event loop
eliot-akira Oct 13, 2021
8c4f56a
streaming write with `Readable.from`
arantes555 Oct 15, 2021
148d7c8
fix error callbacks
arantes555 Oct 15, 2021
cbfd31f
changelog and contributors
Oct 5, 2021
3fe1df2
fix package.json browser field & update changelog
Oct 7, 2021
3d42f30
changelog
arantes555 Oct 15, 2021
6960424
2.1.0-2
arantes555 Oct 15, 2021
8b984f1
extract writeFileLines into its own function
arantes555 Oct 15, 2021
84e19ea
2.1.0-3
arantes555 Oct 15, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.1.0-2] - 2021-10-14
### Changed
- properly streaming writing the database file

## [2.1.0-1] - 2021-10-07
### Changed
- fixed package.json browser field for byline.js
- last minute improvements on [PR](https://github.com/seald/nedb/pull/5)

## [2.1.0-0] - 2021-10-05
Thank [@eliot-akira](https://github.com/eliot-akira) for the amazing work on this.
### Changed
- [implement file streaming of the database](https://github.com/seald/nedb/pull/5) like [a PR on the original repo](https://github.com/louischatriot/nedb/pull/463) did;
- internalize [`byline`](https://github.com/jahewson/node-byline) package because it is unmaintained.

## [2.0.4] - 2021-07-12
### Fixed
- switch back to an AVLTree instead of a BinarySearchTree like the original nedb to fix [#1](https://github.com/seald/nedb/issues/1).
Expand Down
1 change: 1 addition & 0 deletions browser-version/lib/byline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = {}
7 changes: 6 additions & 1 deletion browser-version/lib/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,16 @@ const mkdir = (dir, options, callback) => callback()
// Nothing to do, no data corruption possible in the browser
const ensureDatafileIntegrity = (filename, callback) => callback(null)

const crashSafeWriteFileLines = (filename, lines, callback) => {
lines.push('') // Add final new line
writeFile(filename, lines.join('\n'), callback)
arantes555 marked this conversation as resolved.
Show resolved Hide resolved
}

// Interface
module.exports.exists = exists
module.exports.rename = rename
module.exports.writeFile = writeFile
module.exports.crashSafeWriteFile = writeFile // No need for a crash safe function in the browser
module.exports.crashSafeWriteFileLines = crashSafeWriteFileLines
module.exports.appendFile = appendFile
module.exports.readFile = readFile
module.exports.unlink = unlink
Expand Down
153 changes: 153 additions & 0 deletions lib/byline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Forked from https://github.com/jahewson/node-byline

// Copyright (C) 2011-2015 John Hewson
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.

const stream = require('stream')
const util = require('util')
const timers = require('timers')

// convinience API
module.exports = function (readStream, options) {
return module.exports.createStream(readStream, options)
}

// basic API
module.exports.createStream = function (readStream, options) {
if (readStream) {
return createLineStream(readStream, options)
} else {
return new LineStream(options)
}
}

// deprecated API
module.exports.createLineStream = function (readStream) {
console.log('WARNING: byline#createLineStream is deprecated and will be removed soon')
return createLineStream(readStream)
}

function createLineStream (readStream, options) {
if (!readStream) {
throw new Error('expected readStream')
}
if (!readStream.readable) {
throw new Error('readStream must be readable')
}
const ls = new LineStream(options)
readStream.pipe(ls)
return ls
}

//
// using the new node v0.10 "streams2" API
//

module.exports.LineStream = LineStream

function LineStream (options) {
stream.Transform.call(this, options)
options = options || {}

// use objectMode to stop the output from being buffered
// which re-concatanates the lines, just without newlines.
this._readableState.objectMode = true
this._lineBuffer = []
this._keepEmptyLines = options.keepEmptyLines || false
this._lastChunkEndedWithCR = false

// take the source's encoding if we don't have one
const self = this
this.on('pipe', function (src) {
if (!self.encoding) {
// but we can't do this for old-style streams
if (src instanceof stream.Readable) {
self.encoding = src._readableState.encoding
}
}
})
}
util.inherits(LineStream, stream.Transform)

LineStream.prototype._transform = function (chunk, encoding, done) {
// decode binary chunks as UTF-8
encoding = encoding || 'utf8'

if (Buffer.isBuffer(chunk)) {
if (encoding === 'buffer') {
chunk = chunk.toString() // utf8
encoding = 'utf8'
} else {
chunk = chunk.toString(encoding)
}
}
this._chunkEncoding = encoding

// see: http://www.unicode.org/reports/tr18/#Line_Boundaries
const lines = chunk.split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g)

// don't split CRLF which spans chunks
if (this._lastChunkEndedWithCR && chunk[0] === '\n') {
lines.shift()
}

if (this._lineBuffer.length > 0) {
this._lineBuffer[this._lineBuffer.length - 1] += lines[0]
lines.shift()
}

this._lastChunkEndedWithCR = chunk[chunk.length - 1] === '\r'
this._lineBuffer = this._lineBuffer.concat(lines)
this._pushBuffer(encoding, 1, done)
}

LineStream.prototype._pushBuffer = function (encoding, keep, done) {
// always buffer the last (possibly partial) line
while (this._lineBuffer.length > keep) {
const line = this._lineBuffer.shift()
// skip empty lines
if (this._keepEmptyLines || line.length > 0) {
if (!this.push(this._reencode(line, encoding))) {
// when the high-water mark is reached, defer pushes until the next tick
timers.setImmediate(() => {
this._pushBuffer(encoding, keep, done)
})
return
}
}
}
done()
}

LineStream.prototype._flush = function (done) {
this._pushBuffer(this._chunkEncoding, 0, done)
}

// see Readable::push
LineStream.prototype._reencode = function (line, chunkEncoding) {
if (this.encoding && this.encoding !== chunkEncoding) {
return Buffer.from(line, chunkEncoding).toString(this.encoding)
} else if (this.encoding) {
// this should be the most common case, i.e. we're using an encoded source stream
return line
} else {
return Buffer.from(line, chunkEncoding)
}
}
90 changes: 76 additions & 14 deletions lib/persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
const path = require('path')
const async = require('async')
const byline = require('./byline')
const customUtils = require('./customUtils.js')
const Index = require('./indexes.js')
const model = require('./model.js')
Expand Down Expand Up @@ -72,26 +73,26 @@ class Persistence {
* @param {Function} callback Optional callback, signature: err
*/
persistCachedDatabase (callback = () => {}) {
let toPersist = ''
const lines = []

if (this.inMemoryOnly) return callback(null)

this.db.getAllData().forEach(doc => {
toPersist += this.afterSerialization(model.serialize(doc)) + '\n'
lines.push(this.afterSerialization(model.serialize(doc)))
})
Object.keys(this.db.indexes).forEach(fieldName => {
if (fieldName !== '_id') { // The special _id index is managed by datastore.js, the others need to be persisted
toPersist += this.afterSerialization(model.serialize({
lines.push(this.afterSerialization(model.serialize({
$$indexCreated: {
fieldName: fieldName,
unique: this.db.indexes[fieldName].unique,
sparse: this.db.indexes[fieldName].sparse
}
})) + '\n'
})))
}
})

storage.crashSafeWriteFile(this.filename, toPersist, err => {
storage.crashSafeWriteFileLines(this.filename, lines, err => {
if (err) return callback(err)
this.db.emit('compaction.done')
return callback(null)
Expand Down Expand Up @@ -155,8 +156,9 @@ class Persistence {
treatRawData (rawData) {
const data = rawData.split('\n')
const dataById = {}
const tdata = []
const indexes = {}

// Last line of every data file is usually blank so not really corrupt
let corruptItems = -1

for (const datum of data) {
Expand All @@ -178,11 +180,58 @@ class Persistence {
corruptItems / data.length > this.corruptAlertThreshold
) throw new Error(`More than ${Math.floor(100 * this.corruptAlertThreshold)}% of the data file is corrupt, the wrong beforeDeserialization hook may be used. Cautiously refusing to start NeDB to prevent dataloss`)

tdata.push(...Object.values(dataById))
const tdata = Object.values(dataById)

return { data: tdata, indexes: indexes }
}

/**
* From a database's raw stream, return the corresponding
* machine understandable collection
*/
treatRawStream (rawStream, cb) {
const dataById = {}
const indexes = {}

// Last line of every data file is usually blank so not really corrupt
let corruptItems = -1

const lineStream = byline(rawStream, { keepEmptyLines: true })
let length = 0

lineStream.on('data', (line) => {
try {
const doc = model.deserialize(this.beforeDeserialization(line))
if (doc._id) {
if (doc.$$deleted === true) delete dataById[doc._id]
else dataById[doc._id] = doc
} else if (doc.$$indexCreated && doc.$$indexCreated.fieldName != null) indexes[doc.$$indexCreated.fieldName] = doc.$$indexCreated
else if (typeof doc.$$indexRemoved === 'string') delete indexes[doc.$$indexRemoved]
} catch (e) {
corruptItems += 1
}

length++
})

lineStream.on('end', () => {
// A bit lenient on corruption
if (length > 0 && corruptItems / length > this.corruptAlertThreshold) {
const err = new Error(`More than ${Math.floor(100 * this.corruptAlertThreshold)}% of the data file is corrupt, the wrong beforeDeserialization hook may be used. Cautiously refusing to start NeDB to prevent dataloss`)
cb(err, null)
return
}

const data = Object.values(dataById)

cb(null, { data, indexes: indexes })
})

lineStream.on('error', function (err) {
cb(err)
})
}

/**
* Load the database
* 1) Create all indexes
Expand All @@ -207,14 +256,8 @@ class Persistence {
// eslint-disable-next-line node/handle-callback-err
storage.ensureDatafileIntegrity(this.filename, err => {
// TODO: handle error
storage.readFile(this.filename, 'utf8', (err, rawData) => {
const treatedDataCallback = (err, treatedData) => {
if (err) return cb(err)
let treatedData
try {
treatedData = this.treatRawData(rawData)
} catch (e) {
return cb(e)
}

// Recreate all indexes in the datafile
Object.keys(treatedData.indexes).forEach(key => {
Expand All @@ -230,6 +273,25 @@ class Persistence {
}

this.db.persistence.persistCachedDatabase(cb)
}

if (storage.readFileStream) {
// Server side
const fileStream = storage.readFileStream(this.filename, { encoding: 'utf8' })
this.treatRawStream(fileStream, treatedDataCallback)
return
}

// Browser
storage.readFile(this.filename, 'utf8', (err, rawData) => {
if (err) return cb(err)

try {
const treatedData = this.treatRawData(rawData)
treatedDataCallback(null, treatedData)
} catch (e) {
return cb(e)
}
})
})
})
Expand Down
Loading