業界・業務から探す
導入目的・課題から探す
データ・AIについて学ぶ
News
Hakkyについて
ウェビナーコラム
◆トップ【データ基盤】
クラウドDWHを比較
Architecture
Redshift ServerlessRedshift
Amplify
データの保守運用
AI

執筆者:Handbook編集部

SNSとSQSとLambdaを使用したメッセージ基盤

概要

この記事では、SNS と SQS と Lambda を使用したメッセージ基盤について紹介します。

アーキテクチャは下図の通りです。

大まかな処理の流れとしては以下の流れとなります。

  1. SNS でメッセージが発行される(失敗した場合はデッドレターキューに送る)
  2. SQS が Lambda にメッセージをキューイングする(失敗した場合はデッドレターキューに送る)
  3. Lambda がイベント起因で駆動する

アーキテクチャ図下部の、DLQ にメッセージが入ったら Slack に通知する箇所については以下の記事で紹介します。

  • DLQ にキューが入ったら Slack へアラートを送信する

Terraform

まずは環境を構築します。 この記事で作成する環境は下図の通りです。
× N個としている箇所については、今回は 3 個にしています。

terraform {
  required_version = "~> 1.0"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 3.0"
    }
    random = {
      source  = "hashicorp/random"
      version = "3.4.3"
    }
    archive = {
      source  = "hashicorp/archive"
      version = "2.3.0"
    }
  }

  backend "s3" {
    bucket  = "***"
    key     = "***/***.tfstate"
    region  = "ap-northeast-1"
    profile = "***"
  }
}

provider "aws" {
  profile = "***"
  region  = "ap-northeast-1"
}

variable "aws_account_id" {
  type    = string
  default = "***"
}

variable "region" {
  type    = string
  default = "ap-northeast-1"
}

variable "app_name" {
  type    = string
  default = "sns-sqs-lambda-demo"
}

variable "slack_webhook_url" {
  type    = string
  default = "https://hooks.slack.com/services/***"
}

variable "api_handler" {
  type = map(any)
  default = {
    api_a = {
      name = "api_a"
    }
    api_b = {
      name = "api_b"
    }
    api_c = {
      name = "api_c"
    }
  }
}

resource "aws_sns_topic" "main" {
  name                        = "${var.app_name}.fifo"
  fifo_topic                  = true
  content_based_deduplication = true
  kms_master_key_id           = "alias/aws/sns"
  policy                      = <<EOF
{
  "Version": "2008-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "__default_statement_ID",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "SNS:Publish",
        "SNS:RemovePermission",
        "SNS:SetTopicAttributes",
        "SNS:DeleteTopic",
        "SNS:ListSubscriptionsByTopic",
        "SNS:GetTopicAttributes",
        "SNS:AddPermission",
        "SNS:Subscribe"
      ],
      "Resource": "arn:aws:sns:${var.region}:${var.aws_account_id}:${var.app_name}.fifo"
    },
    {
      "Sid": "__console_pub_0",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "SNS:Publish",
      "Resource": "arn:aws:sns:${var.region}:${var.aws_account_id}:${var.app_name}.fifo"
    }
  ]
}
EOF
}

resource "aws_sns_topic_subscription" "main" {
  for_each  = var.api_handler
  protocol  = "sqs"
  topic_arn = aws_sns_topic.main.arn
  endpoint  = aws_sqs_queue.main[each.key].arn
  redrive_policy = jsonencode(
    {
      deadLetterTargetArn = aws_sqs_queue.sns_dlq.arn
    }
  )
  raw_message_delivery = true
}

# main sqs
resource "aws_sqs_queue" "main" {
  for_each   = var.api_handler
  name       = "${var.app_name}-${each.value.name}.fifo"
  fifo_queue = true

  content_based_deduplication = false
  delay_seconds               = 0
  receive_wait_time_seconds   = 0
  # リトライ時も適用される
  visibility_timeout_seconds = 60
  message_retention_seconds  = 345600


  policy = jsonencode(
    {
      Id = "__default_policy_ID"
      Statement = [
        {
          Sid       = "__owner_statement"
          Action    = "SQS:*"
          Effect    = "Allow"
          Principal = "*"
          Resource  = "arn:aws:sqs:ap-northeast-1:${var.aws_account_id}:${var.app_name}-${each.value.name}.fifo"
        }
      ]
      Version = "2008-10-17"
    }
  )

  redrive_policy = jsonencode(
    {
      deadLetterTargetArn = aws_sqs_queue.sqs_dlq[each.key].arn
      # lambda向け配信の最大リトライ回数
      maxReceiveCount = 3
    }
  )
}

# sns dlq
resource "aws_sqs_queue" "sns_dlq" {
  name       = "${var.app_name}-sns-dlq.fifo"
  fifo_queue = true

  content_based_deduplication = false
  delay_seconds               = 0
  receive_wait_time_seconds   = 0
  visibility_timeout_seconds  = 30
  message_retention_seconds   = 345600

  policy = jsonencode(
    {
      Id = "__default_policy_ID"
      Statement = [
        {
          Sid      = "__owner_statement"
          Action   = "SQS:*"
          Effect   = "Allow"
          Resource = "arn:aws:sqs:ap-northeast-1:${var.aws_account_id}:${var.app_name}-sns-dlq.fifo"
        },
        {
          Action = "sqs:SendMessage"
          Effect = "Allow"
          Principal = {
            Service = "sns.amazonaws.com"
          }
          Resource = "arn:aws:sqs:ap-northeast-1:${var.aws_account_id}:${var.app_name}-sns-dlq.fifo"
        },
      ]
      Version = "2008-10-17"
    }
  )

  redrive_allow_policy = jsonencode(
    {
      redrivePermission = "allowAll"
    }
  )
}

# sqs dlq
resource "aws_sqs_queue" "sqs_dlq" {
  for_each   = var.api_handler
  name       = "${var.app_name}-${each.value.name}-sqs-dlq.fifo"
  fifo_queue = true

  content_based_deduplication = false
  delay_seconds               = 0
  receive_wait_time_seconds   = 0
  visibility_timeout_seconds  = 30
  message_retention_seconds   = 1209600

  policy = jsonencode(
    {
      Id = "__default_policy_ID"
      Statement = [
        {
          Sid       = "__owner_statement"
          Action    = "SQS:*"
          Effect    = "Allow"
          Principal = "*"
          Resource  = "arn:aws:sqs:ap-northeast-1:${var.aws_account_id}:${var.app_name}-${each.value.name}-sqs-dlq.fifo"
        },
      ]
      Version = "2008-10-17"
    }
  )

  redrive_allow_policy = jsonencode(
    {
      redrivePermission = "allowAll"
    }
  )
}

resource "aws_iam_role" "lambda" {
  name               = "${var.app_name}-lambda-iam-role"
  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

resource "aws_iam_role_policy" "lambda" {
  name   = "${var.app_name}-lambda-iam-role-policy"
  role   = aws_iam_role.lambda.id
  policy = <<POLICY
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:PutMetricFilter",
                "logs:PutRetentionPolicy"
            ],
            "Resource": [
                "arn:aws:logs:${var.region}:${var.aws_account_id}:*"
            ]
        },
        {
            "Action": [
                "s3:*",
                "sqs:*",
                "logs:*"
            ],
            "Effect": "Allow",
            "Resource": "*"
        }
    ]
}
POLICY
}

locals {
  ecr-lifecycle-policy = {
    rules = [
      {
        action = {
          type = "expire"
        }
        description  = "最新の5つを残してイメージを削除する"
        rulePriority = 1
        selection = {
          countNumber = 5
          countType   = "imageCountMoreThan"
          tagStatus   = "any"
        }
      },
    ]
  }
}

resource "aws_ecr_repository" "main" {
  name = var.app_name

  encryption_configuration {
    encryption_type = "AES256"
  }
  image_scanning_configuration {
    scan_on_push = "true"
  }
  image_tag_mutability = "IMMUTABLE"
}

resource "aws_ecr_lifecycle_policy" "main" {
  repository = aws_ecr_repository.main.name
  policy     = jsonencode(local.ecr-lifecycle-policy)
}

resource "aws_lambda_function" "main" {
  for_each      = var.api_handler
  function_name = "${var.app_name}-lambda-${each.value.name}"
  role          = aws_iam_role.lambda.arn
  package_type  = "Image"
  image_uri     = "${aws_ecr_repository.main.repository_url}:v1"
  timeout       = 60
  memory_size   = 128

  lifecycle {
    ignore_changes = [image_uri]
  }

  environment {
    variables = {
      SLACK_WEBHOOK_URL = var.slack_webhook_url
    }
  }
}

resource "aws_lambda_event_source_mapping" "main" {
  for_each         = var.api_handler
  event_source_arn = aws_sqs_queue.main[each.key].arn
  enabled          = true
  batch_size       = 10
  function_name    = aws_lambda_function.main[each.key].arn
}

上記 terraform を apply します。

現時点ではまだ ECR にイメージが存在しないため、Lambda の作成はエラーとなります。そのため、ECR にイメージを push した後で再度 terraform apply する必要があります。

Lambda 用 Docker イメージ作成

次に Lambda 用 Docker イメージを ECR へ push します。

lambda.py

今回はメッセージを確認できれば十分なので、下記のような簡単な関数を使用します。

import json

# for test lambda
def lambda_handler(event, context) -> None:
    body = event['Records'][0]['body']
    print(body)

    return 200

Dockerfile

Dockerfile は下記です。Docker イメージを作成して ECR へ push します。

FROM public.ecr.aws/lambda/python:3.8

COPY . .

CMD ["lambda.lambda_handler"]
info
備考

ECR へイメージを push したら、再度 terraform apply を実施して Lambda を作成します。

メッセージ発行

環境が整ったので、SNS に対してメッセージを発行します。 今回は AWS SDK を使用してメッセージを発行します。

import boto3
import random

TOPIC_ARN = "arn:aws:sns:ap-northeast-1:************:sns-sqs-lambda-demo.fifo"

session = boto3.session.Session(profile_name="***")
client = session.client("sns")

subject = "sample message"
msg = """
sample message
sns-sqs-lambda
boto3
"""

# 重複排除を避けるためIDを重複させない
duplication_id = random.randint(0, 10000)
print(duplication_id)

request = {
"TopicArn": TOPIC_ARN,
"MessageGroupId": "sample",
"MessageDeduplicationId": f'sample-dupulication-id-{duplication_id}',
"Message": msg,
"Subject": subject
}

response = client.publish(**request)
print(response)

上記 python を使用してメッセージを発行してみると、全ての Lambda が実行され、メッセージも渡されていることが確認できます。

デッドレターキューを試す

SNS→SQS→Lambda の順でメッセージを渡せることは確認できました。
次は、意図的に Lambda の処理を失敗させて、デッドレターキューに失敗したキューを格納してみます。

lambda.py

Lambda 関数を下記のように修正して、エラーが発生するようにします。

import json

# for test dead letter queue
def lambda_handler(event, context):
    msg = event['Records'][0]['body']
    print(msg)
    try:
        a = 1 / 0
    except:
        raise Exception('ERROR!!')
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

実行してみる

再度、AWS SDK を使用して SNS にメッセージを発行してみると、全ての Lamdba はそれぞれ 3 回実行され、その後デッドレターキューに格納されていることが分かります。

リトライ回数はaws_sqs_queue.mainmaxReceiveCountで定義しています。

resource "aws_sqs_queue" "main" {
  .
  .
  redrive_policy = jsonencode(
    {
      .
      .
      # lambda向け配信の最大リトライ回数
      maxReceiveCount = 3
    }
  )
}

まとめ

この記事では、SNS と SQS と Lambda を使用したメッセージ基盤の構築について紹介しました。メッセージを複数の処理に渡したい、かつメッセージの発付に失敗した際も確認できるようにしたい場合に有効な構成です。

参考

info
備考

Hakky ではエンジニアを募集中です!まずは話してみたいなどでも構いませんので、ぜひお気軽に採用ページからお問い合わせくださいませ。

2025年07月06日に最終更新
読み込み中...