業界・業務から探す
導入目的・課題から探す
データ・AIについて学ぶ
News
Hakkyについて
ウェビナーコラム
◆トップ【データ基盤】
クラウドDWHを比較
データの保守運用
AI

執筆者:Handbook編集部

Snowflakeのコストを毎日Slackへ通知する方法

はじめに

この記事では、Snowflake のコストを毎日 Slack へ通知する方法について紹介します。
この記事の内容を実装することで以下のような Slack メッセージが定期的に送信されるようになります。

環境

実装する環境は AWS を使用します。

  • AWS
  • Lambda(ECR の Docker イメージ使用)
  • Secret Manager(Snowflake のパスワード格納)
  • Event bridge(Lambda を定期実行する)

Terraform

まずは terraform で環境を構築します。

terraform {
  required_version = "~> 1.0.0"
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 3.0"
    }
    random = {
      source  = "hashicorp/random"
      version = "3.4.3"
    }
  }
}
# Slackのwebhook url
variable "slack_webhook_url" {
  type    = string
  default = "https://hooks.slack.com/*********************************"
}
variable "app_name" {
  type    = string
  default = "snowflake-billing-report"
}
resource "aws_secretsmanager_secret" "snowflake_user_password" {
  name = "${var.app_name}-snowflake-user-password"
}
resource "aws_cloudwatch_event_rule" "billing_report_schedule_event" {
  name                = "${var.app_name}-schedule-event"
  description         = "Schedule event to report billing"
  # 毎日9時
  schedule_expression = "cron(0 0 * * ? *)"
}
resource "aws_cloudwatch_event_target" "billing_report_schedule_event_target" {
  rule      = aws_cloudwatch_event_rule.billing_report_schedule_event.name
  target_id = var.app_name
  arn       = aws_lambda_function.billing_lambda.arn
}
resource "aws_lambda_permission" "allow_cloudwatch_to_call_lambda" {
  statement_id  = "AllowExecutionFromCloudWatch"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.billing_lambda.function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.billing_report_schedule_event.arn
locals {
  ecr-lifecycle-policy = {
    rules = [
      {
        action = {
          type = "expire"
        }
        description  = "最新の5つを残してイメージを削除する"
        rulePriority = 1
        selection = {
          countNumber = 5
          countType   = "imageCountMoreThan"
          tagStatus   = "any"
        }
      },
    ]
  }
}
resource "aws_ecr_repository" "billing" {
  encryption_configuration {
    encryption_type = "AES256"
  }
  image_scanning_configuration {
    scan_on_push = "true"
  }
  image_tag_mutability = "IMMUTABLE"
  name                 = var.app_name
}
resource "aws_ecr_lifecycle_policy" "billing" {
  repository = aws_ecr_repository.billing.name
  policy     = jsonencode(local.ecr-lifecycle-policy)
resource "aws_iam_role" "billing_lambda_role" {
  name               = "${var.app_name}-lambda-iam-role"
  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}
resource "aws_iam_role_policy" "billing_lambda_role_policy" {
  name   = "${var.app_name}-lambda-iam-role-policy"
  role   = aws_iam_role.billing_lambda_role.id
  policy = <<POLICY
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "NotifySlackToBillingLambdaPolicy",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogStream",
        "logs:CreateLogGroup",
        "logs:PutLogEvents",
        "logs:PutMetricFilter",
        "logs:PutRetentionPolicy",
        "secretsmanager:GetSecretValue"
      ],
      "Resource": [
        "arn:aws:logs:*",
        "${aws_secretsmanager_secret.snowflake_user_password.arn}"
      ]
    }
  ]
}
POLICY
}
resource "aws_lambda_function" "billing_lambda" {
  function_name = "${var.app_name}-lambda"
  role          = aws_iam_role.billing_lambda_role.arn
  package_type  = "Image"
  image_uri     = "${aws_ecr_repository.billing.repository_url}:v1"
  timeout       = 600
  memory_size   = 128
  lifecycle {
    ignore_changes = [image_uri]
  }
  environment {
    variables = {
      SLACK_WEBHOOK_URL = var.slack_webhook_url
    }
  }
}

terraform apply すると、 ECR にイメージが存在しないため Lambda の作成はエラーになりますが、 それ以外の ECR、Secret Manager、Event bridge が構築されます。
ECR へイメージを push した後で再度 apply することで Lambda が作成されます。

Lambda

Lambda は Docker イメージを使用します。

Dockerfile

FROM public.ecr.aws/lambda/python:3.8
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["lambda.lambda_handler"]

requirements.txt

snowflake-connector-python==2.8.1
urllib3==1.26.12
boto3==1.26.83

lambda.py

Snowflake のコスト情報は、snowflake.organization_usage.usage_in_currency_dailyにあります。

import snowflake.connector
import boto3
import urllib3
import json
import os
import datetime
jst = datetime.timezone(datetime.timedelta(hours=+9), "JST")
SLACK_CHANNEL = "#*******"
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL")
def lambda_handler(event, context) -> None:
    password = get_snowflake_user_password()
    conn = snowflake.connector.connect(
        user='*******',
        password=password,
        account='*****',
        )
    # 1日の場合は先月のコスト取得
    if datetime.datetime.now(jst).strftime("%d") == "01":
        cost = get_snowflake_cost_prev_month(conn)
        period = get_cost_period_prev_month(conn)
    else:
        cost = get_snowflake_cost(conn)
        period = get_cost_period(conn)
    print(cost)
    print(period)
    # Slackに通知するメッセージ生成
    body = text_generator(period, cost)
    print(body)
    # Slackに通知
    resp = notify_slack(body)
    print(resp)
    conn.close()
def get_snowflake_user_password() -> str:
    secret_name = "snowflake-billing-report-snowflake-user-password"
    region_name = "ap-northeast-1"
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=region_name)
    res = client.get_secret_value(SecretId=secret_name)
    secret = res['SecretString']
    password = json.loads(secret)["SNOWFLAKE_USER_PASSWORD"]
    return password
def get_snowflake_cost(conn) -> float:
    cs = conn.cursor()
    try:
        cs.execute("""
        select account_name,
        round(sum(usage_in_currency), 2) as usage_in_currency
        from snowflake.organization_usage.usage_in_currency_daily
        where usage_date BETWEEN DATE_TRUNC('MONTH', current_date()) AND DATE_TRUNC('DAY', current_date())
        group by 1
        """)
        cost = cs.fetchone()
    finally:
        cs.close()
    return float(cost[len(cost)-1])
def get_cost_period(conn) -> list:
    cs = conn.cursor()
    try:
        cs.execute("""
        select
        MIN(usage_date),
        MAX(usage_date)
        from snowflake.organization_usage.usage_in_currency_daily
        where usage_date BETWEEN DATE_TRUNC('MONTH', current_date()) AND DATE_TRUNC('DAY', current_date())
        """)
        period = cs.fetchone()
    finally:
        cs.close()
    period_list = [
        period[0].strftime("%m/%d"),
        period[1].strftime("%m/%d"),
    ]
    return(period_list)
def get_snowflake_cost_prev_month(conn) -> float:
    cs = conn.cursor()
    try:
        cs.execute("""
        select account_name,
        round(sum(usage_in_currency), 2) as usage_in_currency
        from snowflake.organization_usage.usage_in_currency_daily
        where usage_date BETWEEN DATEADD(Month,-1,current_date()) AND LAST_DAY(DATEADD(Month,-1,current_date()))
        group by 1
        """)
        cost = cs.fetchone()
    finally:
        cs.close()
    return float(cost[len(cost)-1])
def get_cost_period_prev_month(conn) -> list:
    cs = conn.cursor()
    try:
        cs.execute("""
        select
        MIN(usage_date),
        MAX(usage_date)
        from snowflake.organization_usage.usage_in_currency_daily
        where usage_date BETWEEN DATEADD(Month,-1,current_date()) AND LAST_DAY(DATEADD(Month,-1,current_date()))
        """)
        period = cs.fetchone()
    finally:
        cs.close()
    period_list = [
        period[0].strftime("%m/%d"),
        period[1].strftime("%m/%d"),
    ]
    return(period_list)
def text_generator(period:list, cost:str) -> str:
    body = f"""
【SNOWFLAKE料金通知】
{period[0]}{period[1]}の請求額は、{cost} USDです。
"""
    return body
def notify_slack(body:str):
    http = urllib3.PoolManager()
    msg = {"channel": SLACK_CHANNEL, "username": "", "text": body, "icon_emoji": ""}
    encoded_msg = json.dumps(msg).encode("utf-8")
    resp = http.request("POST", SLACK_WEBHOOK_URL, body=encoded_msg)
    return resp

ECR にイメージ push

下記シェルスクリプトを実行して ECR にイメージを push します。 イメージを push した後で再度 terraform apply を実施して Lambda を作成してください。

# image_name
image_name=snowflake-billing-report
# lambda_name
lambda_name=snowflake-billing-report-lambda
# アカウントID
account=************
# リージョン名
region='ap-northeast-1'
# ECR最新バージョン取得
prev_tag=`aws --profile *** ecr describe-images --repository-name ${image_name} --query "reverse(sort_by(imageDetails,&amp;imagePushedAt))[0].imageTags[0]"`
prev_tag_ver=`echo $prev_tag | sed -e 's/[^0-9]//g'`
image_tag_ver=$(($prev_tag_ver + 1))
# リポジトリarn
fullname="${account}.dkr.ecr.${region}.amazonaws.com/${image_name}:v${image_tag_ver}"
echo $fullname
# ECRへのログインコマンドを取得し、ログインする
aws ecr get-login-password --region ${region} --profile *** | docker login --username AWS --password-stdin "https://${account}.dkr.ecr.${region}.amazonaws.com"
# コンテナイメージをビルドする
docker build  -t ${image_name} --no-cache .
docker tag ${image_name} ${fullname}
# ECRのリポジトリへプッシュする
docker push ${fullname}
# Lambdaが参照するイメージを更新する(初回実行時はLambdaが存在しないため不要)
aws --profile *** lambda update-function-code --function-name ${lambda_name} --image-uri ${fullname} --publish --region ${region}

まとめ

上記を実装することで、毎日 9 時に Snowflake のコストが Slack へ送られてくるようになります。

参考

info
備考

Hakky では社内のデータ活用やサービスとしてデータ提供を行うためのソリューションを展開しております。

Snowflake など具体的な相談はもちろんのこと、「どんなことをお願いできるのか知りたい」や「こんなことをやりたい」など、ご検討段階でも構いませんので、ぜひお気軽にフォームよりお問い合わせくださいませ。

Hakkyへのお問い合わせ
2025年07月05日に最終更新
読み込み中...