Process, and deliver "Daily digest" to thousands of users with SQS & Lambda

/ Work

Serverless functions make it easy for doing the heavy lifting, as exporting data from a platform like Mixpanel or Amplitude, process the data and then sending back to your users.

You could simply create a CRON in AWS which executes your lambda once a day, fetched data from where you logged.

Problem Statement

The challenge is while you are running a lambda as a CRON, processing the data for all the users in your platform becomes the responsibility of one lambda invocation.

It increases the time for lambda to process the data for all users. Though a lambda can run up to 15 mins, sometimes it's not enough. You end up with a state where some of your users did not receive the digest at all. Neither they are going to receive it as

  • You run the lambda once a day, to send the digest once.
  • You do not know who received and who doesn't.

Solution

The trick is to invoke multiple lambdas for a group of users. You can either do that using your own remainder logic or you can simply use SQS.

SQS

There are two types of SQS

  • FIFO
  • Standard

To quickly give you an overview of what to use when. When the order of delivery is important then use FIFO. If higher throughput is required, irrespective of the order, you use Standard.

For our Daily digest, the order is not important, and neither the throughput.

How to decide which one to use?

The Standard Queue has support for lambda triggers which is not available for FIFO, and that's why we go with the Standard Queue.

Let's do some code

Before we jump in, keep in mind, we are going to use

  • Standard Queue
  • Producer (lambda)
  • Consumer (lambda)
 

Create a Standard Queue

This is the easiest one. Login into your AWS account, Go to SQS and create a Queue.

notion image
 

Create a Producer lambda

import * as AWS from 'aws-sdk';

const sqs = new AWS.SQS();

const handler = (event , context) => {

  const sqsParams = {
    MessageBody: "{{pass unique information using which to fetch the data (can be user infromation or group information )}}",
    QueueUrl: "{{YOUR_QUEUE_URL}}",
    DelaySeconds: 120
  };

  const response = await sqs.sendMessage(sqsParams).promise();

  return {
    statusCode: statusCode,
    body: JSON.stringify(response),
    isBase64Encoded: false
  };
}

The DelaySeconds parameter makes sure your function does not run out of concurrency, as there is a limited amount of concurrency available for lambda per AWS account. This will give lambda some breathing space to process the data before the next request about to reach.

This enable users to invoke lesser number of concurrent lambda and reuse the one which is already warmed.

 

Create a Consumer lambda

Consumer lambda is the one which do all the job.

/** handler function */
const handler = middy(async (event, context) => {
  let responseBody = {};
  let statusCode = "";

  try {

    /** SQS Event object */
    const workPromises = event.Records.map(async (record) => {
				const data = JSON.parse(record.body);
				/** do your work here **/
		}

	  const response = await Promise.all(workPromises);

		return {
	    statusCode: statusCode,
	    body: JSON.stringify(response),
	    isBase64Encoded: false
	  };
});

You can find your message body inside the event object. Once you have your data, parse it (as it will be received in the string) and do your thing.

 

Last thing is to set the SQS trigger for the consumer lambda.

Go back to SQS page, select your queue, Find "Lambda triggers" tab.

notion image

Click the Orange button (Configure Lambda function Trigger) and select your consumer lambda.

You are done and ready to go. Invoke your producer lambda using APIGateway. You don't need an APIGateway endpoint for Consumer lambda as it is triggered by SQS.

Have a Great Day.

 
 
Mahendra Rathod
Developer from 🇮🇳
@maddygoround
© 2024 Mahendra Rathod · Source