Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix set semantics bug #9

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
78 changes: 69 additions & 9 deletions lib/StreamingStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,69 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
}
}

protected importToListeners(stream: RDF.Stream<Q>): void {
stream.on('data', (quad: Q) => {
for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) {
if (!this.ended) {
pendingStream.push(quad);
pendingStream.emit('quad', quad);
}
/**
* This function will read the quad stream in an on-demand fashion, and will check if the quads already exist in the
* store. If they don't, they will be pushed into the storeImportStream, and the matching pendingStreams.
* @param stream A quad stream.
* @param storeImportStream A stream to import the quads into the store.
*/
protected importToListeners(stream: RDF.Stream<Q>, storeImportStream: PassThrough): void {
let streamEnded = false;
let streamReadable = false;
stream.on('readable', async() => {
streamReadable = true;
let quad: Q | null = stream.read();
while (quad) {
const staticQuad = quad;
await new Promise<void>(resolve => {
// Match the new quad with the store.
const matchStream = this.store.match(
staticQuad.subject,
staticQuad.predicate,
staticQuad.object,
staticQuad.graph,
);

// If the StreamingStore hasn't ended, we add the quad to the storeImportStream and the corresponding
// pendingStreams and resolve to handle the next quad.
const handleEnd = (): void => {
if (!this.ended) {
storeImportStream.push(staticQuad);
for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(staticQuad)) {
pendingStream.push(staticQuad);
pendingStream.emit('quad', staticQuad);
}
}
resolve();
};

// If the matchStream has a result, the quad already exists.
// We remove the 'end' listener and continue to the next quad.
matchStream.once('data', () => {
matchStream.removeListener('end', handleEnd);
resolve();
});

// If the matchStream has ended (and this listener isn't removed), the quad doesn't exist yet.
// So we call the handleEnd function.
matchStream.once('end', handleEnd);
});

quad = stream.read();
}
// If the stream has ended, all quads will be read from the quad stream, so we can end the storeImportStream.
if (streamEnded) {
storeImportStream.end();
}
streamReadable = false;
});

stream.on('end', () => {
// If the stream is still readable let the on readable function end the `storeImportStream`, else do it now.
if (streamReadable) {
streamEnded = true;
} else {
storeImportStream.end();
}
});
}
Expand All @@ -56,8 +112,12 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
throw new Error('Attempted to import into an ended StreamingStore');
}

this.importToListeners(stream);
return this.store.import(stream);
const storeImportStream = new PassThrough({ objectMode: true });
stream.on('error', error => storeImportStream.emit('error', error));

this.importToListeners(stream, storeImportStream);

return this.store.import(storeImportStream);
}

public match(
Expand Down
76 changes: 75 additions & 1 deletion test/StreamingStore-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import arrayifyStream from 'arrayify-stream';
import { promisifyEventEmitter } from 'event-emitter-promisify/dist';
import { Store } from 'n3';
import { DataFactory } from 'rdf-data-factory';
import { Readable } from 'readable-stream';
import { PassThrough, Readable } from 'readable-stream';
import { StreamingStore } from '../lib/StreamingStore';
const quad = require('rdf-quad');
const streamifyArray = require('streamify-array');
Expand Down Expand Up @@ -465,4 +465,78 @@ describe('StreamingStore', () => {
quad('s4', 'p4', 'o', 'g'),
]);
});

it('handles duplicates in import (set-semantics)', async() => {
const match = store.match();
await promisifyEventEmitter(store.import(streamifyArray([
quad('s1', 'p1', 'o1'),
quad('s1', 'p1', 'o1'),
])));
store.end();

expect(await arrayifyStream(match)).toEqualRdfQuadArray(
[ quad('s1', 'p1', 'o1') ],
);
});

it('handles duplicates in import (set-semantics) with slow store', async() => {
(<Store>(<any>store).store).import = stream => {
stream.on('data', data => {
setTimeout(() => {
(<Store>(<any>store).store).add(data);
}, 50);
});

return stream;
};

const match = store.match();
await promisifyEventEmitter(store.import(streamifyArray([
quad('s1', 'p1', 'o1'),
quad('s1', 'p1', 'o1'),
])));
store.end();

expect(await arrayifyStream(match)).toEqualRdfQuadArray(
[ quad('s1', 'p1', 'o1') ],
);
});

it('handles duplicates in import (set-semantics) during slow import', async() => {
const match = store.match();

const importStream = new Readable({ objectMode: true });
importStream._read = () => {
setImmediate(() => {
importStream.push(quad('s1', 'p1', 'o1'));
});
setImmediate(() => {
importStream.push(quad('s1', 'p1', 'o1'));
});
setImmediate(() => {
importStream.push(null);
});
};
store.import(importStream);
await new Promise(resolve => importStream.on('end', resolve));
store.end();

expect(await arrayifyStream(match)).toEqualRdfQuadArray(
[ quad('s1', 'p1', 'o1') ],
);
});

it('handles errors in import', async() => {
const importStream = new PassThrough({ objectMode: true });
const returnStream = store.import(importStream);
const error = new Error('myError');
const callback = jest.fn();

returnStream.on('error', callback);
importStream.emit('error', error);

expect(callback).toHaveBeenCalled();
expect(callback).toHaveBeenCalledWith(error);
store.end();
});
});
Loading