Build Queue For Processing Webhooks With Node Js And MongoDB

Manage The Flood Of Webhooks With A Database Queue

Many of the third party services we integrate in our applications use webhooks to communicate events to us. Responding to the incoming webhook with success response (status 200) is all that’s required to acknowledge the receipt of the webhook. To process that webhook immediately upon receiving or entering it in a queue to be processed later is up to you. In this post I am sharing a way to build and process a mongodb based webhook queue for node/express applications.

MongoDB and mongoose are of course required.

Webhook Schema

const mongoose = require('mongoose');

const WebhookSchema = new mongoose.Schema({
  accountId: { type: String, index: true },
  notificationType: { type: String, index: true },
  receivedAt: { type: Date, default: Date.now, index: true, expires: '14d' },
  processed: { type: Boolean, index: true, default: false },
  processedAt: { type: Date }
  //more relevant fields related to the webhook 
});
module.exports = mongoose.model("Webhook", WebhookSchema); 
  • accountId or any other field (such as subscriptionId) referencing the document (like user) you want to update on receiving webhook
  • notificationType identifies the type of the webhook. Based on this changes are made to the user document. Common notification type examples are subscription.created, subscription.upgraded, account.deleted
  • receivedAt tells when webhook was received. We also set up the document for destruction after 14 days (time to live), as we don’t need it after processing is done, apart from a few days for record
  • processed the queue finds only unprocessed webhooks
  • processedAt records when the webhook got processed, if it did

Controller Method (Route)

router.post('/api/webhook', function(req, res) {
  const webhooks = req.body.webhooks.map((w)=>{
    return {
      notificationType: w.type,
      accountId: w.accountId
    };
  });

  Webhook.create(webhooks, function (err) {
    if (err) {
      console.log(err);
      return res.status(500).end(); //something went wrong, the service should send the webhook again
    }
    else {
      return res.status(200).end(); // acknowledge that the webhook(s) is received.
    }
  })

});

This is the route you register with the service provider as endpoint where webhooks will be sent. Dump the incoming webhooks to Webhook.

Here assuming that one call to this endpoint can have multiple webhooks (many providers bundle webhooks this way). If it’s one per webhook, insert single document:

const webhook = {
  notificationType: w.type,
  accountId: w.accountId
};
Webhook.create(webhook, function (err) {...}

Webhook Queue

Finally, the queue script. Let’s call it webhookqueue.js

const mongoose = require('mongoose');
const Webhook = require('./webhook.js');

mongoose.connect('mongodb://127.0.0.1/webapp');


mongoose.connection.on('connected', () => {
  console.log("connected");
  if (!this.instance) {
    console.log("creating new WebhookQueue instance");
    this.instance = new WebhookQueue();
  } else {
    console.log("WebhookQueue instance already exists");
  }
});

const WebhookQueue = function () {
  const self = this;

  self.checkHooksInterval = 5000;
  setTimeout(self.checkHooks.bind(self), self.checkHooksInterval);
}

const pt = WebhookQueue.prototype;


pt.checkHooks = function () {
  const self = this;
  console.log("** Checking Webhook Queue ***")
  Webhook.find({ processed: false })
    .limit(10)
    .exec()
    .then(function (webhooks) {
      if (webhooks.length) {
        console.log("Pending Webhooks Found");
        // Process the webhooks and mark them processed: true
        setTimeout(self.checkHooks.bind(self), self.checkHooksInterval);
      }
    })
    .catch(function (err) {
      console.log("something went wrong", err);
      setTimeout(self.checkHooks.bind(self), self.checkHooksInterval);
    })  
}

We instantiate the WebhookQueue as soon as mongodb connection is made. Then we check for pending webhooks in the queue every 5 seconds (increase or decrease this time based on urgency of processing). On every turn 10 unprocessed webhooks are looked for, and, if found, processed and setprocessed to true. Finally, checkHooks method is set for re-running after next 5 seconds with setTimeout.

And that’s it. Start this script by including and calling it on starting the express server (the one that has '/api/webhook' api), or start it as a stand alone script with node webhookqueue.

Why Use Webhook Queue?

Why should we have a queue in the first place? Why not just process the webhooks when they are received? That’s totally doable if webhooks are few and far between. Actually, it’s better that way instead of dealing with extra overhead of the queue.

But for high load and frequent webhooks, especially if the processing involved upon receiving those webhooks is complex/high too, it’s a better option for a couple of reasons:

  • Sometimes the service provider can be down. When it’s up and running, it will try to clear as many of your stuck webhooks as possible by sending them over to your api endpoint, resulting in an unexpected spike in your CPU usage (as you’re processing immediately upon receiving), and probably some failures.

  • Though remote, it’s still a possibility that you acknowledge receiving the webhook, and before you’re done processing it, the server crashes (for any reason). Now, since you have acknowledged, the service is not going to attempt sending that webhook again, and it could not be processed because of your server crash, so the webhook is lost forever. [This can be taken care of by acknowledging the receipt after the process is complete. But it’s not a good idea for a complex processing, as that may take a while, and some services consider it a failure when no acknowledgement is received within few seconds.]

Some Providers With Webhook Communication