フューチャー技術ブログ

AWSマネージドAirflow(MWAA)についてのFAQ

概要

フューチャー Advent Calendar 6日目 です。TIG DXチーム所属の多賀です。

AWS マネージド Airflow (MWAA) が 2020/11/24 にリリースされました。

サービスを利用するにあたって知りたかったことを調査し、FAQ ベースで整理しましたので公開します。

[Airflow logos](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+logos) からの画像

MWAA (Managed Workflow for Apache Airflow) とは?

Airflow のマネージドサービスで、インスタンスやDB管理不要で、Airflowを利用できます。Airflow 完全互換を謳っており、フォークしたソースではなく、Airflow 本体が利用されています。Auto Scaling に対応しており、 worker 数を設定した最大数まで自動でスケールアップしてくれます。また、ログインのための、ユーザー権限制御に IAM を利用しており、詳細な権限制御が可能です。

MWAA 公式ドキュメント

MWAA FAQ

Airflow のバージョンは?

現状(2020/12/03)は 1.10.12 のみが指定できます。
パッチバージョンアップグレードは7日以内、マイナーバージョンアップグレードは30日以内に自動で実行されます。
アップグレードに失敗した場合は、自動復旧されます。

Executorはどのタイプ?

Celery Executor を利用している旨が、公式ドキュメントに記載されています。

Celery Executor の構成は以下の図の通りです。

Architecture-Celery Executor からの画像

プロセスの種類は全部で3つで、webserver, scheduler と worker になります。
履歴やメタデータ管理のため、DB (metadata DB) があります。scheduler と worker の間に キューが存在しているはずですが、公式には記載されていません。(Elatsic Cache (Redis) の可能性が高いかなと推測してます。)

各プロセスは Fargate を利用しており、コンテナ起動です。scheduler と worker は VPC 内での実行が保証されています。metadata DB と webserver は サービスアカウントレベルで共有して利用する模様です。

DAGの定義と配置方法は?

DAGファイルは S3 に配置することで、自動で読み込みを実施してくれます。

配置先は、MWAA 作成時に指定します(更新も可能です)。plugin も同様に S3 に配置します。 配置する際は、plugin のみ zip に固めます。

※ S3 バケット名は、 airflow- プレフィックスで始まる必要があります。

Python のライブラリの取得方法は?

requirements.txt を S3 に配置することで、ライブラリを読み込んでくれます。
配置先は MWAA に設定します。

階層化した dag の読み込みは可能か?

Airflow 本来の実装方法と変わらずに実現できます。
以下コードを ./dags 直下に指定します。

import os
from airflow.models import DagBag
# ディレクトリを指定
dags_dirs = ['~/dags/sample']

for dir in dags_dirs:
dag_bag = DagBag(os.path.expanduser(dir))

if dag_bag:
for dag_id, dag in dag_bag.dags.items():
globals()[dag_id] = dag

S3 への配置方法は、以下です。

.
└── dags
├── add_dag_bags.py
└── sample
└── test_dag.py

Airflow UI へのアクセス方法は?

AWS コンソール上に UI へのリンクが表示されます。

上記リンクを押下すると、認証を自動で実施後に以下の画面が表示されます。

ちなみに IAM での認証が必須のため、直接URLにアクセスするとログインを求められます。

metadata DBの移行は可能?

現在(2020/12/03) サポートされていません。過去の実行履歴は metadata DB に保持されているため、現状MWAA へ移行する際は、履歴なしでの移行になります。

ネットワーク構成は?

VPC の設定が必須です。

構成としては、以下が必要との記載があります。

詳細はこちら

  • public subnet: 2リージョン
  • private subnet: 2リージョン
  • inbound/outbound all security group
  • Elastic IP: 2つ
  • Internet Gateway
  • Nat Gateway

※ 適当に subnet 指定した場合、起動しませんでした.。

Airflow CLIの実行方法は?

CLI は http ごしに実行できます。

(1) aws cli でトークンを取得

aws mwaa create-cli-token --name ${airflow name}
{
"CliToken": "${トークン}",
"WebServerHostname": "${ホスト名}"
}

(2) airflow cli を実行
リクエスト Body にコマンドを指定します。標準出力と、標準エラー出力が base64 エンコードされて返ってきます。

export WEB_SERVER_HOSTNAME="${ホスト名}"
export CLI_TOKEN="${トークン}"
curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
--header "Authorization: Bearer $CLI_TOKEN" \
--header "Content-Type: text/plain" \
--data-raw "version"
{
"stderr": "",
"stdout": "Q2xvdWR3YXRjaCBsb2dnaW5nIGlzIGRpc2FibGVkIGZvciBDbG91ZHdhdGNoUHJvY2Vzc29ySGFuZGxlcgoxLjEwLjEyCg=="
}

デコードすると、コマンド実行結果を表示できます。

echo 'Q2xvdWR3YXRjaCBsb2dnaW5nIGlzIGRpc2FibGVkIGZvciBDbG91ZHdhdGNoUHJvY2Vzc29ySGFuZGxlcgoxLjEwLjEyCg==' | base64 -D
Cloudwatch logging is disabled for CloudwatchProcessorHandler
1.10.12

ワンタイムログイントークンの発行方法は?

Airflow UI にAWS コンソールからでなく、トークンのみでログインさせることもできます。
まず、ログイン用のトークンを取得します。トークンは60秒間のみ、有効です。

❯ aws mwaa create-web-login-token --name MyAirflowEnvironment --profile midori
{
"WebServerHostname": "${ホスト名}",
"WebToken": "${トークン}"
}

取得したトークンを # 以降に指定して、URLアクセスすることでログインできます。

https://${WebServerHostname}/aws_mwaa/aws-console-sso?login=true#${WebTokenを指定}

感想

MWAA を利用するにあたって、気になる点を調べてみました。

EC2 上への構築では、EC2複数台、RDS、Redis と管理するコンポーネントが多かったので、マネージドで気軽に利用できるようになり、より今後広がりを見せるかなと思います。

現状、機能的には十分足りており、実利用は問題なさそうです。ただ、metadata DBの移行はできないので機能としてサポートされると嬉しいですね。

参考