概要
TIG DXチームの多賀です。Airflowの SLA 設定方法を紹介します。
TL;DR
sla_miss_callback 関数は以下の引数が必要
def sla_alert( |
SLA設定
SLA とは Service Level Agreement の略で、サービス提供者が保証する品質を明示する際に利用されます。当 Airflow の SLA は、各 Task が指定の時間内に終わることを保証する形で定義されます。
そのため、Airflow は 各 Task に対して SLA 値(時間) を設定できます。
SLA 値は以下のように Task に対して定義します。10秒以上 Task が実行されると SLA 違反となります。
t1 = PythonOperator( |
SLA 違反をした Task が発生した場合、以下2パターンで通知をできます
- sla_miss_callback (Function) の任意実装
注意点として、SLA 違反のチェックタイミングは、対象タスクの実行完了後ではなく、対象タスクの次のタスクの実行前(別 run_id の可能性もあり) になります。
Task 実行時に、過去の SLA 違反の一覧 (sla_miss テーブル) を取得して通知する仕組みになっています。
また、Task が実行されているいないに関わらず、 start_time と schedule_interval と 現在時刻から 実行されているべき Task を洗い出してエラー通知します。過去分の再実行等を実施すると、現在時刻まで実行されているべき Task まとめて SLA 違反が出たりします。
Email 設定方法
ドキュメントと実装を読む限り、Operator の引数に渡す必要がありそうでした。
email が設定されていない場合は、エラーになることなく Task の実行が完了します。
t1 = PythonOperator( |
sla_miss_callback 設定方法
ドキュメント上は、関数を設定と記載されてますが、以下のように設定しても動きません。
https://airflow.readthedocs.io/en/1.10.11/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
|
Scheduelr のログを見ると以下がはかれています。
[2020-08-11 18:27:00,766] {{scheduler_job.py:541}} ERROR - Could not call sla_miss_callback for DAG example_sla |
修正
ソースコードを grep してみると 以下の実装を見つけました。
引数が必要みたいですね。
https://github.com/apache/airflow/blob/master/airflow/jobs/scheduler_job.py#L455
# schduler_job.py |
DAG の設定を修正してみると、無事に動きました。
def sla_alert( |
sla_miss_callback の引数
引数 | 型 | 値 |
---|---|---|
dag | airflow.DAG | DAG オブジェクト dag.dag_id で ID取得可能 |
task_list | str | 過去のSLA エラーの Task の List 以下フォーマットの文字列リスト ${task_id} on ${execution_date} |
blocking_task_list | str | 過去の失敗した Task の List 以下フォーマットの文字列リスト ${task_id} on ${execution_date} |
slas | List[airflow.models.SlaMiss] | 過去のSLA エラーオブジェクトの List |
blocking_tis | List[airflow.models.TaskInstance] | 失敗した TaskInstance の List |
SlaMiss
https://airflow.apache.org/docs/stable/_api/airflow/models/slamiss/index.html
TaskInstance
https://airflow.apache.org/docs/stable/_api/airflow/models/taskinstance/index.html
所感
ドキュメントに記載してほしいところでしたが、見つけられなかったため別途整理しました。