diff --git a/index.js b/index.js index 6a72a4c..cb10f35 100644 --- a/index.js +++ b/index.js @@ -13,7 +13,7 @@ const pLimit = concurrency => { activeCount--; if (queue.length > 0) { - queue.shift()(); + queue.shift().run(); } }; @@ -27,15 +27,17 @@ const pLimit = concurrency => { result.then(next, next); }; - const enqueue = (fn, resolve, ...args) => { + const enqueue = (fn, resolve, reject, ...args) => { if (activeCount < concurrency) { run(fn, resolve, ...args); } else { - queue.push(run.bind(null, fn, resolve, ...args)); + queue.push({run: run.bind(null, fn, resolve, ...args), reject}); } }; - const generator = (fn, ...args) => new Promise(resolve => enqueue(fn, resolve, ...args)); + const generator = (fn, ...args) => new Promise( + (resolve, reject) => enqueue(fn, resolve, reject, ...args) + ); Object.defineProperties(generator, { activeCount: { get: () => activeCount @@ -45,6 +47,7 @@ const pLimit = concurrency => { }, clearQueue: { value: () => { + queue.forEach(({reject}) => reject(new Error('queue cleared before function was invoked'))); queue.length = 0; } } diff --git a/test.js b/test.js index e06d7d4..905ac4a 100644 --- a/test.js +++ b/test.js @@ -125,7 +125,9 @@ test('clearQueue', t => { const limit = pLimit(1); Array.from({length: 1}, () => limit(() => delay(1000))); - Array.from({length: 3}, () => limit(() => delay(1000))); + Array.from({length: 3}, () => limit(() => delay(1000)).catch( + error => t.is(error.message, 'queue cleared before function was invoked') + )); t.is(limit.pendingCount, 3); limit.clearQueue();