Fanout pattern with SNS, SQS and TypeScript Lambda

---- views

Fanout is a messaging pattern where a message is distributed (fanned-out) to multiple destinations in parallel.

You can think of fanout as a Pub/Sub messaging pattern. In this pattern, publishers send messages to a topic. Subscribers receive the messages from the topic.

pub/sub model

The advantage of this architecture is that the publishers and subscribers are decoupled from each other and can act independently.

Examples:

  • Uploading a video to an S3 bucket, we might want to transcode the video to different resolutions, generate the thumbnail, extract the audio, automatically generate captions. These tasks can all be done in parallel and are independant.
  • In a social media application, adding a post to your profile, you'd want to notify the followers of your profile, the followers of the hashtag targeted by the post.

Example scenario

In this tutorial, we're going to build a simple ticketing system.

When a reserving a ticket, the client can opt to receive a confirmation either by email, sms or both. In any case the system needs to send the reservation information to the analytics system.

The fanout pattern will help us design the system in a decoupled way, so each system (reservation, email, sms, analytics) are independant one from the other.

SNS

In AWS, the fanout pattern can be implemented with Amazon Simple Notification Service (SNS).

SNS is a Publisher/Subscriber managed service.

The diagram below from AWS describes SNS:

Diagram showing how Amazon SNS transmits messages by topic and fans them out to subscriber systems

When a publisher publishes a message to the SNS topic, the topic pushes the message to multiple subscribers.

Lambdas directly subscribed to the SNS topic

The simplest implementation of our system would be for lambdas to directly subscribe to the SNS topic. Here is a diagram of this solution:

Diagram showing how a reservation lambda can publish messages to a SNS topic.  Email, sms and Analytics systems subscribe to the topic

When the reservation lambda is triggered, it sends a message to the SNS topic. The SNS topic immediatly pushes the message to its subscribers (the email, sms and analytics lambdas).

This approach will work but there are a few problems:

  • Both email and sms lambdas will always be triggered for a reservation. But our system requires that a client can get either email or sms or both confirmations
  • What if for whatever reason a lambda fails? For example if the email system is down for a while, the email lambda will fail. So while the analytics lambda registers the reservation, the client may never get the email confirmation.
  • What if we want to throttle a specific lambda? For example if the sms system has a limited throughput, we would not want to overload it, if there is a spike of reservations.

Let's address the first problem: how can we send the message to the email and/or sms lambda, depending on the client preference?

Filter messages by attribute

By default, all subscribers of a topic receive a message when it is published. In our example, the email, sms and analytics lambdas all receive a messsage published by the reservation lambda to the topic.

In order to receive a subset of messages, the SNS subscriber can specify a filter policy.
SNS will try to match the attributes of the message, with those defined in the filter policy.
If there is a match, the subscriber will receive the message, otherwise SNS will skip the subscriber.

We'll defined an attribute confirmationType in the message published to the topic, an array of strings.

  • confirmationType === ["email"] means only the email lambda should receive the message
  • confirmationType === ["sms"] means only the sms lambda should receive the message
  • confirmationType === ["email", "sms"] means both the email and sms lambda should receive the message

Since no filter policy is applied to the analytics lambda subscription, it will receive all messages.

Diagram showing how a filter policy can be applied to a SNS subscription

Here is how to implement this functionnality when combining SNS with SQS:

topic.addSubscription(
    new subscriptions.SqsSubscription(confirmationEmailQueue, {
      filterPolicy: {
        confirmationType: sns.SubscriptionFilter.stringFilter({
          allowlist: ["EMAIL"],
        }),
      },
    })
  );
  topic.addSubscription(
    new subscriptions.SqsSubscription(confirmationSMSQueue, {
      filterPolicy: {
        confirmationType: sns.SubscriptionFilter.stringFilter({
          allowlist: ["SMS"],
        }),
      },
    })
  );
}

When publishing a message to the topic, the message attributes will contain the filter:

Filtering by message attributes

SQS

Let's address the two remaining issues we have identified earlier:

  • What if for whatever reason a lambda fails? For example if the email system is down for a while, the email lambda will fail. So while the analytics lambda registers the reservation, the client may never get the email confirmation.
  • What if we want to throttle a specific lambda? For example if the sms system has a limited throughput, we would not want to overload it, if there is a spike of reservations.

Turns out AWS provides the Amazon Simple Queue Service (SQS).

In our case, we'll use SQS to make sure the application can scale and is more reliable.

The queue will act as a buffer between the sender and the receiver of the messages. This will decrease the temporal coupling of the SNS topic and the lambda that receives the messages.

In other words, the communication between the two does not have to synchronous, it can be asynchronous. This is also called the asynchronous point-to-point model.

Here is how is works, at a high level:

  1. The SNS topic sends the message to the queue
  2. The queue buffers the message
  3. The lambda polls the queue when it is ready
  4. If there is a message in the queue, the lambda picks it up and processes it
  5. If the lambda processes the message sucessfully, the message is removed from the queue
  6. If the lambda fail to process the message, it is added back to the queue

See how this solves our problems?

What if for whatever reason a lambda fails? For example if the email system is down for a while, the email lambda will fail. So while the analytics lambda registers the reservation, the client may never get the email confirmation.

If the lambda fails, the message is added back to the queue, to be processed again

What if we want to throttle a specific lambda? For example if the sms system has a limited throughput, we would not want to overload it, if there is a spike of reservations.

The queue acts as a buffer in front of the lambda. The lambda can poll the queue at its own pace and can process a message (or a batch of messages) when it is ready to.

The integration of lambda with SQS is super easy: you just need to set the SQS queue as an event source for the lambda.

import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as sqs from "aws-cdk-lib/aws-sqs";
import * as nodejs from "aws-cdk-lib/aws-lambda-nodejs";
import * as lambdaEventSources from "aws-cdk-lib/aws-lambda-event-sources";

export class MyStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const queue = new sqs.Queue(this, "Queue");

    const lambda = new nodejs.NodejsFunction(this, "Lambda", {
      ...
    });

    lambda.addEventSource(
      new lambdaEventSources.SqsEventSource(queue)
    );
  }
}

Diagram showing how a fan-out architecture, with SNS and SQS

Batching message processing

It might be more cost optimized to batch the messages received by the lambda. If the workoad of the lambda is not time-sensitive, you can batch the messages with the batchSize option. You can also control how long should a lambda wait for the batch to be filled up with the maxBatchingWindow option.

Let's take an example:

lambda.addEventSource(
  new lambdaEventSources.SqsEventSource(queue, {
    batchSize: 10,
    maxBatchingWindow: cdk.Duration.minutes(1),
  })
);

The lambda requires a batch of 10 messages to be invoked.

Hence, it will be triggered when there is a batch of 10 messages in the queue to be processed.

But if after 1 minute the batch is not yet filled up, the lambda will be triggered anyway with how many messages there are in the queue.

Handling errors

Batch errors

Let's go to the case of a lambda not being able to process messages. There are several scenarios to consider:

  1. The lambda processes all batched messages successfully
  2. The lambda was not able to process any message (they all failed)
  3. The lambda was able to process some messages, but not all of them

Let's consider each scenario, with an example. A SQS queue has 3 messages, that will be processed in batch by a lambda.

Lambda processes all messages sucessfully

In this scenario, the lambda picks up the 3 messages and processes them all successfully.

Let's go through the exact steps:

Diagram of lambda processing three messages from SQS successfully

  1. There are 3 messages in the SQS queue
  2. The lambda polls for the available messages, and picks up the 3 messages
  3. The lambda successfully processes the 3 messages
  4. The messages are automatically removed from the queue

This is the happy path! ☺️

Lambda fails to processes all messages

In this scenario, the lambda fails to process any message. Maybe there is an error related to the processing of the message itself, or maybe the lambda fails.

Diagram of lambda failing to process three messages from SQS.  The messages are not removed from the queue.

  1. There are 3 messages in the SQS queue
  2. The lambda polls for the available messages, and picks up the 3 messages
  3. The lambda fails to process any message
  4. The messages are not removed from the queue. After the visibility timeout has passed, the messages are visible again, and available for another message receiver to process.

This is OK and what we actually want. We don't want to lose any message.

Lambda fails to processes some messages

In this scenario, the lambda only process the first 2 messages successfully.

The last message processing fails.

Since there is no way for SQS to know which messages have been processed successfully and which ones have fails, no message is removed from the queue.

Just like the precedent scenario, after the visibility timeout has passed, all messages are visibile again and ready to processed.

Diagram of lambda failing to process a message (out of three) from SQS.  No message is removed from the queue

  1. There are 3 messages in the SQS queue
  2. The lambda polls for the available messages, and picks up the 3 messages
  3. The lambda processes the first 2 messages, but fails to process the last message
  4. Just like in the preceding scenario, the messages are not removed from the queue. After the visibility timeout has passed, the messages are visible again, and available for another message receiver to process.

Since SQS does not know which messages were succesfully processed and which were not, it does not take any chance and make them all available for processing.

This could be a problem.

While messages A and B were processed successfully, they're gonna be processed again.

One solution to this problem is to make the processing of message idempotent.

What is idempotence?

According to wikipedia, idempotence is:

The property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application

For our need, Enterprise Integration Patterns clarifies the definition of idempotence as:

In Messaging this concepts translates into a message that has the same effect whether it is received once or multiple times.

The AWS Lambda Powertools offers a great utility to convert Lambda functions into idempotent operations which are safe to retry.

While idempotence is a solution, it's still not perfect, in that we're wasting processing power for messages that don't need it anymore.

Handling partial batch failures gracefully

There are two ways to handle partial batch failures.

Deleting message from the SQS queue manually

If a message has been successfully processed, you can delete the message from the SQS queue with the deleteMessage function from the AWS SDK.

Diagram of lambda calling the deleteMessage function to delete successful messages from a the SQS queue

  1. There are 3 messages in the SQS queue
  2. The lambda polls for the available messages, and picks up the 3 messages
  3. The lambda processes the first 2 messages succesfully
  4. The lambda calls the deleteMessage function to delete the successfull messages from the queue
  5. The lambda fails to process the last message
  6. Message C is not removed from the queue. After the visibility timeout has passed, message C is visible again, and available for another message receiver to process.

Here is how to do it in code:

import { SQSRecord } from "aws-lambda";
import { SQS } from "aws-sdk";

export const deleteSQSRecord = async (record: SQSRecord) => {
  const params = {
    QueueUrl: getQueueUrl(record),
    ReceiptHandle: record.receiptHandle,
  };
  const deletedMessage = await sqs.deleteMessage(params).promise();
  console.log("Deleted message", deletedMessage);
};

Notice that you need to pass the QueueUrl to the deleteMessage function. You could pass the queue url as an environment variable.

You can also extract the queue url from the SQSRecord itself, then pass it to deleteMessage. Here is how to do it in TypeScript:

const getQueueUrl = (record: SQSRecord): string => {
  const splits = record.eventSourceARN.split(":");
  const service = splits[2];
  const region = splits[3];
  const accountId = splits[4];
  const queueName = splits[5];
  return `https://${service}.${region}.amazonaws.com/${accountId}/${queueName}`;
};

Report failed messages

Another way to achieve the same effect, it to report to SQS which messages failed and should be kept in the queue.

Diagram of lambda returning the list of messages that were not successful, and that should be kept in the SQS queue

  1. There are 3 messages in the SQS queue
  2. The lambda polls for the available messages, and picks up the 3 messages
  3. The lambda processes the first 2 messages, but fails to process the last message
  4. When the lambda completes, it returns the unsuccessful messages (message C).
  5. Message C is not removed from the queue. After the visibility timeout has passed, message C is visible again, and available for another message receiver to process.

Here is how to do it in code:

import { SQSHandler, SQSBatchItemFailure, SQSBatchResponse } from "aws-lambda";

export const handler: SQSHandler = async (event, context) => {
  // the list of unsuccessful messages
  const batchItemFailures: SQSBatchItemFailure[] = [];

  for (const record of event.Records) {
    try {
      // process the record
      console.log("Record: %j", record);
    } catch {
      // add message to list
      batchItemFailures.push({ itemIdentifier: record.messageId });
    }
  }

  // return list of unsuccessful messages to SQS
  const response: SQSBatchResponse = { batchItemFailures };
  return response;
};

Dead-letter queue

What if a message keeps on failing to be processed... indefinitely?

I will never be removed from the queue, and the lambda will keep trying to to process it, hence wasting processing resources.

Diagram of a message that will never be removed from the SQS queue

To fix this, you can use a dead-letter queue.

A dead-letter queue is just another SQS queue, with another receiver (a lambda in our case). After a specificied number of attempts to be processed (the maxReceiveCount property), if a message still fails, it will be moved to the dead-letter queue.

The AWS documentation suggests a few good uses for the dead-letter queue:

  • Configure an alarm for any messages moved to a dead-letter queue.
  • Examine logs for exceptions that might have caused messages to be moved to a dead-letter queue.
  • Analyze the contents of messages moved to a dead-letter queue to diagnose software or the producer's or consumer's hardware issues.
  • Determine whether you have given your consumer sufficient time to process messages.

The dead-letter queue is associated to the queue via a redrive policy.

Here is how to setup the dead-letter queue in CDK:


// Step 1: create the dead-letter queue, and its lambda
const dlq = new sqs.Queue(this, "DeadLetterQueue", {
      queueName: "DeadLetterQueue",
      visibilityTimeout: cdk.Duration.seconds(30)
    });

const dlqLambda = new nodejs.NodejsFunction(this, "DeadLetterLambda", {
  entry: lambdaPath,
  handler: "handler",
});

dlqLambda.addEventSource(
  new lambdaEventSources.SqsEventSource(dlq)
);

// Step 2: create a redrive policy
const deadLetterQueue: DeadLetterQueue = {
  queue: dlq,
  maxReceiveCount: 3, // after 3 retries, the message will be moved to the dead-letter queue
}

// Step 3: create the queue, and its lambda
const queue = new sqs.Queue(this, "Queue", {
  queueName: "Queue",
  visibilityTimeout: cdk.Duration.seconds(30),
  deadLetterQueue // assign the dead-letter queue to the queue
});

const lambda = new nodejs.NodejsFunction(this, `"Lambda", {
  entry: lambdaPath,
  handler: "handler",
});

lambda.addEventSource(
  new lambdaEventSources.SqsEventSource(queue)
);

With this code, a message will fails 3 times before being moved to the dead-letter queue.

It will then be processed by the dead-letter queue lambda.

Diagram of a message moved from a SQS queue to a dead-letter queue

Accessing the SNS message from the SQS record

since the SQS record wraps the SNS Message, to access the original SNS message, we need to:

// record is a SQSRecord
const body = JSON.parse(record.body) as {
  Subject: string;
  Message: string;
};

const message = { subject: body.Subject, message: body.Message };

Conclusion

In this tutorial, we have implemented a fan-out pattern with AWS services:

  • SNS (simple notification service)
  • SQS (simple queue service)
  • Lambda
  • CDK in TypeScript

Diagram of the project architecture

Code is available on github