Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft: synchronousTransformIterator #77

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 90 additions & 10 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items
*/
take(limit: number): AsyncIterator<T> {
return this.transform({ limit });
return new UntilIterator(this, limit);
}

/**
Expand Down Expand Up @@ -795,14 +795,12 @@ export function identity<S>(item: S): typeof item {
return item;
}


/**
An iterator that synchronously transforms every item from its source
by applying a mapping function.
@extends module:asynciterator.AsyncIterator
*/
export class MappingIterator<S, D = S> extends AsyncIterator<D> {
protected readonly _map: MapFunction<S, D>;
export abstract class SynchronousTransformIterator<S, D = S> extends AsyncIterator<D> {
protected readonly _source: InternalSource<S>;
protected readonly _destroySource: boolean;

Expand All @@ -811,11 +809,9 @@ export class MappingIterator<S, D = S> extends AsyncIterator<D> {
*/
constructor(
source: AsyncIterator<S>,
map: MapFunction<S, D> = identity as MapFunction<S, D>,
options: SourcedIteratorOptions = {}
) {
super();
this._map = map;
this._source = ensureSourceAvailable(source);
this._destroySource = options.destroySource !== false;

Expand All @@ -833,15 +829,16 @@ export class MappingIterator<S, D = S> extends AsyncIterator<D> {
}
}

protected abstract safeRead(): D | null;

/* Tries to read the next item from the iterator. */
read(): D | null {
if (!this.done) {
// Try to read an item that maps to a non-null value
if (this._source.readable) {
let item: S | null, mapped: D | null;
while ((item = this._source.read()) !== null) {
if ((mapped = this._map(item)) !== null)
return mapped;
const item = this.safeRead();
if (item !== null) {
return item;
}
}
this.readable = false;
Expand All @@ -865,6 +862,89 @@ export class MappingIterator<S, D = S> extends AsyncIterator<D> {
}
}

export class MappingIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
private _map: MapFunction<S, D>;

constructor(
source: AsyncIterator<S>,
map: MapFunction<S, D> = identity as MapFunction<S, D>,
options: SourcedIteratorOptions = {}
) {
super(source, options);
this._map = map;
}

safeRead() {
let item: S | null;
while ((item = this._source.read()) !== null) {
const mapped = this._map(item);
if (mapped !== null)
return mapped;
}
return null;
}

map<K>(map: MapFunction<D, K>, self?: any): AsyncIterator<K> {
return new CompositeMappingIterator(this._source, [this._map, bind(map, self)], this);
}
}

export class CompositeMappingIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
constructor(
private root: AsyncIterator<S>,
private mappings: MapFunction<any, any>[] = [],
source: AsyncIterator<any>,
options: SourcedIteratorOptions = {},
) {
super(source, options);
}

safeRead() {
// TODO: See if this is actually necessary
// A source should only be read from if readable is true
if (!this.root.readable) {
this.readable = false;
// TODO: See if this should be here
if (this.root.done)
this.close();
return null;
}

let mapped : any = null;
while (mapped === null && (mapped = this.root.read()) !== null) {
for (let i = 0; i < this.mappings.length; i++) {
mapped = this.mappings[i](mapped);
if (mapped === null)
break;
}
}
return mapped;
}

map<K>(map: MapFunction<D, K>, self?: any): AsyncIterator<K> {
return new CompositeMappingIterator(this.root, [...this.mappings, bind(map, self)], this);
}
}

export class UntilIterator<S> extends SynchronousTransformIterator<S> {
constructor(
source: AsyncIterator<S>,
private limit: number,
options: SourcedIteratorOptions = {}
) {
super(source, options);
}

safeRead() {
if (this.limit <= 0) {
this.close();
return null;
}
this.limit -= 1;
return this._source.read();
}
}

// Validates an AsyncIterator for use as a source within another AsyncIterator
function ensureSourceAvailable<S>(source?: AsyncIterator<S>, allowDestination = false) {
if (!source || !isFunction(source.read) || !isFunction(source.on))
Expand Down
4 changes: 0 additions & 4 deletions test/SimpleTransformIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1198,10 +1198,6 @@ describe('SimpleTransformIterator', () => {
result.on('end', done);
});

it('should be a SimpleTransformIterator', () => {
result.should.be.an.instanceof(SimpleTransformIterator);
});

it('should take the given number of items', () => {
items.should.deep.equal(['a', 'b', 'c']);
});
Expand Down