Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use lokijs #88

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
52 changes: 19 additions & 33 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@creately/rxdata",
"version": "5.1.4",
"version": "6.2.4",
"description": "",
"main": "dist/index.js",
"typings": "dist/index.d.ts",
Expand Down Expand Up @@ -39,10 +39,11 @@
"dependencies": {
"@creately/lschannel": "^2.0.3",
"@creately/mungo": "^1.2.1",
"localforage": "^1.7.3",
"localforage-setitems": "^1.4.0",
"@types/lokijs": "^1.5.7",
"lodash.clonedeep": "^4.5.0",
"lodash.isequal": "^4.5.0",
"lodash.omit": "^4.5.0",
"lokijs": "^1.5.12",
"mingo": "^2.2.6",
"rxjs": "^6.0.0"
}
Expand Down
1 change: 1 addition & 0 deletions src/__module.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ declare module 'mingo';
declare module 'clone';
declare module 'lodash.isequal';
declare module 'lodash.clonedeep';
declare module 'lodash.omit';
24 changes: 24 additions & 0 deletions src/__tests__/collection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,16 @@ describe('Collection', () => {
it('should emit modified documents if they match the selector', async done => {
const { col } = await prepare();
const watchPromise = watchN(col, 1, { z: 3 });
const watchPromise2 = watchN(col, 1, { z: 2 });
await col.insert(TEST_DOCS);
const out = await watchPromise;
const out2 = await watchPromise2;
expect(out).toEqual([
{ id: (jasmine.any(Number) as any) as number, type: 'insert', docs: TEST_DOCS.filter(doc => doc.z === 3) },
]);
expect(out2).toEqual([
{ id: (jasmine.any(Number) as any) as number, type: 'insert', docs: TEST_DOCS.filter(doc => doc.z === 2) },
]);
done();
});

Expand Down Expand Up @@ -336,6 +341,25 @@ describe('Collection', () => {
done();
});

it('should update the exsiting query / upsert', async done => {
const { col } = await prepare();
const promise1 = watchN(col, 1);
await col.insert(TEST_DOCS);
let out = await promise1;
expect(out).toEqual([{ id: (jasmine.any(Number) as any) as number, type: 'insert', docs: [...TEST_DOCS] }]);

const promise2 = watchN(col, 1);
await col.insert(Object.freeze({ id: 'd111', x: 2, y: 2, z: 2 }));
out = await promise2;
expect((col as any).storage.data.length).toEqual(TEST_DOCS.length);
expect((col as any).storage.data.filter((doc: any) => doc.id === 'd111').length).toEqual(1);
const updated = (col as any).storage.data.find((doc: any) => doc.id === 'd111');
expect(updated.x).toEqual(2);
expect(updated.y).toEqual(2);
expect(updated.z).toEqual(2);
done();
});

it('should emit the inserted document as a change (remote)');
});

Expand Down
1 change: 1 addition & 0 deletions src/__tests__/database.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ describe('Database', () => {
const { db } = prepare();
const c1 = db.collection('test');
await c1.insert([{ id: 'd1' }]);
expect(Collection.loki.getCollection(`rxdata.test-db.test`).data[0].id).toEqual('d1');
expect(await findN(c1, 1)).toEqual([[{ id: 'd1' }]]);
await db.drop();
expect(await findN(c1, 1)).toEqual([[]]);
Expand Down
95 changes: 39 additions & 56 deletions src/collection.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
/// <reference types="localforage" />

import mingo from 'mingo';
import * as LocalForage from 'localforage';
import 'localforage-setitems';
import * as Loki from 'lokijs';
import * as isequal from 'lodash.isequal';
import * as cloneDeep from 'lodash.clonedeep';
import * as omit from 'lodash.omit';
import { Observable, Subject, empty, of, defer, from, Subscription } from 'rxjs';
import { switchMap, concatMap, concat, map, distinctUntilChanged } from 'rxjs/operators';
import { modify } from '@creately/mungo';
Expand Down Expand Up @@ -72,17 +70,14 @@ export type DocumentChange<T> = InsertDocumentChange<T> | RemoveDocumentChange<T
// Collection
// Collection ...?
export class Collection<T extends IDocument> {
public static loki = new Loki('rxdatalokijs');
// allDocs
// allDocs emits all documents in the collection when they get modified.
protected allDocs: Subject<T[]>;

// storage
// storage stores documents in a suitable storage backend.
protected storage: LocalForage;

// cachedDocs
// cachedDocs is an in memory cache of all documents in the database.
protected cachedDocs: T[] | null = null;
protected storage: Loki.Collection;

// channel
// channel sends/receives messages between browser tabs.
Expand All @@ -107,7 +102,7 @@ export class Collection<T extends IDocument> {
// constructor
constructor(public name: string) {
this.allDocs = new Subject();
this.storage = LocalForage.createInstance({ name });
this.storage = Collection.loki.addCollection(name);
this.changes = Channel.create(`rxdata.${name}.channel`);
this.changesSub = this.changes.pipe(concatMap(change => from(this.apply(change)))).subscribe();
}
Expand All @@ -130,12 +125,17 @@ export class Collection<T extends IDocument> {
// made to documents which match the selector.
public watch(selector?: Selector): Observable<DocumentChange<T>> {
if (!selector) {
return this.changes.asObservable();
return this.changes.asObservable().pipe(
map(change => {
const docs = change.docs.map(doc => omit(doc, '$loki', 'meta'));
return Object.assign({}, change, { docs });
})
);
}
const mq = new mingo.Query(selector);
return this.changes.pipe(
switchMap(change => {
const docs = change.docs.filter(doc => mq.test(doc));
const docs = change.docs.filter(doc => mq.test(doc)).map(doc => omit(doc, '$loki', 'meta'));
if (!docs.length) {
return empty();
}
Expand Down Expand Up @@ -178,7 +178,17 @@ export class Collection<T extends IDocument> {
// with the id already exists in the collection, it will be replaced.
public async insert(docOrDocs: T | T[]): Promise<void> {
const docs: T[] = cloneDeep(Array.isArray(docOrDocs) ? docOrDocs : [docOrDocs]);
await this.storage.setItems(docs.map(doc => ({ key: (doc as any).id, value: doc })));
docs.forEach(doc => {
var existingDoc = this.storage.findOne({ id: doc.id });
if (existingDoc) {
// If a matching document exists, update it
Object.assign(existingDoc, doc);
this.storage.update(existingDoc);
} else {
// If no matching document exists, insert the new document
this.storage.insert(doc);
}
});
await this.emitAndApply({ id: this.nextChangeId(), type: 'insert', docs: docs });
}

Expand All @@ -189,8 +199,10 @@ export class Collection<T extends IDocument> {
const modifier = cloneDeep(_modifier);
const filter = this.createFilter(selector);
const docs: T[] = cloneDeep((await this.load()).filter(doc => filter(doc)));
docs.forEach(doc => modify(doc, modifier));
await this.storage.setItems(docs.map(doc => ({ key: (doc as any).id, value: doc })));
docs.forEach(doc => {
modify(doc, modifier);
this.storage.update(doc);
});
await this.emitAndApply({ id: this.nextChangeId(), type: 'update', docs: docs, modifier: modifier });
}

Expand All @@ -200,10 +212,15 @@ export class Collection<T extends IDocument> {
public async remove(selector: Selector): Promise<void> {
const filter = this.createFilter(selector);
const docs = (await this.load()).filter(doc => filter(doc));
await Promise.all(docs.map(doc => this.storage.removeItem((doc as any).id)));
docs.map(doc => this.storage.remove(doc));
await this.emitAndApply({ id: this.nextChangeId(), type: 'remove', docs: docs });
}

public async dropCollection() {
Collection.loki.removeCollection(this.name);
await this.reload();
}

// filter
// filter returns an array of documents which match the selector and
// filter options. The selector, options and all option fields are optional.
Expand All @@ -218,7 +235,7 @@ export class Collection<T extends IDocument> {
if (options.limit) {
cursor = cursor.limit(options.limit);
}
return cursor.all();
return cursor.all().map((doc: any) => omit(doc, '$loki', 'meta'));
}

// createFilter
Expand All @@ -231,64 +248,30 @@ export class Collection<T extends IDocument> {
// load
// load loads all documents from the database to the in-memory cache.
protected load(): Promise<T[]> {
if (this.cachedDocs) {
return Promise.resolve(this.cachedDocs);
}
if (!this.loadPromise) {
this.loadPromise = this.loadAll().then(docs => (this.cachedDocs = docs));
}
return this.loadPromise.then(() => this.cachedDocs as T[]);
return Promise.resolve(this.storage.data);
}

// Reload
// Reload loads all document from storage, update
// the cachedDocs and emit the updated docs.
public async reload() {
return this.loadAll().then(docs => {
this.cachedDocs = docs;
this.allDocs.next(this.cachedDocs);
this.allDocs.next(docs);
});
}

// loadAll
// loadAll loads all documents from storage without filtering.
// Returns a promise which resolves to an array of documents.
protected async loadAll(): Promise<T[]> {
const docs: T[] = [];
await this.storage.iterate((doc: T) => {
docs.push(doc);
// If a non-undefined value is returned,
// the localforage iterator will stop.
return undefined;
});
return docs;
return Promise.resolve(this.storage.data);
}

// refresh
// refresh loads all documents from localForage storage and emits it
// to all listening queries. Called when the collection gets changed.
protected async apply(change: DocumentChange<T>) {
if (!this.cachedDocs) {
this.cachedDocs = await this.load();
}
if (change.type === 'insert' || change.type === 'update') {
for (const doc of change.docs) {
const index = this.cachedDocs.findIndex(d => d.id === doc.id);
if (index === -1) {
this.cachedDocs.push(doc);
} else {
this.cachedDocs[index] = doc;
}
}
} else if (change.type === 'remove') {
for (const doc of change.docs) {
const index = this.cachedDocs.findIndex(d => d.id === doc.id);
if (index !== -1) {
this.cachedDocs.splice(index, 1);
}
}
}
this.allDocs.next(this.cachedDocs);
this.allDocs.next(this.storage.data);
const resolveFn = this.changeResolve[change.id];
if (resolveFn) {
resolveFn();
Expand All @@ -306,7 +289,7 @@ export class Collection<T extends IDocument> {
// emitAndApply emits the change and waits until it is applied.
private async emitAndApply(change: DocumentChange<T>): Promise<void> {
await new Promise(resolve => {
this.changeResolve[change.id] = resolve;
this.changeResolve[change.id] = resolve as any;
this.changes.next(change);
});
}
Expand Down
6 changes: 3 additions & 3 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ export class Database {
// drop clears all data in all collections in the database.
// It also closes all active subscriptions in all collections.
public async drop(): Promise<void> {
this.collections = new Map();
const collections = this.collectionsList;
this.collectionsList = [];
await Promise.all(collections.map(name => new Collection<any>(name).remove({})));
this.collections.forEach(async c => await c.dropCollection());
this.collections = new Map();
await Promise.resolve();
}

// collectionsListKey
Expand Down