Skip to content

Commit

Permalink
Fix duplicate triples being emitted multiple times
Browse files Browse the repository at this point in the history
Closes #6
  • Loading branch information
maartyman authored Feb 6, 2024
1 parent 9805457 commit 4d0d540
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 4 deletions.
20 changes: 17 additions & 3 deletions lib/StreamingStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ import type { Readable } from 'readable-stream';
import { PassThrough } from 'readable-stream';
import { PendingStreamsIndex } from './PendingStreamsIndex';

interface ILocalStore<Q extends RDF.BaseQuad> extends RDF.Store<Q> {
countQuads: (
subject: RDF.Term | null,
predicate: RDF.Term | null,
object: RDF.Term | null,
graph: RDF.Term | null,
) => number;
}

/**
* A StreamingStore allows data lookup and insertion to happen in parallel.
* Concretely, this means that `match()` calls happening before `import()` calls, will still consider those triples that
Expand All @@ -14,7 +23,7 @@ import { PendingStreamsIndex } from './PendingStreamsIndex';
*
* WARNING: `end()` MUST be called at some point, otherwise all `match` streams will remain unended.
*/
export class StreamingStore<Q extends RDF.BaseQuad = RDF.Quad, S extends RDF.Store<Q> = Store<Q>>
export class StreamingStore<Q extends RDF.BaseQuad = RDF.Quad, S extends ILocalStore<Q> = Store<Q>>
implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
protected readonly store: S;
protected readonly pendingStreams: PendingStreamsIndex<Q> = new PendingStreamsIndex();
Expand Down Expand Up @@ -42,8 +51,13 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {

protected importToListeners(stream: RDF.Stream<Q>): void {
stream.on('data', (quad: Q) => {
for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) {
if (!this.ended) {
if (!this.ended && !this.store.countQuads(
quad.subject,
quad.predicate,
quad.object,
quad.graph,
)) {
for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) {
pendingStream.push(quad);
pendingStream.emit('quad', quad);
}
Expand Down
53 changes: 52 additions & 1 deletion test/StreamingStore-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import arrayifyStream from 'arrayify-stream';
import { promisifyEventEmitter } from 'event-emitter-promisify/dist';
import { Store } from 'n3';
import { DataFactory } from 'rdf-data-factory';
import { Readable } from 'readable-stream';
import { Readable, PassThrough } from 'readable-stream';
import { StreamingStore } from '../lib/StreamingStore';
const quad = require('rdf-quad');
const streamifyArray = require('streamify-array');
Expand Down Expand Up @@ -465,4 +465,55 @@ describe('StreamingStore', () => {
quad('s4', 'p4', 'o', 'g'),
]);
});

it('handles duplicates in import (set-semantics)', async() => {
const match = store.match();
await promisifyEventEmitter(store.import(streamifyArray([
quad('s1', 'p1', 'o1'),
quad('s1', 'p1', 'o1'),
])));
store.end();

expect(await arrayifyStream(match)).toEqualRdfQuadArray(
[ quad('s1', 'p1', 'o1') ],
);
});

it('handles duplicates in import (set-semantics) during slow import', async() => {
const match = store.match();

const importStream = new Readable({ objectMode: true });
importStream._read = () => {
setImmediate(() => {
importStream.push(quad('s1', 'p1', 'o1'));
});
setImmediate(() => {
importStream.push(quad('s1', 'p1', 'o1'));
});
setImmediate(() => {
importStream.push(null);
});
};
store.import(importStream);
await new Promise(resolve => importStream.on('end', resolve));
store.end();

expect(await arrayifyStream(match)).toEqualRdfQuadArray(
[ quad('s1', 'p1', 'o1') ],
);
});

it('handles errors in import', async() => {
const importStream = new PassThrough({ objectMode: true });
const returnStream = store.import(importStream);
const error = new Error('myError');
const callback = jest.fn();

returnStream.on('error', callback);
importStream.emit('error', error);

expect(callback).toHaveBeenCalled();
expect(callback).toHaveBeenCalledWith(error);
store.end();
});
});

0 comments on commit 4d0d540

Please sign in to comment.