Found a Nice Async Batching Library

NodeJS

Yesterday, I was doing a little work, and noticed that I was getting a lot of connection resets on a service that has been flawless for more than 18 months, but to be fair, the load has been rising, and after digging into the cause, it appeared that the issue was overloading the Client with so many requests, it just failed.

Typically, a client will apply back-pressure on the caller to make sure that things don't get to this point, or they will queue the requests in memory so that they will be processed, in turn, as they arrived. I'm not exactly sure what's happening, the developers of the Client are looking at this, but I needed to find something to ease the load, and so I found asyncBatch().

Let's say I had the following code:

  const balances = (await Promise.all(companies
    .map(async c => {
      const bal = await minimumDueForCompany(user, c)
      if (bal?.success && !isNil(bal?.interestDue) && bal.billDate === today) {
        bal.company = c
        return bal
      }
      return undefined
    })))
    .filter(bal => bal !== undefined)

we're running through all the items in the companies array, and for each, we are calling minimumDueForCompany() and then checking a few things, and then filtering on those that we want to see. Simple.

But if we have more than 200 elements in the companies array, and the minimumDueForCompany() employs several database queries, we could get to the point of launching more than a thousand hits at nearly the same time. If this is a background task, this might be able to starve some more important tasks with all the database aork.

A batching solution was needed. And so I went looking.

asyncBatch() follows much the same style as the Promise.all(), it just takes the values as arguments: the array, the function, and the batch size:

  const asyncBatch = require('async-batch').default
 
  const balances = (await asyncBatch(companies,
    async c => {
      const bal = await minimumDueForCompany(user, c)
      if (bal?.success && !isNil(bal?.interestDue) && bal.billDate === today) {
        bal.company = c
        return bal
      }
      return undefined
    }, 2))
    .filter(bal => bal !== undefined)

With a batch size of 2, we'll start simply, and let the background task take a little longer, while preserving the more immediate user-facing calls can have priority access.

Put this in and things are working better. It's not a perfect solution, and we still need to have the Client improved, but it gets around the two problems: Flooding the database when the use-case doesn't require it... and Failures on the Client to handle the flood. We can fine-tune the batch size later.

UPDATE: it turned out that the library launched all the work in an initial Promise.all() so it really wasn't batching the work as I'd expected. So I wrote my own using the chunk library:

  const chunk = require('chunk')
 
  /*
   * We need a function that will batch the equivalent of:
   *
   *   const resp = await Promise.all(arr.map(itm => fcn(itm)))
   *
   * but do it in batches, so that when we get a large workload, we don't
   * overwhelm the system. This is that function. The first argument is the
   * array to process, the second is the async function, that takes one
   * argument, and the last is the batch size that defaults to a reasonable
   * value.
   */
  const asyncBatch = async (arr, fcn, batchSize = 4) => {
    const ans = []
    for (const b of chunk(arr, batchSize)) {
      const blk = await Promise.all(b.map(async itm => await fcn.apply(null, [itm])))
      ans.push(...blk)
    }
    return ans
  }

This works exactly as expected, working on n of the elements at a time, and then moving to the next batch. Much cleaner.