コードリーディングでAirflowの仕組みを理解する
こんにちは。Nowcastでエンジニアをしている片山(@fozzhey)です。
NowcastではワークフローマネジメントツールとしてAirflowを採用しています。 Airflowは日本でもよく採用されており、実際に活用されている方も多いと思います。
しかし、Airflowを使うことはできるけど、実際にどういう仕組みで動いているのかは分からない方が多いのではないでしょうか?
せっかくPythonで書かれているのに、Airflowのコードを読まないのはもったいない!
ということで、この記事ではAirflowのコードリーディングを行いたいと思います。
なるべくコードやGithubのリンクを貼っていますが、手元のエディターでAirflowのリポジトリを開きながら読んでいただくとより理解が深まると思います。
コードリーディングの題材
題材とするDAGとタスク
この記事ではAirflowが提供するサンプルDAGの中から、example_bash_operator
を題材として扱います。
DAG中のrunme_0
というタスクをローカル環境で実行し、実行結果が表示されるまでの流れを実際にコードを読みながら追っていきます。
example_bash_operator
をWEB UIから見るとこんな感じです。
DAGのコードはこちらから確認できます。
コードを見ると、実行対象のrunme_0
は以下の様に定義されていることが分かります。
bash_command
の部分を見るとrunme_0
は何かしらの文字列をechoした後、1秒間sleepするだけの簡単なタスクであることが分かります。 後述しますが{{ task_instance_key_str }}
には実行時に実際の文字列がレンダリングされます。
実際に実行してみる
では、実際にタスクを実行してみましょう。ローカル環境の起動方法についてはこの記事では解説しませんが、Airflow Breezeを使ってやるのが無難かと思います。 Airflow Breezeについてはこちらの記事が参考になるので、気になる方はチェックしてみてください。
タスク実行は以下のコマンドを実行するだけで行うことが可能です。
>>> airflow tasks test example_bash_operator runme_0 2021-06-20
普通に実行するならairflow tasks run
を使うのですが、依存関係や副作用が発生するので、今回はairflow tasks test
を使っています。 こうすることでタスクだけを冪等に実行することが可能です。
以下が実行結果とになります。
赤色で囲った箇所を見ると、example_bash_operator__runme0__20210620
という文字列が標準出力されているのが分かります。 こちらは先ほどの{{ task_instance_key_str }}
がレンダリングされた後の文字列になります。
タスクとは何か?
タスクの実行フローを追う前に、まずはAirflowにおけるタスクとは何か解説します。
Airflowにおいてタスクは実行の最小単位であり、BaseOperator
というクラスのサブクラスとして実装されています。
以下、この記事ではBaseOperator
のサブクラスのことを Operator と呼ぶことにします。 今回読み進めるコード中では、Operatorのインスタンスはtask
という変数名で用いられます。
公式の説明はこちら。
Operatorはざっくりまとめると以下の情報を保持してます。
- どんなタスクを実行するか
- どうやってそのタスクを実行するか
- そのタスクの前後の依存関係
※厳密には他にも色んな情報を管理していますが。
タスクの実例を見てみる
こちらは今回実行するexample_bash_operator
のrunme_0
が定義されている箇所のコードです。 先ほども紹介しましたが、今回は補足情報を足してみました。
先ほどあげた3つの情報が確かに含まれていることがわかります。
TaskInstanceとは?
タスクを理解する上で、もう一つ重要なクラスがTaskInstance
です。
TaskInstance
はあるexecution_date
におけるタスクの実行を管理するクラスです。 TaskInstance
の主な責務は以下の2つになります。
- タスクの実行 タスクの実行に必要な情報を取得し、タスクを実行する。
- 実行したタスクのステータスの管理 タスクの実行ステータスを管理する。また、ステータスをDBに格納し永続化する。
公式の説明はこちら。
タスクとTaskInstanceの関係性
TaskInstance
のインスタンスは上記の様にOperator
のインスタンスとタスクの実行日から生成されます。
コードでは以下の様にTaskInstance
のインスタンスが生成されています。
ti = TaskInstance(
task, # Operatorのインスタンス
execution_date # 実行日
)
確かに、引数としてタスクと実行日が渡されているのが分かります。
タスクとTaskInstanceの関係性は、AirflowのTree Viewを見ると直感的に理解できます。
1つのタスクに対して実行日/実行時間ごとに複数のTaskInstanceが作られることが分かりますね。
タスクの実行
公式ドキュメントではざっくり「TaskInstance
はステータス管理のためのもの」という説明になっていますが、実際にコードを読んでみると「タスクの実行」もその重要な責務であることが分かります。
以下にタスク実行に関連する処理をいくつか挙げてみます。
- テンプレートへのレンダリングの実行
- XComのデータ管理
Operator
のexecute()
を呼び出す
※他にもたくさんあります。
この様に、TaskInstance
は対応するタスクを実行するために必要な情報を取得/生成し、テンプレートのレンダリング等の事前処理を行い、Operator
のメソッドを呼び出してタスクを開始する重要な役割を担っています。
登場人物と処理の流れ
Airflowの基本コンセプトをおさらいしたので、コードの話に入ります。具体的な処理を追う前に、処理の大まかなフローとそこで登場するクラスの説明を行います。
クラス図
次に、タスクの実行に関わるクラスとそれらの属性/メソッドを整理していきます。 主要なクラスとメソッドをまとめたクラス図が以下です。
今回の実行フローにおいて、それぞれのクラスは大まかに以下のような役割を持ちます。
DAG(Github)
Operator
のインスタンス(タスク)を保持する
DagBag(Github)
- DAGの定義ファイルの読み込み、
DAG
インスタンスを生成する
BashOperator(Github)
- bashコマンドの保持とテンプレートへのレンダリング
- bashコマンドの実行
TaskInstance(Github)
- タスク実行の制御
- Contextの生成
処理の流れ
CLIコマンドの実行からタスクの完了までの処理の流れをまとめると以下の様になります。 先ほどクラス図で紹介してクラスが連携して以下の様な1つの処理が実行されます。
[タスク実行の流れ]
- CLIコマンドのパーシング
DAG
の生成BashOpeartor
の取得TaskInstance
の生成- タスクの実行
・Contextの生成
・テンプレートのレンダリング
・Bashの実行
以降では、この流れに沿って実際のコードを読んでいきます!
1. CLIコマンドのパーシング
処理の始まり
>>> airflow tasks test example_bash_operator runme_0 2021-06-20
冒頭で紹介したコマンドをコマンドラインからairflow
コマンドを実行すると airflow/__main__.py
が実行されて処理が始まります。
実際の処理はargs.func(args)
の部分から開始されます。
コマンドのパーシング
__main__.py
ではCLIから渡されたコマンドをパーシングし、関数を実行することで処理が始まりますが、このパーシングの仕組みを詳しくみていきましょう。
パーシングを責務とするパーサーは以下の箇所で生成されます。
parser = cli_parser.get_parser()
このパーサーはPythonの標準ライブラリであるargparseで実装されています。 argparseはpythonのコマンドライン引数を柔軟に扱うことができるライブラリで、僕も普段の開発で重宝しています。
AirlowではCLIコマンドごとに対応する関数が用意されており、airflow/cli/commands
に格納されています。
こちらがコマンドの一覧になります。commandsディレクトリにあるファイルとよく見比べると、各コマンドに対応するファイルがあるのが分かります。 例えば、dagsコマンドはdag_command.py
と対応します。
今回実行するコマンドであるairflow tasks test
にはairflow/cli/commands/task_command.py
のtask_test()
という関数が対応します。 parserによりtasks test
という引数とtask_test()
が関連付けられ、args.func(args)
とすることで関連づけられた関数を実行することができます。
大まかに説明するとargs
にはexample_bash_operator
runme_0
2021-06-23
が格納されています。
※他にもデフォルト引数が色々格納されているし、厳密にはtask_test()
との関連もargs
に含まれる
まとめると、parserは与えられたコマンドを以下の様に解釈し、処理を開始します。
task_test()
の呼び出し
次に、task_test()
の中身を追っていきましょう。task_test()
のソースコードは以下の通りです。
※全体の流れに対応する箇所にコメントを足しています。
[タスク実行の流れ]で整理した流れと、コードが一致していることが分かります。
以降では、それぞれの処理の詳細を追っていきます。
2. DAG
の生成
get_dag()
によるDAG
インスタンスの取得
Airflowではタスク(Operatorのインスタンス)はDAG
インスタンスに格納されます。 なので、タスクを実行するためには、まずDAG
インスタンスを生成する必要があります。
task_test()
の中では、DAG
インスタンスはget_dag()
というutil関数から取得されています。
これを見ると、DagBag
のdags
という属性からdag_id
をKeyとしてDAG
のインスタンスを取得していることが分かります。 先述のクラス図でも紹介しましたが、DagBag
はDAG
インスタンスを管理するクラスです。
では、どこでdags
にDAG
インスタンスが格納されているのでしょうか?
DAGの定義ファイルの読み込み
Airflowにおいて、DAG
インスタンスの生成はDAGの定義ファイルの中で行われます。
そして、DAGの定義ファイルを読み込み、DAG
インスタンスをdags
へ格納する処理はDagBag
のprocess_file()
というメソッドによって行われます。
process_file()
は以下の順で定義ファイルを読み込みます。
- DAGの定義ファイルをモジュールとして読み込む。
- 読み込んだモジュールに含まれる全てのオブジェクトから
DAG
のインスタンスを抽出する。
以降で各処理の詳細を見ていきます。
定義ファイルをモジュールとして読み込む
ファイルの読み込みはDagBagの_load_modules_from_file()
というメソッド(Github)で行われます。 以下に、実際にファイルを読み込んでいる箇所を抜粋しました。
file_path
は定義ファイルのパスです。 Pythonの標準ライブラリのimportlib
を用いて、定義ファイルがnew_module
として読み込まれていることが分かります。
モジュールからDAG
インスタンスを抽出
DAG
インスタンスの抽出は_process_modules()
というメソッド(Github)で実行されます。
以下の箇所で読み込んだモジュールの中からDAG
インスタンスのみを抽出し、top_level_dags
という配列に格納しています。
top_level_dags = [o for m in mods for o in list(m.__dict__.values()) if isinstance(o, DAG)]
内包表記がネストされており見にくいですが、if isinstance(o, DAG)
の部分であるobjectがDAG
のインスタンスかどうかをチェックしています。
そして、以下の箇所でDagBagのbag_dag()
メソッドを呼び出し、DAG
インスタンスをdags
に追加しています。
self.bag_dag(dag=dag, root_dag=dag)
これで、ようやく定義ファイルからDAG
インスタンスを取得することができました!
3. BashOpeartor
の取得
BashOpeartor
インスタンスの取得
DAG
インスタンスが取得できたので、次はBashOpeartor
インスタンス(タスク)を取得しましょう。BashOpeartor
インスタンスの取得はget_task()
というDAG
のメソッドで行うことができます。
内容は以下の通りで、task_dict
というDAG
の属性からタスクを取得するだけの簡単な処理です。 task_dict
はKeyがtask_id
で、ValueがOperatorのインスタンスの辞書オブジェクトになります。
タスクとDAGの紐付け
では、BashOpeartor
インスタンスはどこでDAG
インスタンスに格納されるのでしょうか?
今回例として取り上げたDAGの定義ファイルではContext Managerを使った少しトリッキーな方法でDAG
インスタンスにBashOpeartor
インスタンスを格納しています。 このプロセスは理解できると楽しいのですが、それだけでブログが1本書けそうなので今回は割愛します。
一旦、生成されたBashOperator
インスタンスは、DAG
インスタンスのtask_dict
に格納されるものだと考えてください。
4. TaskInstanceの生成
TaskInstanceの取得
BashOpeartor
インスタンスが取得できたので、次にTaskInstance
のインスタンスの生成に移ります。先述の通り、Airflowにおいてタスクを実行するにはTaskInstance
のインスタンスを生成する必要があります。
ここでtask_test()
の説明に戻りましょう。TaskInstance
のインスタンスは以下で生成されます。
ti = TaskInstance(task, args.execution_date)
その後、以下の箇所からタスクの実行が開始されます。ti.run()
の部分です。
ti.run()
の実行を起点に連鎖的にいくつかの関数が呼び出され、TaskInstance
のメソッドである_run_raw_task()
にたどり着きます。
このメソッドがタスクの実行を司る重要な関数です。以降でこのメソッドの中身を解説していきます。
5. タスクの実行
タスク実行の流れ
_run_raw_task()
(Github)以降の処理の大まかな流れは以下の様になります。
- Contextの生成
- テンプレートのレンダリング
- Bashの実行
本番環境でAirflowを動かすと他にもたくさんの処理が実行されますが、今回はタスクの単体実行の流れを追うだけなので大きく3つくらいに収まります。
Contextとは?
ContextとはTaskの実行にまつわる情報を含む辞書オブジェクトのことです。 Airflowを普段から使っている方にとってはお馴染みかと思います。
Contextは後述のJinjaテンプレート内でアクセスできたり、直接タスクに引数として渡すことができます。
ちなみに、Contextには主に以下の情報が含まれます。
execution_date
などの日付関連の情報DAG
,DagRun
,TaskInstance
など、タスクの実行に関連するクラスのインスタンス- Variables
- マクロ関数
conf
(設定情報を管理するオブジェクト)
Contextから取得できる値の一覧はこちらの公式ドキュメントで確認できます。
Contextの生成
Contextの生成はget_template_context()
(Github)という関数によって行われます。
こちらは、_run_raw_task()
のcontextを生成している箇所です。
context = self.get_template_context()
の箇所でget_template_context()
からContextが生成されているのが分かります。
説明は省略しますが、get_template_context()
の終わりに大きめの辞書オブジェクトを生成しており、それがContextです。 興味のある方は見てみてください。「普段何気なく使っているContextがここで生成されてるのか」という気持ちになれます。
Contextの生成後、_prepare_and_execute_task_with_callbacks()
(Github)が実行され、いよいよタスクの実行が開始されます!
Jinjaによるレンダリング
Airflowではタスクの定義時にJinjaのテンプレートを用いることができ、テンプレート内ではContextの値を用いることができます。こちらの機能もAirflowユーザーからすればお馴染みですね。
公式の説明はこちら。
普段何気なく使っているテンプレートですが、どのタイミングでレンダリングが行われるかは知らない方も多いのではないでしょうか?
コードを読んでいくと、テンプレートはタスクの実行時にTastInstance
によってレンダリングされることがわかります。
今回の例ではタスクの定義時、bash_command
に以下の値を指定しています。
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
task_instance_key_str
はContextから取得できる情報の1つで、{dag_id}__{task_id}__{execution_date}
の様な文字列となります。
では、実際にこの値がレンダリングされる過程を見ていきましょう。
レンダリングの実施
レンダリングは_prepare_and_execute_task_with_callbacks()
内の以下の箇所で行われます。
self.render_templates(context=context)
この関数はBaseOperator
のrender_template_fields()
(Github)を実行しており、この関数が実際にレンダリングを行っている関数となっています。
render_template_fields()
はさらに_do_render_template_fields()
を呼び出します。 _do_render_template_fields()
の内容は以下の通りです。処理が少しややこしいのでコメントを足しています。
parent
には自身(self
)が渡されています。 また、template_fields
はレンダリングの対象となるBashOperator
の属性名が格納されています。 これより、content
にはレンダリング対象の属性の値が格納されることが分かります。
レンダリングの実行部分で呼び出されているrender_template()
は、content
内のテンプレートをcontext
の値でレンダリングするJinjaの薄いラッパーです。 以下の箇所でレンダリングを行っています。
ちなみにレンダリングの対象となる属性を指定するtemplate_fields
はサブクラスの中で指定されています。BashOperator
のtemplate_fields
は以下の箇所で指定されており、bash_command
がレンダリングの対象になっていることがわかります。
template_fields = ('bash_command', 'env')
少し複雑な処理でしたが、テンプレートのレンダリングのフローを追うことができました。
Bashの実行
Contextを取得し、その値のレンダリングも完了し、次はいよいよタスクの実行です!
タスクの実行は_prepare_and_execute_task_with_callbacks()
から_execute_task()
という関数が呼ばれることで行われます。
_execute_task()
はBaseOperator
のメソッドのexecute()
を呼び出すだけの関数で実質的なタスクの実行はこの関数で行われます。execute()
はBaseOperator
の各サブクラスで実装されます。
今回用いているBashOperator
の実装箇所はこちらです。以下の箇所でsubprocess
ライブラリを用いてbash_command
が実行されていることが分かります!!
最後に
長くなりましたが、これでCLIコマンドの実行からBashが実行されるまでの流れを追うことができました! 実際にコードを追うことでAirflowへの理解が深まってのではないでしょうか?
Airflowには他にも面白い機能がたくさんあるので、今後もこういうブログを出していきたいなと思います。
Twitterもやってますので、よかったらフォローしてください。Airflowに限らず新しい技術をどんどん取り入れながら、データプロダクトを一緒に開発してくれるデータエンジニアを募集しています!
エンジニア向けのイベントや、開発組織についてもっと知りたい方は、以下に最新情報をまとめていますのでぜひチェックしてみてください!