サーバーレスでスケールするメール送信システムをAWS CDKで構築する

概要

タイトル通りですが、AWSでサーバーレスなメール送信システムを、AWS CDKを使って構築してみました。

サーバーレスなので、スケールする良い感じの構成になります。


荒いところはあるのですが結構便利なのでよかったらぜひご参考に、またはそのまま使ってくださいという感じで読んでいただけたら光栄です。

Amazon SESを使用してメールを送信していますが、(少し変えれば)SES以外でも送信することができるようになっています。


目次

目次


要件

こんな要件をもとに構築しました。

  • 添付ファイルあり
  • メールはキューイングし、送信失敗時にリトライもしたい
  • AWSで構築されたバックエンドからアクセスしたい
    • APIとして公開するのではなくて良い
  • サーバーレスでスケールしてほしい


前提

言語はアプリケーション・CDK共にTypeScript(ver 4.7.4)、またCDKはv2(ver 2.31.0)を使用しております。


アーキテクチャ

構成図


使用AWSサービス

  • AWS CDK
  • Amazon SES
  • SQS
  • S3
  • DynamoDB
  • Lambda
  • その他
    • SQS(Dead Letter Queue)
    • CloudWatch Alarm
    • SNS Topic
    • Chatbot


コード

アプリケーション・CDKのコード量的にちょっと量が多く、詳細をブログに載せるのは諦めました。。。

GitHubに全コードを載せております。READMEに使い方なども記載しています。

github.com


アーキテクチャ解説と補足

AWS CDK

具体的なリソースの話は次以降の各サービスごとの項目になるのでそんなに書くことはないのですが、何点かピックアップしました。

aws_lambda_nodejs

アプリケーション(Lambda)とインフラ(CDK)にTypeScriptを使用していますが、"aws-cdk-lib/aws-lambda-nodejs"というモジュールで、LambdaとCDKで1つのpackage.jsonを共有して構築することができるのです。

(もちろんそれぞれの依存関係は良い感じに切り分けてくれるそうで、余計なモジュールが入って重くなったりとかもなさそうです。)

docs.aws.amazon.com


ルールとして、CDKスタッククラスのファイル名が「my-construct.ts」の場合に、Lambdaは「my-construct.mailer.ts」というようなファイルの命名が必要です。

そして、NodejsFunctionコンストラクタの第2引数にて拡張子の前の部分(mailer)を指定することで、そのファイルをLambdaのソースコードとして扱ってくれます。

  • CDK例
import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs";
......(省略)
......(省略)
    const mailerHandler = new NodejsFunction(this, "mailer", {
       ...(省略)
    });


※このルールでなく別の名前でも構築可能ですが、ここではその方法には触れません。詳細は公式をご覧下さい。

docs.aws.amazon.com


パラメータ用コンフィグファイル

CDKスタックに渡すパラメータはコンフィグ用TypeScriptファイルで作ってみました。

StackPropsを継承するConfigStackPropsというinterfaceを作成し、スタックのコンストラクタにインジェクト(注入)して渡すことで、ユニットテスト時にパラメータを変更したテストを行えるようになりました。

※テスト用パラメータオブジェクトをinterfaceに沿って作成してテストコード内でスタックインスタンス生成時に渡してテストを行う

  • CDK例
import { StackProps } from "aws-cdk-lib";

export interface Config {
  slackWorkspaceId: string;
  slackChannelId: string;
  senderAddress: string;
}

export interface ConfigStackProps extends StackProps {
  config: Config;
}

export const configStackProps: ConfigStackProps = {
  env: {
    region: "ap-northeast-1",
  },
  config: {
    slackWorkspaceId: "*********",
    slackChannelId: "*********",
    senderAddress: "***@***.***",
  },
};


バリデーション

CDKスタックもバリデーションをしています。zodというバリデーションモジュールで行っています。

こちらの記事と同じことをしているのでよかったらご覧下さい。

go-to-k.hatenablog.com


Amazon SES

AWSでサーバーレスにメールを送るとなると、やっぱりSESですよね。(ただの「SES」と書くとお仕事の方の印象が浮かぶかなと思ってここだけ正式に「Amazon」と書いてます。。。)


ただ最初に書いておいてあれなのですが、SESは構築済みの前提で、このリポジトリでは作成しません。

SESはCloudFormationでは作れず(SPF/DKIM/DMARCなどの設定とかもあるので)CLIやコンソールでの作成になるので、今回はスコープ外としました。(私は普段はCLIスクリプトを作成して構築しています。)


また使用の際には、サンドボックス外への移動(=制限解除)をしている前提になります。が、動くは動くと思います。

docs.aws.amazon.com


SQS

こちらは要件にあったメールのキューイング・リトライ用として用意しています。また、水平スケーリングに有効なサービスですね。

APIという要件ではなくAWSで構築されたバックエンドアプリケーションからのアクセスという要件であったため、アプリケーションはこのSQSにメールのメッセージを投げる形でメール送信システムを使用することになります。


標準キュー

今回要件にはありませんが、同じメールが2回送られるのはNGだという暗黙の要件があるため、メールの重複排除が必要になります。

そこで、まず考えられるのがSQSのFIFOキューかなと思います。


しかし、FIFOキューでもLambdaをトリガーする場合は重複の可能性がある(exactly onceを保証しない)ということがAWS公式ブログで書かれています。

Amazon SQS FIFO queues ensure that the order of processing follows the message order within a message group. However, it does not guarantee only once delivery when used as a Lambda trigger. If only once delivery is important in your serverless application, it’s recommended to make your function idempotent. You could achieve this by tracking a unique attribute of the message using a scalable, low-latency control database like Amazon DynamoDB.

aws.amazon.com


そのためFIFOキューは使わず標準キューにして、後述するDynamoDBを使用して重複排除を行うことにしました。(スケーラビリティや料金という面でも標準キューの方が良いという点もありました。)


可視性タイムアウト

「可視性タイムアウト」とは、コンシューマー(Lambda)がSQSからメッセージを受け取って処理をしている間、まだキューに残っていて処理中のメッセージを他のコンシューマーが処理できないようにするために、指定した時間だけそのメッセージを「見えない」状態にするための設定です。

Lambdaのタイムアウトと同じか少し多めにしておかないと、Lambdaの処理中に他のLambdaが同じメッセージを受け取れるようになってしまうので注意です。

docs.aws.amazon.com


visibilityTimeoutというパラメータで設定が可能です。

  • CDK例
    const queue = new Queue(this, "MailQueue", {
      visibilityTimeout: Duration.seconds(30),
      ...
      ...
    });


ロングポーリング

SQSのデフォルトの挙動である「ショートポーリング」では、SQS内にメッセージが空のときでもポーリング時にSQSがすぐにレスポンスを返してしまい、無駄なポーリングが増えてしまうという特徴があります。

それに対して「ロングポーリング」は、ポーリング時にメッセージが空の場合、指定した時間までメッセージが溜まるのを待機してからSQSがレスポンスを返すため、「ショートポーリング」に比べポーリング回数を削減することができ、効率的であるという特徴があります。

docs.aws.amazon.com


receiveMessageWaitTimeというパラメータで設定が可能です。0より大きい値にすると「ロングポーリング」になります。

  • CDK例
    const queue = new Queue(this, "MailQueue", {
      ...
      receiveMessageWaitTime: Duration.seconds(10),
      ...
      ...
    });


部分バッチ応答

今回SQSをイベントとするLambdaは、1メッセージごとに1Lambda起動するのではなく、1回の起動で複数のメッセージを取得し効率よく処理する「バッチ処理」にしています。

(maxBatchingWindow: 10秒経つか、batchSize: 5個メッセージが貯まるまで待つ)

  • CDK例
    const eventSource = new SqsEventSource(queue, {
      batchSize: 5,
      maxBatchingWindow: Duration.seconds(10),
      reportBatchItemFailures: true,
    });


また、今回SQSの機能の特徴として比較的新しくて便利な、部分バッチ応答を使用しました。

ざっくりいうと、「Lambdaでバッチ処理した複数のSQSメッセージのうち、失敗したメッセージだけをリトライできる」というような機能になります。

aws.amazon.com


こんな感じで、LambdaでSQSの部分バッチ応答によって、失敗したメッセージのIDのみSQSに戻してリトライすることができます。

※catchでthrowさせないで配列に貯めて、最後にreturnするのが特徴です。

  • メール送信Lambda例
export const handler: SQSHandler = async (event: SQSEvent) => {
  const batchItemFailureArray: SQSBatchItemFailure[] = [];

  for (const record of event.Records) {
    try {
      ...
      ...
    } catch (e) {
      const batchItemFailure: SQSBatchItemFailure = {
        itemIdentifier: record.messageId,
      };

      batchItemFailureArray.push(batchItemFailure);
    }
  }

  const sqsBatchResponse: SQSBatchResponse = {
    batchItemFailures: batchItemFailureArray,
  };

  return sqsBatchResponse;
};


SQSの設定としては、reportBatchItemFailuresという1つの設定だけで使用可能になります。

  • CDK例
    const eventSource = new SqsEventSource(queue, {
      ...
      ...
      reportBatchItemFailures: true, // <-こちら
    });


S3

添付ファイル用としてS3を使用しました。

SQSを使用しているのでメッセージに画像をバイナリで載せることも可能ではあるのですが、SQSは256KBがペイロードの上限サイズになり容量的にオーバーしそうな感じがしました。

それを超えるとなると、Java 用のAmazon SQS 拡張クライアントライブラリを使用することになるのですが、TypeScriptでの使用を考えて無理にSQSを通さず、S3を使用することにしました。


使い方としては、SQSとは完全に分けて、別途アップロードするような以下の流れになります。

  • メールシステム呼び出し側でS3に画像をアップロード
  • 画像のパスをメッセージに含めてSQSに渡す
  • Lambdaが受け取ったパスをもとにS3に取りにいく


DynamoDB

条件付き書き込み

上記SQSの標準キューにて説明したように、SQSのFIFOキューの代わりに、メールの重複排除を行うためにDynamoDBを使用しました。

条件付き書き込みによるロック(排他制御によって重複排除を実現するのですが、具体的にはConditionExpressionを使用したputによって実現できます。


これによって、「書き込みが成功した=ロックが確保できたらメールを送信する」、「書き込みに失敗した=ロックが確保できなかったすでにメールが送信されたものと見なしてメールを送信しない」という仕組みを実現しています。

  • メール送信Lambda例
  const params: DynamoDB.DocumentClient.PutItemInput = {
    TableName: tableName,
    Item: {
      LockMailKey: lockMailKey,
      ExpirationUnixTime: expirationUnixTime,
    },
    ConditionExpression: "attribute_not_exists(#hash)",
    ExpressionAttributeNames: {
      "#hash": "LockMailKey",
    },
  };


TTL

DynamoDBはビジネスデータではなく同時起動などの重複排除のための使用用途だったので、しばらくしたら消えても構わないデータです。

そのため、DynamoDBのTTL(TimeToLive)」という機能を使用して、指定した期間後にデータが削除されるようにしてストレージのコスト削減を図っています。


具体的には、DynamoDBのリソース定義でtimeToLiveAttributeというパラメータに、TTLの値(いつ削除するかという削除時刻)を格納するキー名を指定します。

  • CDK例
    const table = new Table(this, "QueueLockTable", {
      ...
      ...
      timeToLiveAttribute: "ExpirationUnixTime",
    });


そして、メール送信Lambdaで実際にテーブルに書き込む際、timeToLiveAttributeで指定したキーに対して削除を実行する時刻のUnixタイムスタンプを入れて書き込みます。

これで、格納した値の時刻になったら自動でデータが削除されるようになり、データの保存料が節約できるというわけです。

  • メール送信Lambda例
  const params: DynamoDB.DocumentClient.PutItemInput = {
    ...
    Item: {
      ...
      ExpirationUnixTime: expirationUnixTime,
    },
    ...
    ...
  };


注意点として、すぐデータを消して良いわけではないので、重複起動がもう起こらないだろうという値を指定してください。

SQSで指定した回数だけリトライされ終わるであろう時刻であったり、SQSのメッセージ保持期間に合わせたり、ワークロードによるかなと思います。

(本システムでは、SQSのメッセージ保持期間のデフォルトである4日に合わせています。)


Lambda

TypeScriptで実装しています。

が、AWS SDK for JavaScript v2を使用しています。どこかでv3に移行しようとは思っています。。。

まだモノとしてはプロトタイプレベルなため、loggerなども使用していないのでまとめて仕上げたいなあと。。。


nodemailer

LambdaでSESのSDKを使用してメール送信しているのですが、nodemailerというメール送信モジュールを使用しています。

nodemailerがSESのメール処理をラップして、SESのSDKよりも楽に送信することができるようになっています。

  • メール送信Lambda例
const ses = new SES({
  apiVersion: "2010-12-01",
});
...
...
const transporter = nodemailer.createTransport({ SES: ses });
...
...
await transporter.sendMail(mailParam);


nodemailerではSES以外でもメールが送れるため、SES環境を構築していない方も少し変えればこのメール送信システムを使用できます!


直列処理

上記SQSで記載したように、今回LambdaはSQSのメッセージをバッチで複数一括処理するように構築しました。

そのためLambda内では複数メッセージを扱うことになるのですが、それらを直列(for ... of)で処理しています。

  • メール送信Lambda例
  for (const record of event.Records) {


これは、一応Amazon SESの送信API制限(スロットル)を加味してこのようにしています。

SQSによって水平スケーリング(Lambdaが並列起動)するようになっているため、Lambda内でも並列(並行)処理してしまうとすぐSESの上限に引っかかるなと思い、このようにしました。

また1メッセージごとに吐くログの可視性を担保する(順番関係なくなるとごっちゃになってわかりづらい)などの(後付けの)理由もあります。


これでもSESのスロットリングが不安な方は、Lambdaの同時実行数(Reseved Concurrency)を制限するとより安心かと思います。

Lambdaの同時実行数を減らす場合スケーラビリティは下がりますが、SQSのキューイング・リトライという保証があるため、メール送信成功の可能性は変わらず担保できるかと思います。


※SES処理は直列にしていますが、複数添付ファイルがあった場合にS3からファイルを取得する部分は並行処理にて行っています。

const attachmentsPromises = mail?.attachedFileKeys.map((attachedFileKey) => {
  return getMailAttachmentFromS3(mail.mailKey, attachedFileKey);
});

mailParam.attachments = await Promise.allSettled(attachmentsPromises).then((results) =>
  results.map((result) => {
    if (result.status === "rejected") {
      throw new Error(result.reason);
    }
    return MailAttachmentSchema.parse(result.value);
  }),
);


バリデーション(zod)

CDKスタックと同じくzodというバリデーションモジュールを使用して、SES(nodemailer)へのパラメータやSQSからのリクエストに対してzodスキーマを定義し、バリデーションを行っています。

  • メール送信Lambda例
import { z } from "zod";

export const MailAttachmentSchema = z.object({
  filename: z.string().min(1).optional(),
  path: z.string().min(1),
});

export type MailAttachment = z.infer<typeof MailAttachmentSchema>;

export const MailParamSchema = z.object({
  from: z.string().min(1),
  to: z.string().min(1),
  subject: z.string(),
  text: z.string(),
  attachments: z.array(MailAttachmentSchema).optional(),
});

export type MailParam = z.infer<typeof MailParamSchema>;


何が言いたかったか?

  • こんなの(サーバーレスなメール送信システム)を作ったよ
  • CDK(TypeScript)で作ったよ
  • まだ荒いけど結構便利なのでよかったらぜひ使ってくださいね


最後に

あまりCDKの話をしない記事になってしまいました。。。

もしもっと詳しく見てみたい方がいらっしゃいましたら、GitHubを見てくださると全て載っております。READMEに使用方法などの詳細も記載してあります。(以下リンク再掲)

github.com



Twitter始めました!

良かったらぜひ。お知り合い増やせたら嬉しいです。

Twitter ID → @365_step_tech