コードリーディングでAirflowの仕組みを理解する

Ryohei Katayama
The Finatext Tech Blog
20 min readAug 29, 2021

--

こんにちは。Nowcastでエンジニアをしている片山(@fozzhey)です。

NowcastではワークフローマネジメントツールとしてAirflowを採用しています。 Airflowは日本でもよく採用されており、実際に活用されている方も多いと思います。
しかし、Airflowを使うことはできるけど、実際にどういう仕組みで動いているのかは分からない方が多いのではないでしょうか?

せっかくPythonで書かれているのに、Airflowのコードを読まないのはもったいない!
ということで、この記事ではAirflowのコードリーディングを行いたいと思います。

なるべくコードやGithubのリンクを貼っていますが、手元のエディターでAirflowのリポジトリを開きながら読んでいただくとより理解が深まると思います。

コードリーディングの題材

題材とするDAGとタスク

この記事ではAirflowが提供するサンプルDAGの中から、example_bash_operatorを題材として扱います。
DAG中のrunme_0というタスクをローカル環境で実行し、実行結果が表示されるまでの流れを実際にコードを読みながら追っていきます。

example_bash_operatorをWEB UIから見るとこんな感じです。

DAGのコードはこちらから確認できます。
コードを見ると、実行対象のrunme_0は以下の様に定義されていることが分かります。

Github

bash_commandの部分を見るとrunme_0は何かしらの文字列をechoした後、1秒間sleepするだけの簡単なタスクであることが分かります。 後述しますが{{ task_instance_key_str }}には実行時に実際の文字列がレンダリングされます。

実際に実行してみる

では、実際にタスクを実行してみましょう。ローカル環境の起動方法についてはこの記事では解説しませんが、Airflow Breezeを使ってやるのが無難かと思います。 Airflow Breezeについてはこちらの記事が参考になるので、気になる方はチェックしてみてください。

タスク実行は以下のコマンドを実行するだけで行うことが可能です。

>>> airflow tasks test example_bash_operator runme_0 2021-06-20

普通に実行するならairflow tasks runを使うのですが、依存関係や副作用が発生するので、今回はairflow tasks testを使っています。 こうすることでタスクだけを冪等に実行することが可能です。

以下が実行結果とになります。

赤色で囲った箇所を見ると、example_bash_operator__runme0__20210620という文字列が標準出力されているのが分かります。 こちらは先ほどの{{ task_instance_key_str }}がレンダリングされた後の文字列になります。

タスクとは何か?

タスクの実行フローを追う前に、まずはAirflowにおけるタスクとは何か解説します。

Airflowにおいてタスクは実行の最小単位であり、BaseOperatorというクラスのサブクラスとして実装されています。
以下、この記事ではBaseOperatorのサブクラスのことを Operator と呼ぶことにします。 今回読み進めるコード中では、Operatorのインスタンスはtaskという変数名で用いられます。
公式の説明はこちら

Operatorはざっくりまとめると以下の情報を保持してます。

  • どんなタスクを実行するか
  • どうやってそのタスクを実行するか
  • そのタスクの前後の依存関係

※厳密には他にも色んな情報を管理していますが。

タスクの実例を見てみる

こちらは今回実行するexample_bash_operatorrunme_0が定義されている箇所のコードです。 先ほども紹介しましたが、今回は補足情報を足してみました。

先ほどあげた3つの情報が確かに含まれていることがわかります。

TaskInstanceとは?

タスクを理解する上で、もう一つ重要なクラスがTaskInstanceです。

TaskInstanceはあるexecution_dateにおけるタスクの実行を管理するクラスです。 TaskInstanceの主な責務は以下の2つになります。

  • タスクの実行 タスクの実行に必要な情報を取得し、タスクを実行する。
  • 実行したタスクのステータスの管理 タスクの実行ステータスを管理する。また、ステータスをDBに格納し永続化する。

公式の説明はこちら

タスクとTaskInstanceの関係性

TaskInstanceのインスタンスは上記の様にOperatorのインスタンスとタスクの実行日から生成されます。
コードでは以下の様にTaskInstanceのインスタンスが生成されています。

ti = TaskInstance(
task, # Operatorのインスタンス
execution_date # 実行日
)

確かに、引数としてタスクと実行日が渡されているのが分かります。
タスクとTaskInstanceの関係性は、AirflowのTree Viewを見ると直感的に理解できます。

1つのタスクに対して実行日/実行時間ごとに複数のTaskInstanceが作られることが分かりますね。

タスクの実行

公式ドキュメントではざっくり「TaskInstanceはステータス管理のためのもの」という説明になっていますが、実際にコードを読んでみると「タスクの実行」もその重要な責務であることが分かります。

以下にタスク実行に関連する処理をいくつか挙げてみます。

  • テンプレートへのレンダリングの実行
  • XComのデータ管理
  • Operatorexecute()を呼び出す

※他にもたくさんあります。

この様に、TaskInstanceは対応するタスクを実行するために必要な情報を取得/生成し、テンプレートのレンダリング等の事前処理を行い、Operatorのメソッドを呼び出してタスクを開始する重要な役割を担っています。

登場人物と処理の流れ

Airflowの基本コンセプトをおさらいしたので、コードの話に入ります。具体的な処理を追う前に、処理の大まかなフローとそこで登場するクラスの説明を行います。

クラス図

次に、タスクの実行に関わるクラスとそれらの属性/メソッドを整理していきます。 主要なクラスとメソッドをまとめたクラス図が以下です。

今回の実行フローにおいて、それぞれのクラスは大まかに以下のような役割を持ちます。

DAG(Github

  • Operatorのインスタンス(タスク)を保持する

DagBag(Github

  • DAGの定義ファイルの読み込み、DAGインスタンスを生成する

BashOperator(Github

  • bashコマンドの保持とテンプレートへのレンダリング
  • bashコマンドの実行

TaskInstance(Github

  • タスク実行の制御
  • Contextの生成

処理の流れ

CLIコマンドの実行からタスクの完了までの処理の流れをまとめると以下の様になります。 先ほどクラス図で紹介してクラスが連携して以下の様な1つの処理が実行されます。

[タスク実行の流れ]

  1. CLIコマンドのパーシング
  2. DAGの生成
  3. BashOpeartorの取得
  4. TaskInstanceの生成
  5. タスクの実行
    ・Contextの生成
    ・テンプレートのレンダリング
    ・Bashの実行

以降では、この流れに沿って実際のコードを読んでいきます!

1. CLIコマンドのパーシング

処理の始まり

>>> airflow tasks test example_bash_operator runme_0 2021-06-20

冒頭で紹介したコマンドをコマンドラインからairflowコマンドを実行すると airflow/__main__.pyが実行されて処理が始まります。

Github

実際の処理はargs.func(args)の部分から開始されます。

コマンドのパーシング

__main__.pyではCLIから渡されたコマンドをパーシングし、関数を実行することで処理が始まりますが、このパーシングの仕組みを詳しくみていきましょう。

パーシングを責務とするパーサーは以下の箇所で生成されます。

parser = cli_parser.get_parser()

このパーサーはPythonの標準ライブラリであるargparseで実装されています。 argparseはpythonのコマンドライン引数を柔軟に扱うことができるライブラリで、僕も普段の開発で重宝しています。

AirlowではCLIコマンドごとに対応する関数が用意されており、airflow/cli/commandsに格納されています。
こちらがコマンドの一覧になります。commandsディレクトリにあるファイルとよく見比べると、各コマンドに対応するファイルがあるのが分かります。 例えば、dagsコマンドはdag_command.pyと対応します。

今回実行するコマンドであるairflow tasks testにはairflow/cli/commands/task_command.pytask_test()という関数が対応します。 parserによりtasks testという引数とtask_test()が関連付けられ、args.func(args)とすることで関連づけられた関数を実行することができます。

大まかに説明するとargsにはexample_bash_operator runme_0 2021-06-23が格納されています。
※他にもデフォルト引数が色々格納されているし、厳密にはtask_test()との関連もargsに含まれる

まとめると、parserは与えられたコマンドを以下の様に解釈し、処理を開始します。

task_test()の呼び出し

次に、task_test()の中身を追っていきましょう。task_test()のソースコードは以下の通りです。
※全体の流れに対応する箇所にコメントを足しています。

Github

[タスク実行の流れ]で整理した流れと、コードが一致していることが分かります。
以降では、それぞれの処理の詳細を追っていきます。

2. DAGの生成

get_dag()によるDAGインスタンスの取得

Airflowではタスク(Operatorのインスタンス)はDAGインスタンスに格納されます。 なので、タスクを実行するためには、まずDAGインスタンスを生成する必要があります。

task_test()の中では、DAGインスタンスはget_dag()というutil関数から取得されています。

Github

これを見ると、DagBagdagsという属性からdag_idをKeyとしてDAGのインスタンスを取得していることが分かります。 先述のクラス図でも紹介しましたが、DagBagDAGインスタンスを管理するクラスです。

では、どこでdagsDAGインスタンスが格納されているのでしょうか?

DAGの定義ファイルの読み込み

Airflowにおいて、DAGインスタンスの生成はDAGの定義ファイルの中で行われます。
そして、DAGの定義ファイルを読み込み、DAGインスタンスをdagsへ格納する処理はDagBagprocess_file()というメソッドによって行われます。

Github

process_file()は以下の順で定義ファイルを読み込みます。

  1. DAGの定義ファイルをモジュールとして読み込む。
  2. 読み込んだモジュールに含まれる全てのオブジェクトからDAGのインスタンスを抽出する。

以降で各処理の詳細を見ていきます。

定義ファイルをモジュールとして読み込む

ファイルの読み込みはDagBagの_load_modules_from_file()というメソッド(Github)で行われます。 以下に、実際にファイルを読み込んでいる箇所を抜粋しました。

Github

file_pathは定義ファイルのパスです。 Pythonの標準ライブラリのimportlibを用いて、定義ファイルがnew_moduleとして読み込まれていることが分かります。

モジュールからDAGインスタンスを抽出

DAGインスタンスの抽出は_process_modules()というメソッド(Github)で実行されます。

以下の箇所で読み込んだモジュールの中からDAGインスタンスのみを抽出し、top_level_dagsという配列に格納しています。

top_level_dags = [o for m in mods for o in list(m.__dict__.values()) if isinstance(o, DAG)]

内包表記がネストされており見にくいですが、if isinstance(o, DAG)の部分であるobjectがDAGのインスタンスかどうかをチェックしています。

そして、以下の箇所でDagBagのbag_dag()メソッドを呼び出し、DAGインスタンスをdagsに追加しています。

self.bag_dag(dag=dag, root_dag=dag)

これで、ようやく定義ファイルからDAGインスタンスを取得することができました!

3. BashOpeartorの取得

BashOpeartorインスタンスの取得

DAGインスタンスが取得できたので、次はBashOpeartorインスタンス(タスク)を取得しましょう。BashOpeartorインスタンスの取得はget_task()というDAGのメソッドで行うことができます。
内容は以下の通りで、task_dictというDAGの属性からタスクを取得するだけの簡単な処理です。 task_dictはKeyがtask_idで、ValueがOperatorのインスタンスの辞書オブジェクトになります。

Github

タスクとDAGの紐付け

では、BashOpeartorインスタンスはどこでDAGインスタンスに格納されるのでしょうか?

今回例として取り上げたDAGの定義ファイルではContext Managerを使った少しトリッキーな方法でDAGインスタンスにBashOpeartorインスタンスを格納しています。 このプロセスは理解できると楽しいのですが、それだけでブログが1本書けそうなので今回は割愛します。
一旦、生成されたBashOperatorインスタンスは、DAGインスタンスのtask_dictに格納されるものだと考えてください。

4. TaskInstanceの生成

TaskInstanceの取得

BashOpeartorインスタンスが取得できたので、次にTaskInstanceのインスタンスの生成に移ります。先述の通り、Airflowにおいてタスクを実行するにはTaskInstanceのインスタンスを生成する必要があります。

ここでtask_test()の説明に戻りましょう。TaskInstanceのインスタンスは以下で生成されます。

ti = TaskInstance(task, args.execution_date)

Github

その後、以下の箇所からタスクの実行が開始されます。ti.run() の部分です。

Github

ti.run()の実行を起点に連鎖的にいくつかの関数が呼び出され、TaskInstanceのメソッドである_run_raw_task()にたどり着きます。
このメソッドがタスクの実行を司る重要な関数です。以降でこのメソッドの中身を解説していきます。

5. タスクの実行

タスク実行の流れ

_run_raw_task()Github)以降の処理の大まかな流れは以下の様になります。

  1. Contextの生成
  2. テンプレートのレンダリング
  3. Bashの実行

本番環境でAirflowを動かすと他にもたくさんの処理が実行されますが、今回はタスクの単体実行の流れを追うだけなので大きく3つくらいに収まります。

Contextとは?

ContextとはTaskの実行にまつわる情報を含む辞書オブジェクトのことです。 Airflowを普段から使っている方にとってはお馴染みかと思います。
Contextは後述のJinjaテンプレート内でアクセスできたり、直接タスクに引数として渡すことができます。

ちなみに、Contextには主に以下の情報が含まれます。

  • execution_dateなどの日付関連の情報
  • DAG, DagRun, TaskInstanceなど、タスクの実行に関連するクラスのインスタンス
  • Variables
  • マクロ関数
  • conf (設定情報を管理するオブジェクト)

Contextから取得できる値の一覧はこちらの公式ドキュメントで確認できます。

Contextの生成

Contextの生成はget_template_context()Github)という関数によって行われます。

こちらは、_run_raw_task()のcontextを生成している箇所です。

context = self.get_template_context()の箇所でget_template_context()からContextが生成されているのが分かります。

説明は省略しますが、get_template_context()の終わりに大きめの辞書オブジェクトを生成しており、それがContextです。 興味のある方は見てみてください。「普段何気なく使っているContextがここで生成されてるのか」という気持ちになれます。

Contextの生成後、_prepare_and_execute_task_with_callbacks()Github)が実行され、いよいよタスクの実行が開始されます!

Jinjaによるレンダリング

Airflowではタスクの定義時にJinjaのテンプレートを用いることができ、テンプレート内ではContextの値を用いることができます。こちらの機能もAirflowユーザーからすればお馴染みですね。
公式の説明はこちら

普段何気なく使っているテンプレートですが、どのタイミングでレンダリングが行われるかは知らない方も多いのではないでしょうか?
コードを読んでいくと、テンプレートはタスクの実行時にTastInstanceによってレンダリングされることがわかります。

今回の例ではタスクの定義時、bash_commandに以下の値を指定しています。

bash_command='echo "{{ task_instance_key_str }}" && sleep 1',

task_instance_key_strはContextから取得できる情報の1つで、{dag_id}__{task_id}__{execution_date}の様な文字列となります。
では、実際にこの値がレンダリングされる過程を見ていきましょう。

レンダリングの実施

レンダリングは_prepare_and_execute_task_with_callbacks()内の以下の箇所で行われます。

self.render_templates(context=context)

Github

この関数はBaseOperatorrender_template_fields()Github)を実行しており、この関数が実際にレンダリングを行っている関数となっています。

render_template_fields()はさらに_do_render_template_fields()を呼び出します。 _do_render_template_fields()の内容は以下の通りです。処理が少しややこしいのでコメントを足しています。

parentには自身(self)が渡されています。 また、template_fieldsはレンダリングの対象となるBashOperatorの属性名が格納されています。 これより、contentにはレンダリング対象の属性の値が格納されることが分かります。

レンダリングの実行部分で呼び出されているrender_template()は、content内のテンプレートをcontextの値でレンダリングするJinjaの薄いラッパーです。 以下の箇所でレンダリングを行っています。

Github

ちなみにレンダリングの対象となる属性を指定するtemplate_fieldsはサブクラスの中で指定されています。
BashOperatortemplate_fieldsは以下の箇所で指定されており、bash_commandがレンダリングの対象になっていることがわかります。

template_fields = ('bash_command', 'env')

Github

少し複雑な処理でしたが、テンプレートのレンダリングのフローを追うことができました。

Bashの実行

Contextを取得し、その値のレンダリングも完了し、次はいよいよタスクの実行です!
タスクの実行は_prepare_and_execute_task_with_callbacks()から_execute_task()という関数が呼ばれることで行われます。

Github

_execute_task()BaseOperatorのメソッドのexecute()を呼び出すだけの関数で実質的なタスクの実行はこの関数で行われます。
execute()BaseOperatorの各サブクラスで実装されます。

今回用いているBashOperatorの実装箇所はこちらです。以下の箇所でsubprocessライブラリを用いてbash_commandが実行されていることが分かります!!

Github

最後に

長くなりましたが、これでCLIコマンドの実行からBashが実行されるまでの流れを追うことができました! 実際にコードを追うことでAirflowへの理解が深まってのではないでしょうか?
Airflowには他にも面白い機能がたくさんあるので、今後もこういうブログを出していきたいなと思います。

Twitterもやってますので、よかったらフォローしてください。Airflowに限らず新しい技術をどんどん取り入れながら、データプロダクトを一緒に開発してくれるデータエンジニアを募集しています!

エンジニア向けのイベントや、開発組織についてもっと知りたい方は、以下に最新情報をまとめていますのでぜひチェックしてみてください!

--

--