Airflow

Apache Airflow 技術詳解 - ワークフローオーケストレーションの全容

目次

  1. はじめに
  2. Apache Airflow とは
  3. コアコンセプト
  4. アーキテクチャ概要
  5. Executor の種類と選定
  6. DAG の設計と実装
  7. Operator 詳解
  8. タスク間の依存関係とデータ受け渡し
  9. スケジューリングの仕組み
  10. 接続管理と Variable
  11. センサーとトリガー
  12. TaskFlow API
  13. ブランチングと条件分岐
  14. テンプレートエンジンと Jinja
  15. プラグインシステム
  16. セキュリティと認証
  17. モニタリングとログ管理
  18. パフォーマンスチューニング
  19. Kubernetes 環境でのデプロイ
  20. CI/CD とテスト戦略
  21. ベストプラクティス
  22. トラブルシューティング
  23. 他ツールとの比較
  24. まとめ

1. はじめに

現代のデータエンジニアリングにおいて、複雑なデータパイプラインを確実かつ効率的にオーケストレーションすることは極めて重要な課題である。バッチ処理、ETL(Extract, Transform, Load)、機械学習パイプライン、データウェアハウスへのロードなど、データチームが扱うワークフローは年々複雑化している。

Apache Airflow は、こうした複雑なワークフローをプログラム的に作成・スケジュール・監視するためのオープンソースプラットフォームである。2014年に Airbnb のエンジニアである Maxime Beauchemin によって開発が始まり、2016年に Apache Incubator プロジェクトとなり、2019年にトップレベルの Apache プロジェクトに昇格した。

本記事では、Apache Airflow の機能、アーキテクチャ、設計思想、そして実践的な設定・実装例を包括的に解説する。運用現場で必要となる知識を網羅的にカバーし、初学者から上級者までが参照できる技術資料を目指す。


2. Apache Airflow とは

2.1 基本的な位置づけ

Apache Airflow は「ワークフローオーケストレーター」に分類されるツールである。ワークフローをDAG(Directed Acyclic Graph:有向非巡回グラフ)として定義し、各タスクの実行順序、依存関係、スケジュールを管理する。

Airflow の最大の特徴は「Configuration as Code」の思想にある。ワークフローの定義をPythonコードで記述するため、以下のような利点がある。

  • バージョン管理: Git 等でワークフローの変更履歴を追跡可能
  • テスト可能性: ユニットテスト・統合テストが記述可能
  • 再利用性: Pythonの関数・クラスとして部品化が可能
  • 柔軟性: Pythonの全機能を活用した動的なワークフロー生成が可能
  • コードレビュー: チーム開発におけるレビュープロセスが適用可能

2.2 Airflow が得意とするユースケース

  1. バッチETLパイプライン: データソースからデータを抽出し、変換してデータウェアハウスにロードする定期的な処理
  2. 機械学習パイプライン: データの前処理、特徴量エンジニアリング、モデルの学習・評価・デプロイの自動化
  3. データ品質チェック: データの整合性検証、品質メトリクスの計測と通知
  4. レポート生成: 定期的なビジネスレポートの自動生成と配信
  5. インフラストラクチャ管理: クラウドリソースのプロビジョニングやメンテナンスタスクの自動化

2.3 Airflow が不向きなユースケース

  • ストリーミング処理: Airflow はバッチ処理向けに設計されている。リアルタイムストリーミングには Apache Kafka + Flink 等が適している
  • 高頻度実行: 秒単位やミリ秒単位のスケジューリングは想定されていない
  • データ処理エンジン: Airflow 自体はデータを処理しない。Spark、Presto、BigQuery 等の実行を「指示」するオーケストレーターである

3. コアコンセプト

3.1 DAG(Directed Acyclic Graph)

DAG は Airflow の最も基本的な概念である。DAG はタスクのコレクションであり、それらの間の依存関係と実行順序を定義する。

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email': ['data-team@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=60),
    'execution_timeout': timedelta(hours=2),
    'sla': timedelta(hours=4),
}

dag = DAG(
    dag_id='example_etl_pipeline',
    default_args=default_args,
    description='本番ETLパイプラインの例',
    schedule_interval='0 2 * * *',  # 毎日午前2時
    start_date=datetime(2025, 1, 1),
    end_date=None,
    catchup=False,
    max_active_runs=1,
    max_active_tasks=10,
    dagrun_timeout=timedelta(hours=6),
    tags=['etl', 'production', 'data-warehouse'],
    doc_md="""
    ## ETL パイプライン
    データソースから抽出し、変換後にデータウェアハウスにロードするパイプライン。
    ### オーナー
    Data Engineering チーム
    ### SLA
    午前6時までに完了すること
    """,
)

DAG の主要パラメータ解説

パラメータ説明推奨値
dag_idDAGの一意識別子チーム名_用途_対象 の命名規則を推奨
schedule_interval実行スケジュール(cron式またはtimedelta)ユースケースに応じて設定
start_dateDAGの開始日固定の過去日を推奨
catchup過去分の実行を補完するか本番では通常 False
max_active_runs同時実行可能なDAG Run数リソースに応じて1-3
max_active_tasks同時実行可能なタスク数ワーカーリソースに応じて設定
dagrun_timeoutDAG Run全体のタイムアウトSLAに基づいて設定
tags分類用タグチーム名、環境、用途を付与

3.2 Task

Task は DAG 内の処理の最小単位である。各 Task は Operator のインスタンスとして定義される。

extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_function,
    op_kwargs={'source': 'postgresql', 'table': 'users'},
    pool='db_connection_pool',
    priority_weight=10,
    weight_rule='downstream',
    queue='high_priority',
    dag=dag,
)

3.3 Task Instance

Task Instance は、特定の DAG Run における特定の Task の実行インスタンスである。Task Instance は以下の状態遷移を持つ。

none -> scheduled -> queued -> running -> success
                                       -> failed -> up_for_retry -> scheduled -> ...
                                       -> upstream_failed
                                       -> skipped
                            -> deferred -> scheduled -> ...

主要な状態:

  • none: まだスケジュールされていない
  • scheduled: スケジューラによって実行がスケジュールされた
  • queued: Executor のキューに投入された
  • running: 実行中
  • success: 正常終了
  • failed: 異常終了
  • up_for_retry: リトライ待ち
  • upstream_failed: 上流タスクの失敗により実行されなかった
  • skipped: 条件分岐等でスキップされた
  • deferred: Deferrable Operator により延期中(ワーカーリソースを解放)

3.4 DAG Run

DAG Run は DAG の1回の実行インスタンスである。以下の方法でトリガーされる。

  1. スケジュール実行: schedule_interval に基づく自動実行
  2. 手動トリガー: Web UI または CLI からの手動実行
  3. 外部トリガー: REST API、TriggerDagRunOperator、外部システムからのトリガー
# CLI からの手動トリガー
# airflow dags trigger -c '{"key": "value"}' example_etl_pipeline

# REST API からのトリガー
# curl -X POST "http://localhost:8080/api/v1/dags/example_etl_pipeline/dagRuns" \
#   -H "Content-Type: application/json" \
#   -d '{"conf": {"key": "value"}}'

3.5 logical_date(旧 execution_date)

Airflow 2.2 以降、execution_datelogical_date にリネームされた。これは DAG Run が「論理的に」対象とする時刻であり、実際の実行時刻ではない点に注意が必要である。

例えば、毎日午前2時に実行される DAG の場合:

  • logical_date = 2025-01-15 02:00:00 の DAG Run は、2025-01-16 02:00:00 頃に実行される
  • つまり、logical_date は「処理対象データの期間の開始時刻」を示す
sql = """
SELECT * FROM events
WHERE event_date >= '{{ ds }}'
  AND event_date < '{{ next_ds }}'
"""

4. アーキテクチャ概要

4.1 コンポーネント構成

Airflow は以下の主要コンポーネントで構成される。

+------------------------------------------------------------------+
|                        Airflow Architecture                       |
|                                                                   |
|  +-------------+   +--------------+   +-----------------------+  |
|  |  Web Server  |   |  Scheduler   |   |   Triggerer           |  |
|  |  (Flask/     |   |              |   |   (async event loop)  |  |
|  |   Gunicorn)  |   |  - DAG Parse |   |                       |  |
|  |              |   |  - Task      |   |  Deferrable Operator  |  |
|  |  - Web UI    |   |    Schedule  |   |  Event Monitoring     |  |
|  |  - REST API  |   |  - Executor  |   |                       |  |
|  +------+-------+   +------+------+   +-----------+-----------+  |
|         |                  |                       |              |
|         v                  v                       v              |
|  +-------------------------------------------------------------+ |
|  |                    Metadata Database                          | |
|  |            (PostgreSQL / MySQL)                               | |
|  +-------------------------------------------------------------+ |
|         ^                  ^                                      |
|  +------+-------+   +-----+-----------------------------------+  |
|  |    DAG Files  |   |         Executor / Worker               |  |
|  |    (.py)      |   |  [Worker 1] [Worker 2] ... [Worker N]   |  |
|  +---------------+   +-----------------------------------------+  |
|                                                                   |
|  +-------------------------------------------------------------+ |
|  |           Log Storage (Local/S3/GCS/Elasticsearch)           | |
|  +-------------------------------------------------------------+ |
+------------------------------------------------------------------+

4.2 Web Server

Web Server は Airflow の UI と REST API を提供するコンポーネントである。Flask フレームワーク上に構築され、Gunicorn をWSGIサーバーとして使用する。

# airflow.cfg - Web Server 設定例
[webserver]
base_url = https://airflow.example.com
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_worker_timeout = 120
workers = 4
rbac = True
default_ui_timezone = Asia/Tokyo
dag_default_view = grid

4.3 Scheduler

Scheduler は Airflow の心臓部であり、DAGファイルの解析、DAG Runの生成、Taskのスケジューリング、状態管理を担当する。

[scheduler]
dag_dir_list_interval = 300
min_file_process_interval = 30
parsing_processes = 2
max_dagruns_to_create_per_loop = 10
max_dagruns_per_loop_to_schedule = 20
scheduler_idle_sleep_time = 1
max_tis_per_query = 512
num_runs = -1  # 無限ループ(本番推奨)

Scheduler の内部動作

1. DAG ファイルのパース
   +-- 各 .py ファイルを子プロセスで解析し、DAG オブジェクトをDBに保存

2. DAG Run の作成
   +-- schedule_interval に基づいて新規 DAG Run を作成

3. Task Instance のスケジューリング
   +-- 依存関係の完了チェック -> Trigger Rule の評価 -> Pool のスロット確認

4. Task Instance のキューイング
   +-- Executor に Task Instance を送信

5. Executor からの結果収集
   +-- 完了した Task Instance の状態更新

4.4 Triggerer

Triggerer は Airflow 2.2 で導入されたコンポーネントで、Deferrable Operator と連携し、外部イベントの待機を非同期的に処理する。待機中のタスクはワーカーリソースを解放し、イベント発生時に再びワーカーで実行される。

[triggerer]
default_capacity = 1000

4.5 Metadata Database

Metadata Database は Airflow の全状態を永続化する。PostgreSQL または MySQL の使用が推奨される。

[database]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True

5. Executor の種類と選定

5.1 SequentialExecutor

タスクを1つずつ逐次実行。開発・テスト・デバッグ専用。

5.2 LocalExecutor

単一マシン上で子プロセスを fork して並列実行。中小規模のワークロードに適している。

[core]
executor = LocalExecutor
parallelism = 32

5.3 CeleryExecutor

Celery を利用した分散タスク実行。水平スケーリングが可能。

[core]
executor = CeleryExecutor
parallelism = 64

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres:5432/airflow
worker_concurrency = 16
worker_autoscale = 16,4

5.4 KubernetesExecutor

各タスクを個別の Kubernetes Pod として実行。タスクごとにリソースを分離でき、アイドル時のリソース消費がゼロ。

[core]
executor = KubernetesExecutor

[kubernetes_executor]
namespace = airflow
delete_worker_pods = True
worker_pods_creation_batch_size = 16
# Pod テンプレートの使用例
from kubernetes.client import models as k8s

pod_template = k8s.V1Pod(
    spec=k8s.V1PodSpec(
        containers=[
            k8s.V1Container(
                name="base",
                image="my-registry/airflow-worker:2.8.1",
                resources=k8s.V1ResourceRequirements(
                    requests={"cpu": "1", "memory": "2Gi"},
                    limits={"cpu": "2", "memory": "4Gi"},
                ),
            )
        ],
        service_account_name="airflow-worker",
    ),
)

5.5 CeleryKubernetesExecutor

CeleryExecutor と KubernetesExecutor のハイブリッド。タスクのキュー名に基づいて使い分ける。

5.6 Executor 選定ガイド

要件推奨 Executor
開発・テストSequentialExecutor / LocalExecutor
中小規模(単一サーバー)LocalExecutor
大規模・高可用性CeleryExecutor
Kubernetes 環境KubernetesExecutor
混合ワークロードCeleryKubernetesExecutor

6. DAG の設計と実装

6.1 DAG ファイルの推奨構成

dags/
+-- common/
|   +-- callbacks.py, default_args.py, utils.py
+-- etl/
|   +-- dag_sales_etl.py, dag_user_etl.py
|   +-- sql/
+-- ml/
|   +-- dag_model_training.py
+-- maintenance/
|   +-- dag_cleanup_logs.py
+-- config/
    +-- dag_configs.yaml

6.2 標準的な ETL パイプライン

from airflow import DAG
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='sales_etl_pipeline',
    schedule_interval='0 18 * * *',  # UTC 18:00 = JST 03:00
    start_date=datetime(2025, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=['etl', 'sales', 'production'],
) as dag:

    start = EmptyOperator(task_id='start')
    
    extract_sales = PostgresToGCSOperator(
        task_id='extract_sales_to_gcs',
        postgres_conn_id='sales_db',
        sql="SELECT * FROM sales WHERE sale_date = '{{ ds }}'",
        bucket='my-data-lake',
        filename='raw/sales/{{ ds }}/sales_{{ ds_nodash }}.json',
        export_format='json', gzip=True,
    )
    
    extract_customers = PostgresToGCSOperator(
        task_id='extract_customers_to_gcs',
        postgres_conn_id='sales_db',
        sql="SELECT * FROM customers WHERE updated_at >= '{{ ds }}' AND updated_at < '{{ next_ds }}'",
        bucket='my-data-lake',
        filename='raw/customers/{{ ds }}/customers_{{ ds_nodash }}.json',
        export_format='json', gzip=True,
    )
    
    load_sales_raw = GCSToBigQueryOperator(
        task_id='load_sales_to_bq_raw',
        bucket='my-data-lake',
        source_objects=['raw/sales/{{ ds }}/sales_{{ ds_nodash }}.json.gz'],
        destination_project_dataset_table='project.raw.sales_{{ ds_nodash }}',
        source_format='NEWLINE_DELIMITED_JSON',
        write_disposition='WRITE_TRUNCATE', autodetect=True,
    )
    
    load_customers_raw = GCSToBigQueryOperator(
        task_id='load_customers_to_bq_raw',
        bucket='my-data-lake',
        source_objects=['raw/customers/{{ ds }}/customers_{{ ds_nodash }}.json.gz'],
        destination_project_dataset_table='project.raw.customers_{{ ds_nodash }}',
        source_format='NEWLINE_DELIMITED_JSON',
        write_disposition='WRITE_TRUNCATE', autodetect=True,
    )
    
    transform_sales = BigQueryInsertJobOperator(
        task_id='transform_sales',
        configuration={"query": {"query": """
            CREATE OR REPLACE TABLE `project.staging.fact_sales_{{ ds_nodash }}` AS
            SELECT s.*, c.name AS customer_name, c.segment, c.region,
                   CURRENT_TIMESTAMP() AS etl_loaded_at
            FROM `project.raw.sales_{{ ds_nodash }}` s
            LEFT JOIN `project.raw.customers_{{ ds_nodash }}` c ON s.customer_id = c.customer_id
            WHERE s.total_amount > 0
        """, "useLegacySql": False}},
    )
    
    data_quality_check = BigQueryInsertJobOperator(
        task_id='data_quality_check',
        configuration={"query": {"query": """
            SELECT CASE
                WHEN COUNT(*) = 0 THEN ERROR('No records loaded for {{ ds }}')
                ELSE 'PASS'
            END FROM `project.staging.fact_sales_{{ ds_nodash }}`
        """, "useLegacySql": False}},
    )
    
    load_to_mart = BigQueryInsertJobOperator(
        task_id='load_to_mart',
        configuration={"query": {"query": """
            MERGE `project.mart.fact_sales` T
            USING `project.staging.fact_sales_{{ ds_nodash }}` S ON T.sale_id = S.sale_id
            WHEN MATCHED THEN UPDATE SET customer_name = S.customer_name, total_amount = S.total_amount
            WHEN NOT MATCHED THEN INSERT ROW
        """, "useLegacySql": False}},
    )
    
    end = EmptyOperator(task_id='end')
    
    start >> [extract_sales, extract_customers]
    extract_sales >> load_sales_raw
    extract_customers >> load_customers_raw
    [load_sales_raw, load_customers_raw] >> transform_sales
    transform_sales >> data_quality_check >> load_to_mart >> end

6.3 動的 DAG 生成

import yaml
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

config_path = Path(__file__).parent / 'config' / 'dag_configs.yaml'
with open(config_path, 'r') as f:
    configs = yaml.safe_load(f)

def create_etl_dag(config):
    dag = DAG(
        dag_id=f"dynamic_{config['name']}_etl",
        schedule_interval=config.get('schedule', '@daily'),
        start_date=datetime(2025, 1, 1),
        catchup=False,
        tags=config.get('tags', []) + ['dynamic'],
    )
    with dag:
        for table in config['tables']:
            extract = PythonOperator(task_id=f'extract_{table["name"]}', python_callable=extract_table,
                op_kwargs={'source_conn': config['source_connection'], 'table': table['name']})
            load = PythonOperator(task_id=f'load_{table["name"]}', python_callable=load_table,
                op_kwargs={'dest_conn': config['dest_connection'], 'table': table['name']})
            extract >> load
    return dag

for config in configs.get('etl_pipelines', []):
    dag_obj = create_etl_dag(config)
    globals()[dag_obj.dag_id] = dag_obj

7. Operator 詳解

7.1 Operator の分類

Action Operators: PythonOperator, BashOperator, SparkSubmitOperator 等。実際の処理を実行する。

from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

python_task = PythonOperator(task_id='process_data', python_callable=my_function,
    op_kwargs={'key1': 'value1'})

bash_task = BashOperator(task_id='run_script',
    bash_command='python /opt/scripts/process.py --date {{ ds }}',
    env={'DATA_PATH': '/data/{{ ds }}/'}, append_env=True)

spark_task = SparkSubmitOperator(task_id='spark_etl',
    application='/opt/spark/jobs/etl_job.py', conn_id='spark_default',
    conf={'spark.executor.memory': '4g', 'spark.executor.instances': '10'})

Transfer Operators: PostgresToGCSOperator, S3ToRedshiftOperator 等。システム間のデータ転送。

Sensor Operators: S3KeySensor, ExternalTaskSensor, SqlSensor 等。条件が満たされるまで待機する。

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_for_s3 = S3KeySensor(
    task_id='wait_for_upstream_data',
    bucket_name='data-lake',
    bucket_key='processed/{{ ds }}/complete.flag',
    mode='reschedule',  # ワーカースロットを解放
    poke_interval=300, timeout=7200,
    exponential_backoff=True,
)

7.2 カスタム Operator の作成

from airflow.models import BaseOperator
from typing import Any, Dict, Optional, Sequence

class DataQualityOperator(BaseOperator):
    """データ品質チェックを実行するカスタム Operator"""
    
    template_fields: Sequence[str] = ('sql', 'parameters')
    template_ext: Sequence[str] = ('.sql',)
    ui_color = '#89DA59'
    
    def __init__(self, sql: str, conn_id: str, expected_result: Any = None,
                 check_type: str = 'not_empty', **kwargs):
        super().__init__(**kwargs)
        self.sql = sql
        self.conn_id = conn_id
        self.expected_result = expected_result
        self.check_type = check_type
    
    def execute(self, context):
        from airflow.hooks.base import BaseHook
        hook = BaseHook.get_hook(self.conn_id)
        result = hook.get_first(self.sql)
        actual_value = result[0] if result else None
        
        if self.check_type == 'not_empty' and (actual_value is None or actual_value <= 0):
            raise ValueError(f"Data quality check failed: {actual_value}")
        elif self.check_type == 'exact' and actual_value != self.expected_result:
            raise ValueError(f"Expected {self.expected_result}, got {actual_value}")
        
        self.log.info(f"Data quality check passed: {actual_value}")
        return actual_value

8. タスク間の依存関係とデータ受け渡し

8.1 依存関係の定義方法

# ビットシフト演算子(推奨)
task_a >> task_b >> task_c
task_a >> [task_b, task_c] >> task_d

# cross_downstream
from airflow.models.baseoperator import cross_downstream
cross_downstream([extract_1, extract_2], [transform_1, transform_2])

# chain
from airflow.models.baseoperator import chain
chain(task_a, [task_b, task_c], [task_d, task_e], task_f)

8.2 Trigger Rule

Trigger Rule条件
ALL_SUCCESS全上流が成功(デフォルト)
ALL_DONE全上流が完了(状態不問)
ONE_SUCCESS1つ以上の上流が成功
ONE_FAILED1つ以上の上流が失敗
NONE_FAILED失敗した上流がない
NONE_FAILED_MIN_ONE_SUCCESS失敗がなく、少なくとも1つ成功
ALWAYS常に実行

8.3 XCom(Cross-Communication)

def extract_data(**context):
    return {'record_count': 1500, 'file_path': f'/data/{context["ds"]}/output.csv'}

def validate_data(**context):
    ti = context['ti']
    result = ti.xcom_pull(task_ids='extract_data')
    if result['record_count'] == 0:
        raise ValueError("No records extracted")
    ti.xcom_push(key='status', value='passed')

XCom の注意点: サイズ制限あり。大量データは外部ストレージに保存し、パスのみXComで渡す。カスタムXCom Backendで S3 等への自動オフロードも可能。


9. スケジューリングの仕組み

dag = DAG('cron_example', schedule_interval='0 2 * * *')    # 毎日午前2時
dag = DAG('preset_daily', schedule_interval='@daily')        # 毎日 00:00
dag = DAG('delta', schedule_interval=timedelta(hours=6))     # 6時間ごと
dag = DAG('manual_only', schedule_interval=None)             # 手動実行のみ

# Timetable(Airflow 2.2+)
from airflow.timetables.trigger import CronTriggerTimetable
dag = DAG('timetable', timetable=CronTriggerTimetable('0 9 * * 1-5', timezone='Asia/Tokyo'))

Data Interval の概念

Data Interval:  [start ---- end)
                   |           |
                   +-- data_interval_start (= logical_date)
                               +-- data_interval_end
                                   実行はここで行われる

10. 接続管理と Variable

Connection

# 環境変数: AIRFLOW_CONN_POSTGRES_DEFAULT=postgresql://user:pass@host:5432/dbname
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='my_postgres')
records = hook.get_records("SELECT COUNT(*) FROM users")

Secrets Backend

[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}

Variable

from airflow.models import Variable
Variable.set('etl_config', {'batch_size': 10000}, serialize_json=True)

# テンプレートでの参照(推奨)
task = BashOperator(task_id='t', bash_command='echo {{ var.json.etl_config.batch_size }}')

11. センサーとトリガー

Sensor の動作モード

モード特徴
pokeワーカースロットを占有し続ける
rescheduleチェック間はワーカースロットを解放
deferrable=TrueTriggerer に処理を委譲(最もリソース効率が良い)

Deferrable Operator

execute() 内で self.defer() を呼び出し、Trigger オブジェクトを Triggerer に渡す。Triggerer は非同期イベントループで外部イベントを監視し、イベント発生時にタスクを再びワーカーで実行する。


12. TaskFlow API

from airflow.decorators import dag, task, task_group
from datetime import datetime

@dag(dag_id='taskflow_etl', schedule_interval='@daily',
     start_date=datetime(2025, 1, 1), catchup=False)
def taskflow_etl():
    
    @task()
    def extract() -> dict:
        return {'users': [{'id': 1, 'name': 'Alice'}], 'count': 1}
    
    @task(multiple_outputs=True)
    def transform(data: dict) -> dict:
        return {'transformed': data['users'], 'stats': {'count': data['count']}}
    
    @task()
    def load(data: list, stats: dict):
        print(f"Loading {stats['count']} records")
    
    # Dynamic Task Mapping(Airflow 2.3+)
    @task()
    def get_partitions() -> list:
        return ['part_a', 'part_b', 'part_c']
    
    @task()
    def process_partition(partition: str) -> dict:
        return {'partition': partition, 'records': 1000}
    
    raw = extract()
    result = transform(raw)
    load(result['transformed'], result['stats'])
    
    partitions = get_partitions()
    process_partition.expand(partition=partitions)

dag_instance = taskflow_etl()

13. ブランチングと条件分岐

from airflow.operators.python import BranchPythonOperator, ShortCircuitOperator

def choose_branch(**context):
    return 'full_load' if context['execution_date'].weekday() == 0 else 'incremental_load'

branch = BranchPythonOperator(task_id='branch', python_callable=choose_branch)
branch >> [full_load, incremental_load] >> EmptyOperator(
    task_id='join', trigger_rule='none_failed_min_one_success')

# ShortCircuitOperator: False で下流全タスクをスキップ
should_run = ShortCircuitOperator(task_id='check', python_callable=check_data_exists)

14. テンプレートエンジンと Jinja

# 主要なテンプレート変数
# {{ ds }}                    - YYYY-MM-DD
# {{ ds_nodash }}             - YYYYMMDD
# {{ logical_date }}          - logical_date オブジェクト
# {{ data_interval_start }}   - Data Interval の開始
# {{ data_interval_end }}     - Data Interval の終了
# {{ macros.ds_add(ds, -1) }} - 前日
# {{ var.value.my_var }}      - Variable の参照
# {{ var.json.config.key }}   - JSON Variable の参照
# {{ ti.xcom_pull(...) }}     - XCom の参照
# {{ params.my_param }}       - パラメータの参照

# カスタムマクロ
dag = DAG('example', user_defined_macros={
    'ds_to_quarter': lambda ds: f"{ds[:4]}Q{(int(ds[5:7])-1)//3+1}",
})

15. プラグインシステム

Airflow 2.0 以降、外部サービスとの統合は Provider パッケージとして分離された。

pip install apache-airflow-providers-amazon       # AWS
pip install apache-airflow-providers-google        # GCP
pip install apache-airflow-providers-microsoft-azure  # Azure
pip install apache-airflow-providers-apache-spark  # Spark
pip install apache-airflow-providers-databricks    # Databricks
pip install apache-airflow-providers-snowflake     # Snowflake
pip install apache-airflow-providers-slack         # Slack
pip install apache-airflow-providers-kubernetes    # Kubernetes
pip install apache-airflow-providers-dbt-cloud     # dbt Cloud

16. セキュリティと認証

RBAC

dag = DAG('restricted_dag', access_control={
    'data_engineering': {'can_read', 'can_edit'},
    'data_science': {'can_read'},
})

認証バックエンド

LDAP、OAuth(Google, GitHub等)、SAML をサポート。webserver_config.py で設定する。

Fernet Key による暗号化

[core]
fernet_key = your-fernet-key-here
# キーローテーション: fernet_key = new-key,old-key-1,old-key-2

17. モニタリングとログ管理

ログの設定

[logging]
remote_logging = True
remote_log_conn_id = aws_default
remote_base_log_folder = s3://my-airflow-logs/logs

メトリクス

[metrics]
statsd_on = True
statsd_host = statsd-exporter
statsd_port = 9125
statsd_prefix = airflow

コールバック

def on_failure_callback(context):
    ti = context['task_instance']
    # Slack / PagerDuty / メール等で通知

dag = DAG('monitored', on_failure_callback=on_failure_callback,
          sla_miss_callback=sla_miss_callback)

18. パフォーマンスチューニング

[scheduler]
dag_dir_list_interval = 300
parsing_processes = 4

[core]
parallelism = 64
max_active_tasks_per_dag = 16
store_serialized_dags = True

[database]
sql_alchemy_pool_size = 10
sql_alchemy_max_overflow = 20
sql_alchemy_pool_pre_ping = True

アンチパターンと推奨

# NG: トップレベルでの重い処理
df = pd.read_csv('large_file.csv')  # パース時に毎回実行

# OK: タスク内に記述
def process():
    import pandas as pd
    df = pd.read_csv('large_file.csv')

# NG: DAGファイル内でのDB接続
env = Variable.get('environment')  # パース時にDB問い合わせ

# OK: テンプレートで参照
# {{ var.value.environment }}

Pool によるリソース制御

task = PythonOperator(task_id='query_db', pool='db_connection_pool',
    pool_slots=1, priority_weight=10)

19. Kubernetes 環境でのデプロイ

helm repo add apache-airflow https://airflow.apache.org
helm install airflow apache-airflow/airflow \
  --namespace airflow --create-namespace --values values.yaml
# values.yaml(主要部分)
executor: KubernetesExecutor
scheduler:
  replicas: 2
webserver:
  replicas: 2
triggerer:
  enabled: true
dags:
  gitSync:
    enabled: true
    repo: git@github.com:org/airflow-dags.git
    branch: main
config:
  core:
    parallelism: 64
    store_serialized_dags: true
  kubernetes_executor:
    delete_worker_pods: true
  logging:
    remote_logging: true
    remote_base_log_folder: s3://airflow-logs/logs

20. CI/CD とテスト戦略

DAG のテスト

import pytest
from airflow.models import DagBag

@pytest.fixture(scope='session')
def dagbag():
    return DagBag(dag_folder='dags/', include_examples=False)

class TestDagIntegrity:
    def test_no_import_errors(self, dagbag):
        assert len(dagbag.import_errors) == 0
    
    def test_all_dags_have_tags(self, dagbag):
        for dag_id, dag in dagbag.dags.items():
            assert len(dag.tags) > 0
    
    def test_retries_configured(self, dagbag):
        for dag_id, dag in dagbag.dags.items():
            for task in dag.tasks:
                assert task.retries >= 1

CI/CD パイプライン

name: Airflow CI/CD
on:
  push:
    branches: [main]
    paths: ['dags/**', 'plugins/**', 'tests/**']
jobs:
  lint-and-test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install ruff black apache-airflow==2.8.1 pytest
      - run: ruff check dags/ && black --check dags/
      - run: pytest tests/ -v
  deploy:
    needs: lint-and-test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: aws s3 sync dags/ s3://airflow-production-dags/ --delete

21. ベストプラクティス

カテゴリチェック項目
DAG設計冪等性が保証されているか
タスクの粒度は適切か
リトライとタイムアウトが設定されているか
catchup の設定は意図通りか
監視SLA が設定されているか
失敗時のアラートが設定されているか
ログが永続化されているか
セキュリティ接続情報が暗号化されているか
RBAC が適切に設定されているか
パフォーマンスPool が適切に設定されているか
DAGパース時に重い処理を実行していないか
可用性Scheduler は HA 構成か
Metadata DB はバックアップされているか

22. トラブルシューティング

よくある問題と対処法

DAG が Web UI に表示されない:

airflow dags list-import-errors  # インポートエラーを確認

タスクが stuck 状態になる:

airflow pools list               # Pool のスロット不足を確認

メモリ不足(OOM):

  • タスク内でチャンク処理を実装する
  • XCom に大きなデータを保存しない

便利な管理コマンド

airflow dags list                          # DAG一覧
airflow dags trigger sales_etl_pipeline    # 手動トリガー
airflow tasks test dag_id task_id 2025-01-15  # タスクのテスト実行
airflow tasks clear dag_id -s 2025-01-15 -e 2025-01-16  # タスク状態のクリア
airflow db check                           # DB接続チェック
airflow jobs check --job-type SchedulerJob # Scheduler の動作確認

23. 他ツールとの比較

特徴Apache AirflowPrefectDagsterLuigiArgo Workflows
言語PythonPythonPythonPythonYAML/任意
定義方式Python (DAG)Python (Flow)Python (Job)Python (Task)YAML
UIの充実度高い高い非常に高い低い中程度
スケーラビリティ高い高い高い中程度非常に高い
動的ワークフロー可能得意可能限定的可能
データリネージ限定的あり優れているなしなし
Kubernetes統合良好良好良好なしネイティブ
コミュニティ最大成長中成長中安定大きい
学習コスト中程度低い中程度低い高い

24. まとめ

Apache Airflow の位置づけ

Apache Airflow は、2014年の誕生以来、データエンジニアリング領域で最も広く採用されているワークフローオーケストレーターである。

  1. Configuration as Code: Pythonによるワークフロー定義でソフトウェアエンジニアリングのベストプラクティスを適用可能
  2. 豊富なエコシステム: 数百のProvider パッケージで主要クラウドサービスとの統合が容易
  3. 柔軟なアーキテクチャ: 小規模開発環境から大規模本番環境まで対応
  4. 活発なコミュニティ: Apache トップレベルプロジェクトとして世界中の開発者がコントリビュート

今後の展望

  • Airflow 3.x: より軽量で高速なアーキテクチャへの進化
  • Deferrable Operator の普及: リソース効率の大幅な改善
  • Data-Aware Scheduling: データセットの更新イベントに基づくスケジューリング
  • Edge Worker: ハイブリッドクラウド・エッジ環境への対応

学習リソース


本記事は Apache Airflow 2.8.x を基準に執筆されている。バージョンによって一部の機能や設定が異なる場合がある。