Airflow
Apache Airflow 技術詳解 - ワークフローオーケストレーションの全容
目次
- はじめに
- Apache Airflow とは
- コアコンセプト
- アーキテクチャ概要
- Executor の種類と選定
- DAG の設計と実装
- Operator 詳解
- タスク間の依存関係とデータ受け渡し
- スケジューリングの仕組み
- 接続管理と Variable
- センサーとトリガー
- TaskFlow API
- ブランチングと条件分岐
- テンプレートエンジンと Jinja
- プラグインシステム
- セキュリティと認証
- モニタリングとログ管理
- パフォーマンスチューニング
- Kubernetes 環境でのデプロイ
- CI/CD とテスト戦略
- ベストプラクティス
- トラブルシューティング
- 他ツールとの比較
- まとめ
1. はじめに
現代のデータエンジニアリングにおいて、複雑なデータパイプラインを確実かつ効率的にオーケストレーションすることは極めて重要な課題である。バッチ処理、ETL(Extract, Transform, Load)、機械学習パイプライン、データウェアハウスへのロードなど、データチームが扱うワークフローは年々複雑化している。
Apache Airflow は、こうした複雑なワークフローをプログラム的に作成・スケジュール・監視するためのオープンソースプラットフォームである。2014年に Airbnb のエンジニアである Maxime Beauchemin によって開発が始まり、2016年に Apache Incubator プロジェクトとなり、2019年にトップレベルの Apache プロジェクトに昇格した。
本記事では、Apache Airflow の機能、アーキテクチャ、設計思想、そして実践的な設定・実装例を包括的に解説する。運用現場で必要となる知識を網羅的にカバーし、初学者から上級者までが参照できる技術資料を目指す。
2. Apache Airflow とは
2.1 基本的な位置づけ
Apache Airflow は「ワークフローオーケストレーター」に分類されるツールである。ワークフローをDAG(Directed Acyclic Graph:有向非巡回グラフ)として定義し、各タスクの実行順序、依存関係、スケジュールを管理する。
Airflow の最大の特徴は「Configuration as Code」の思想にある。ワークフローの定義をPythonコードで記述するため、以下のような利点がある。
- バージョン管理: Git 等でワークフローの変更履歴を追跡可能
- テスト可能性: ユニットテスト・統合テストが記述可能
- 再利用性: Pythonの関数・クラスとして部品化が可能
- 柔軟性: Pythonの全機能を活用した動的なワークフロー生成が可能
- コードレビュー: チーム開発におけるレビュープロセスが適用可能
2.2 Airflow が得意とするユースケース
- バッチETLパイプライン: データソースからデータを抽出し、変換してデータウェアハウスにロードする定期的な処理
- 機械学習パイプライン: データの前処理、特徴量エンジニアリング、モデルの学習・評価・デプロイの自動化
- データ品質チェック: データの整合性検証、品質メトリクスの計測と通知
- レポート生成: 定期的なビジネスレポートの自動生成と配信
- インフラストラクチャ管理: クラウドリソースのプロビジョニングやメンテナンスタスクの自動化
2.3 Airflow が不向きなユースケース
- ストリーミング処理: Airflow はバッチ処理向けに設計されている。リアルタイムストリーミングには Apache Kafka + Flink 等が適している
- 高頻度実行: 秒単位やミリ秒単位のスケジューリングは想定されていない
- データ処理エンジン: Airflow 自体はデータを処理しない。Spark、Presto、BigQuery 等の実行を「指示」するオーケストレーターである
3. コアコンセプト
3.1 DAG(Directed Acyclic Graph)
DAG は Airflow の最も基本的な概念である。DAG はタスクのコレクションであり、それらの間の依存関係と実行順序を定義する。
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email': ['data-team@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=60),
'execution_timeout': timedelta(hours=2),
'sla': timedelta(hours=4),
}
dag = DAG(
dag_id='example_etl_pipeline',
default_args=default_args,
description='本番ETLパイプラインの例',
schedule_interval='0 2 * * *', # 毎日午前2時
start_date=datetime(2025, 1, 1),
end_date=None,
catchup=False,
max_active_runs=1,
max_active_tasks=10,
dagrun_timeout=timedelta(hours=6),
tags=['etl', 'production', 'data-warehouse'],
doc_md="""
## ETL パイプライン
データソースから抽出し、変換後にデータウェアハウスにロードするパイプライン。
### オーナー
Data Engineering チーム
### SLA
午前6時までに完了すること
""",
)
DAG の主要パラメータ解説
| パラメータ | 説明 | 推奨値 |
|---|---|---|
dag_id | DAGの一意識別子 | チーム名_用途_対象 の命名規則を推奨 |
schedule_interval | 実行スケジュール(cron式またはtimedelta) | ユースケースに応じて設定 |
start_date | DAGの開始日 | 固定の過去日を推奨 |
catchup | 過去分の実行を補完するか | 本番では通常 False |
max_active_runs | 同時実行可能なDAG Run数 | リソースに応じて1-3 |
max_active_tasks | 同時実行可能なタスク数 | ワーカーリソースに応じて設定 |
dagrun_timeout | DAG Run全体のタイムアウト | SLAに基づいて設定 |
tags | 分類用タグ | チーム名、環境、用途を付与 |
3.2 Task
Task は DAG 内の処理の最小単位である。各 Task は Operator のインスタンスとして定義される。
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_function,
op_kwargs={'source': 'postgresql', 'table': 'users'},
pool='db_connection_pool',
priority_weight=10,
weight_rule='downstream',
queue='high_priority',
dag=dag,
)
3.3 Task Instance
Task Instance は、特定の DAG Run における特定の Task の実行インスタンスである。Task Instance は以下の状態遷移を持つ。
none -> scheduled -> queued -> running -> success
-> failed -> up_for_retry -> scheduled -> ...
-> upstream_failed
-> skipped
-> deferred -> scheduled -> ...
主要な状態:
- none: まだスケジュールされていない
- scheduled: スケジューラによって実行がスケジュールされた
- queued: Executor のキューに投入された
- running: 実行中
- success: 正常終了
- failed: 異常終了
- up_for_retry: リトライ待ち
- upstream_failed: 上流タスクの失敗により実行されなかった
- skipped: 条件分岐等でスキップされた
- deferred: Deferrable Operator により延期中(ワーカーリソースを解放)
3.4 DAG Run
DAG Run は DAG の1回の実行インスタンスである。以下の方法でトリガーされる。
- スケジュール実行:
schedule_intervalに基づく自動実行 - 手動トリガー: Web UI または CLI からの手動実行
- 外部トリガー: REST API、
TriggerDagRunOperator、外部システムからのトリガー
# CLI からの手動トリガー
# airflow dags trigger -c '{"key": "value"}' example_etl_pipeline
# REST API からのトリガー
# curl -X POST "http://localhost:8080/api/v1/dags/example_etl_pipeline/dagRuns" \
# -H "Content-Type: application/json" \
# -d '{"conf": {"key": "value"}}'
3.5 logical_date(旧 execution_date)
Airflow 2.2 以降、execution_date は logical_date にリネームされた。これは DAG Run が「論理的に」対象とする時刻であり、実際の実行時刻ではない点に注意が必要である。
例えば、毎日午前2時に実行される DAG の場合:
logical_date = 2025-01-15 02:00:00の DAG Run は、2025-01-16 02:00:00 頃に実行される- つまり、
logical_dateは「処理対象データの期間の開始時刻」を示す
sql = """
SELECT * FROM events
WHERE event_date >= '{{ ds }}'
AND event_date < '{{ next_ds }}'
"""
4. アーキテクチャ概要
4.1 コンポーネント構成
Airflow は以下の主要コンポーネントで構成される。
+------------------------------------------------------------------+
| Airflow Architecture |
| |
| +-------------+ +--------------+ +-----------------------+ |
| | Web Server | | Scheduler | | Triggerer | |
| | (Flask/ | | | | (async event loop) | |
| | Gunicorn) | | - DAG Parse | | | |
| | | | - Task | | Deferrable Operator | |
| | - Web UI | | Schedule | | Event Monitoring | |
| | - REST API | | - Executor | | | |
| +------+-------+ +------+------+ +-----------+-----------+ |
| | | | |
| v v v |
| +-------------------------------------------------------------+ |
| | Metadata Database | |
| | (PostgreSQL / MySQL) | |
| +-------------------------------------------------------------+ |
| ^ ^ |
| +------+-------+ +-----+-----------------------------------+ |
| | DAG Files | | Executor / Worker | |
| | (.py) | | [Worker 1] [Worker 2] ... [Worker N] | |
| +---------------+ +-----------------------------------------+ |
| |
| +-------------------------------------------------------------+ |
| | Log Storage (Local/S3/GCS/Elasticsearch) | |
| +-------------------------------------------------------------+ |
+------------------------------------------------------------------+
4.2 Web Server
Web Server は Airflow の UI と REST API を提供するコンポーネントである。Flask フレームワーク上に構築され、Gunicorn をWSGIサーバーとして使用する。
# airflow.cfg - Web Server 設定例
[webserver]
base_url = https://airflow.example.com
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_worker_timeout = 120
workers = 4
rbac = True
default_ui_timezone = Asia/Tokyo
dag_default_view = grid
4.3 Scheduler
Scheduler は Airflow の心臓部であり、DAGファイルの解析、DAG Runの生成、Taskのスケジューリング、状態管理を担当する。
[scheduler]
dag_dir_list_interval = 300
min_file_process_interval = 30
parsing_processes = 2
max_dagruns_to_create_per_loop = 10
max_dagruns_per_loop_to_schedule = 20
scheduler_idle_sleep_time = 1
max_tis_per_query = 512
num_runs = -1 # 無限ループ(本番推奨)
Scheduler の内部動作
1. DAG ファイルのパース
+-- 各 .py ファイルを子プロセスで解析し、DAG オブジェクトをDBに保存
2. DAG Run の作成
+-- schedule_interval に基づいて新規 DAG Run を作成
3. Task Instance のスケジューリング
+-- 依存関係の完了チェック -> Trigger Rule の評価 -> Pool のスロット確認
4. Task Instance のキューイング
+-- Executor に Task Instance を送信
5. Executor からの結果収集
+-- 完了した Task Instance の状態更新
4.4 Triggerer
Triggerer は Airflow 2.2 で導入されたコンポーネントで、Deferrable Operator と連携し、外部イベントの待機を非同期的に処理する。待機中のタスクはワーカーリソースを解放し、イベント発生時に再びワーカーで実行される。
[triggerer]
default_capacity = 1000
4.5 Metadata Database
Metadata Database は Airflow の全状態を永続化する。PostgreSQL または MySQL の使用が推奨される。
[database]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
5. Executor の種類と選定
5.1 SequentialExecutor
タスクを1つずつ逐次実行。開発・テスト・デバッグ専用。
5.2 LocalExecutor
単一マシン上で子プロセスを fork して並列実行。中小規模のワークロードに適している。
[core]
executor = LocalExecutor
parallelism = 32
5.3 CeleryExecutor
Celery を利用した分散タスク実行。水平スケーリングが可能。
[core]
executor = CeleryExecutor
parallelism = 64
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres:5432/airflow
worker_concurrency = 16
worker_autoscale = 16,4
5.4 KubernetesExecutor
各タスクを個別の Kubernetes Pod として実行。タスクごとにリソースを分離でき、アイドル時のリソース消費がゼロ。
[core]
executor = KubernetesExecutor
[kubernetes_executor]
namespace = airflow
delete_worker_pods = True
worker_pods_creation_batch_size = 16
# Pod テンプレートの使用例
from kubernetes.client import models as k8s
pod_template = k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
image="my-registry/airflow-worker:2.8.1",
resources=k8s.V1ResourceRequirements(
requests={"cpu": "1", "memory": "2Gi"},
limits={"cpu": "2", "memory": "4Gi"},
),
)
],
service_account_name="airflow-worker",
),
)
5.5 CeleryKubernetesExecutor
CeleryExecutor と KubernetesExecutor のハイブリッド。タスクのキュー名に基づいて使い分ける。
5.6 Executor 選定ガイド
| 要件 | 推奨 Executor |
|---|---|
| 開発・テスト | SequentialExecutor / LocalExecutor |
| 中小規模(単一サーバー) | LocalExecutor |
| 大規模・高可用性 | CeleryExecutor |
| Kubernetes 環境 | KubernetesExecutor |
| 混合ワークロード | CeleryKubernetesExecutor |
6. DAG の設計と実装
6.1 DAG ファイルの推奨構成
dags/
+-- common/
| +-- callbacks.py, default_args.py, utils.py
+-- etl/
| +-- dag_sales_etl.py, dag_user_etl.py
| +-- sql/
+-- ml/
| +-- dag_model_training.py
+-- maintenance/
| +-- dag_cleanup_logs.py
+-- config/
+-- dag_configs.yaml
6.2 標準的な ETL パイプライン
from airflow import DAG
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
with DAG(
dag_id='sales_etl_pipeline',
schedule_interval='0 18 * * *', # UTC 18:00 = JST 03:00
start_date=datetime(2025, 1, 1),
catchup=False,
max_active_runs=1,
tags=['etl', 'sales', 'production'],
) as dag:
start = EmptyOperator(task_id='start')
extract_sales = PostgresToGCSOperator(
task_id='extract_sales_to_gcs',
postgres_conn_id='sales_db',
sql="SELECT * FROM sales WHERE sale_date = '{{ ds }}'",
bucket='my-data-lake',
filename='raw/sales/{{ ds }}/sales_{{ ds_nodash }}.json',
export_format='json', gzip=True,
)
extract_customers = PostgresToGCSOperator(
task_id='extract_customers_to_gcs',
postgres_conn_id='sales_db',
sql="SELECT * FROM customers WHERE updated_at >= '{{ ds }}' AND updated_at < '{{ next_ds }}'",
bucket='my-data-lake',
filename='raw/customers/{{ ds }}/customers_{{ ds_nodash }}.json',
export_format='json', gzip=True,
)
load_sales_raw = GCSToBigQueryOperator(
task_id='load_sales_to_bq_raw',
bucket='my-data-lake',
source_objects=['raw/sales/{{ ds }}/sales_{{ ds_nodash }}.json.gz'],
destination_project_dataset_table='project.raw.sales_{{ ds_nodash }}',
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE', autodetect=True,
)
load_customers_raw = GCSToBigQueryOperator(
task_id='load_customers_to_bq_raw',
bucket='my-data-lake',
source_objects=['raw/customers/{{ ds }}/customers_{{ ds_nodash }}.json.gz'],
destination_project_dataset_table='project.raw.customers_{{ ds_nodash }}',
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE', autodetect=True,
)
transform_sales = BigQueryInsertJobOperator(
task_id='transform_sales',
configuration={"query": {"query": """
CREATE OR REPLACE TABLE `project.staging.fact_sales_{{ ds_nodash }}` AS
SELECT s.*, c.name AS customer_name, c.segment, c.region,
CURRENT_TIMESTAMP() AS etl_loaded_at
FROM `project.raw.sales_{{ ds_nodash }}` s
LEFT JOIN `project.raw.customers_{{ ds_nodash }}` c ON s.customer_id = c.customer_id
WHERE s.total_amount > 0
""", "useLegacySql": False}},
)
data_quality_check = BigQueryInsertJobOperator(
task_id='data_quality_check',
configuration={"query": {"query": """
SELECT CASE
WHEN COUNT(*) = 0 THEN ERROR('No records loaded for {{ ds }}')
ELSE 'PASS'
END FROM `project.staging.fact_sales_{{ ds_nodash }}`
""", "useLegacySql": False}},
)
load_to_mart = BigQueryInsertJobOperator(
task_id='load_to_mart',
configuration={"query": {"query": """
MERGE `project.mart.fact_sales` T
USING `project.staging.fact_sales_{{ ds_nodash }}` S ON T.sale_id = S.sale_id
WHEN MATCHED THEN UPDATE SET customer_name = S.customer_name, total_amount = S.total_amount
WHEN NOT MATCHED THEN INSERT ROW
""", "useLegacySql": False}},
)
end = EmptyOperator(task_id='end')
start >> [extract_sales, extract_customers]
extract_sales >> load_sales_raw
extract_customers >> load_customers_raw
[load_sales_raw, load_customers_raw] >> transform_sales
transform_sales >> data_quality_check >> load_to_mart >> end
6.3 動的 DAG 生成
import yaml
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
config_path = Path(__file__).parent / 'config' / 'dag_configs.yaml'
with open(config_path, 'r') as f:
configs = yaml.safe_load(f)
def create_etl_dag(config):
dag = DAG(
dag_id=f"dynamic_{config['name']}_etl",
schedule_interval=config.get('schedule', '@daily'),
start_date=datetime(2025, 1, 1),
catchup=False,
tags=config.get('tags', []) + ['dynamic'],
)
with dag:
for table in config['tables']:
extract = PythonOperator(task_id=f'extract_{table["name"]}', python_callable=extract_table,
op_kwargs={'source_conn': config['source_connection'], 'table': table['name']})
load = PythonOperator(task_id=f'load_{table["name"]}', python_callable=load_table,
op_kwargs={'dest_conn': config['dest_connection'], 'table': table['name']})
extract >> load
return dag
for config in configs.get('etl_pipelines', []):
dag_obj = create_etl_dag(config)
globals()[dag_obj.dag_id] = dag_obj
7. Operator 詳解
7.1 Operator の分類
Action Operators: PythonOperator, BashOperator, SparkSubmitOperator 等。実際の処理を実行する。
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
python_task = PythonOperator(task_id='process_data', python_callable=my_function,
op_kwargs={'key1': 'value1'})
bash_task = BashOperator(task_id='run_script',
bash_command='python /opt/scripts/process.py --date {{ ds }}',
env={'DATA_PATH': '/data/{{ ds }}/'}, append_env=True)
spark_task = SparkSubmitOperator(task_id='spark_etl',
application='/opt/spark/jobs/etl_job.py', conn_id='spark_default',
conf={'spark.executor.memory': '4g', 'spark.executor.instances': '10'})
Transfer Operators: PostgresToGCSOperator, S3ToRedshiftOperator 等。システム間のデータ転送。
Sensor Operators: S3KeySensor, ExternalTaskSensor, SqlSensor 等。条件が満たされるまで待機する。
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_for_s3 = S3KeySensor(
task_id='wait_for_upstream_data',
bucket_name='data-lake',
bucket_key='processed/{{ ds }}/complete.flag',
mode='reschedule', # ワーカースロットを解放
poke_interval=300, timeout=7200,
exponential_backoff=True,
)
7.2 カスタム Operator の作成
from airflow.models import BaseOperator
from typing import Any, Dict, Optional, Sequence
class DataQualityOperator(BaseOperator):
"""データ品質チェックを実行するカスタム Operator"""
template_fields: Sequence[str] = ('sql', 'parameters')
template_ext: Sequence[str] = ('.sql',)
ui_color = '#89DA59'
def __init__(self, sql: str, conn_id: str, expected_result: Any = None,
check_type: str = 'not_empty', **kwargs):
super().__init__(**kwargs)
self.sql = sql
self.conn_id = conn_id
self.expected_result = expected_result
self.check_type = check_type
def execute(self, context):
from airflow.hooks.base import BaseHook
hook = BaseHook.get_hook(self.conn_id)
result = hook.get_first(self.sql)
actual_value = result[0] if result else None
if self.check_type == 'not_empty' and (actual_value is None or actual_value <= 0):
raise ValueError(f"Data quality check failed: {actual_value}")
elif self.check_type == 'exact' and actual_value != self.expected_result:
raise ValueError(f"Expected {self.expected_result}, got {actual_value}")
self.log.info(f"Data quality check passed: {actual_value}")
return actual_value
8. タスク間の依存関係とデータ受け渡し
8.1 依存関係の定義方法
# ビットシフト演算子(推奨)
task_a >> task_b >> task_c
task_a >> [task_b, task_c] >> task_d
# cross_downstream
from airflow.models.baseoperator import cross_downstream
cross_downstream([extract_1, extract_2], [transform_1, transform_2])
# chain
from airflow.models.baseoperator import chain
chain(task_a, [task_b, task_c], [task_d, task_e], task_f)
8.2 Trigger Rule
| Trigger Rule | 条件 |
|---|---|
ALL_SUCCESS | 全上流が成功(デフォルト) |
ALL_DONE | 全上流が完了(状態不問) |
ONE_SUCCESS | 1つ以上の上流が成功 |
ONE_FAILED | 1つ以上の上流が失敗 |
NONE_FAILED | 失敗した上流がない |
NONE_FAILED_MIN_ONE_SUCCESS | 失敗がなく、少なくとも1つ成功 |
ALWAYS | 常に実行 |
8.3 XCom(Cross-Communication)
def extract_data(**context):
return {'record_count': 1500, 'file_path': f'/data/{context["ds"]}/output.csv'}
def validate_data(**context):
ti = context['ti']
result = ti.xcom_pull(task_ids='extract_data')
if result['record_count'] == 0:
raise ValueError("No records extracted")
ti.xcom_push(key='status', value='passed')
XCom の注意点: サイズ制限あり。大量データは外部ストレージに保存し、パスのみXComで渡す。カスタムXCom Backendで S3 等への自動オフロードも可能。
9. スケジューリングの仕組み
dag = DAG('cron_example', schedule_interval='0 2 * * *') # 毎日午前2時
dag = DAG('preset_daily', schedule_interval='@daily') # 毎日 00:00
dag = DAG('delta', schedule_interval=timedelta(hours=6)) # 6時間ごと
dag = DAG('manual_only', schedule_interval=None) # 手動実行のみ
# Timetable(Airflow 2.2+)
from airflow.timetables.trigger import CronTriggerTimetable
dag = DAG('timetable', timetable=CronTriggerTimetable('0 9 * * 1-5', timezone='Asia/Tokyo'))
Data Interval の概念
Data Interval: [start ---- end)
| |
+-- data_interval_start (= logical_date)
+-- data_interval_end
実行はここで行われる
10. 接続管理と Variable
Connection
# 環境変数: AIRFLOW_CONN_POSTGRES_DEFAULT=postgresql://user:pass@host:5432/dbname
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='my_postgres')
records = hook.get_records("SELECT COUNT(*) FROM users")
Secrets Backend
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}
Variable
from airflow.models import Variable
Variable.set('etl_config', {'batch_size': 10000}, serialize_json=True)
# テンプレートでの参照(推奨)
task = BashOperator(task_id='t', bash_command='echo {{ var.json.etl_config.batch_size }}')
11. センサーとトリガー
Sensor の動作モード
| モード | 特徴 |
|---|---|
poke | ワーカースロットを占有し続ける |
reschedule | チェック間はワーカースロットを解放 |
deferrable=True | Triggerer に処理を委譲(最もリソース効率が良い) |
Deferrable Operator
execute() 内で self.defer() を呼び出し、Trigger オブジェクトを Triggerer に渡す。Triggerer は非同期イベントループで外部イベントを監視し、イベント発生時にタスクを再びワーカーで実行する。
12. TaskFlow API
from airflow.decorators import dag, task, task_group
from datetime import datetime
@dag(dag_id='taskflow_etl', schedule_interval='@daily',
start_date=datetime(2025, 1, 1), catchup=False)
def taskflow_etl():
@task()
def extract() -> dict:
return {'users': [{'id': 1, 'name': 'Alice'}], 'count': 1}
@task(multiple_outputs=True)
def transform(data: dict) -> dict:
return {'transformed': data['users'], 'stats': {'count': data['count']}}
@task()
def load(data: list, stats: dict):
print(f"Loading {stats['count']} records")
# Dynamic Task Mapping(Airflow 2.3+)
@task()
def get_partitions() -> list:
return ['part_a', 'part_b', 'part_c']
@task()
def process_partition(partition: str) -> dict:
return {'partition': partition, 'records': 1000}
raw = extract()
result = transform(raw)
load(result['transformed'], result['stats'])
partitions = get_partitions()
process_partition.expand(partition=partitions)
dag_instance = taskflow_etl()
13. ブランチングと条件分岐
from airflow.operators.python import BranchPythonOperator, ShortCircuitOperator
def choose_branch(**context):
return 'full_load' if context['execution_date'].weekday() == 0 else 'incremental_load'
branch = BranchPythonOperator(task_id='branch', python_callable=choose_branch)
branch >> [full_load, incremental_load] >> EmptyOperator(
task_id='join', trigger_rule='none_failed_min_one_success')
# ShortCircuitOperator: False で下流全タスクをスキップ
should_run = ShortCircuitOperator(task_id='check', python_callable=check_data_exists)
14. テンプレートエンジンと Jinja
# 主要なテンプレート変数
# {{ ds }} - YYYY-MM-DD
# {{ ds_nodash }} - YYYYMMDD
# {{ logical_date }} - logical_date オブジェクト
# {{ data_interval_start }} - Data Interval の開始
# {{ data_interval_end }} - Data Interval の終了
# {{ macros.ds_add(ds, -1) }} - 前日
# {{ var.value.my_var }} - Variable の参照
# {{ var.json.config.key }} - JSON Variable の参照
# {{ ti.xcom_pull(...) }} - XCom の参照
# {{ params.my_param }} - パラメータの参照
# カスタムマクロ
dag = DAG('example', user_defined_macros={
'ds_to_quarter': lambda ds: f"{ds[:4]}Q{(int(ds[5:7])-1)//3+1}",
})
15. プラグインシステム
Airflow 2.0 以降、外部サービスとの統合は Provider パッケージとして分離された。
pip install apache-airflow-providers-amazon # AWS
pip install apache-airflow-providers-google # GCP
pip install apache-airflow-providers-microsoft-azure # Azure
pip install apache-airflow-providers-apache-spark # Spark
pip install apache-airflow-providers-databricks # Databricks
pip install apache-airflow-providers-snowflake # Snowflake
pip install apache-airflow-providers-slack # Slack
pip install apache-airflow-providers-kubernetes # Kubernetes
pip install apache-airflow-providers-dbt-cloud # dbt Cloud
16. セキュリティと認証
RBAC
dag = DAG('restricted_dag', access_control={
'data_engineering': {'can_read', 'can_edit'},
'data_science': {'can_read'},
})
認証バックエンド
LDAP、OAuth(Google, GitHub等)、SAML をサポート。webserver_config.py で設定する。
Fernet Key による暗号化
[core]
fernet_key = your-fernet-key-here
# キーローテーション: fernet_key = new-key,old-key-1,old-key-2
17. モニタリングとログ管理
ログの設定
[logging]
remote_logging = True
remote_log_conn_id = aws_default
remote_base_log_folder = s3://my-airflow-logs/logs
メトリクス
[metrics]
statsd_on = True
statsd_host = statsd-exporter
statsd_port = 9125
statsd_prefix = airflow
コールバック
def on_failure_callback(context):
ti = context['task_instance']
# Slack / PagerDuty / メール等で通知
dag = DAG('monitored', on_failure_callback=on_failure_callback,
sla_miss_callback=sla_miss_callback)
18. パフォーマンスチューニング
[scheduler]
dag_dir_list_interval = 300
parsing_processes = 4
[core]
parallelism = 64
max_active_tasks_per_dag = 16
store_serialized_dags = True
[database]
sql_alchemy_pool_size = 10
sql_alchemy_max_overflow = 20
sql_alchemy_pool_pre_ping = True
アンチパターンと推奨
# NG: トップレベルでの重い処理
df = pd.read_csv('large_file.csv') # パース時に毎回実行
# OK: タスク内に記述
def process():
import pandas as pd
df = pd.read_csv('large_file.csv')
# NG: DAGファイル内でのDB接続
env = Variable.get('environment') # パース時にDB問い合わせ
# OK: テンプレートで参照
# {{ var.value.environment }}
Pool によるリソース制御
task = PythonOperator(task_id='query_db', pool='db_connection_pool',
pool_slots=1, priority_weight=10)
19. Kubernetes 環境でのデプロイ
helm repo add apache-airflow https://airflow.apache.org
helm install airflow apache-airflow/airflow \
--namespace airflow --create-namespace --values values.yaml
# values.yaml(主要部分)
executor: KubernetesExecutor
scheduler:
replicas: 2
webserver:
replicas: 2
triggerer:
enabled: true
dags:
gitSync:
enabled: true
repo: git@github.com:org/airflow-dags.git
branch: main
config:
core:
parallelism: 64
store_serialized_dags: true
kubernetes_executor:
delete_worker_pods: true
logging:
remote_logging: true
remote_base_log_folder: s3://airflow-logs/logs
20. CI/CD とテスト戦略
DAG のテスト
import pytest
from airflow.models import DagBag
@pytest.fixture(scope='session')
def dagbag():
return DagBag(dag_folder='dags/', include_examples=False)
class TestDagIntegrity:
def test_no_import_errors(self, dagbag):
assert len(dagbag.import_errors) == 0
def test_all_dags_have_tags(self, dagbag):
for dag_id, dag in dagbag.dags.items():
assert len(dag.tags) > 0
def test_retries_configured(self, dagbag):
for dag_id, dag in dagbag.dags.items():
for task in dag.tasks:
assert task.retries >= 1
CI/CD パイプライン
name: Airflow CI/CD
on:
push:
branches: [main]
paths: ['dags/**', 'plugins/**', 'tests/**']
jobs:
lint-and-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install ruff black apache-airflow==2.8.1 pytest
- run: ruff check dags/ && black --check dags/
- run: pytest tests/ -v
deploy:
needs: lint-and-test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: aws s3 sync dags/ s3://airflow-production-dags/ --delete
21. ベストプラクティス
| カテゴリ | チェック項目 |
|---|---|
| DAG設計 | 冪等性が保証されているか |
| タスクの粒度は適切か | |
| リトライとタイムアウトが設定されているか | |
| catchup の設定は意図通りか | |
| 監視 | SLA が設定されているか |
| 失敗時のアラートが設定されているか | |
| ログが永続化されているか | |
| セキュリティ | 接続情報が暗号化されているか |
| RBAC が適切に設定されているか | |
| パフォーマンス | Pool が適切に設定されているか |
| DAGパース時に重い処理を実行していないか | |
| 可用性 | Scheduler は HA 構成か |
| Metadata DB はバックアップされているか |
22. トラブルシューティング
よくある問題と対処法
DAG が Web UI に表示されない:
airflow dags list-import-errors # インポートエラーを確認
タスクが stuck 状態になる:
airflow pools list # Pool のスロット不足を確認
メモリ不足(OOM):
- タスク内でチャンク処理を実装する
- XCom に大きなデータを保存しない
便利な管理コマンド
airflow dags list # DAG一覧
airflow dags trigger sales_etl_pipeline # 手動トリガー
airflow tasks test dag_id task_id 2025-01-15 # タスクのテスト実行
airflow tasks clear dag_id -s 2025-01-15 -e 2025-01-16 # タスク状態のクリア
airflow db check # DB接続チェック
airflow jobs check --job-type SchedulerJob # Scheduler の動作確認
23. 他ツールとの比較
| 特徴 | Apache Airflow | Prefect | Dagster | Luigi | Argo Workflows |
|---|---|---|---|---|---|
| 言語 | Python | Python | Python | Python | YAML/任意 |
| 定義方式 | Python (DAG) | Python (Flow) | Python (Job) | Python (Task) | YAML |
| UIの充実度 | 高い | 高い | 非常に高い | 低い | 中程度 |
| スケーラビリティ | 高い | 高い | 高い | 中程度 | 非常に高い |
| 動的ワークフロー | 可能 | 得意 | 可能 | 限定的 | 可能 |
| データリネージ | 限定的 | あり | 優れている | なし | なし |
| Kubernetes統合 | 良好 | 良好 | 良好 | なし | ネイティブ |
| コミュニティ | 最大 | 成長中 | 成長中 | 安定 | 大きい |
| 学習コスト | 中程度 | 低い | 中程度 | 低い | 高い |
24. まとめ
Apache Airflow の位置づけ
Apache Airflow は、2014年の誕生以来、データエンジニアリング領域で最も広く採用されているワークフローオーケストレーターである。
- Configuration as Code: Pythonによるワークフロー定義でソフトウェアエンジニアリングのベストプラクティスを適用可能
- 豊富なエコシステム: 数百のProvider パッケージで主要クラウドサービスとの統合が容易
- 柔軟なアーキテクチャ: 小規模開発環境から大規模本番環境まで対応
- 活発なコミュニティ: Apache トップレベルプロジェクトとして世界中の開発者がコントリビュート
今後の展望
- Airflow 3.x: より軽量で高速なアーキテクチャへの進化
- Deferrable Operator の普及: リソース効率の大幅な改善
- Data-Aware Scheduling: データセットの更新イベントに基づくスケジューリング
- Edge Worker: ハイブリッドクラウド・エッジ環境への対応
学習リソース
- 公式ドキュメント: https://airflow.apache.org/docs/
- GitHub リポジトリ: https://github.com/apache/airflow
- Astronomer ブログ: https://www.astronomer.io/blog/
本記事は Apache Airflow 2.8.x を基準に執筆されている。バージョンによって一部の機能や設定が異なる場合がある。