Future Tech Blog
フューチャー技術ブログ

AirflowのTips 11選


概要

Airflow 利用にあたって知っておいたほうが良いんじゃないかなと思う情報をまとめました。いわゆるハマりどころです。

Airflow 自体の基本的な説明(用語,DAG 定義方法,etc..) は省略しています。

基礎参照先

バージョン

  • Airflow 1.10.4
  • Postgres 10.7

Tips 11 選

1. 日時

Airflow は基本的には、日時のみを変数として DAG を実行します。
日時にも種類があるため、各日時について整理します。

execution_date

実行日時。いつ実行されても、リトライで実行されても変わることのない日時。
特定日時のデータを取得したいといったケースの日時指定は、実行日時を利用しておくと良いです。

start_date

DAG の開始日時。実際に DAG が動作した日時。
リトライ時にはリトライした日時になります。
DAG ファイル上では特に利用しないほうが良いです。

end_date

DAG の終了日時。実際に DAG が動作した日時。

schedule_interval

DAG の実行間隔。

上記の用語を元に、初回の Dag 実行までを時系列で追いかけてみます。

条件

  • 2020/01/29 15:30:00+09:00 よりスケジュール実行
  • 実行間隔は 1min
  1. DAG ファイルを作成
    ./dag 配下に配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    dag = DAG(
    'blog_example_execution_date',
    schedule_interval=timedelta(minutes=1),
    start_date=datetime(
    2020, 1, 29, 15, 30, tzinfo=pendulum.timezone('Asia/Tokyo')), # 明示的にタイムゾーンを指定
    )

    BashOperator(
    task_id='test',
    dag=dag,
    bash_command='echo 1',
    )
  1. Web UI 上から OFF -> ON に変更
  1. start_date に指定した時刻には 何も起きない

  2. start_date + schecule_interval 経過後に 初回 DAG 実行
    各パラメータの日時は以下のようになります。(※ 各パラメータの取得方法は 5 で後述)
    start_date と execution_date がずれていることがわかります。

    1
    2
    3
    4
    5
    execution_date = 2019/01/29 15:30:00 +09:00
    => DAG に指定した start_date と同一

    start_date = 2019/01/29 15:31:00 +09:00
    => DAG に指定した start_date に schedule_interval を足した日時と同一
  3. 2 回目以降は、schedule_interval 経過後に順次実行

2. リトライ

基本的には Task の状態を Clear とすることで、リトライができます。

Tree View より指定

  1. を Click
  1. Clear を指定
  1. Clear する Task の一覧が表示され OK を 指定する
  1. Clear (=リトライ) される

Task Instances から指定

基本的には、 Tree View の画面より指定してリトライする方法が簡単なのですが、いくつかのケースにて、下記の画面が出てきて失敗するケースがあります。
(筆者の環境では Task 指定の Clear の際に失敗します。)

こちらが発生した際は、他の手段を利用して、Clear することができます。

  1. Browse > Task Instances を指定
  1. 対象 Task を検索
  1. 対象 Task を指定して Clear

  2. Clear 完了

3. start_date の変更ができない

一度でも DAG を実行してしまった場合、start_date を変更できない仕様になっています。
DAG ファイル上で変更しても、反映されなくなってしまいます。
(※ 筆者の経験談ですが、DAG が実行されていない ( ON にしていない) 場合であれば、 start_date の変更が反映されます。)
よくあるケースとしては、毎日03:00実行のDAGを1時間ずらす変更がありますが、簡易にはできません。

1度でも DAG を実行してしまった場合は、下記の手段にて start_date を変更する必要があります。

過去履歴が消えても良い場合

  1. DAG ファイルを一度 ./dag 配下から削除
  2. Web UI 上から DAG を削除
  3. ファイルを復元して開始日時を修正
  4. ./dag 配下にファイルを配置

過去履歴を残したい場合

DAGファイルをコピーして、別DAG ID として再作成して開始日時を変更する。

4. 開始日時を過去日時にすると過去ジョブが自動実行される

過去日を指定するとデフォルトで、backfillという機能により、過去日から schedule_interval ごとにDAGを実行します。
実行したくない場合は、 catchup=False 指定により回避できます。

1
2
3
4
5
6
7
8
dag = DAG(
'blog_example_backfill',
schedule_interval=timedelta(days=1),
start_date=datetime(
2019, 1, 1, tzinfo=pendulum.timezone('Asia/Tokyo')),
default_args=args,
catchup=False, # こちらを指定
)

5. context の活用

各 DAG 実行ごとの変数は、context として取得することができます。
PythonOperator として呼び出す際は、呼び出す関数に **context と可変長引数を定義することで扱えます。context には辞書型で値が入ります。

(context として渡ってくる値は、 6 参照)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def print_context(**context):
# 最も利用するケースが多い
# pendulum.Pendulum 型 であることに注意
execution_date = context['execution_date']

dag_id = context['dag'].dag_id
task_id = context['task'].task_id

print(execution_date)
print(dag_id)
print(task_id)

pprint.pprint(context)


PythonOperator(
task_id='context',
dag=dag,
python_callable=print_context,
provide_context=True, # context を渡したい場合必須
)

6. Macros

context の値は、 { { } } の構文を利用することでも取得できます。

1
2
3
4
5
BashOperator(
task_id='test',
dag=dag,
bash_command='echo {{ dag.dag_id }}',
)

以下が、context で取得できる値一覧です。 (Macros で取得できる値と同義)
https://airflow.apache.org/docs/stable/macros.html

7. Task 間で値を受け渡す方法

処理を Task 粒度で分割して定義することが多いですが、Task間で値を渡したいケースが有るかと思います。その場合は、XComs と呼ばれる機能を利用します。
利用方法は下記の実装のとおりです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def push(**context):
# 後続の Task に渡したい値を戻り値として定義
return 'hoge'


def pull(**context):
# context 内より task_id を key に取得
v = context['task_instance'].xcom_pull(task_ids='push_task')
print(v)
return


push = PythonOperator(
task_id='push_task',
dag=dag,
python_callable=push,
)

pull = PythonOperator(
task_id='pull_task',
dag=dag,
python_callable=pull,
)

push >> pull

8. dags ディレクトリ以下のディレクトリの切り方

下記ファイルを dags 直下に配置することで、実現できます。
再帰的に参照してくれるため、指定したディレクトリ以下でさらにディレクトリを切っても問題ないです。

1
2
3
4
5
6
7
8
9
10
11
12
import os
from airflow.models import DagBag

# 追加したいディレクトリを指定
dags_dirs = ['~/dags/blog']

for dir in dags_dirs:
dag_bag = DagBag(os.path.expanduser(dir))

if dag_bag:
for dag_id, dag in dag_bag.dags.items():
globals()[dag_id] = dag

9. UI の表示が UTC 固定

Web UI は UTC のみ対応との記載がドキュメントにあるので、現状は UTC 時刻をみて脳内変換しています。

timezone.html#time-zones

回避策はある模様ですが、試したことはないです。
Airflow 日本語化

10. Web UI 上からジョブのパラメータを渡せない

Web UI 上の、 Trigger DAG より手動での実行は可能ですが、任意のパラメータを指定して実行することはできません。
execution date は Trigger DAG を押下した時刻になります。

CLI で実行する際は、パラメータを渡すことができます。
渡し方は、下記の通りです。
(※ 追記 :筆者のローカルで試したところ DAG が見つからないエラーが出ており、未解消です。)

1
airflow trigger_dag ${dag_id} -c '{"key": "value"}'

パラメータの受け取り方

1
2
3
4
5
# context より取得
value = context['dag_run']['conf']['key']

# macros 利用
'{{ DAG_run.conf["key"] }}'

https://airflow.apache.org/docs/stable/cli.html#trigger_dag

11. DAG/Task の同時実行数制御

DAG オブジェクトに対して、concurrency/max_active_runs フィールドを指定することで制御できます。
max_active_runs の数値が DAG の最大同時実行数で、 concurrency の数値が Task の最大同時実行数になります。

1
2
3
4
5
6
7
dag = DAG(
'blog_example_concurrency',
start_date=start_date,
schedule_interval=timedelta(days=1),
concurrency=3, # task は 3つまで同時起動できる
max_active_runs=1, # DAG は 1つまで同時起動できる
)

Airflow 全体/デフォルトの concurrency/max_active_runs は、airflow.cfg に指定してあります。

1
2
3
4
5
6
7
8
# Airflow 全体の task の同時実行数のデフォルト値
parallelism = 32

# DAG ごとの task の同時実行数のデフォルト値
DAG_concurrency = 16

# DAG ごとの DAG の同時実行数 のデフォルト値
max_active_runs_per_DAG = 16

まとめ

Airflow利用にあたっての Tips をまとめました。
ハマりどころはありますが、ジョブをソースコード管理でき、Pythonで自由に定義できる点が非常に魅力的ですので、利用できるシーンで使っていきたいと思います。

参考