diff --git a/lib/StreamingStore.ts b/lib/StreamingStore.ts index 779fefb..d6a2ae8 100644 --- a/lib/StreamingStore.ts +++ b/lib/StreamingStore.ts @@ -40,14 +40,65 @@ implements RDF.Source, RDF.Sink, EventEmitter> { } } - protected importToListeners(stream: RDF.Stream): void { - stream.on('data', (quad: Q) => { - for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) { - if (!this.ended) { - pendingStream.push(quad); - pendingStream.emit('quad', quad); - } + /** + * This function will read the quad stream in an on-demand fashion, and will check if the quads already exist in the + * store. If they don't, they will be pushed into the storeImportStream, and the matching pendingStreams. + * @param stream A quad stream. + * @param storeImportStream A stream to import the quads into the store. + */ + protected importToListeners(stream: RDF.Stream, storeImportStream: PassThrough): void { + let streamEnded = false; + stream.once('readable', async() => { + let quad: Q | null = stream.read(); + while (quad) { + const staticQuad = quad; + await new Promise(resolve => { + // Match the new quad with the store. + const matchStream = this.store.match( + staticQuad.subject, + staticQuad.predicate, + staticQuad.object, + staticQuad.graph, + ); + + // If the StreamingStore hasn't ended, we add the quad to the storeImportStream and the corresponding + // pendingStreams and resolve to handle the next quad. + const handleEnd = (): void => { + if (!this.ended) { + storeImportStream.push(staticQuad); + for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(staticQuad)) { + pendingStream.push(staticQuad); + pendingStream.emit('quad', staticQuad); + } + } + resolve(); + }; + + // If the matchStream has a result, the quad already exists. + // We remove the 'end' listener and continue to the next quad. + matchStream.once('data', () => { + matchStream.removeListener('end', handleEnd); + resolve(); + }); + + // If the matchStream has ended (and this listener isn't removed), the quad doesn't exist yet. + // So we call the handleEnd function. + matchStream.once('end', handleEnd); + }); + + quad = stream.read(); } + // If the stream has ended, all quads will be read from the quad stream, so we can end the storeImportStream. + if (streamEnded) { + storeImportStream.end(); + return; + } + // If the stream hasn't ended, we recursively call this function to wait for the stream to become readable again. + this.importToListeners(stream, storeImportStream); + }); + + stream.on('end', () => { + streamEnded = true; }); } @@ -56,8 +107,12 @@ implements RDF.Source, RDF.Sink, EventEmitter> { throw new Error('Attempted to import into an ended StreamingStore'); } - this.importToListeners(stream); - return this.store.import(stream); + const storeImportStream = new PassThrough({ objectMode: true }); + stream.on('error', error => storeImportStream.emit('error', error)); + + this.importToListeners(stream, storeImportStream); + + return this.store.import(storeImportStream); } public match( diff --git a/test/StreamingStore-test.ts b/test/StreamingStore-test.ts index a10b7bc..87c359a 100644 --- a/test/StreamingStore-test.ts +++ b/test/StreamingStore-test.ts @@ -3,7 +3,7 @@ import arrayifyStream from 'arrayify-stream'; import { promisifyEventEmitter } from 'event-emitter-promisify/dist'; import { Store } from 'n3'; import { DataFactory } from 'rdf-data-factory'; -import { Readable } from 'readable-stream'; +import { PassThrough, Readable } from 'readable-stream'; import { StreamingStore } from '../lib/StreamingStore'; const quad = require('rdf-quad'); const streamifyArray = require('streamify-array'); @@ -465,4 +465,55 @@ describe('StreamingStore', () => { quad('s4', 'p4', 'o', 'g'), ]); }); + + it('handles duplicates in import (set-semantics)', async() => { + const match = store.match(); + await promisifyEventEmitter(store.import(streamifyArray([ + quad('s1', 'p1', 'o1'), + quad('s1', 'p1', 'o1'), + ]))); + store.end(); + + expect(await arrayifyStream(match)).toEqualRdfQuadArray( + [ quad('s1', 'p1', 'o1') ], + ); + }); + + it('handles duplicates in import (set-semantics) during slow import', async() => { + const match = store.match(); + + const importStream = new Readable({ objectMode: true }); + importStream._read = () => { + setImmediate(() => { + importStream.push(quad('s1', 'p1', 'o1')); + }); + setImmediate(() => { + importStream.push(quad('s1', 'p1', 'o1')); + }); + setImmediate(() => { + importStream.push(null); + }); + }; + store.import(importStream); + await new Promise(resolve => importStream.on('end', resolve)); + store.end(); + + expect(await arrayifyStream(match)).toEqualRdfQuadArray( + [ quad('s1', 'p1', 'o1') ], + ); + }); + + it('handles errors in import', async() => { + const importStream = new PassThrough({ objectMode: true }); + const returnStream = store.import(importStream); + const error = new Error('myError'); + const callback = jest.fn(); + + returnStream.on('error', callback); + importStream.emit('error', error); + + expect(callback).toHaveBeenCalled(); + expect(callback).toHaveBeenCalledWith(error); + store.end(); + }); });