Skip to content

Commit

Permalink
Fix duplicate triples being emitted multiple times
Browse files Browse the repository at this point in the history
Closes #6
  • Loading branch information
maartyman authored Jan 11, 2024
1 parent 6f45397 commit d11c8ce
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 10 deletions.
73 changes: 64 additions & 9 deletions lib/StreamingStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,65 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
}
}

protected importToListeners(stream: RDF.Stream<Q>): void {
stream.on('data', (quad: Q) => {
for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) {
if (!this.ended) {
pendingStream.push(quad);
pendingStream.emit('quad', quad);
}
/**
* This function will read the quad stream in an on-demand fashion, and will check if the quads already exist in the
* store. If they don't, they will be pushed into the storeImportStream, and the matching pendingStreams.
* @param stream A quad stream.
* @param storeImportStream A stream to import the quads into the store.
*/
protected importToListeners(stream: RDF.Stream<Q>, storeImportStream: PassThrough): void {
let streamEnded = false;
stream.once('readable', async() => {
let quad: Q | null = stream.read();
while (quad) {
const staticQuad = quad;
await new Promise<void>(resolve => {
// Match the new quad with the store.
const matchStream = this.store.match(
staticQuad.subject,
staticQuad.predicate,
staticQuad.object,
staticQuad.graph,
);

// If the StreamingStore hasn't ended, we add the quad to the storeImportStream and the corresponding
// pendingStreams and resolve to handle the next quad.
const handleEnd = (): void => {
if (!this.ended) {
storeImportStream.push(staticQuad);
for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(staticQuad)) {
pendingStream.push(staticQuad);
pendingStream.emit('quad', staticQuad);
}
}
resolve();
};

// If the matchStream has a result, the quad already exists.
// We remove the 'end' listener and continue to the next quad.
matchStream.once('data', () => {
matchStream.removeListener('end', handleEnd);
resolve();
});

// If the matchStream has ended (and this listener isn't removed), the quad doesn't exist yet.
// So we call the handleEnd function.
matchStream.once('end', handleEnd);
});

quad = stream.read();
}
// If the stream has ended, all quads will be read from the quad stream, so we can end the storeImportStream.
if (streamEnded) {
storeImportStream.end();
return;
}
// If the stream hasn't ended, we recursively call this function to wait for the stream to become readable again.
this.importToListeners(stream, storeImportStream);
});

stream.on('end', () => {
streamEnded = true;
});
}

Expand All @@ -56,8 +107,12 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
throw new Error('Attempted to import into an ended StreamingStore');
}

this.importToListeners(stream);
return this.store.import(stream);
const storeImportStream = new PassThrough({ objectMode: true });
stream.on('error', error => storeImportStream.emit('error', error));

this.importToListeners(stream, storeImportStream);

return this.store.import(storeImportStream);
}

public match(
Expand Down
53 changes: 52 additions & 1 deletion test/StreamingStore-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import arrayifyStream from 'arrayify-stream';
import { promisifyEventEmitter } from 'event-emitter-promisify/dist';
import { Store } from 'n3';
import { DataFactory } from 'rdf-data-factory';
import { Readable } from 'readable-stream';
import { PassThrough, Readable } from 'readable-stream';
import { StreamingStore } from '../lib/StreamingStore';
const quad = require('rdf-quad');
const streamifyArray = require('streamify-array');
Expand Down Expand Up @@ -465,4 +465,55 @@ describe('StreamingStore', () => {
quad('s4', 'p4', 'o', 'g'),
]);
});

it('handles duplicates in import (set-semantics)', async() => {
const match = store.match();
await promisifyEventEmitter(store.import(streamifyArray([
quad('s1', 'p1', 'o1'),
quad('s1', 'p1', 'o1'),
])));
store.end();

expect(await arrayifyStream(match)).toEqualRdfQuadArray(
[ quad('s1', 'p1', 'o1') ],
);
});

it('handles duplicates in import (set-semantics) during slow import', async() => {
const match = store.match();

const importStream = new Readable({ objectMode: true });
importStream._read = () => {
setImmediate(() => {
importStream.push(quad('s1', 'p1', 'o1'));
});
setImmediate(() => {
importStream.push(quad('s1', 'p1', 'o1'));
});
setImmediate(() => {
importStream.push(null);
});
};
store.import(importStream);
await new Promise(resolve => importStream.on('end', resolve));
store.end();

expect(await arrayifyStream(match)).toEqualRdfQuadArray(
[ quad('s1', 'p1', 'o1') ],
);
});

it('handles errors in import', async() => {
const importStream = new PassThrough({ objectMode: true });
const returnStream = store.import(importStream);
const error = new Error('myError');
const callback = jest.fn();

returnStream.on('error', callback);
importStream.emit('error', error);

expect(callback).toHaveBeenCalled();
expect(callback).toHaveBeenCalledWith(error);
store.end();
});
});

0 comments on commit d11c8ce

Please sign in to comment.