Description
I am using Piscina to help process some file data, by chunking up the data set and maxQueue: 'auto'
, and experienced some issues with the queue not draining - which caused the process to never complete.
My original implementation was:
const tasks = [];
const data = []; // a large array
const chunkSize = 1000;
for (let i = 0; i < data.length; i += chunkSize) {
const chunk = data.slice(i, i + chunkSize);
if (piscina.queueSize >= piscina.options.maxQueue) {
const interval = setInterval(() => console.log(piscina.queueSize), 500);
await once(piscina, 'drain');
clearInterval(interval);
}
tasks.push(piscina.run(chunk));
}
const results = await Promise.all(tasks);
This seemed to work for most scenarios, but on a particular file in a particular environment I found the queue filled up and the drain event never arrived. The log does show that the queue size does reduce to zero. I also tried using piscina.needsDrain
but this didn't help.
I was able to work around this problem by implementing a queue checking function with a timer and using Promise.race
like so:
const checkQueueSize = async() => {
while (piscina.queueSize > 0) {
await new Promise(resolve => setTimeout(resolve, 100));
}
};
await Promise.race([
once(piscina, 'drain'),
checkQueueSize()
]);
The custom implementation perhaps make using the drain event somewhat redundant, but I'm curious as to what is going on and if I'm misunderstanding how it's supposed to work?