概要
Airflow 利用にあたって知っておいたほうが良いんじゃないかなと思う情報をまとめました。いわゆるハマりどころです。
Airflow 自体の基本的な説明(用語,DAG 定義方法,etc..) は省略しています。
基礎参照先
バージョン
- Airflow 1.10.4
- Postgres 10.7
Tips 11 選
- 1. 日時
- 2. リトライ
- 3. start_date の変更ができない
- 4. 開始日時を過去日時にすると過去ジョブが自動実行される
- 5. context の活用
- 6. Macros
- 7. Task 間で値を受け渡す方法
- 8. dags ディレクトリ以下のディレクトリの切り方
- 9. UI の表示が UTC 固定
- 10. Web UI 上からジョブのパラメータを渡せない
- 11. DAG/Task の同時実行数制御
1. 日時
Airflow は基本的には、日時のみを変数として DAG を実行します。
日時にも種類があるため、各日時について整理します。
execution_date
実行日時。いつ実行されても、リトライで実行されても変わることのない日時。
特定日時のデータを取得したいといったケースの日時指定は、実行日時を利用しておくと良いです。
start_date
DAG の開始日時。実際に DAG が動作した日時。
リトライ時にはリトライした日時になります。
DAG ファイル上では特に利用しないほうが良いです。
end_date
DAG の終了日時。実際に DAG が動作した日時。
schedule_interval
DAG の実行間隔。
上記の用語を元に、初回の Dag 実行までを時系列で追いかけてみます。
条件
2020/01/29 15:30:00+09:00
よりスケジュール実行- 実行間隔は
1min
DAG ファイルを作成
./dag 配下に配置dag = DAG(
'blog_example_execution_date',
schedule_interval=timedelta(minutes=1),
start_date=datetime(
2020, 1, 29, 15, 30, tzinfo=pendulum.timezone('Asia/Tokyo')), # 明示的にタイムゾーンを指定
)
BashOperator(
task_id='test',
dag=dag,
bash_command='echo 1',
)Web UI 上から
OFF
->ON
に変更
start_date に指定した時刻には 何も起きない
start_date + schecule_interval 経過後に 初回 DAG 実行
各パラメータの日時は以下のようになります。(※ 各パラメータの取得方法は5
で後述)
start_date と execution_date がずれていることがわかります。execution_date = 2019/01/29 15:30:00 +09:00
=> DAG に指定した start_date と同一
start_date = 2019/01/29 15:31:00 +09:00
=> DAG に指定した start_date に schedule_interval を足した日時と同一2 回目以降は、schedule_interval 経過後に順次実行
2. リトライ
基本的には Task の状態を Clear
とすることで、リトライができます。
Tree View より指定
○
を Click- Clear を指定
- Clear する Task の一覧が表示され OK を 指定する
- Clear (=リトライ) される
Task Instances から指定
基本的には、 Tree View
の画面より指定してリトライする方法が簡単なのですが、いくつかのケースにて、下記の画面が出てきて失敗するケースがあります。
(筆者の環境では Task 指定の Clear の際に失敗します。)
こちらが発生した際は、他の手段を利用して、Clear できます。
Browse > Task Instances
を指定- 対象 Task を検索
- 対象 Task を指定して Clear
- Clear 完了
3. start_date の変更ができない
一度でも DAG を実行してしまった場合、start_date を変更できない仕様になっています。
DAG ファイル上で変更しても、反映されなくなってしまいます。
(※ 筆者の経験談ですが、DAG が実行されていない ( ON
にしていない) 場合であれば、 start_date の変更が反映されます。)
よくあるケースとしては、毎日03:00実行のDAGを1時間ずらす変更がありますが、簡易にはできません。
1度でも DAG を実行してしまった場合は、下記の手段にて start_date を変更する必要があります。
過去履歴が消えても良い場合
- DAG ファイルを一度 ./dag 配下から削除
- Web UI 上から DAG を削除
- ファイルを復元して開始日時を修正
- ./dag 配下にファイルを配置
過去履歴を残したい場合
DAGファイルをコピーして、別DAG ID として再作成して開始日時を変更する。
4. 開始日時を過去日時にすると過去ジョブが自動実行される
過去日を指定するとデフォルトで、backfillという機能により、過去日から schedule_interval
ごとにDAGを実行します。
実行したくない場合は、 catchup=False
指定により回避できます。
dag = DAG( |
5. context の活用
各 DAG 実行ごとの変数は、context として取得できます。
PythonOperator として呼び出す際は、呼び出す関数に **context
と可変長引数を定義することで扱えます。context には辞書型で値が入ります。
(context として渡ってくる値は、 6
参照)
def print_context(**context): |
6. Macros
context の値は、 { { } }
の構文を利用することでも取得できます。
BashOperator( |
以下が、context で取得できる値一覧です。 (Macros で取得できる値と同義)
https://airflow.apache.org/docs/stable/macros.html
7. Task 間で値を受け渡す方法
処理を Task 粒度で分割して定義することが多いですが、Task間で値を渡したいケースが有るかと思います。その場合は、XComs と呼ばれる機能を利用します。
利用方法は下記の実装のとおりです。
def push(**context): |
8. dags ディレクトリ以下のディレクトリの切り方
下記ファイルを dags 直下に配置することで、実現できます。
再帰的に参照してくれるため、指定したディレクトリ以下でさらにディレクトリを切っても問題ないです。
import os |
9. UI の表示が UTC 固定
Web UI は UTC のみ対応との記載がドキュメントにあるので、現状は UTC 時刻をみて脳内変換しています。
</a loading=”lazy”>
timezone.html#time-zones
回避策はある模様ですが、試したことはないです。
Airflow 日本語化
10. Web UI 上からジョブのパラメータを渡せない
Web UI 上の、 Trigger DAG
より手動での実行は可能ですが、任意のパラメータを指定して実行することはできません。
execution date は Trigger DAG を押下した時刻になります。
CLI で実行する際は、パラメータを渡すことができます。
渡し方は、下記の通りです。
(※ 追記
:筆者のローカルで試したところ DAG が見つからないエラーが出ており、未解消です。)
airflow trigger_dag ${dag_id} -c '{"key": "value"}' |
パラメータの受け取り方
# context より取得 |
https://airflow.apache.org/docs/stable/cli.html#trigger_dag
11. DAG/Task の同時実行数制御
DAG オブジェクトに対して、concurrency/max_active_runs フィールドを指定することで制御できます。max_active_runs
の数値が DAG の最大同時実行数で、 concurrency
の数値が Task の最大同時実行数になります。
dag = DAG( |
Airflow 全体/デフォルトの concurrency/max_active_runs は、airflow.cfg に指定してあります。
# Airflow 全体の task の同時実行数のデフォルト値 |
まとめ
Airflow利用にあたっての Tips をまとめました。
ハマりどころはありますが、ジョブをソースコード管理でき、Pythonで自由に定義できる点が非常に魅力的ですので、利用できるシーンで使っていきたいと思います。
参考
- For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI? - Stack Overflow
- Gotcha’s — ETL Best Practices with Airflow v1.8
- Apache Airflow でエンドユーザーのための機械学習パイプラインを構築する Part4 - programming-soda - Medium
- How to use several dag_folders? Airflow DAGBags. - Iuliia Volkova - Medium