Apache Flink

Apache Flink 徹底解説:アーキテクチャ・機能・設定の全容

目次

  1. はじめに
  2. Apache Flink の概要と歴史
  3. アーキテクチャ全体像
  4. ランタイムアーキテクチャ
  5. DataStream API
  6. Table API と Flink SQL
  7. ウィンドウ処理
  8. 状態管理とステートバックエンド
  9. チェックポイントとセーブポイント
  10. Exactly-Once セマンティクス
  11. 時間の概念とウォーターマーク
  12. コネクターとフォーマット
  13. CEP(Complex Event Processing)
  14. バッチ処理
  15. デプロイメント
  16. 設定とチューニング
  17. メモリ管理
  18. 監視と運用
  19. セキュリティ
  20. ユースケースとベストプラクティス
  21. Flink vs 他エンジン比較
  22. 最新動向と将来展望
  23. まとめ

1. はじめに

Apache Flink は、ステートフルな計算をバウンデッド(有限)およびアンバウンデッド(無限)のデータストリームに対して行うための分散処理フレームワークである。「ストリーム処理ファースト」の設計思想を持ち、真のイベント単位のストリーム処理を実現する。

Flink の最大の特徴は、Exactly-Once の状態整合性を持つステートフルストリーム処理であり、低レイテンシと高スループットを両立させながら、障害時の状態回復を保証する。

1.1 本記事の対象読者

  • リアルタイムデータパイプラインを構築するデータエンジニア
  • ストリーミング分析基盤を設計するアーキテクト
  • Flink クラスターの運用を行う SRE / インフラエンジニア
  • イベント駆動アプリケーションを開発するバックエンドエンジニア

1.2 Flink の位置づけ

┌────────────────────────────────────────────────────────────┐
│                    Applications                             │
│  (Real-time Analytics, ETL, Event-Driven, ML Inference)    │
├────────────────────────────────────────────────────────────┤
│                Apache Flink Runtime                         │
│         (Stateful Stream Processing Engine)                 │
├──────────┬──────────┬──────────┬──────────────────────────┤
│DataStream│ Table    │ Flink   │  CEP / ML /              │
│   API    │ API      │  SQL    │  Graph (Gelly)           │
├──────────┴──────────┴──────────┴──────────────────────────┤
│              State Backends                                 │
│      (HashMapStateBackend / EmbeddedRocksDB)               │
├────────────────────────────────────────────────────────────┤
│              Cluster Manager                                │
│     (Standalone / YARN / Kubernetes / Mesos)               │
├────────────────────────────────────────────────────────────┤
│              Source / Sink Connectors                       │
│     (Kafka / Kinesis / JDBC / FileSystem / Iceberg /       │
│      Elasticsearch / HBase / Cassandra / etc.)             │
└────────────────────────────────────────────────────────────┘

2. Apache Flink の概要と歴史

2.1 誕生の背景

Apache Flink は、ドイツのベルリン工科大学の研究プロジェクト「Stratosphere」を起源とし、2014年に Apache Software Foundation のインキュベーションプロジェクトとなった。

従来のストリーム処理エンジン(Apache Storm等)がイベント単位の低レイテンシ処理に特化し、バッチエンジン(MapReduce, Spark等)がスループット重視であったのに対し、Flink はストリーム処理を基盤としてバッチ処理も統一的に扱うというアプローチを採用した。

2.2 バージョンの変遷

バージョンリリース年主な特徴
0.92015最初の安定版リリース
1.02016Exactly-Once チェックポイント
1.22016セーブポイント機能
1.32017インクリメンタルチェックポイント
1.52018Blink SQL エンジン統合開始
1.72018Scala-free クラスパス
1.92019Blink Planner(新SQL プランナー)
1.112020Application Mode、Unaligned Checkpoints
1.122020Kubernetes HA
1.132021Reactive Mode
1.142021Buffer Debloating
1.152022チェックポイントの高速化
1.162022Generalized Incremental Checkpoints
1.172023Async Sink、Watermark Alignment
1.182023Flink SQL Gateway
1.192024Materialized Table、Flink CDC統合強化
1.202024Disaggregated State Management
2.02025DataStream API v2、Thread Model改善

2.3 主要な特徴

  • 真のストリーム処理: イベント単位の低レイテンシ処理(マイクロバッチではない)
  • ステートフル処理: 大規模な状態をExactly-Onceで管理
  • イベントタイム処理: イベント時刻に基づく正確なウィンドウ処理
  • Exactly-Once保証: チェックポイントメカニズムによる状態整合性
  • 高スループット: 秒間数百万イベントの処理
  • 統一バッチ・ストリーム: 同一APIでバッチとストリームを処理
  • CEP: 複合イベント処理のネイティブサポート
  • スケーラブル: 数千ノードまでスケール

3. アーキテクチャ全体像

3.1 レイヤードアーキテクチャ

┌──────────────────────────────────────────────────────────┐
│                     API Layer                             │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐  │
│  │DataStream│ │ Table    │ │ Flink   │ │ PyFlink    │  │
│  │   API    │ │ API      │ │  SQL    │ │ (Python)   │  │
│  └──────────┘ └──────────┘ └──────────┘ └────────────┘  │
├──────────────────────────────────────────────────────────┤
│                  Dataflow Runtime                         │
│  ┌──────────────────────────────────────────────────┐    │
│  │    JobGraph → ExecutionGraph → Physical Graph     │    │
│  └──────────────────────────────────────────────────┘    │
├──────────────────────────────────────────────────────────┤
│               State & Checkpoint                          │
│  ┌──────────┐ ┌─────────────────┐ ┌──────────────────┐  │
│  │ HashMap  │ │  EmbeddedRocksDB│ │  Changelog       │  │
│  │ State    │ │  StateBackend   │ │  StateBackend     │  │
│  │ Backend  │ │                 │ │                   │  │
│  └──────────┘ └─────────────────┘ └──────────────────┘  │
├──────────────────────────────────────────────────────────┤
│                 Cluster Management                        │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐  │
│  │Standalone│ │  YARN    │ │Kubernetes│ │  Session /  │  │
│  │          │ │          │ │          │ │ Application │  │
│  └──────────┘ └──────────┘ └──────────┘ └────────────┘  │
└──────────────────────────────────────────────────────────┘

3.2 ストリーム処理の基本概念

Flink ではすべてのデータを ストリーム(Stream) として扱う:

  • Unbounded Stream(無限ストリーム): 終わりのないデータストリーム(リアルタイムイベント)
  • Bounded Stream(有限ストリーム): 終わりのあるデータセット(バッチデータ)
Unbounded Stream (ストリーム処理):
──●──●──●──●──●──●──●──●──●──●──●──●──►  (終わりなし)
  e1  e2  e3  e4  e5  e6  e7  e8  e9 ...

Bounded Stream (バッチ処理):
──●──●──●──●──●──●──●──●──●──●|
  e1  e2  e3  e4  e5  e6  e7  e8  e9  e10  (終了あり)

4. ランタイムアーキテクチャ

4.1 コンポーネント構成

┌────────────────────────────────────────────────────────────┐
│                        Client                               │
│            (CLI / REST API / Dashboard)                      │
└──────────────────────┬─────────────────────────────────────┘
                       │ Submit JobGraph
                       ▼
┌────────────────────────────────────────────────────────────┐
│                   JobManager (JM)                            │
│  ┌─────────────┐ ┌──────────────┐ ┌────────────────────┐  │
│  │  Dispatcher  │ │ ResourceMgr  │ │   JobMaster        │  │
│  │  (Job受付)   │ │ (リソース管理)│ │ (Job実行管理)      │  │
│  └─────────────┘ └──────────────┘ └────────────────────┘  │
│  ┌─────────────┐ ┌──────────────┐ ┌────────────────────┐  │
│  │ Checkpoint  │ │   Scheduler  │ │   REST Endpoint    │  │
│  │ Coordinator │ │              │ │                    │  │
│  └─────────────┘ └──────────────┘ └────────────────────┘  │
└──────────────────────┬─────────────────────────────────────┘
                       │
          ┌────────────┼────────────┐
          ▼            ▼            ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ TaskManager 1│ │ TaskManager 2│ │ TaskManager N│
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │ Slot 1   │ │ │ │ Slot 1   │ │ │ │ Slot 1   │ │
│ │┌────────┐│ │ │ │┌────────┐│ │ │ │┌────────┐│ │
│ ││ Task   ││ │ │ ││ Task   ││ │ │ ││ Task   ││ │
│ │└────────┘│ │ │ │└────────┘│ │ │ │└────────┘│ │
│ ├──────────┤ │ │ ├──────────┤ │ │ ├──────────┤ │
│ │ Slot 2   │ │ │ │ Slot 2   │ │ │ │ Slot 2   │ │
│ │┌────────┐│ │ │ │┌────────┐│ │ │ │┌────────┐│ │
│ ││ Task   ││ │ │ ││ Task   ││ │ │ ││ Task   ││ │
│ │└────────┘│ │ │ │└────────┘│ │ │ │└────────┘│ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
│              │ │              │ │              │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │  State   │ │ │ │  State   │ │ │ │  State   │ │
│ │ Backend  │ │ │ │ Backend  │ │ │ │ Backend  │ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │ Network  │ │ │ │ Network  │ │ │ │ Network  │ │
│ │ Buffer   │ │ │ │ Buffer   │ │ │ │ Buffer   │ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
└──────────────┘ └──────────────┘ └──────────────┘

4.2 JobManager

JobManager はFlinkクラスターのマスターノードであり、以下のコンポーネントで構成される:

  • Dispatcher: ジョブの受付とJobMasterの起動
  • ResourceManager: TaskManager のスロット管理とリソース割り当て
  • JobMaster: 個々のジョブの実行を管理
  • Checkpoint Coordinator: チェックポイントのトリガーと調整

4.3 TaskManager

TaskManager はワーカーノードであり、実際のデータ処理を行う:

  • Task Slot: リソース分離の単位。各Slotは固定量のメモリを持つ
  • Task: 演算子のサブタスク。1つのSlot内で複数のTaskをチェーン実行可能
  • Network Buffer: TaskManager間のデータ交換用バッファ
  • State Backend: ローカルの状態ストレージ

4.4 ジョブの実行フロー

User Code (DataStream / SQL)
    │
    ▼
StreamGraph (論理グラフ)
    │
    ▼
JobGraph (最適化済みグラフ - operator chaining)
    │
    ▼
ExecutionGraph (並列化された実行グラフ)
    │
    ▼
Physical Execution (各TaskManagerでの実行)

4.5 Operator Chaining

Flink は連続する演算子を1つのTaskにチェーンして実行することで、シリアライゼーションやネットワーク通信のオーバーヘッドを削減する。

チェーン前:
Source → Map → Filter → Sink (4つの別々のTask)

チェーン後:
[Source → Map → Filter] → Sink (2つのTask)
  1つのスレッドで実行     別スレッド
// Operator Chainingの制御
stream
    .map(value -> process(value))
    .disableChaining()  // このオペレータからチェーンを切断
    .filter(value -> value > 0)
    .startNewChain()    // 新しいチェーンを開始
    .keyBy(value -> value.getKey())
    .process(new MyProcessFunction());

5. DataStream API

5.1 基本的なストリーム処理

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FilterFunction;

// 実行環境の作成
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 並列度の設定
env.setParallelism(4);

// ソースからの読み込み
DataStream<String> source = env.fromSource(
    KafkaSource.<String>builder()
        .setBootstrapServers("kafka:9092")
        .setTopics("input-topic")
        .setGroupId("flink-consumer")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build(),
    WatermarkStrategy.noWatermarks(),
    "Kafka Source"
);

// 変換処理
DataStream<Event> events = source
    .map(json -> Event.fromJson(json))
    .filter(event -> event.getType() != null)
    .name("Parse and Filter Events");

// キー付きストリーム
DataStream<EventStats> stats = events
    .keyBy(event -> event.getUserId())
    .process(new EventStatsProcessor())
    .name("Compute Event Stats");

// シンクへの書き出し
stats.sinkTo(
    KafkaSink.<EventStats>builder()
        .setBootstrapServers("kafka:9092")
        .setRecordSerializer(
            KafkaRecordSerializationSchema.builder()
                .setTopic("output-topic")
                .setValueSerializationSchema(new EventStatsSchema())
                .build()
        )
        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .build()
).name("Kafka Sink");

// 実行
env.execute("Event Processing Pipeline");

5.2 PyFlink (Python API)

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy

# 環境の作成
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)

# Kafkaソース
kafka_source = KafkaSource.builder() \
    .set_bootstrap_servers("kafka:9092") \
    .set_topics("input-topic") \
    .set_group_id("flink-python-consumer") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()

stream = env.from_source(
    kafka_source,
    WatermarkStrategy.no_watermarks(),
    "Kafka Source"
)

# 処理
result = stream \
    .map(lambda x: json.loads(x)) \
    .filter(lambda x: x.get('event_type') is not None) \
    .key_by(lambda x: x['user_id']) \
    .process(MyProcessFunction())

env.execute("Python Flink Job")

5.3 主要な変換演算子

// map: 1:1変換
DataStream<Integer> doubled = stream.map(x -> x * 2);

// flatMap: 1:N変換
DataStream<String> words = lines.flatMap(
    (String line, Collector<String> out) -> {
        for (String word : line.split(" ")) {
            out.collect(word);
        }
    }
).returns(Types.STRING);

// filter: フィルタリング
DataStream<Event> errors = events.filter(e -> e.getLevel().equals("ERROR"));

// keyBy: キー分割(論理パーティション)
KeyedStream<Event, String> keyed = events.keyBy(Event::getUserId);

// reduce: キーごとの集約
DataStream<Integer> sums = keyed.reduce((a, b) -> a + b);

// union: 複数ストリームの結合
DataStream<Event> combined = stream1.union(stream2, stream3);

// connect: 2つのストリームの接続(異なる型可)
ConnectedStreams<Event, Rule> connected = events.connect(rules.broadcast());

// Side Output: 条件に応じた分岐
final OutputTag<Event> errorTag = new OutputTag<Event>("errors") {};

SingleOutputStreamOperator<Event> mainStream = events
    .process(new ProcessFunction<Event, Event>() {
        @Override
        public void processElement(Event event, Context ctx, Collector<Event> out) {
            if (event.isError()) {
                ctx.output(errorTag, event);
            } else {
                out.collect(event);
            }
        }
    });

DataStream<Event> errorStream = mainStream.getSideOutput(errorTag);

// Async I/O: 非同期外部呼び出し
AsyncDataStream.unorderedWait(
    events,
    new AsyncDatabaseLookup(),
    30, TimeUnit.SECONDS,  // タイムアウト
    100  // 同時リクエスト数
);

5.4 ProcessFunction(低レベルAPI)

ProcessFunction はFlink の最も強力なAPIであり、タイマー、状態アクセス、Side Output を提供する。

public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {

    // 状態の定義
    private ValueState<Boolean> flagState;
    private ValueState<Long> timerState;

    @Override
    public void open(OpenContext openContext) {
        ValueStateDescriptor<Boolean> flagDescriptor =
            new ValueStateDescriptor<>("flag", Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor =
            new ValueStateDescriptor<>("timer-state", Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

    @Override
    public void processElement(Transaction tx, Context ctx, Collector<Alert> out) throws Exception {
        Boolean lastWasSmall = flagState.value();

        if (lastWasSmall != null && lastWasSmall) {
            if (tx.getAmount() > 500.0) {
                // 不正検知: 小額の直後に大額
                out.collect(new Alert(tx.getAccountId(), "Potential fraud detected"));
            }
            clearState();
        }

        if (tx.getAmount() < 1.0) {
            flagState.update(true);
            // 1分後のタイマー設定
            long timer = ctx.timerService().currentProcessingTime() + 60000;
            ctx.timerService().registerProcessingTimeTimer(timer);
            timerState.update(timer);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
        // タイマー発火: フラグをクリア
        timerState.clear();
        flagState.clear();
    }

    private void clearState() {
        flagState.clear();
        Long timer = timerState.value();
        if (timer != null) {
            timerState.clear();
        }
    }
}

6. Table API と Flink SQL

6.1 Table API の概要

Table APIはFlink のリレーショナルAPIであり、SQLに似た宣言的な処理を行う。

import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

// Table環境の作成
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// テーブルの登録
tableEnv.executeSql("""
    CREATE TABLE kafka_events (
        event_id BIGINT,
        user_id BIGINT,
        event_type STRING,
        amount DECIMAL(10, 2),
        event_time TIMESTAMP(3),
        proc_time AS PROCTIME(),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'flink-sql-group',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json',
        'json.timestamp-format.standard' = 'ISO-8601'
    )
""");

// Table APIクエリ
Table result = tableEnv.from("kafka_events")
    .filter($("event_type").isEqual("purchase"))
    .groupBy($("user_id"), $("event_type"))
    .select(
        $("user_id"),
        $("event_type"),
        $("amount").sum().as("total_amount"),
        $("event_id").count().as("event_count")
    );

6.2 Flink SQL

-- Kafka ソーステーブル
CREATE TABLE page_views (
    user_id BIGINT,
    page_url STRING,
    view_duration INT,
    event_time TIMESTAMP(3),
    proc_time AS PROCTIME(),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'page-views',
    'properties.bootstrap.servers' = 'kafka:9092',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

-- Icebergシンクテーブル
CREATE TABLE page_view_stats (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    page_url STRING,
    view_count BIGINT,
    unique_users BIGINT,
    avg_duration DOUBLE,
    PRIMARY KEY (window_start, page_url) NOT ENFORCED
) WITH (
    'connector' = 'iceberg',
    'catalog-name' = 'iceberg_catalog',
    'catalog-type' = 'hive',
    'uri' = 'thrift://hive-metastore:9083',
    'warehouse' = 's3://data-lake/warehouse'
);

-- ウィンドウ集約
INSERT INTO page_view_stats
SELECT
    window_start,
    window_end,
    page_url,
    COUNT(*) as view_count,
    COUNT(DISTINCT user_id) as unique_users,
    AVG(CAST(view_duration AS DOUBLE)) as avg_duration
FROM TABLE(
    TUMBLE(TABLE page_views, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
)
GROUP BY window_start, window_end, page_url;

-- Temporal Join(時間的結合)
CREATE TABLE currency_rates (
    currency STRING,
    rate DECIMAL(10, 6),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND,
    PRIMARY KEY (currency) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'currency-rates',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);

-- イベント時刻でのTemporal Join
SELECT
    o.order_id,
    o.amount,
    o.currency,
    r.rate,
    o.amount * r.rate AS amount_usd
FROM orders AS o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;

-- Lookup Join(外部テーブル参照)
CREATE TABLE user_profiles (
    user_id BIGINT,
    name STRING,
    country STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://db:5432/users',
    'table-name' = 'user_profiles',
    'lookup.cache.max-rows' = '10000',
    'lookup.cache.ttl' = '5min'
);

SELECT
    e.event_id,
    e.user_id,
    u.name,
    u.country,
    e.event_type,
    e.event_time
FROM page_views AS e
JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;

-- Deduplication(重複排除)
SELECT event_id, user_id, event_type, event_time
FROM (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_time ASC) AS rn
    FROM page_views
)
WHERE rn = 1;

-- Top-N クエリ
SELECT user_id, page_url, view_count
FROM (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY page_url ORDER BY view_count DESC) AS rn
    FROM page_view_summary
)
WHERE rn <= 10;

-- Pattern Recognition (MATCH_RECOGNIZE)
SELECT *
FROM page_views
MATCH_RECOGNIZE (
    PARTITION BY user_id
    ORDER BY event_time
    MEASURES
        FIRST(A.page_url) AS first_page,
        LAST(B.page_url) AS checkout_page,
        A.event_time AS start_time,
        LAST(B.event_time) AS end_time
    ONE ROW PER MATCH
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (A B+ C)
    DEFINE
        A AS A.page_url LIKE '%/product/%',
        B AS B.page_url LIKE '%/cart%',
        C AS C.page_url LIKE '%/checkout/complete%'
);

7. ウィンドウ処理

7.1 ウィンドウの種類

Tumbling Window(タンブリングウィンドウ)- 固定長、重複なし:
|----W1----|----W2----|----W3----|----W4----|
0    5    10    15    20    25    30    35    40 (秒)

Sliding Window(スライディングウィンドウ)- 固定長、重複あり:
|--------W1--------|
     |--------W2--------|
          |--------W3--------|
0    5    10    15    20    25    30 (秒)

Session Window(セッションウィンドウ)- ギャップベース:
|--W1--|  |----W2----|  |--W3--|
●●●    ●     ●●●●●       ●●●
  gap=5s   gap=5s      gap=5s

Global Window(グローバルウィンドウ)- カスタムトリガー:
|----────────────── entire stream ──────────────────|

7.2 DataStream API でのウィンドウ

// Tumbling Window(イベントタイム)
events
    .keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new EventAggregator());

// Sliding Window
events
    .keyBy(Event::getUserId)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
    .reduce((a, b) -> a.merge(b));

// Session Window
events
    .keyBy(Event::getUserId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .process(new SessionWindowFunction());

// Processing Time Window
events
    .keyBy(Event::getUserId)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
    .sum("amount");

// カスタムトリガー
events
    .keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))  // 1分ごとに中間結果
    .aggregate(new EventAggregator());

// 遅延データの許容
events
    .keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(10))
    .sideOutputLateData(lateOutputTag)
    .aggregate(new EventAggregator());

7.3 Flink SQL でのウィンドウ(TVF)

-- Tumbling Window (TVF)
SELECT
    window_start, window_end,
    user_id,
    COUNT(*) as event_count,
    SUM(amount) as total_amount
FROM TABLE(
    TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
)
GROUP BY window_start, window_end, user_id;

-- Hop Window (Sliding) (TVF)
SELECT
    window_start, window_end,
    COUNT(*) as event_count
FROM TABLE(
    HOP(TABLE events, DESCRIPTOR(event_time), INTERVAL '1' MINUTE, INTERVAL '10' MINUTE)
)
GROUP BY window_start, window_end;

-- Cumulate Window (TVF)
SELECT
    window_start, window_end,
    SUM(amount) as cumulative_amount
FROM TABLE(
    CUMULATE(TABLE events, DESCRIPTOR(event_time), INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;

-- Session Window (TVF)
SELECT
    window_start, window_end,
    user_id,
    COUNT(*) as event_count
FROM TABLE(
    SESSION(TABLE events PARTITION BY user_id, DESCRIPTOR(event_time), INTERVAL '30' MINUTE)
)
GROUP BY window_start, window_end, user_id;

8. 状態管理とステートバックエンド

8.1 状態の種類

Flink はオペレーター内で管理される状態(State)を提供する:

状態タイプ説明API
ValueState単一値value(), update(T)
ListStateリストadd(T), get(), update(List<T>)
MapState<K, V>マップget(K), put(K, V), entries()
ReducingStateリデュース関数付きadd(T), get()
AggregatingState<IN, OUT>集約関数付きadd(IN), get()
public class UserSessionTracker extends KeyedProcessFunction<String, Event, SessionResult> {

    private ValueState<Long> sessionStart;
    private ValueState<Long> lastActivity;
    private MapState<String, Integer> pageVisits;
    private ListState<String> recentPages;

    @Override
    public void open(OpenContext openContext) {
        sessionStart = getRuntimeContext().getState(
            new ValueStateDescriptor<>("session-start", Long.class));
        lastActivity = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-activity", Long.class));
        pageVisits = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("page-visits", String.class, Integer.class));
        recentPages = getRuntimeContext().getListState(
            new ListStateDescriptor<>("recent-pages", String.class));
    }

    @Override
    public void processElement(Event event, Context ctx, Collector<SessionResult> out)
            throws Exception {
        long currentTime = event.getTimestamp();

        if (sessionStart.value() == null) {
            // 新しいセッション開始
            sessionStart.update(currentTime);
        }

        // ページ訪問カウント
        String page = event.getPage();
        Integer count = pageVisits.get(page);
        pageVisits.put(page, count == null ? 1 : count + 1);

        // 最近のページリスト
        recentPages.add(page);

        // タイマー設定(30分アイドルでセッション終了)
        lastActivity.update(currentTime);
        ctx.timerService().registerEventTimeTimer(currentTime + 30 * 60 * 1000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<SessionResult> out)
            throws Exception {
        if (lastActivity.value() != null && timestamp >= lastActivity.value() + 30 * 60 * 1000) {
            // セッション結果を出力
            out.collect(new SessionResult(
                ctx.getCurrentKey(),
                sessionStart.value(),
                lastActivity.value(),
                collectPages()
            ));
            // 状態をクリア
            sessionStart.clear();
            lastActivity.clear();
            pageVisits.clear();
            recentPages.clear();
        }
    }
}

8.2 ステートバックエンド

バックエンド状態の格納先特徴推奨用途
HashMapStateBackendJVMヒープ高速、メモリ制約あり小〜中規模状態
EmbeddedRocksDBStateBackendRocksDB(ローカルディスク)大規模状態対応、やや低速大規模状態
# flink-conf.yaml

# HashMapStateBackend
state.backend.type: hashmap
state.checkpoints.dir: s3://flink-state/checkpoints
state.savepoints.dir: s3://flink-state/savepoints

# EmbeddedRocksDBStateBackend
state.backend.type: rocksdb
state.backend.rocksdb.localdir: /var/flink/rocksdb
state.backend.incremental: true
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.block.cache-size: 256mb
state.backend.rocksdb.writebuffer.size: 128mb
state.backend.rocksdb.writebuffer.count: 4
state.backend.rocksdb.compaction.level.max-size-level-base: 320mb
// プログラム内でのステートバックエンド設定
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));  // incremental = true
env.getCheckpointConfig().setCheckpointStorage("s3://flink-state/checkpoints");

8.3 Broadcast State

Broadcast State は、小さなデータセット(ルール、設定等)を全パーティションで共有するためのパターンである。

// ルールストリーム
DataStream<Rule> ruleStream = env.fromSource(ruleSource, ...);

// イベントストリーム
DataStream<Event> eventStream = env.fromSource(eventSource, ...);

// Broadcast State Descriptor
MapStateDescriptor<String, Rule> ruleStateDesc =
    new MapStateDescriptor<>("rules", String.class, Rule.class);

// ルールをブロードキャスト
BroadcastStream<Rule> broadcastRules = ruleStream.broadcast(ruleStateDesc);

// イベントストリームとブロードキャストストリームの接続
DataStream<Alert> alerts = eventStream
    .connect(broadcastRules)
    .process(new BroadcastProcessFunction<Event, Rule, Alert>() {
        @Override
        public void processElement(Event event, ReadOnlyContext ctx, Collector<Alert> out) {
            ReadOnlyBroadcastState<String, Rule> rules =
                ctx.getBroadcastState(ruleStateDesc);
            // ルールに基づいてイベントを評価
            for (Map.Entry<String, Rule> entry : rules.immutableEntries()) {
                if (entry.getValue().matches(event)) {
                    out.collect(new Alert(event, entry.getValue()));
                }
            }
        }

        @Override
        public void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out) {
            BroadcastState<String, Rule> state = ctx.getBroadcastState(ruleStateDesc);
            state.put(rule.getId(), rule);
        }
    });

9. チェックポイントとセーブポイント

9.1 チェックポイント

チェックポイントはFlinkの障害回復メカニズムであり、分散スナップショットアルゴリズム(Chandy-Lamportの変形)に基づく。

Checkpoint Barrier の伝播:

Source 1: ──●──●──|CB|──●──●──●──|CB|──●──●──►
Source 2: ──●──●──●──|CB|──●──●──|CB|──●──●──►
                      ↓                ↓
              Barrier Alignment    State Snapshot
                                       ↓
                            Checkpoint Storage (S3/HDFS)
# flink-conf.yaml

# チェックポイント設定
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 600s
execution.checkpointing.min-pause: 30s
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.prefer-checkpoint-for-recovery: true

# チェックポイントストレージ
state.checkpoints.dir: s3://flink-state/checkpoints
state.checkpoints.num-retained: 3

# Unaligned Checkpoints(バックプレッシャー下で有効)
execution.checkpointing.unaligned.enabled: true
execution.checkpointing.unaligned.forced: false

# Incremental Checkpoints(RocksDB使用時に推奨)
state.backend.incremental: true

# Changelog State Backend(高速リカバリ)
state.changelog.enabled: true
state.changelog.storage: filesystem
dstl.dfs.base-path: s3://flink-state/changelog
// プログラム内でのチェックポイント設定
env.enableCheckpointing(60000);  // 60秒ごと
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(600000);  // 10分
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

// ジョブキャンセル時にチェックポイントを保持
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

9.2 セーブポイント

セーブポイントは手動でトリガーされるチェックポイントであり、ジョブのアップグレードやマイグレーションに使用する。

# セーブポイントの作成
flink savepoint <jobId> s3://flink-state/savepoints/

# セーブポイントからのジョブ再開
flink run -s s3://flink-state/savepoints/savepoint-xxxxx \
    -c com.example.MyJob my-job.jar

# セーブポイントの破棄
flink savepoint -d s3://flink-state/savepoints/savepoint-xxxxx

セーブポイント互換性のためのUID設定:

// 各オペレーターにUIDを設定(セーブポイント互換性に必須)
events
    .keyBy(Event::getUserId)
    .uid("user-keyed-stream")       // UID設定
    .process(new UserProcessor())
    .uid("user-processor")          // UID設定
    .name("User Event Processor");  // 表示名

10. Exactly-Once セマンティクス

10.1 End-to-End Exactly-Once

Flink は以下のメカニズムで End-to-End の Exactly-Once を実現する:

Source                Flink Internal              Sink
┌─────────┐     ┌──────────────────┐     ┌────────────┐
│ Kafka   │     │  Checkpoint +    │     │ 2-Phase    │
│ Consumer│────►│  Barrier         │────►│ Commit     │
│ Offset  │     │  Alignment       │     │ (Kafka     │
│ Tracking│     │                  │     │  Producer) │
└─────────┘     └──────────────────┘     └────────────┘
    ↓                    ↓                      ↓
 Replayable       State Snapshot          Transactional
 Source            (exactly-once          Write
                   internal state)        (pre-commit → commit)

10.2 Two-Phase Commit Sink

// Kafka Sink with Exactly-Once
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("output-topic")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-tx")
    .setProperty("transaction.timeout.ms", "900000")  // 15分
    .build();

注意: Exactly-Once Sink を使用する場合、Kafka の transaction.timeout.ms はチェックポイント間隔より大きく設定する必要がある。


11. 時間の概念とウォーターマーク

11.1 3つの時間概念

Event Time:      実際のイベント発生時刻
                 ●(10:00:00)  ●(10:00:01)  ●(10:00:05)  ●(10:00:02)
                                                          ↑ 遅延到着

Ingestion Time:  Flinkソースに到達した時刻
                 ●(10:00:03)  ●(10:00:03)  ●(10:00:08)  ●(10:00:08)

Processing Time: オペレーターで処理された時刻
                 ●(10:00:05)  ●(10:00:05)  ●(10:00:10)  ●(10:00:10)

11.2 ウォーターマーク

ウォーターマークは「この時刻までのイベントは全て到着した」という宣言であり、ウィンドウのクローズタイミングを制御する。

// ウォーターマーク戦略の設定
DataStream<Event> events = env.fromSource(source,
    WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
        .withIdleness(Duration.ofMinutes(5)),  // アイドルソースの検出
    "Event Source"
);

// カスタムウォーターマーク
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forGenerator(ctx -> new WatermarkGenerator<Event>() {
        private long maxTimestamp = Long.MIN_VALUE;

        @Override
        public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
            maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(maxTimestamp - 10000));  // 10秒の遅延許容
        }
    })
    .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

11.3 Watermark Alignment(Flink 1.17+)

複数ソースパーティション間のウォーターマーク進行を揃える機能。

WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((event, ts) -> event.getTimestamp())
    .withWatermarkAlignment("alignment-group", Duration.ofSeconds(30), Duration.ofSeconds(5));

12. コネクターとフォーマット

12.1 主要なソースコネクター

コネクター用途Exactly-Once Source
Kafkaメッセージキュー
Amazon KinesisAWSストリーミング
Apache Pulsarメッセージング
FileSystem (S3/HDFS)ファイルソース
JDBCリレーショナルDB
CDC (Debezium)データベースCDC
MongoDBドキュメントDB
Elasticsearch検索エンジンN/A

12.2 主要なシンクコネクター

// Kafka Sink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("output-topic")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .build();

// JDBC Sink
SinkFunction<Event> jdbcSink = JdbcSink.sink(
    "INSERT INTO events (event_id, user_id, event_type, amount) VALUES (?, ?, ?, ?)",
    (statement, event) -> {
        statement.setLong(1, event.getEventId());
        statement.setLong(2, event.getUserId());
        statement.setString(3, event.getEventType());
        statement.setDouble(4, event.getAmount());
    },
    JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(3)
        .build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:postgresql://db:5432/events")
        .withDriverName("org.postgresql.Driver")
        .withUsername("flink")
        .withPassword("password")
        .build()
);

// Elasticsearch Sink
ElasticsearchSink<Event> esSink = new ElasticsearchSinkBuilder<Event>()
    .setHosts(new HttpHost("es-host", 9200, "https"))
    .setEmitter((event, context, indexer) -> {
        indexer.add(
            Requests.indexRequest()
                .index("events-" + event.getDate())
                .id(String.valueOf(event.getEventId()))
                .source(event.toJson(), XContentType.JSON)
        );
    })
    .setBulkFlushMaxActions(1000)
    .setBulkFlushInterval(5000)
    .build();

// FileSink(ストリーミングファイル書き出し)
FileSink<String> fileSink = FileSink
    .forRowFormat(
        new Path("s3://data-lake/events"),
        new SimpleStringEncoder<String>("UTF-8")
    )
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(256))
            .build()
    )
    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
    .build();

12.3 Flink CDC(Change Data Capture)

-- MySQL CDC Source
CREATE TABLE mysql_orders (
    order_id BIGINT,
    customer_id BIGINT,
    product_name STRING,
    price DECIMAL(10, 2),
    order_status STRING,
    order_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'mysql-host',
    'port' = '3306',
    'username' = 'flink_cdc',
    'password' = 'password',
    'database-name' = 'orders_db',
    'table-name' = 'orders',
    'server-time-zone' = 'UTC',
    'scan.startup.mode' = 'initial'  -- スナップショット + 増分
);

-- PostgreSQL CDC Source
CREATE TABLE pg_customers (
    customer_id BIGINT,
    name STRING,
    email STRING,
    updated_at TIMESTAMP(3),
    PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'pg-host',
    'port' = '5432',
    'username' = 'flink_cdc',
    'password' = 'password',
    'database-name' = 'crm',
    'schema-name' = 'public',
    'table-name' = 'customers',
    'slot.name' = 'flink_slot',
    'decoding.plugin.name' = 'pgoutput'
);

-- CDC → Iceberg パイプライン
INSERT INTO iceberg_orders
SELECT order_id, customer_id, product_name, price, order_status, order_time
FROM mysql_orders;

13. CEP(Complex Event Processing)

13.1 CEP ライブラリ

Flink CEP は、ストリーム上のイベントパターンを検出するためのライブラリである。

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;

// パターン定義: ログイン失敗3回 → ログイン成功
Pattern<LoginEvent, ?> loginFailPattern = Pattern
    .<LoginEvent>begin("first-fail")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent event) {
                return event.getType().equals("FAIL");
            }
        })
    .next("second-fail")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent event) {
                return event.getType().equals("FAIL");
            }
        })
    .next("third-fail")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent event) {
                return event.getType().equals("FAIL");
            }
        })
    .followedBy("success")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent event) {
                return event.getType().equals("SUCCESS");
            }
        })
    .within(Time.minutes(5));

// パターンの適用
PatternStream<LoginEvent> patternStream = CEP.pattern(
    loginEvents.keyBy(LoginEvent::getUserId),
    loginFailPattern
);

// マッチした結果の処理
DataStream<Alert> alerts = patternStream.process(
    new PatternProcessFunction<LoginEvent, Alert>() {
        @Override
        public void processMatch(Map<String, List<LoginEvent>> match,
                Context ctx, Collector<Alert> out) {
            LoginEvent firstFail = match.get("first-fail").get(0);
            LoginEvent success = match.get("success").get(0);
            out.collect(new Alert(
                firstFail.getUserId(),
                "Suspicious login: 3 failures followed by success",
                firstFail.getTimestamp(),
                success.getTimestamp()
            ));
        }
    }
);

14. バッチ処理

Flink はストリーム処理エンジンの上にバッチ処理を構築する。Bounded Stream として扱い、バッチ最適化を適用する。

// バッチモードの設定
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

// FileSystemからの読み込み
DataStream<String> input = env.readTextFile("s3://data-lake/input/");

DataStream<WordCount> wordCounts = input
    .flatMap((String line, Collector<WordCount> out) -> {
        for (String word : line.split("\\s+")) {
            out.collect(new WordCount(word, 1));
        }
    })
    .returns(WordCount.class)
    .keyBy(wc -> wc.word)
    .sum("count");

wordCounts.sinkTo(
    FileSink.forRowFormat(new Path("s3://data-lake/output/"), new SimpleStringEncoder<>())
        .build()
);
-- Flink SQL でのバッチ処理
SET 'execution.runtime-mode' = 'batch';

-- ファイルソース
CREATE TABLE batch_input (
    user_id BIGINT,
    action STRING,
    amount DECIMAL(10, 2),
    event_date DATE
) WITH (
    'connector' = 'filesystem',
    'path' = 's3://data-lake/input/',
    'format' = 'parquet'
);

-- 集約クエリ
INSERT INTO output_table
SELECT
    user_id,
    event_date,
    COUNT(*) as action_count,
    SUM(amount) as total_amount
FROM batch_input
GROUP BY user_id, event_date;

15. デプロイメント

15.1 デプロイモードの比較

モード説明推奨用途
Session Mode共有クラスターに複数ジョブを投入開発、テスト、短時間ジョブ
Per-Job Mode (deprecated)ジョブごとに専用クラスターレガシー
Application Modeアプリケーションごとに専用クラスター本番環境(推奨)

15.2 Kubernetes デプロイ(推奨)

# flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    jobmanager.rpc.port: 6123
    jobmanager.memory.process.size: 4096m
    taskmanager.memory.process.size: 8192m
    taskmanager.numberOfTaskSlots: 4
    parallelism.default: 8
    
    # Checkpointing
    execution.checkpointing.interval: 60s
    execution.checkpointing.mode: EXACTLY_ONCE
    state.backend.type: rocksdb
    state.backend.incremental: true
    state.checkpoints.dir: s3://flink-state/checkpoints
    state.savepoints.dir: s3://flink-state/savepoints
    
    # HA (Kubernetes)
    high-availability.type: kubernetes
    high-availability.storageDir: s3://flink-state/ha
    kubernetes.cluster-id: my-flink-cluster
    
    # REST
    rest.port: 8081
    
    # Metrics
    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port: 9249

  log4j-console.properties: |+
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# flink-jobmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
        - name: jobmanager
          image: flink:1.19-java17
          args: ["jobmanager"]
          ports:
            - containerPort: 6123
            - containerPort: 8081
            - containerPort: 9249
          resources:
            requests:
              memory: "4Gi"
              cpu: "2"
            limits:
              memory: "4Gi"
              cpu: "4"
          volumeMounts:
            - name: flink-config
              mountPath: /opt/flink/conf
      volumes:
        - name: flink-config
          configMap:
            name: flink-config
# flink-taskmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 5
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
        - name: taskmanager
          image: flink:1.19-java17
          args: ["taskmanager"]
          ports:
            - containerPort: 9249
          resources:
            requests:
              memory: "8Gi"
              cpu: "4"
            limits:
              memory: "8Gi"
              cpu: "8"
          volumeMounts:
            - name: flink-config
              mountPath: /opt/flink/conf
            - name: rocksdb-storage
              mountPath: /var/flink/rocksdb
      volumes:
        - name: flink-config
          configMap:
            name: flink-config
        - name: rocksdb-storage
          emptyDir:
            sizeLimit: 100Gi

15.3 Flink Kubernetes Operator

# FlinkDeployment CRD
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: my-flink-job
spec:
  image: my-registry/flink-job:latest
  flinkVersion: v1_19
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    state.backend.type: rocksdb
    state.backend.incremental: "true"
    execution.checkpointing.interval: "60s"
    state.checkpoints.dir: s3://flink-state/checkpoints
    high-availability.type: kubernetes
    high-availability.storageDir: s3://flink-state/ha
  serviceAccount: flink
  jobManager:
    resource:
      memory: "4096m"
      cpu: 2
    replicas: 1
  taskManager:
    resource:
      memory: "8192m"
      cpu: 4
    replicas: 5
  job:
    jarURI: local:///opt/flink/usrlib/my-job.jar
    entryClass: com.example.MyFlinkJob
    parallelism: 20
    upgradeMode: savepoint
    savepointTriggerNonce: 0
    state: running

16. 設定とチューニング

16.1 主要な設定パラメータ

# flink-conf.yaml (主要設定)

# ===== JobManager設定 =====
jobmanager.memory.process.size: 4096m
jobmanager.memory.jvm-overhead.min: 192m
jobmanager.memory.jvm-overhead.max: 1g
jobmanager.memory.jvm-overhead.fraction: 0.1
jobmanager.rpc.address: jobmanager-host
jobmanager.rpc.port: 6123

# ===== TaskManager設定 =====
taskmanager.memory.process.size: 8192m
taskmanager.memory.framework.heap.size: 128m
taskmanager.memory.task.heap.size: 4096m
taskmanager.memory.managed.size: 2048m
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64m
taskmanager.memory.network.max: 1g
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.jvm-overhead.fraction: 0.1

# ===== 並列度 =====
parallelism.default: 8

# ===== チェックポイント =====
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 600s
execution.checkpointing.min-pause: 30s
execution.checkpointing.max-concurrent-checkpoints: 1

# ===== State Backend =====
state.backend.type: rocksdb
state.backend.incremental: true
state.checkpoints.dir: s3://flink-state/checkpoints
state.savepoints.dir: s3://flink-state/savepoints
state.backend.rocksdb.localdir: /var/flink/rocksdb

# ===== HA =====
high-availability.type: kubernetes
high-availability.storageDir: s3://flink-state/ha

# ===== ネットワーク =====
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
taskmanager.network.request-backoff.initial: 100
taskmanager.network.request-backoff.max: 10000

# ===== REST =====
rest.port: 8081
rest.address: 0.0.0.0
rest.bind-port: 8081

# ===== メトリクス =====
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249

16.2 パフォーマンスチューニング

# バックプレッシャー対策
taskmanager.network.memory.buffers-per-channel: 2
taskmanager.network.memory.floating-buffers-per-gate: 8

# Buffer Debloating(自動バッファ調整)
taskmanager.network.memory.buffer-debloat.enabled: true
taskmanager.network.memory.buffer-debloat.target: 1s

# Unaligned Checkpoints(バックプレッシャー下での高速CP)
execution.checkpointing.unaligned.enabled: true

# RocksDB チューニング
state.backend.rocksdb.block.cache-size: 256mb
state.backend.rocksdb.writebuffer.size: 128mb
state.backend.rocksdb.writebuffer.count: 4
state.backend.rocksdb.compaction.level.max-size-level-base: 320mb
state.backend.rocksdb.thread.num: 4

# オブジェクト再利用(GC軽減)
pipeline.object-reuse: true

# Async State Processing(Flink 2.0+)
execution.async-state.enabled: true

17. メモリ管理

17.1 TaskManager メモリモデル

┌──────────────────────────────────────────────────┐
│            Total Process Memory                   │
│          (taskmanager.memory.process.size)        │
│                                                   │
│  ┌────────────────────────────────────────────┐  │
│  │        Total Flink Memory                   │  │
│  │                                             │  │
│  │  ┌──────────────────────────────────────┐  │  │
│  │  │    Framework Heap (128MB)            │  │  │
│  │  │    (Flink 内部オブジェクト)            │  │  │
│  │  ├──────────────────────────────────────┤  │  │
│  │  │    Task Heap                         │  │  │
│  │  │    (ユーザーコード、UDF)              │  │  │
│  │  ├──────────────────────────────────────┤  │  │
│  │  │    Managed Memory (40%)              │  │  │
│  │  │    (RocksDB State, Batch Sort/Hash) │  │  │
│  │  ├──────────────────────────────────────┤  │  │
│  │  │    Network Memory (10%)              │  │  │
│  │  │    (ネットワークバッファ)              │  │  │
│  │  ├──────────────────────────────────────┤  │  │
│  │  │    Framework Off-Heap (128MB)        │  │  │
│  │  ├──────────────────────────────────────┤  │  │
│  │  │    Task Off-Heap (0)                 │  │  │
│  │  └──────────────────────────────────────┘  │  │
│  └────────────────────────────────────────────┘  │
│                                                   │
│  ┌────────────────────────────────────────────┐  │
│  │    JVM Overhead (10%)                       │  │
│  │    (JVM Metaspace, GC, etc.)                │  │
│  └────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────┘

18. 監視と運用

18.1 Flink Web Dashboard

Flink Web Dashboard(デフォルト: ポート8081)で確認可能な情報:

タブ情報
Overviewクラスター概要、TaskManager数、Slot状態
Running Jobs実行中ジョブの DAG、各オペレーターの処理状況
Completed Jobs完了/失敗ジョブの履歴
Task ManagersTaskManager の詳細情報、メモリ使用状況
Job ManagerJobManager の設定、ログ

18.2 メトリクス

# Prometheus Metrics Reporter
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249

主要なメトリクス:

メトリクス説明監視の意義
numRecordsInPerSecond秒あたりの入力レコード数スループット
numRecordsOutPerSecond秒あたりの出力レコード数スループット
currentInputWatermark現在のウォーターマークイベント時間の進行
lastCheckpointDuration最後のCP所要時間CP性能
lastCheckpointSize最後のCPサイズ状態サイズ
numberOfFailedCheckpoints失敗したCP数安定性
isBackPressuredバックプレッシャー有無ボトルネック検出
busyTimeMsPerSecondオペレーターのビジー時間負荷

18.3 Grafana ダッシュボード

推奨パネル:

  • Throughput: Records In/Out per Second (by operator)
  • Latency: End-to-end latency distribution
  • Checkpointing: Duration, Size, Success/Failure rate
  • Backpressure: Backpressure status per operator
  • Resource Usage: CPU, Memory, Network per TaskManager
  • State Size: State backend size trends

18.4 REST API

# ジョブ一覧
curl http://jobmanager:8081/jobs

# ジョブ詳細
curl http://jobmanager:8081/jobs/<job-id>

# ジョブのキャンセル
curl -X PATCH http://jobmanager:8081/jobs/<job-id>?mode=cancel

# セーブポイントの作成
curl -X POST http://jobmanager:8081/jobs/<job-id>/savepoints \
    -d '{"target-directory": "s3://flink-state/savepoints/", "cancel-job": false}'

# チェックポイント統計
curl http://jobmanager:8081/jobs/<job-id>/checkpoints

# TaskManager一覧
curl http://jobmanager:8081/taskmanagers

19. セキュリティ

# SSL/TLS設定
security.ssl.internal.enabled: true
security.ssl.internal.keystore: /etc/flink/tls/keystore.jks
security.ssl.internal.keystore-password: keystore-pass
security.ssl.internal.truststore: /etc/flink/tls/truststore.jks
security.ssl.internal.truststore-password: truststore-pass

security.ssl.rest.enabled: true
security.ssl.rest.keystore: /etc/flink/tls/keystore.jks
security.ssl.rest.keystore-password: keystore-pass

# Kerberos
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /etc/flink/keytab/flink.keytab
security.kerberos.login.principal: flink/hostname@REALM.COM

20. ユースケースとベストプラクティス

20.1 代表的なユースケース

リアルタイム不正検知

// クレジットカード不正検知パイプライン
DataStream<Transaction> transactions = env.fromSource(kafkaSource, ...);

// 1. 地理的に不可能な取引の検出
DataStream<Alert> geoAlerts = transactions
    .keyBy(Transaction::getCardId)
    .process(new GeoImpossibilityDetector());  // 状態: 最後の取引位置と時刻

// 2. 短時間での異常な取引頻度
DataStream<Alert> frequencyAlerts = transactions
    .keyBy(Transaction::getCardId)
    .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
    .aggregate(new TransactionCountAggregator())
    .filter(count -> count.getValue() > 10);

// 3. CEPによるパターン検出
Pattern<Transaction, ?> pattern = Pattern
    .<Transaction>begin("small").where(t -> t.getAmount() < 1.0).times(3)
    .followedBy("large").where(t -> t.getAmount() > 1000.0)
    .within(Time.minutes(10));

リアルタイムETL(CDC → Data Lake)

-- MySQL CDC → Iceberg パイプライン
-- ソース:MySQL CDC
CREATE TABLE mysql_orders (
    order_id BIGINT, customer_id BIGINT, amount DECIMAL(10,2),
    status STRING, created_at TIMESTAMP(3), updated_at TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc', ...);

-- シンク:Iceberg
CREATE TABLE iceberg_orders (
    order_id BIGINT, customer_id BIGINT, amount DECIMAL(10,2),
    status STRING, created_at TIMESTAMP(3), updated_at TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'iceberg', 'write.upsert.enabled' = 'true', ...);

-- CDC→Iceberg同期
INSERT INTO iceberg_orders SELECT * FROM mysql_orders;

20.2 ベストプラクティス

  1. 全オペレーターにUIDを設定: セーブポイント互換性の確保
  2. Incremental Checkpoint を使用: RocksDB + incremental で大規模状態に対応
  3. ウォーターマーク戦略を慎重に設計: 遅延データの許容範囲を適切に設定
  4. バックプレッシャーを監視: ボトルネックの早期発見
  5. 状態のTTLを設定: 無限に成長する状態を防止
  6. Operator Chaining を活用: 不要な分離を避ける
  7. Async I/O を使用: 外部システム呼び出しの非同期化
  8. 適切な並列度を設定: Kafkaパーティション数に合わせる
  9. Application Modeでデプロイ: 本番環境ではSession Modeを避ける
  10. チェックポイント間隔を適切に設定: 短すぎるとオーバーヘッド、長すぎるとリカバリが遅い

21. Flink vs 他エンジン比較

項目FlinkSpark StreamingKafka StreamsStorm
処理モデル真のストリームマイクロバッチ真のストリーム真のストリーム
レイテンシミリ秒〜秒秒〜分ミリ秒ミリ秒
Exactly-Once✓(E2E)✓(E2E)At-least-once
状態管理強力(大規模対応)中程度限定的弱い
SQLFlink SQLSpark SQLKSQLなし
バッチ処理✓(主要機能)
CEP✓(ネイティブ)
スケーラビリティ高い高い中程度高い
運用複雑性中〜高

22. 最新動向と将来展望

22.1 Flink 2.0 の主な変更

  • DataStream API v2: より直感的で型安全なAPI
  • Disaggregated State Management: 状態のストレージと計算の分離
  • Thread Model改善: マルチスレッド実行モデルの最適化
  • 非同期状態処理: 状態アクセスの非同期化による高スループット化

22.2 将来の方向性

  • Streaming Lakehouse: Iceberg/Paimon/Hudiとの深い統合
  • AI/ML Integration: リアルタイム特徴量エンジニアリング、オンライン推論
  • Serverless Flink: クラウドネイティブなオンデマンド実行
  • Unified Batch & Stream: バッチ・ストリーム統合の更なる成熟

23. まとめ

Apache Flink は、ステートフルストリーム処理のデファクトスタンダードであり、低レイテンシ・高スループット・Exactly-Once保証を兼ね備えた分散処理エンジンである。

技術的要点

カテゴリ要点
アーキテクチャJobManager-TaskManager、パイプライン実行
処理モデル真のイベント単位ストリーム処理
状態管理Keyed State / Operator State + RocksDB
耐障害性Checkpoint(分散スナップショット)+ Savepoint
時間処理Event Time + Watermark による正確な処理
SQLFlink SQL + Table API
コネクターKafka, Kinesis, JDBC, CDC, FileSystem, Iceberg等
デプロイKubernetes (Flink Operator) 推奨

選定の指針

Flink が適しているケース:

  • リアルタイムストリーム処理: ミリ秒〜秒レベルのレイテンシが必要
  • ステートフル処理: 大規模な状態を伴うストリーム処理
  • Exactly-Once保証: End-to-Endの厳密なデータ整合性
  • CEP: 複合イベントパターンの検出
  • CDC パイプライン: データベースの変更をリアルタイムに同期

他のツールを検討すべきケース:

  • 大規模バッチETL主体: Apache Spark
  • シンプルなメッセージ処理: Kafka Streams
  • アドホックSQLクエリ: Apache Trino
  • データウェアハウス: BigQuery / Snowflake / Redshift

参考文献: