Real-time Data Aggregation using DynamoDB Streams and AWS Lambda

---- views

DynamoDB streams can be used to aggregate data by using an AWS Lambda function to process the stream records and perform the necessary calculations.

In this tutorial, we will follow the scenario outline in this article.

Imagine a table of invoices, with each invoice consisting of multiple transactions with different amounts. Whenever a new transaction is added to an invoice we want to calculate its total amount.

Here is how it's going to work:

  1. Create an InvoiceTransactions table to store the invoices and their transactions
  2. Make sure to enable the DynamoDB Streams on the InvoiceTransactions table.
  3. Create new table InvoiceTotal to store the total amount for each invoice
  4. Whenever a new transaction is added to an invoice, it will be recorded by the DynamoDB stream
  5. A lambda will be triggered by the stream and update the InvoiceTotal table with the new total amount.

Diagram of lambda function triggered by a DynamoDB stream that aggregates data and writes it to another table

Creating the system

Create the InvoiceTransactions table

An invoice is made of multiple transactions, and each transaction has an amount and date.

The partition key for this table is the InvoiceNumber and the sort is the TransactionId.

Here is the CDK code to create the InvoiceTransactions table:

const invoiceTransactionsTable = new dynamodb.Table(
    this,
    "InvoiceTransactions",
    {
        partitionKey: {
            name: "InvoiceNumber",
            type: dynamodb.AttributeType.STRING,
        },
        sortKey: {
            name: "TransactionId",
            type: dynamodb.AttributeType.STRING,
        },
        stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
        billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
        removalPolicy: cdk.RemovalPolicy.DESTROY,
    }
);

When filled up, the InvoiceTransactions table will look like this:

Partition KeySort KeyAttribute 1Attribute 2
InvoiceNumberTransactionIdAmountInvoiceDate
1212121Client1_trans1$10006062016
1212121Client1_trans2$50006062016
1212122Client2_trans1$20006062016
1212121Client2_trans1$50006062016

Create the InvoiceTotal table

The InvoiceTotal table will store the aggregated total amount for each invoice.

Here is the CDK code for the InvoiceTotal table:

const invoiceTotalTable = new dynamodb.Table(this, "InvoiceTotal", {
    partitionKey: {
        name: "InvoiceNumber",
        type: dynamodb.AttributeType.STRING,
    },
    billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
    removalPolicy: cdk.RemovalPolicy.DESTROY,
});

When filled up, the InvoiceTotal table will look like this:

InvoiceNumberTotalUpdateDate
121212160006062016

Create the lambda to calculate the total amount

Now let's create the lambda that'll update the total amount for each invoice:

const updateInvoiceTotal = new nodejs.NodejsFunction(
    this,
    `UpdateInvoiceTotal`,
    {
        entry: join(__dirname, "..", "functions", "update-invoice-total.ts"),
        handler: "handler",
        logRetention: logs.RetentionDays.ONE_DAY,
        environment: {
            INVOICE_TOTAL_TABLE: invoiceTotalTable.tableName,
        },
    }
);

invoiceTransactionsTable.grantStreamRead(updateInvoiceTotal);
invoiceTotalTable.grantWriteData(updateInvoiceTotal);

Since the lambda will write to the InvoiceTotal table, we pass table name as an environment variable. We also need to set the permissions for the lambda to write to this table.

For the lambda to use the data stream we also need to set the correct permissions on the InvoiceTransactions table with the grantStreamRead method.

The grantStreamRead allows for the following permissions on the table:

The following code will make sure that the lambda is triggered by the DynamoDB stream:

updateInvoiceTotal.addEventSource(
    new lambdaEventSources.DynamoEventSource(invoiceTransactionsTable, {
        startingPosition: lambda.StartingPosition.LATEST,
    })
);

Here is the code for the lambda itself:

import { DynamoDBStreamHandler } from "aws-lambda";
import {
  AttributeValue,
  DynamoDBClient,
  ReturnValue,
} from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, UpdateCommand } from "@aws-sdk/lib-dynamodb";
import { unmarshall } from "@aws-sdk/util-dynamodb";

const client = new DynamoDBClient({});
const ddbDocClient = DynamoDBDocumentClient.from(client);

const { INVOICE_TOTAL_TABLE } = process.env;

export const handler: DynamoDBStreamHandler = async (event) => {
  for (const record of event.Records) {
    if (record.dynamodb && record.dynamodb.NewImage) {
      const newImage = unmarshall(
        record.dynamodb.NewImage as { [key: string]: AttributeValue }
      );

      await ddbDocClient.send(
        new UpdateCommand({
          TableName: INVOICE_TOTAL_TABLE,
          Key: { InvoiceNumber: newImage["InvoiceNumber"] },
          UpdateExpression: `SET #Total = if_not_exists(#Total, :initial) + :num, #UpdateDate = :date`,
          ExpressionAttributeNames: {
            "#Total": "Total",
            "#UpdateDate": "UpdateDate",
          },
          ExpressionAttributeValues: {
            ":num": newImage["Amount"],
            ":initial": 0,
            ":date": newImage["InvoiceDate"],
          },
          ReturnValues: ReturnValue.UPDATED_NEW,
        })
      );
    }
  }
};

We're using the TypeScript types from @types/aws-lambda to help us with type-safety :)

Since the lambda is triggered by a DynamoDB stream, we define the handler function as a DynamoDBStreamHandler, so we can except an event of type DynamoDBStreamEvent. We loop through each record, and if there is a NewImage on the stream record, we update the InvoiceTotal table with the new amount.

It's easier to unmarshall the DynamoDB record, so we can use JavaScript objects.

Testing the system

Let's test the scenario by adding some items to the InvoiceTransactions table:

aws dynamodb put-item \
    --table-name InvoiceTransactions \
    --item '{
      "InvoiceNumber": { "S": "ABC" },
      "TransactionId": { "S": "123" },
      "Amount": { "N": "100" },
      "InvoiceDate": { "S": "06062016" }
    }
  }'

aws dynamodb put-item \
    --table-name InvoiceTransactions \
    --item '{
      "InvoiceNumber": { "S": "ABC" },
      "TransactionId": { "S": "456" },
      "Amount": { "N": "300" },
      "InvoiceDate": { "S": "06062016" }
    }
  }'

We're adding two transactions ($100 and $300) for the same invoice.

After adding the two items, the InvoiceTransactions table contains the two transactions:

InvoiceTransactions:

InvoiceNumberTransactionIdAmountInvoiceDate
ABC123$10006062016
ABC456$30006062016

As expected, the InvoiceTotal table contains the total amount of $400 ($100 + $300) for the invoice:

InvoiceNumberTotalUpdateDate
ABC40006062016

Conclusion

In conclusion, DynamoDB streams and AWS Lambda provide a powerful combination for aggregating data in real-time. By using DynamoDB streams to capture data changes and an AWS Lambda function to process those changes, developers can easily perform calculations and store aggregate data.

Source code available on github