Codementor Events

Building a Concurrent Promise Queue with JavaScript

Published Sep 25, 2020Last updated May 18, 2021
Building a Concurrent Promise Queue with JavaScript

Spent hours looking for a cheesy quote to start with🧐, couldn't find one that went with promises😫.

Well, let's hop in🚶🏿‍♂️.

Promises!!!🚦 If you work with JavaScript, you will encounter them every day. If you work on a large product, you would have come across the problem of having to serve multiple requests to a third-party API. As a Backend developer working in the fintech space, one recurring issue I keep encountering is, serving and processing customer's bulk requests. Most people want to be able to upload a CSV file of a 1000 account (debit/credit) requests and have them executed promptly.

Although there are more optimal methods to solving this kind of problems, JavaScript non-blocking Asynchronous pattern provides a fairly naive approach that helps you process multiple I/O tasks in a manageable manner that doesn't overwhelm the event loop.
Let's do a quick run-through on the basics of Promises.

What is a Promise?

Simply put, a Promise is an object that can be used to represent the eventual completion of an asynchronous process. Not my quote and if this doesn't explain Promises to like you are a kid, this might though🙃. The official doc does a good job to explain promises.

Resolving Promises

const transactionStatus = (seconds) => new Promise((resolve, reject) => {
  setTimeout(() => {
    	resolve('Your transaction is successful');
    }, seconds * 500); //second argument is in milliseconds
})

Invoking our promise

transactionStatus(2)
  .then((result) => console.log(result));
result: Your transaction is successful

PS: There is a nifty little trick we can do above. Our .then() block takes in a function that takes an argument and logs it. Since console.log does the same thing, I can replace the entire function in the .then() method with console.log

transactionStatus(2)
  .then(console.log);

Result will be the same

result: Your transaction is successful

Promises are chainable. Invoking our promise above can include multiple .then() methods.

transactionStatus(2)
  .then(console.log)
  .then(() => 200)
  .then((status) => console.log('Transaction fetched', status));
result: Your transaction is successful
result: Transaction fetched 200

Rejecting Promises

If something goes wrong within the promise, we can chain a .catch() method to handle it (errors that occur within our codes).
We can also pass an error to the .catch() method if we evoke the reject() method.

const transactionStatus = (seconds) => new Promise((resolve, reject) => {
    if(seconds > 25){
        reject(new Error('Request timed out'));
    }
    setTimeout(() => {
        resolve('Your transaction is successful');
    }, seconds * 1000); 
 });
 
transactionStatus(26)
    .then(console.log)
    .then((status) => console.log('Transaction fetched', status))
    .catch((error) => {
         console.log(error.message);
    });

result: Request timed out

The ability to chain multiple .then() offer us the opportunity to execute Promises sequentially.

const fetchTransactionStatus = () => Promise.resolve()
  .then(() => console.log('Begin request')
  .then(() => transactionStatus(5))
  .then(() => console.log)
  .then(() => console.log('End Request'))

Each of the .then() executes sequentially and produce the following result.

result: Begin request
result: Your transaction is successful
result: End Request

Parallel Execution

Executing multiple tasks at the same time is referred to as parallel execution.
JavaScript provides the Promise.all() method that enables us to implement parallel promise execution.

Promise.all([
  transactionStatus(15),
    transactionStatus(5),
    transactionStatus(20)
]).then(() => console.log('All transaction requests resolved'))

result: All transaction requests resolved //prints after 20seconds

The code above will wait for all three promises to resolve before logging the message in the .then() block.That means it will take 20seconds to completely resolve.

If we want our .then() block to log our message after the first promise with less time is fulfilled, wrap the promises in a Promise.race().

Promise.race([
  transactionStatus(15),
    transactionStatus(5),
    transactionStatus(20)
]).then(() => console.log('transaction requests resolved'))
result: transaction requests resolved (logs after 5seconds)

Concurrent Promise Queue

Handling multiple asynchronous tasks is something NodeJS has the mechanism to handle well. Although there are better ways to handling large aysnchronous tasks such as implementing a proper queuing system with RabbitMQ, Amazon SQS, custom SQL Queue etc, a naive approach can be implemented with a promise queue.

I have talked about executing promises both sequentially and in parallel. Running multiple asynchronous tasks sequentially will take longer and if you run them in parallel, it will consume more resources. Yet, we can combine both methods to create a promise queue that will be faster and consume fewer resources.

Step 1: Create an array of promises.

const tasks = [
  transactionStatus(5),
  transactionStatus(15),
  transactionStatus(2),
  transactionStatus(4),
  transactionStatus(6),
  transactionStatus(9),
  transactionStatus(11),
  transactionStatus(12),
  transactionStatus(10)
];

Step 2: Create a constructor function that takes in two parameters, A tasks array and a concurrentCount integer to set the maximum number of promises that should be in the running state.
We also create an array that holds the promises currently running and another array that holds the promises that are completed.

Creating our promise queue constructor function

function PromiseQueue(tasks = [], concurrentCount = 1) {
 this.total = tasks.length;
 this.todo = tasks;
 this.running = [];
 this.complete = [];
 this.count = concurrentCount;
}

Step 3: Create a method that tracks the state of the tasks queue. This will determine if the runner should keep executing or quit. It also ensures we don't have more than the specified concurrent count in the task runner.

PromiseQueue.prototype.runNext = function(){
  return ((this.running.length < this.count) && this.todo.length);
}

Step 4 : Create a task runner. We would move tasks from the todo list to the running list, after execution, we move them to the completed list.

PromiseQueue.prototype.run = function () {
  while (this.runNext()) {
    const promise = this.todo.shift();
    promise.then(() => {
      this.complete.push(this.running.shift());
      this.run();
    });
    this.running.push(promise);
  }

}

Step 5 : Go ahead and instantiate our Promise queue function. we can call the .run() method to run our task queue.

const taskQueue = new PromiseQueue(tasks, 3); //tasks = an array of aysnc functions, 3 = number of tasks to run in parallel
taskQueue.run();

Conclusion

One reason why this naive approach is not scalable is, it becomes very complex to track failures and initiate retries.
What about if requests grow to 20, 000 tasks, it becomes extremely unmanageable. That is why a more robust queuing system that makes queue management easier to implement is the preferred approach.
Nonetheless, learning how to create concurrent promise queue gives us a better understanding of how JavaScript promises work.

Discover and read more posts from Edafe Emunotor
get started
post comments12Replies
Francis Duvivier
2 years ago

Does not look good to me.

      this.complete.push(this.running.shift());

here you take the first item from the running list and assume that it is completed. But this will not be correct if that promise resolves a lot later than other ones.

Here a better simpler version in typescript:

async function queuePromiseMap<IN, OUT>(items: IN[], asyncProcessor: (item: IN) => Promise<OUT>, queueSize?: number): Promise<OUT[]> {
    if (!queueSize) {
        queueSize = PROMISE_QUEUE_SIZE_FALLBACK; // Handle NaN and 0
    }
    queueSize = Math.min(queueSize, items.length);
    const toProcess: IN[] = [...items];
    const results: OUT[] = [];
    const initialInputItems = Array.from(Array(queueSize));
    await Promise.all(initialInputItems.map(async () => {
        while (toProcess.length) {
            const item = toProcess.shift()!;
            results.push(await asyncProcessor(item));
        }
    }));
    return results;
}

With usage:

`const results = await queuePromiseMap(items, processorFunction, 3);`
Joey de Jonge
2 years ago

Hi, do you happen to have the full code for me? I’ve been looking for promise queue with typescript for a while

Aidan Fraser
3 years ago

Hey, nice post. Thanks for the writeup.

I’m looking around for a lightweight Promise-based queue and I found this post. I tried this Queue code as written and unfortunately all tasks run at once – there is no queuing functionality.

Looking closely, when we call transactionStatus in the tasks array, that function immediately returns a Promise, which also includes executing the content of that Promise, which is the task itself. (To create a new promise is to start it running. That’s just how promises work.)

So, as it stands, there’s no mechanism in this class to allow a task to wait (or be delayed) until it’s the right time.

To see this in action, try running:

let tasks = [
  new Promise(res => { console.log('Starting...'); setTimeout(() => { console.log("Finished!"); res() }, 1000) }),
  new Promise(res => { console.log('Starting...'); setTimeout(() => { console.log("Finished!"); res() }, 1000) }),
  new Promise(res => { console.log('Starting...'); setTimeout(() => { console.log("Finished!"); res() }, 1000) }),
];
let pq = new PromiseQueue(tasks, 1);
pq.run();

Given a concurrentCount of 1, we might expect the 2nd and 3rd promises to start only after the previous promise finishes. Something like:

Starting...
Finished!
Starting...
Finished!
Starting...
Finished!

But in fact, we get

Starting...
Starting...
Starting...
Finished!
Finished!
Finished!

This shows how the resolve function of a task’s promise is not delaying other tasks.

For this to work correctly, there needs to be some way for the queue to signal to a task that it’s time to begin, and for the task not to start until then. A simple way to do this might be for the queue to receive tasks as functions that return promises, rather than just “raw” promises. That way, the work inside the task’s promise only begins once the task function is called, and that allows the queue to control when a task begins.

From outside the class, this could work something like this:

let tasks = [
    () => transactionStatus(1),
    () => transactionStatus(2),
    () => transactionStatus(3),
]
let pq = new PromiseQueue(tasks, 1);

Hope this makes sense. Let me know if you want to clarify or discuss.

Salvatore Caputi
3 years ago

Hi, maybe I’m missing something, but when you create the promises array, in step 1, you are executing functions in that moment, so how can you control the concurrency later?
Thanks

Show more replies