Apache Flink
Apache Flink 徹底解説:アーキテクチャ・機能・設定の全容
目次
- はじめに
- Apache Flink の概要と歴史
- アーキテクチャ全体像
- ランタイムアーキテクチャ
- DataStream API
- Table API と Flink SQL
- ウィンドウ処理
- 状態管理とステートバックエンド
- チェックポイントとセーブポイント
- Exactly-Once セマンティクス
- 時間の概念とウォーターマーク
- コネクターとフォーマット
- CEP(Complex Event Processing)
- バッチ処理
- デプロイメント
- 設定とチューニング
- メモリ管理
- 監視と運用
- セキュリティ
- ユースケースとベストプラクティス
- Flink vs 他エンジン比較
- 最新動向と将来展望
- まとめ
1. はじめに
Apache Flink は、ステートフルな計算をバウンデッド(有限)およびアンバウンデッド(無限)のデータストリームに対して行うための分散処理フレームワークである。「ストリーム処理ファースト」の設計思想を持ち、真のイベント単位のストリーム処理を実現する。
Flink の最大の特徴は、Exactly-Once の状態整合性を持つステートフルストリーム処理であり、低レイテンシと高スループットを両立させながら、障害時の状態回復を保証する。
1.1 本記事の対象読者
- リアルタイムデータパイプラインを構築するデータエンジニア
- ストリーミング分析基盤を設計するアーキテクト
- Flink クラスターの運用を行う SRE / インフラエンジニア
- イベント駆動アプリケーションを開発するバックエンドエンジニア
1.2 Flink の位置づけ
┌────────────────────────────────────────────────────────────┐
│ Applications │
│ (Real-time Analytics, ETL, Event-Driven, ML Inference) │
├────────────────────────────────────────────────────────────┤
│ Apache Flink Runtime │
│ (Stateful Stream Processing Engine) │
├──────────┬──────────┬──────────┬──────────────────────────┤
│DataStream│ Table │ Flink │ CEP / ML / │
│ API │ API │ SQL │ Graph (Gelly) │
├──────────┴──────────┴──────────┴──────────────────────────┤
│ State Backends │
│ (HashMapStateBackend / EmbeddedRocksDB) │
├────────────────────────────────────────────────────────────┤
│ Cluster Manager │
│ (Standalone / YARN / Kubernetes / Mesos) │
├────────────────────────────────────────────────────────────┤
│ Source / Sink Connectors │
│ (Kafka / Kinesis / JDBC / FileSystem / Iceberg / │
│ Elasticsearch / HBase / Cassandra / etc.) │
└────────────────────────────────────────────────────────────┘
2. Apache Flink の概要と歴史
2.1 誕生の背景
Apache Flink は、ドイツのベルリン工科大学の研究プロジェクト「Stratosphere」を起源とし、2014年に Apache Software Foundation のインキュベーションプロジェクトとなった。
従来のストリーム処理エンジン(Apache Storm等)がイベント単位の低レイテンシ処理に特化し、バッチエンジン(MapReduce, Spark等)がスループット重視であったのに対し、Flink はストリーム処理を基盤としてバッチ処理も統一的に扱うというアプローチを採用した。
2.2 バージョンの変遷
| バージョン | リリース年 | 主な特徴 |
|---|---|---|
| 0.9 | 2015 | 最初の安定版リリース |
| 1.0 | 2016 | Exactly-Once チェックポイント |
| 1.2 | 2016 | セーブポイント機能 |
| 1.3 | 2017 | インクリメンタルチェックポイント |
| 1.5 | 2018 | Blink SQL エンジン統合開始 |
| 1.7 | 2018 | Scala-free クラスパス |
| 1.9 | 2019 | Blink Planner(新SQL プランナー) |
| 1.11 | 2020 | Application Mode、Unaligned Checkpoints |
| 1.12 | 2020 | Kubernetes HA |
| 1.13 | 2021 | Reactive Mode |
| 1.14 | 2021 | Buffer Debloating |
| 1.15 | 2022 | チェックポイントの高速化 |
| 1.16 | 2022 | Generalized Incremental Checkpoints |
| 1.17 | 2023 | Async Sink、Watermark Alignment |
| 1.18 | 2023 | Flink SQL Gateway |
| 1.19 | 2024 | Materialized Table、Flink CDC統合強化 |
| 1.20 | 2024 | Disaggregated State Management |
| 2.0 | 2025 | DataStream API v2、Thread Model改善 |
2.3 主要な特徴
- 真のストリーム処理: イベント単位の低レイテンシ処理(マイクロバッチではない)
- ステートフル処理: 大規模な状態をExactly-Onceで管理
- イベントタイム処理: イベント時刻に基づく正確なウィンドウ処理
- Exactly-Once保証: チェックポイントメカニズムによる状態整合性
- 高スループット: 秒間数百万イベントの処理
- 統一バッチ・ストリーム: 同一APIでバッチとストリームを処理
- CEP: 複合イベント処理のネイティブサポート
- スケーラブル: 数千ノードまでスケール
3. アーキテクチャ全体像
3.1 レイヤードアーキテクチャ
┌──────────────────────────────────────────────────────────┐
│ API Layer │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐ │
│ │DataStream│ │ Table │ │ Flink │ │ PyFlink │ │
│ │ API │ │ API │ │ SQL │ │ (Python) │ │
│ └──────────┘ └──────────┘ └──────────┘ └────────────┘ │
├──────────────────────────────────────────────────────────┤
│ Dataflow Runtime │
│ ┌──────────────────────────────────────────────────┐ │
│ │ JobGraph → ExecutionGraph → Physical Graph │ │
│ └──────────────────────────────────────────────────┘ │
├──────────────────────────────────────────────────────────┤
│ State & Checkpoint │
│ ┌──────────┐ ┌─────────────────┐ ┌──────────────────┐ │
│ │ HashMap │ │ EmbeddedRocksDB│ │ Changelog │ │
│ │ State │ │ StateBackend │ │ StateBackend │ │
│ │ Backend │ │ │ │ │ │
│ └──────────┘ └─────────────────┘ └──────────────────┘ │
├──────────────────────────────────────────────────────────┤
│ Cluster Management │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐ │
│ │Standalone│ │ YARN │ │Kubernetes│ │ Session / │ │
│ │ │ │ │ │ │ │ Application │ │
│ └──────────┘ └──────────┘ └──────────┘ └────────────┘ │
└──────────────────────────────────────────────────────────┘
3.2 ストリーム処理の基本概念
Flink ではすべてのデータを ストリーム(Stream) として扱う:
- Unbounded Stream(無限ストリーム): 終わりのないデータストリーム(リアルタイムイベント)
- Bounded Stream(有限ストリーム): 終わりのあるデータセット(バッチデータ)
Unbounded Stream (ストリーム処理):
──●──●──●──●──●──●──●──●──●──●──●──●──► (終わりなし)
e1 e2 e3 e4 e5 e6 e7 e8 e9 ...
Bounded Stream (バッチ処理):
──●──●──●──●──●──●──●──●──●──●|
e1 e2 e3 e4 e5 e6 e7 e8 e9 e10 (終了あり)
4. ランタイムアーキテクチャ
4.1 コンポーネント構成
┌────────────────────────────────────────────────────────────┐
│ Client │
│ (CLI / REST API / Dashboard) │
└──────────────────────┬─────────────────────────────────────┘
│ Submit JobGraph
▼
┌────────────────────────────────────────────────────────────┐
│ JobManager (JM) │
│ ┌─────────────┐ ┌──────────────┐ ┌────────────────────┐ │
│ │ Dispatcher │ │ ResourceMgr │ │ JobMaster │ │
│ │ (Job受付) │ │ (リソース管理)│ │ (Job実行管理) │ │
│ └─────────────┘ └──────────────┘ └────────────────────┘ │
│ ┌─────────────┐ ┌──────────────┐ ┌────────────────────┐ │
│ │ Checkpoint │ │ Scheduler │ │ REST Endpoint │ │
│ │ Coordinator │ │ │ │ │ │
│ └─────────────┘ └──────────────┘ └────────────────────┘ │
└──────────────────────┬─────────────────────────────────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ TaskManager 1│ │ TaskManager 2│ │ TaskManager N│
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │ Slot 1 │ │ │ │ Slot 1 │ │ │ │ Slot 1 │ │
│ │┌────────┐│ │ │ │┌────────┐│ │ │ │┌────────┐│ │
│ ││ Task ││ │ │ ││ Task ││ │ │ ││ Task ││ │
│ │└────────┘│ │ │ │└────────┘│ │ │ │└────────┘│ │
│ ├──────────┤ │ │ ├──────────┤ │ │ ├──────────┤ │
│ │ Slot 2 │ │ │ │ Slot 2 │ │ │ │ Slot 2 │ │
│ │┌────────┐│ │ │ │┌────────┐│ │ │ │┌────────┐│ │
│ ││ Task ││ │ │ ││ Task ││ │ │ ││ Task ││ │
│ │└────────┘│ │ │ │└────────┘│ │ │ │└────────┘│ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
│ │ │ │ │ │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │ State │ │ │ │ State │ │ │ │ State │ │
│ │ Backend │ │ │ │ Backend │ │ │ │ Backend │ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │ Network │ │ │ │ Network │ │ │ │ Network │ │
│ │ Buffer │ │ │ │ Buffer │ │ │ │ Buffer │ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
└──────────────┘ └──────────────┘ └──────────────┘
4.2 JobManager
JobManager はFlinkクラスターのマスターノードであり、以下のコンポーネントで構成される:
- Dispatcher: ジョブの受付とJobMasterの起動
- ResourceManager: TaskManager のスロット管理とリソース割り当て
- JobMaster: 個々のジョブの実行を管理
- Checkpoint Coordinator: チェックポイントのトリガーと調整
4.3 TaskManager
TaskManager はワーカーノードであり、実際のデータ処理を行う:
- Task Slot: リソース分離の単位。各Slotは固定量のメモリを持つ
- Task: 演算子のサブタスク。1つのSlot内で複数のTaskをチェーン実行可能
- Network Buffer: TaskManager間のデータ交換用バッファ
- State Backend: ローカルの状態ストレージ
4.4 ジョブの実行フロー
User Code (DataStream / SQL)
│
▼
StreamGraph (論理グラフ)
│
▼
JobGraph (最適化済みグラフ - operator chaining)
│
▼
ExecutionGraph (並列化された実行グラフ)
│
▼
Physical Execution (各TaskManagerでの実行)
4.5 Operator Chaining
Flink は連続する演算子を1つのTaskにチェーンして実行することで、シリアライゼーションやネットワーク通信のオーバーヘッドを削減する。
チェーン前:
Source → Map → Filter → Sink (4つの別々のTask)
チェーン後:
[Source → Map → Filter] → Sink (2つのTask)
1つのスレッドで実行 別スレッド
// Operator Chainingの制御
stream
.map(value -> process(value))
.disableChaining() // このオペレータからチェーンを切断
.filter(value -> value > 0)
.startNewChain() // 新しいチェーンを開始
.keyBy(value -> value.getKey())
.process(new MyProcessFunction());
5. DataStream API
5.1 基本的なストリーム処理
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FilterFunction;
// 実行環境の作成
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 並列度の設定
env.setParallelism(4);
// ソースからの読み込み
DataStream<String> source = env.fromSource(
KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("input-topic")
.setGroupId("flink-consumer")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build(),
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
// 変換処理
DataStream<Event> events = source
.map(json -> Event.fromJson(json))
.filter(event -> event.getType() != null)
.name("Parse and Filter Events");
// キー付きストリーム
DataStream<EventStats> stats = events
.keyBy(event -> event.getUserId())
.process(new EventStatsProcessor())
.name("Compute Event Stats");
// シンクへの書き出し
stats.sinkTo(
KafkaSink.<EventStats>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new EventStatsSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build()
).name("Kafka Sink");
// 実行
env.execute("Event Processing Pipeline");
5.2 PyFlink (Python API)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
# 環境の作成
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Kafkaソース
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("input-topic") \
.set_group_id("flink-python-consumer") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
stream = env.from_source(
kafka_source,
WatermarkStrategy.no_watermarks(),
"Kafka Source"
)
# 処理
result = stream \
.map(lambda x: json.loads(x)) \
.filter(lambda x: x.get('event_type') is not None) \
.key_by(lambda x: x['user_id']) \
.process(MyProcessFunction())
env.execute("Python Flink Job")
5.3 主要な変換演算子
// map: 1:1変換
DataStream<Integer> doubled = stream.map(x -> x * 2);
// flatMap: 1:N変換
DataStream<String> words = lines.flatMap(
(String line, Collector<String> out) -> {
for (String word : line.split(" ")) {
out.collect(word);
}
}
).returns(Types.STRING);
// filter: フィルタリング
DataStream<Event> errors = events.filter(e -> e.getLevel().equals("ERROR"));
// keyBy: キー分割(論理パーティション)
KeyedStream<Event, String> keyed = events.keyBy(Event::getUserId);
// reduce: キーごとの集約
DataStream<Integer> sums = keyed.reduce((a, b) -> a + b);
// union: 複数ストリームの結合
DataStream<Event> combined = stream1.union(stream2, stream3);
// connect: 2つのストリームの接続(異なる型可)
ConnectedStreams<Event, Rule> connected = events.connect(rules.broadcast());
// Side Output: 条件に応じた分岐
final OutputTag<Event> errorTag = new OutputTag<Event>("errors") {};
SingleOutputStreamOperator<Event> mainStream = events
.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event event, Context ctx, Collector<Event> out) {
if (event.isError()) {
ctx.output(errorTag, event);
} else {
out.collect(event);
}
}
});
DataStream<Event> errorStream = mainStream.getSideOutput(errorTag);
// Async I/O: 非同期外部呼び出し
AsyncDataStream.unorderedWait(
events,
new AsyncDatabaseLookup(),
30, TimeUnit.SECONDS, // タイムアウト
100 // 同時リクエスト数
);
5.4 ProcessFunction(低レベルAPI)
ProcessFunction はFlink の最も強力なAPIであり、タイマー、状態アクセス、Side Output を提供する。
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
// 状態の定義
private ValueState<Boolean> flagState;
private ValueState<Long> timerState;
@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor =
new ValueStateDescriptor<>("flag", Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor =
new ValueStateDescriptor<>("timer-state", Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
@Override
public void processElement(Transaction tx, Context ctx, Collector<Alert> out) throws Exception {
Boolean lastWasSmall = flagState.value();
if (lastWasSmall != null && lastWasSmall) {
if (tx.getAmount() > 500.0) {
// 不正検知: 小額の直後に大額
out.collect(new Alert(tx.getAccountId(), "Potential fraud detected"));
}
clearState();
}
if (tx.getAmount() < 1.0) {
flagState.update(true);
// 1分後のタイマー設定
long timer = ctx.timerService().currentProcessingTime() + 60000;
ctx.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
// タイマー発火: フラグをクリア
timerState.clear();
flagState.clear();
}
private void clearState() {
flagState.clear();
Long timer = timerState.value();
if (timer != null) {
timerState.clear();
}
}
}
6. Table API と Flink SQL
6.1 Table API の概要
Table APIはFlink のリレーショナルAPIであり、SQLに似た宣言的な処理を行う。
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
// Table環境の作成
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// テーブルの登録
tableEnv.executeSql("""
CREATE TABLE kafka_events (
event_id BIGINT,
user_id BIGINT,
event_type STRING,
amount DECIMAL(10, 2),
event_time TIMESTAMP(3),
proc_time AS PROCTIME(),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-sql-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
""");
// Table APIクエリ
Table result = tableEnv.from("kafka_events")
.filter($("event_type").isEqual("purchase"))
.groupBy($("user_id"), $("event_type"))
.select(
$("user_id"),
$("event_type"),
$("amount").sum().as("total_amount"),
$("event_id").count().as("event_count")
);
6.2 Flink SQL
-- Kafka ソーステーブル
CREATE TABLE page_views (
user_id BIGINT,
page_url STRING,
view_duration INT,
event_time TIMESTAMP(3),
proc_time AS PROCTIME(),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'page-views',
'properties.bootstrap.servers' = 'kafka:9092',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
-- Icebergシンクテーブル
CREATE TABLE page_view_stats (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
page_url STRING,
view_count BIGINT,
unique_users BIGINT,
avg_duration DOUBLE,
PRIMARY KEY (window_start, page_url) NOT ENFORCED
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'iceberg_catalog',
'catalog-type' = 'hive',
'uri' = 'thrift://hive-metastore:9083',
'warehouse' = 's3://data-lake/warehouse'
);
-- ウィンドウ集約
INSERT INTO page_view_stats
SELECT
window_start,
window_end,
page_url,
COUNT(*) as view_count,
COUNT(DISTINCT user_id) as unique_users,
AVG(CAST(view_duration AS DOUBLE)) as avg_duration
FROM TABLE(
TUMBLE(TABLE page_views, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
)
GROUP BY window_start, window_end, page_url;
-- Temporal Join(時間的結合)
CREATE TABLE currency_rates (
currency STRING,
rate DECIMAL(10, 6),
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'currency-rates',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
-- イベント時刻でのTemporal Join
SELECT
o.order_id,
o.amount,
o.currency,
r.rate,
o.amount * r.rate AS amount_usd
FROM orders AS o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;
-- Lookup Join(外部テーブル参照)
CREATE TABLE user_profiles (
user_id BIGINT,
name STRING,
country STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://db:5432/users',
'table-name' = 'user_profiles',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '5min'
);
SELECT
e.event_id,
e.user_id,
u.name,
u.country,
e.event_type,
e.event_time
FROM page_views AS e
JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;
-- Deduplication(重複排除)
SELECT event_id, user_id, event_type, event_time
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_time ASC) AS rn
FROM page_views
)
WHERE rn = 1;
-- Top-N クエリ
SELECT user_id, page_url, view_count
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY page_url ORDER BY view_count DESC) AS rn
FROM page_view_summary
)
WHERE rn <= 10;
-- Pattern Recognition (MATCH_RECOGNIZE)
SELECT *
FROM page_views
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
FIRST(A.page_url) AS first_page,
LAST(B.page_url) AS checkout_page,
A.event_time AS start_time,
LAST(B.event_time) AS end_time
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B+ C)
DEFINE
A AS A.page_url LIKE '%/product/%',
B AS B.page_url LIKE '%/cart%',
C AS C.page_url LIKE '%/checkout/complete%'
);
7. ウィンドウ処理
7.1 ウィンドウの種類
Tumbling Window(タンブリングウィンドウ)- 固定長、重複なし:
|----W1----|----W2----|----W3----|----W4----|
0 5 10 15 20 25 30 35 40 (秒)
Sliding Window(スライディングウィンドウ)- 固定長、重複あり:
|--------W1--------|
|--------W2--------|
|--------W3--------|
0 5 10 15 20 25 30 (秒)
Session Window(セッションウィンドウ)- ギャップベース:
|--W1--| |----W2----| |--W3--|
●●● ● ●●●●● ●●●
gap=5s gap=5s gap=5s
Global Window(グローバルウィンドウ)- カスタムトリガー:
|----────────────── entire stream ──────────────────|
7.2 DataStream API でのウィンドウ
// Tumbling Window(イベントタイム)
events
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new EventAggregator());
// Sliding Window
events
.keyBy(Event::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
.reduce((a, b) -> a.merge(b));
// Session Window
events
.keyBy(Event::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.process(new SessionWindowFunction());
// Processing Time Window
events
.keyBy(Event::getUserId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
.sum("amount");
// カスタムトリガー
events
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) // 1分ごとに中間結果
.aggregate(new EventAggregator());
// 遅延データの許容
events
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(10))
.sideOutputLateData(lateOutputTag)
.aggregate(new EventAggregator());
7.3 Flink SQL でのウィンドウ(TVF)
-- Tumbling Window (TVF)
SELECT
window_start, window_end,
user_id,
COUNT(*) as event_count,
SUM(amount) as total_amount
FROM TABLE(
TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
)
GROUP BY window_start, window_end, user_id;
-- Hop Window (Sliding) (TVF)
SELECT
window_start, window_end,
COUNT(*) as event_count
FROM TABLE(
HOP(TABLE events, DESCRIPTOR(event_time), INTERVAL '1' MINUTE, INTERVAL '10' MINUTE)
)
GROUP BY window_start, window_end;
-- Cumulate Window (TVF)
SELECT
window_start, window_end,
SUM(amount) as cumulative_amount
FROM TABLE(
CUMULATE(TABLE events, DESCRIPTOR(event_time), INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;
-- Session Window (TVF)
SELECT
window_start, window_end,
user_id,
COUNT(*) as event_count
FROM TABLE(
SESSION(TABLE events PARTITION BY user_id, DESCRIPTOR(event_time), INTERVAL '30' MINUTE)
)
GROUP BY window_start, window_end, user_id;
8. 状態管理とステートバックエンド
8.1 状態の種類
Flink はオペレーター内で管理される状態(State)を提供する:
| 状態タイプ | 説明 | API |
|---|---|---|
| ValueState | 単一値 | value(), update(T) |
| ListState | リスト | add(T), get(), update(List<T>) |
| MapState<K, V> | マップ | get(K), put(K, V), entries() |
| ReducingState | リデュース関数付き | add(T), get() |
| AggregatingState<IN, OUT> | 集約関数付き | add(IN), get() |
public class UserSessionTracker extends KeyedProcessFunction<String, Event, SessionResult> {
private ValueState<Long> sessionStart;
private ValueState<Long> lastActivity;
private MapState<String, Integer> pageVisits;
private ListState<String> recentPages;
@Override
public void open(OpenContext openContext) {
sessionStart = getRuntimeContext().getState(
new ValueStateDescriptor<>("session-start", Long.class));
lastActivity = getRuntimeContext().getState(
new ValueStateDescriptor<>("last-activity", Long.class));
pageVisits = getRuntimeContext().getMapState(
new MapStateDescriptor<>("page-visits", String.class, Integer.class));
recentPages = getRuntimeContext().getListState(
new ListStateDescriptor<>("recent-pages", String.class));
}
@Override
public void processElement(Event event, Context ctx, Collector<SessionResult> out)
throws Exception {
long currentTime = event.getTimestamp();
if (sessionStart.value() == null) {
// 新しいセッション開始
sessionStart.update(currentTime);
}
// ページ訪問カウント
String page = event.getPage();
Integer count = pageVisits.get(page);
pageVisits.put(page, count == null ? 1 : count + 1);
// 最近のページリスト
recentPages.add(page);
// タイマー設定(30分アイドルでセッション終了)
lastActivity.update(currentTime);
ctx.timerService().registerEventTimeTimer(currentTime + 30 * 60 * 1000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<SessionResult> out)
throws Exception {
if (lastActivity.value() != null && timestamp >= lastActivity.value() + 30 * 60 * 1000) {
// セッション結果を出力
out.collect(new SessionResult(
ctx.getCurrentKey(),
sessionStart.value(),
lastActivity.value(),
collectPages()
));
// 状態をクリア
sessionStart.clear();
lastActivity.clear();
pageVisits.clear();
recentPages.clear();
}
}
}
8.2 ステートバックエンド
| バックエンド | 状態の格納先 | 特徴 | 推奨用途 |
|---|---|---|---|
| HashMapStateBackend | JVMヒープ | 高速、メモリ制約あり | 小〜中規模状態 |
| EmbeddedRocksDBStateBackend | RocksDB(ローカルディスク) | 大規模状態対応、やや低速 | 大規模状態 |
# flink-conf.yaml
# HashMapStateBackend
state.backend.type: hashmap
state.checkpoints.dir: s3://flink-state/checkpoints
state.savepoints.dir: s3://flink-state/savepoints
# EmbeddedRocksDBStateBackend
state.backend.type: rocksdb
state.backend.rocksdb.localdir: /var/flink/rocksdb
state.backend.incremental: true
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.block.cache-size: 256mb
state.backend.rocksdb.writebuffer.size: 128mb
state.backend.rocksdb.writebuffer.count: 4
state.backend.rocksdb.compaction.level.max-size-level-base: 320mb
// プログラム内でのステートバックエンド設定
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // incremental = true
env.getCheckpointConfig().setCheckpointStorage("s3://flink-state/checkpoints");
8.3 Broadcast State
Broadcast State は、小さなデータセット(ルール、設定等)を全パーティションで共有するためのパターンである。
// ルールストリーム
DataStream<Rule> ruleStream = env.fromSource(ruleSource, ...);
// イベントストリーム
DataStream<Event> eventStream = env.fromSource(eventSource, ...);
// Broadcast State Descriptor
MapStateDescriptor<String, Rule> ruleStateDesc =
new MapStateDescriptor<>("rules", String.class, Rule.class);
// ルールをブロードキャスト
BroadcastStream<Rule> broadcastRules = ruleStream.broadcast(ruleStateDesc);
// イベントストリームとブロードキャストストリームの接続
DataStream<Alert> alerts = eventStream
.connect(broadcastRules)
.process(new BroadcastProcessFunction<Event, Rule, Alert>() {
@Override
public void processElement(Event event, ReadOnlyContext ctx, Collector<Alert> out) {
ReadOnlyBroadcastState<String, Rule> rules =
ctx.getBroadcastState(ruleStateDesc);
// ルールに基づいてイベントを評価
for (Map.Entry<String, Rule> entry : rules.immutableEntries()) {
if (entry.getValue().matches(event)) {
out.collect(new Alert(event, entry.getValue()));
}
}
}
@Override
public void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out) {
BroadcastState<String, Rule> state = ctx.getBroadcastState(ruleStateDesc);
state.put(rule.getId(), rule);
}
});
9. チェックポイントとセーブポイント
9.1 チェックポイント
チェックポイントはFlinkの障害回復メカニズムであり、分散スナップショットアルゴリズム(Chandy-Lamportの変形)に基づく。
Checkpoint Barrier の伝播:
Source 1: ──●──●──|CB|──●──●──●──|CB|──●──●──►
Source 2: ──●──●──●──|CB|──●──●──|CB|──●──●──►
↓ ↓
Barrier Alignment State Snapshot
↓
Checkpoint Storage (S3/HDFS)
# flink-conf.yaml
# チェックポイント設定
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 600s
execution.checkpointing.min-pause: 30s
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.prefer-checkpoint-for-recovery: true
# チェックポイントストレージ
state.checkpoints.dir: s3://flink-state/checkpoints
state.checkpoints.num-retained: 3
# Unaligned Checkpoints(バックプレッシャー下で有効)
execution.checkpointing.unaligned.enabled: true
execution.checkpointing.unaligned.forced: false
# Incremental Checkpoints(RocksDB使用時に推奨)
state.backend.incremental: true
# Changelog State Backend(高速リカバリ)
state.changelog.enabled: true
state.changelog.storage: filesystem
dstl.dfs.base-path: s3://flink-state/changelog
// プログラム内でのチェックポイント設定
env.enableCheckpointing(60000); // 60秒ごと
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10分
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// ジョブキャンセル時にチェックポイントを保持
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
9.2 セーブポイント
セーブポイントは手動でトリガーされるチェックポイントであり、ジョブのアップグレードやマイグレーションに使用する。
# セーブポイントの作成
flink savepoint <jobId> s3://flink-state/savepoints/
# セーブポイントからのジョブ再開
flink run -s s3://flink-state/savepoints/savepoint-xxxxx \
-c com.example.MyJob my-job.jar
# セーブポイントの破棄
flink savepoint -d s3://flink-state/savepoints/savepoint-xxxxx
セーブポイント互換性のためのUID設定:
// 各オペレーターにUIDを設定(セーブポイント互換性に必須)
events
.keyBy(Event::getUserId)
.uid("user-keyed-stream") // UID設定
.process(new UserProcessor())
.uid("user-processor") // UID設定
.name("User Event Processor"); // 表示名
10. Exactly-Once セマンティクス
10.1 End-to-End Exactly-Once
Flink は以下のメカニズムで End-to-End の Exactly-Once を実現する:
Source Flink Internal Sink
┌─────────┐ ┌──────────────────┐ ┌────────────┐
│ Kafka │ │ Checkpoint + │ │ 2-Phase │
│ Consumer│────►│ Barrier │────►│ Commit │
│ Offset │ │ Alignment │ │ (Kafka │
│ Tracking│ │ │ │ Producer) │
└─────────┘ └──────────────────┘ └────────────┘
↓ ↓ ↓
Replayable State Snapshot Transactional
Source (exactly-once Write
internal state) (pre-commit → commit)
10.2 Two-Phase Commit Sink
// Kafka Sink with Exactly-Once
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-tx")
.setProperty("transaction.timeout.ms", "900000") // 15分
.build();
注意: Exactly-Once Sink を使用する場合、Kafka の transaction.timeout.ms はチェックポイント間隔より大きく設定する必要がある。
11. 時間の概念とウォーターマーク
11.1 3つの時間概念
Event Time: 実際のイベント発生時刻
●(10:00:00) ●(10:00:01) ●(10:00:05) ●(10:00:02)
↑ 遅延到着
Ingestion Time: Flinkソースに到達した時刻
●(10:00:03) ●(10:00:03) ●(10:00:08) ●(10:00:08)
Processing Time: オペレーターで処理された時刻
●(10:00:05) ●(10:00:05) ●(10:00:10) ●(10:00:10)
11.2 ウォーターマーク
ウォーターマークは「この時刻までのイベントは全て到着した」という宣言であり、ウィンドウのクローズタイミングを制御する。
// ウォーターマーク戦略の設定
DataStream<Event> events = env.fromSource(source,
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
.withIdleness(Duration.ofMinutes(5)), // アイドルソースの検出
"Event Source"
);
// カスタムウォーターマーク
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forGenerator(ctx -> new WatermarkGenerator<Event>() {
private long maxTimestamp = Long.MIN_VALUE;
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - 10000)); // 10秒の遅延許容
}
})
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
11.3 Watermark Alignment(Flink 1.17+)
複数ソースパーティション間のウォーターマーク進行を揃える機能。
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.getTimestamp())
.withWatermarkAlignment("alignment-group", Duration.ofSeconds(30), Duration.ofSeconds(5));
12. コネクターとフォーマット
12.1 主要なソースコネクター
| コネクター | 用途 | Exactly-Once Source |
|---|---|---|
| Kafka | メッセージキュー | ✓ |
| Amazon Kinesis | AWSストリーミング | ✓ |
| Apache Pulsar | メッセージング | ✓ |
| FileSystem (S3/HDFS) | ファイルソース | ✓ |
| JDBC | リレーショナルDB | ✓ |
| CDC (Debezium) | データベースCDC | ✓ |
| MongoDB | ドキュメントDB | ✓ |
| Elasticsearch | 検索エンジン | N/A |
12.2 主要なシンクコネクター
// Kafka Sink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
// JDBC Sink
SinkFunction<Event> jdbcSink = JdbcSink.sink(
"INSERT INTO events (event_id, user_id, event_type, amount) VALUES (?, ?, ?, ?)",
(statement, event) -> {
statement.setLong(1, event.getEventId());
statement.setLong(2, event.getUserId());
statement.setString(3, event.getEventType());
statement.setDouble(4, event.getAmount());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://db:5432/events")
.withDriverName("org.postgresql.Driver")
.withUsername("flink")
.withPassword("password")
.build()
);
// Elasticsearch Sink
ElasticsearchSink<Event> esSink = new ElasticsearchSinkBuilder<Event>()
.setHosts(new HttpHost("es-host", 9200, "https"))
.setEmitter((event, context, indexer) -> {
indexer.add(
Requests.indexRequest()
.index("events-" + event.getDate())
.id(String.valueOf(event.getEventId()))
.source(event.toJson(), XContentType.JSON)
);
})
.setBulkFlushMaxActions(1000)
.setBulkFlushInterval(5000)
.build();
// FileSink(ストリーミングファイル書き出し)
FileSink<String> fileSink = FileSink
.forRowFormat(
new Path("s3://data-lake/events"),
new SimpleStringEncoder<String>("UTF-8")
)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(256))
.build()
)
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
.build();
12.3 Flink CDC(Change Data Capture)
-- MySQL CDC Source
CREATE TABLE mysql_orders (
order_id BIGINT,
customer_id BIGINT,
product_name STRING,
price DECIMAL(10, 2),
order_status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql-host',
'port' = '3306',
'username' = 'flink_cdc',
'password' = 'password',
'database-name' = 'orders_db',
'table-name' = 'orders',
'server-time-zone' = 'UTC',
'scan.startup.mode' = 'initial' -- スナップショット + 増分
);
-- PostgreSQL CDC Source
CREATE TABLE pg_customers (
customer_id BIGINT,
name STRING,
email STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'pg-host',
'port' = '5432',
'username' = 'flink_cdc',
'password' = 'password',
'database-name' = 'crm',
'schema-name' = 'public',
'table-name' = 'customers',
'slot.name' = 'flink_slot',
'decoding.plugin.name' = 'pgoutput'
);
-- CDC → Iceberg パイプライン
INSERT INTO iceberg_orders
SELECT order_id, customer_id, product_name, price, order_status, order_time
FROM mysql_orders;
13. CEP(Complex Event Processing)
13.1 CEP ライブラリ
Flink CEP は、ストリーム上のイベントパターンを検出するためのライブラリである。
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
// パターン定義: ログイン失敗3回 → ログイン成功
Pattern<LoginEvent, ?> loginFailPattern = Pattern
.<LoginEvent>begin("first-fail")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent event) {
return event.getType().equals("FAIL");
}
})
.next("second-fail")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent event) {
return event.getType().equals("FAIL");
}
})
.next("third-fail")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent event) {
return event.getType().equals("FAIL");
}
})
.followedBy("success")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent event) {
return event.getType().equals("SUCCESS");
}
})
.within(Time.minutes(5));
// パターンの適用
PatternStream<LoginEvent> patternStream = CEP.pattern(
loginEvents.keyBy(LoginEvent::getUserId),
loginFailPattern
);
// マッチした結果の処理
DataStream<Alert> alerts = patternStream.process(
new PatternProcessFunction<LoginEvent, Alert>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> match,
Context ctx, Collector<Alert> out) {
LoginEvent firstFail = match.get("first-fail").get(0);
LoginEvent success = match.get("success").get(0);
out.collect(new Alert(
firstFail.getUserId(),
"Suspicious login: 3 failures followed by success",
firstFail.getTimestamp(),
success.getTimestamp()
));
}
}
);
14. バッチ処理
Flink はストリーム処理エンジンの上にバッチ処理を構築する。Bounded Stream として扱い、バッチ最適化を適用する。
// バッチモードの設定
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// FileSystemからの読み込み
DataStream<String> input = env.readTextFile("s3://data-lake/input/");
DataStream<WordCount> wordCounts = input
.flatMap((String line, Collector<WordCount> out) -> {
for (String word : line.split("\\s+")) {
out.collect(new WordCount(word, 1));
}
})
.returns(WordCount.class)
.keyBy(wc -> wc.word)
.sum("count");
wordCounts.sinkTo(
FileSink.forRowFormat(new Path("s3://data-lake/output/"), new SimpleStringEncoder<>())
.build()
);
-- Flink SQL でのバッチ処理
SET 'execution.runtime-mode' = 'batch';
-- ファイルソース
CREATE TABLE batch_input (
user_id BIGINT,
action STRING,
amount DECIMAL(10, 2),
event_date DATE
) WITH (
'connector' = 'filesystem',
'path' = 's3://data-lake/input/',
'format' = 'parquet'
);
-- 集約クエリ
INSERT INTO output_table
SELECT
user_id,
event_date,
COUNT(*) as action_count,
SUM(amount) as total_amount
FROM batch_input
GROUP BY user_id, event_date;
15. デプロイメント
15.1 デプロイモードの比較
| モード | 説明 | 推奨用途 |
|---|---|---|
| Session Mode | 共有クラスターに複数ジョブを投入 | 開発、テスト、短時間ジョブ |
| Per-Job Mode (deprecated) | ジョブごとに専用クラスター | レガシー |
| Application Mode | アプリケーションごとに専用クラスター | 本番環境(推奨) |
15.2 Kubernetes デプロイ(推奨)
# flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 4096m
taskmanager.memory.process.size: 8192m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 8
# Checkpointing
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
state.backend.type: rocksdb
state.backend.incremental: true
state.checkpoints.dir: s3://flink-state/checkpoints
state.savepoints.dir: s3://flink-state/savepoints
# HA (Kubernetes)
high-availability.type: kubernetes
high-availability.storageDir: s3://flink-state/ha
kubernetes.cluster-id: my-flink-cluster
# REST
rest.port: 8081
# Metrics
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249
log4j-console.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# flink-jobmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.19-java17
args: ["jobmanager"]
ports:
- containerPort: 6123
- containerPort: 8081
- containerPort: 9249
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "4Gi"
cpu: "4"
volumeMounts:
- name: flink-config
mountPath: /opt/flink/conf
volumes:
- name: flink-config
configMap:
name: flink-config
# flink-taskmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 5
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.19-java17
args: ["taskmanager"]
ports:
- containerPort: 9249
resources:
requests:
memory: "8Gi"
cpu: "4"
limits:
memory: "8Gi"
cpu: "8"
volumeMounts:
- name: flink-config
mountPath: /opt/flink/conf
- name: rocksdb-storage
mountPath: /var/flink/rocksdb
volumes:
- name: flink-config
configMap:
name: flink-config
- name: rocksdb-storage
emptyDir:
sizeLimit: 100Gi
15.3 Flink Kubernetes Operator
# FlinkDeployment CRD
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: my-flink-job
spec:
image: my-registry/flink-job:latest
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
state.backend.type: rocksdb
state.backend.incremental: "true"
execution.checkpointing.interval: "60s"
state.checkpoints.dir: s3://flink-state/checkpoints
high-availability.type: kubernetes
high-availability.storageDir: s3://flink-state/ha
serviceAccount: flink
jobManager:
resource:
memory: "4096m"
cpu: 2
replicas: 1
taskManager:
resource:
memory: "8192m"
cpu: 4
replicas: 5
job:
jarURI: local:///opt/flink/usrlib/my-job.jar
entryClass: com.example.MyFlinkJob
parallelism: 20
upgradeMode: savepoint
savepointTriggerNonce: 0
state: running
16. 設定とチューニング
16.1 主要な設定パラメータ
# flink-conf.yaml (主要設定)
# ===== JobManager設定 =====
jobmanager.memory.process.size: 4096m
jobmanager.memory.jvm-overhead.min: 192m
jobmanager.memory.jvm-overhead.max: 1g
jobmanager.memory.jvm-overhead.fraction: 0.1
jobmanager.rpc.address: jobmanager-host
jobmanager.rpc.port: 6123
# ===== TaskManager設定 =====
taskmanager.memory.process.size: 8192m
taskmanager.memory.framework.heap.size: 128m
taskmanager.memory.task.heap.size: 4096m
taskmanager.memory.managed.size: 2048m
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64m
taskmanager.memory.network.max: 1g
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.jvm-overhead.fraction: 0.1
# ===== 並列度 =====
parallelism.default: 8
# ===== チェックポイント =====
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 600s
execution.checkpointing.min-pause: 30s
execution.checkpointing.max-concurrent-checkpoints: 1
# ===== State Backend =====
state.backend.type: rocksdb
state.backend.incremental: true
state.checkpoints.dir: s3://flink-state/checkpoints
state.savepoints.dir: s3://flink-state/savepoints
state.backend.rocksdb.localdir: /var/flink/rocksdb
# ===== HA =====
high-availability.type: kubernetes
high-availability.storageDir: s3://flink-state/ha
# ===== ネットワーク =====
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
taskmanager.network.request-backoff.initial: 100
taskmanager.network.request-backoff.max: 10000
# ===== REST =====
rest.port: 8081
rest.address: 0.0.0.0
rest.bind-port: 8081
# ===== メトリクス =====
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249
16.2 パフォーマンスチューニング
# バックプレッシャー対策
taskmanager.network.memory.buffers-per-channel: 2
taskmanager.network.memory.floating-buffers-per-gate: 8
# Buffer Debloating(自動バッファ調整)
taskmanager.network.memory.buffer-debloat.enabled: true
taskmanager.network.memory.buffer-debloat.target: 1s
# Unaligned Checkpoints(バックプレッシャー下での高速CP)
execution.checkpointing.unaligned.enabled: true
# RocksDB チューニング
state.backend.rocksdb.block.cache-size: 256mb
state.backend.rocksdb.writebuffer.size: 128mb
state.backend.rocksdb.writebuffer.count: 4
state.backend.rocksdb.compaction.level.max-size-level-base: 320mb
state.backend.rocksdb.thread.num: 4
# オブジェクト再利用(GC軽減)
pipeline.object-reuse: true
# Async State Processing(Flink 2.0+)
execution.async-state.enabled: true
17. メモリ管理
17.1 TaskManager メモリモデル
┌──────────────────────────────────────────────────┐
│ Total Process Memory │
│ (taskmanager.memory.process.size) │
│ │
│ ┌────────────────────────────────────────────┐ │
│ │ Total Flink Memory │ │
│ │ │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ Framework Heap (128MB) │ │ │
│ │ │ (Flink 内部オブジェクト) │ │ │
│ │ ├──────────────────────────────────────┤ │ │
│ │ │ Task Heap │ │ │
│ │ │ (ユーザーコード、UDF) │ │ │
│ │ ├──────────────────────────────────────┤ │ │
│ │ │ Managed Memory (40%) │ │ │
│ │ │ (RocksDB State, Batch Sort/Hash) │ │ │
│ │ ├──────────────────────────────────────┤ │ │
│ │ │ Network Memory (10%) │ │ │
│ │ │ (ネットワークバッファ) │ │ │
│ │ ├──────────────────────────────────────┤ │ │
│ │ │ Framework Off-Heap (128MB) │ │ │
│ │ ├──────────────────────────────────────┤ │ │
│ │ │ Task Off-Heap (0) │ │ │
│ │ └──────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────┐ │
│ │ JVM Overhead (10%) │ │
│ │ (JVM Metaspace, GC, etc.) │ │
│ └────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘
18. 監視と運用
18.1 Flink Web Dashboard
Flink Web Dashboard(デフォルト: ポート8081)で確認可能な情報:
| タブ | 情報 |
|---|---|
| Overview | クラスター概要、TaskManager数、Slot状態 |
| Running Jobs | 実行中ジョブの DAG、各オペレーターの処理状況 |
| Completed Jobs | 完了/失敗ジョブの履歴 |
| Task Managers | TaskManager の詳細情報、メモリ使用状況 |
| Job Manager | JobManager の設定、ログ |
18.2 メトリクス
# Prometheus Metrics Reporter
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249
主要なメトリクス:
| メトリクス | 説明 | 監視の意義 |
|---|---|---|
numRecordsInPerSecond | 秒あたりの入力レコード数 | スループット |
numRecordsOutPerSecond | 秒あたりの出力レコード数 | スループット |
currentInputWatermark | 現在のウォーターマーク | イベント時間の進行 |
lastCheckpointDuration | 最後のCP所要時間 | CP性能 |
lastCheckpointSize | 最後のCPサイズ | 状態サイズ |
numberOfFailedCheckpoints | 失敗したCP数 | 安定性 |
isBackPressured | バックプレッシャー有無 | ボトルネック検出 |
busyTimeMsPerSecond | オペレーターのビジー時間 | 負荷 |
18.3 Grafana ダッシュボード
推奨パネル:
- Throughput: Records In/Out per Second (by operator)
- Latency: End-to-end latency distribution
- Checkpointing: Duration, Size, Success/Failure rate
- Backpressure: Backpressure status per operator
- Resource Usage: CPU, Memory, Network per TaskManager
- State Size: State backend size trends
18.4 REST API
# ジョブ一覧
curl http://jobmanager:8081/jobs
# ジョブ詳細
curl http://jobmanager:8081/jobs/<job-id>
# ジョブのキャンセル
curl -X PATCH http://jobmanager:8081/jobs/<job-id>?mode=cancel
# セーブポイントの作成
curl -X POST http://jobmanager:8081/jobs/<job-id>/savepoints \
-d '{"target-directory": "s3://flink-state/savepoints/", "cancel-job": false}'
# チェックポイント統計
curl http://jobmanager:8081/jobs/<job-id>/checkpoints
# TaskManager一覧
curl http://jobmanager:8081/taskmanagers
19. セキュリティ
# SSL/TLS設定
security.ssl.internal.enabled: true
security.ssl.internal.keystore: /etc/flink/tls/keystore.jks
security.ssl.internal.keystore-password: keystore-pass
security.ssl.internal.truststore: /etc/flink/tls/truststore.jks
security.ssl.internal.truststore-password: truststore-pass
security.ssl.rest.enabled: true
security.ssl.rest.keystore: /etc/flink/tls/keystore.jks
security.ssl.rest.keystore-password: keystore-pass
# Kerberos
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /etc/flink/keytab/flink.keytab
security.kerberos.login.principal: flink/hostname@REALM.COM
20. ユースケースとベストプラクティス
20.1 代表的なユースケース
リアルタイム不正検知
// クレジットカード不正検知パイプライン
DataStream<Transaction> transactions = env.fromSource(kafkaSource, ...);
// 1. 地理的に不可能な取引の検出
DataStream<Alert> geoAlerts = transactions
.keyBy(Transaction::getCardId)
.process(new GeoImpossibilityDetector()); // 状態: 最後の取引位置と時刻
// 2. 短時間での異常な取引頻度
DataStream<Alert> frequencyAlerts = transactions
.keyBy(Transaction::getCardId)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.aggregate(new TransactionCountAggregator())
.filter(count -> count.getValue() > 10);
// 3. CEPによるパターン検出
Pattern<Transaction, ?> pattern = Pattern
.<Transaction>begin("small").where(t -> t.getAmount() < 1.0).times(3)
.followedBy("large").where(t -> t.getAmount() > 1000.0)
.within(Time.minutes(10));
リアルタイムETL(CDC → Data Lake)
-- MySQL CDC → Iceberg パイプライン
-- ソース:MySQL CDC
CREATE TABLE mysql_orders (
order_id BIGINT, customer_id BIGINT, amount DECIMAL(10,2),
status STRING, created_at TIMESTAMP(3), updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc', ...);
-- シンク:Iceberg
CREATE TABLE iceberg_orders (
order_id BIGINT, customer_id BIGINT, amount DECIMAL(10,2),
status STRING, created_at TIMESTAMP(3), updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'iceberg', 'write.upsert.enabled' = 'true', ...);
-- CDC→Iceberg同期
INSERT INTO iceberg_orders SELECT * FROM mysql_orders;
20.2 ベストプラクティス
- 全オペレーターにUIDを設定: セーブポイント互換性の確保
- Incremental Checkpoint を使用: RocksDB + incremental で大規模状態に対応
- ウォーターマーク戦略を慎重に設計: 遅延データの許容範囲を適切に設定
- バックプレッシャーを監視: ボトルネックの早期発見
- 状態のTTLを設定: 無限に成長する状態を防止
- Operator Chaining を活用: 不要な分離を避ける
- Async I/O を使用: 外部システム呼び出しの非同期化
- 適切な並列度を設定: Kafkaパーティション数に合わせる
- Application Modeでデプロイ: 本番環境ではSession Modeを避ける
- チェックポイント間隔を適切に設定: 短すぎるとオーバーヘッド、長すぎるとリカバリが遅い
21. Flink vs 他エンジン比較
| 項目 | Flink | Spark Streaming | Kafka Streams | Storm |
|---|---|---|---|---|
| 処理モデル | 真のストリーム | マイクロバッチ | 真のストリーム | 真のストリーム |
| レイテンシ | ミリ秒〜秒 | 秒〜分 | ミリ秒 | ミリ秒 |
| Exactly-Once | ✓(E2E) | ✓(E2E) | ✓ | At-least-once |
| 状態管理 | 強力(大規模対応) | 中程度 | 限定的 | 弱い |
| SQL | Flink SQL | Spark SQL | KSQL | なし |
| バッチ処理 | ✓ | ✓(主要機能) | ✗ | ✗ |
| CEP | ✓(ネイティブ) | ✗ | ✗ | ✗ |
| スケーラビリティ | 高い | 高い | 中程度 | 高い |
| 運用複雑性 | 中〜高 | 中 | 低 | 高 |
22. 最新動向と将来展望
22.1 Flink 2.0 の主な変更
- DataStream API v2: より直感的で型安全なAPI
- Disaggregated State Management: 状態のストレージと計算の分離
- Thread Model改善: マルチスレッド実行モデルの最適化
- 非同期状態処理: 状態アクセスの非同期化による高スループット化
22.2 将来の方向性
- Streaming Lakehouse: Iceberg/Paimon/Hudiとの深い統合
- AI/ML Integration: リアルタイム特徴量エンジニアリング、オンライン推論
- Serverless Flink: クラウドネイティブなオンデマンド実行
- Unified Batch & Stream: バッチ・ストリーム統合の更なる成熟
23. まとめ
Apache Flink は、ステートフルストリーム処理のデファクトスタンダードであり、低レイテンシ・高スループット・Exactly-Once保証を兼ね備えた分散処理エンジンである。
技術的要点
| カテゴリ | 要点 |
|---|---|
| アーキテクチャ | JobManager-TaskManager、パイプライン実行 |
| 処理モデル | 真のイベント単位ストリーム処理 |
| 状態管理 | Keyed State / Operator State + RocksDB |
| 耐障害性 | Checkpoint(分散スナップショット)+ Savepoint |
| 時間処理 | Event Time + Watermark による正確な処理 |
| SQL | Flink SQL + Table API |
| コネクター | Kafka, Kinesis, JDBC, CDC, FileSystem, Iceberg等 |
| デプロイ | Kubernetes (Flink Operator) 推奨 |
選定の指針
Flink が適しているケース:
- リアルタイムストリーム処理: ミリ秒〜秒レベルのレイテンシが必要
- ステートフル処理: 大規模な状態を伴うストリーム処理
- Exactly-Once保証: End-to-Endの厳密なデータ整合性
- CEP: 複合イベントパターンの検出
- CDC パイプライン: データベースの変更をリアルタイムに同期
他のツールを検討すべきケース:
- 大規模バッチETL主体: Apache Spark
- シンプルなメッセージ処理: Kafka Streams
- アドホックSQLクエリ: Apache Trino
- データウェアハウス: BigQuery / Snowflake / Redshift
参考文献: