Kafka

Apache Kafka:分散イベントストリーミングプラットフォーム完全ガイド

目次

  1. 概要
  2. 基本概念
  3. アーキテクチャ
  4. 主要コンポーネント
  5. トピックとパーティション
  6. プロデューサー
  7. コンシューマー
  8. ブローカー
  9. レプリケーション
  10. 設定とチューニング
  11. 運用
  12. ユースケース

概要

Apache Kafkaは、LinkedInによって開発され、現在はApache Softwareファウンデーションの下で開発される、スケーラブルで耐障害性の高い分散イベントストリーミングプラットフォームです。Kafkaは、大規模なリアルタイムデータストリームの処理、永続化、分析を可能にします。

Kafkaが解決する問題

従来のメッセージキューと異なり、Kafkaは以下の課題に対応しています:

  • スケーラビリティ: 数百万のメッセージ/秒を処理
  • 永続性: ディスク上にイベントを長期保存
  • 再生可能性: 過去のイベントを再度処理可能
  • 低遅延: リアルタイムデータ処理

主な特徴

  • 高スループット: パーティション化による並列処理
  • 低レイテンシー: ミリ秒単位の遅延
  • 耐障害性: レプリケーションによる冗長性
  • スケーラビリティ: ノード追加による線形スケーリング
  • 持続性: 設定可能な保持期間

基本概念

イベントストリーミングとは

イベントストリーミングは、イベント駆動アーキテクチャの中核を成す技術です。システム内で発生する重要なビジネスイベント(ユーザーアクション、トランザクション、センサーデータなど)を、発生順に記録し、複数のシステムやアプリケーションに配信します。

メッセージ vs トピック vs パーティション

メッセージ: Kafkaの最小単位のデータ。キー、値、タイムスタンプで構成されます。

Message {
  key: "user_123",
  value: {
    event: "purchase",
    amount: 99.99,
    timestamp: "2026-04-07T10:30:00Z"
  },
  partition: 2,
  offset: 1524
}

トピック: メッセージを分類するカテゴリ。関連するメッセージをグループ化します。

例: user-events, payments, logs, sensor-data

パーティション: トピック内の並列ストレージユニット。スケーラビリティを実現します。

オフセットとシーケンス性

各パーティション内のメッセージには、オフセットという単調増加するIDが割り当てられます。

パーティション 0:
  オフセット 0: Message A
  オフセット 1: Message B
  オフセット 2: Message C

パーティション 1:
  オフセット 0: Message D
  オフセット 1: Message E
  オフセット 2: Message F

同じキーを持つメッセージは常に同じパーティションに配置され、順序が保証されます。

リーダーとフォロワー

Kafkaのレプリケーション設計:

  • リーダー: パーティションのプライマリレプリカ。すべての読み書きを処理
  • フォロワー: パーティションのセカンダリレプリカ。リーダーからデータをレプリケート

アーキテクチャ

全体構成

┌─────────────────────────────────────────────────────────────┐
│                        クライアント層                        │
│  ┌──────────────────┐         ┌──────────────────┐         │
│  │   Producers      │         │   Consumers      │         │
│  │  (データ送信)     │         │  (データ受信)     │         │
│  └────────┬─────────┘         └────────┬─────────┘         │
└───────────┼──────────────────────────────┼──────────────────┘
            │                              │
            │          ┌──────────────────────────┐
            │          │  Kafka Cluster           │
            └─────────►│  ┌────────────────────┐ │
                       │  │   Broker 1         │ │
                       │  │  Topic A [P0][P1]  │ │
                       │  └────────────────────┘ │
                       │  ┌────────────────────┐ │
                       │  │   Broker 2         │ │
                       │  │  Topic A [P2][P3]  │ │
                       │  └────────────────────┘ │
                       │  ┌────────────────────┐ │
                       │  │   Broker 3         │ │
                       │  │  Topic B [P0][P1]  │ │
                       │  └────────────────────┘ │
                       └──────────────────────────┘
                              │
                       ┌──────────────────────────┐
                       │  ZooKeeper Ensemble      │
                       │  (メタデータ管理)         │
                       │  Node 1, Node 2, Node 3  │
                       └──────────────────────────┘

レイヤー構造

1. ストレージレイヤー

  • パーティション: トピックのデータを保持
  • セグメント: ファイルシステム上の物理的な保存単位

2. ネットワークレイヤー

  • ブローカー間通信
  • クライアント・ブローカー通信
  • ZooKeeperとの通信

3. 処理レイヤー

  • パーティション割り当て
  • レプリケーション
  • リバランシング

主要コンポーネント

ブローカー (Broker)

ブローカーはKafkaクラスタの中核となるサーバープロセスです。

ブローカーの責務:

  • トピックのパーティションを保有
  • プロデューサーからのメッセージを受け取る
  • コンシューマーにメッセージを配信
  • レプリケーションの管理
  • パーティションのリーダー選出

ブローカー設定例 (server.properties):

# ブローカーID(クラスタ内で一意)
broker.id=1

# リスニングポート
listeners=PLAINTEXT://kafka-broker-1.example.com:9092
advertised.listeners=PLAINTEXT://kafka-broker-1.example.com:9092

# ログディレクトリ
log.dirs=/var/kafka-logs

# リテンション設定
log.retention.hours=168
log.retention.bytes=1073741824

# パフォーマンス設定
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# レプリケーション設定
min.insync.replicas=2
unclean.leader.election.enable=false
auto.leader.rebalance.enable=true

# ZooKeeper設定
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.session.timeout.ms=18000

ZooKeeper

ZooKeeperはKafkaのメタデータとクラスタ状態を管理します。

ZooKeeperが管理する情報:

  • ブローカーのリスト
  • トピック・パーティション構成
  • パーティションリーダー情報
  • コンシューマーグループのオフセット
  • クォーラムリーダー情報

ZooKeeper設定例 (zoo.cfg):

tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181

# ZooKeeperクラス��メンバー
server.1=zk1.example.com:2888:3888
server.2=zk2.example.com:2888:3888
server.3=zk3.example.com:2888:3888

# セッションタイムアウト
4lw.commands.whitelist=stat,ruok,conf,isro
autopurge.snapRetainCount=3
autopurge.purgeInterval=1

トピックとパーティション

トピックの作成

# 基本的なトピック作成
kafka-topics.sh --create \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 2 \
  --bootstrap-server localhost:9092

# より詳細な設定を含む作成
kafka-topics.sh --create \
  --topic events-topic \
  --partitions 6 \
  --replication-factor 3 \
  --bootstrap-server localhost:9092 \
  --config retention.ms=604800000 \
  --config compression.type=snappy \
  --config min.insync.replicas=2

パーティション戦略

1. ラウンドロビン分配:

キーなしメッセージが送信されるとき:
Message 1 → Partition 0
Message 2 → Partition 1
Message 3 → Partition 2
Message 4 → Partition 0
...

2. キーベース分配:

同じキーは同じパーティションへ:
Message(key="user_123") → Partition 2
Message(key="user_123") → Partition 2
Message(key="order_456") → Partition 0

3. カスタム分配: 実装例:

public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes,
                        Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        if (key == null) {
            return new Random().nextInt(numPartitions);
        }
        
        // キーのハッシュ値に基づいてパーティションを決定
        String keyStr = key.toString();
        if (keyStr.startsWith("priority_")) {
            return 0;  // 優先度の高いメッセージは Partition 0
        }
        
        return Math.abs(keyStr.hashCode()) % numPartitions;
    }
}

パーティション数の決定

パーティション数を決定する際の考慮事項:

スループット要件:

必要なスループット = 100,000 msg/sec
単一パーティションのスループット = 10,000 msg/sec
推奨パーティション数 = 100,000 / 10,000 = 10 パーティション

コンシューマー並列性:

パーティション数 >= コンシューマー数
例:4つのコンシューマーで処理する場合、最小4パーティション

レイテンシー:

パーティション数が多い → 管理オーバーヘッドが増加
パーティション数が少ない → スループット制限
バランスが重要

プロデューサー

プロデューサー設定例

# 基本設定
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

# 信頼性設定
acks=all
retries=3
max.in.flight.requests.per.connection=5
enable.idempotence=true

# パフォーマンス設定
batch.size=16384
linger.ms=10
compression.type=snappy

# タイムアウト設定
request.timeout.ms=30000
delivery.timeout.ms=120000

プロデューサー設定の詳細

acks設定:

acks=0: ファイア・アンド・フォーゲット
  - 最もパフォーマンスが高い
  - メッセージ喪失のリスク
  
acks=1: リーダーのみ確認
  - 中程度の信頼性
  - リーダーがダウンするとメッセージ喪失の可能性
  
acks=all (-1): ISR (In-Sync Replicas) すべてが確認
  - 最も信頼性が高い
  - レイテンシーが増加
  - 推奨: 金融取引など重要なデータ

べき等性:

# べき等プロデューサーの設定
enable.idempotence=true
max.in.flight.requests.per.connection=5

# 結果:同じメッセージが複数回送信されてもファイナル状態は同じ
# Kafkaが内部で重複排除を行う

Javaプロデューサーの実装例

import org.apache.kafka.clients.producer.*;
import java.util.*;

public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", 
            "org.apache.kafka.common.serialization.JsonSerializer");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("compression.type", "snappy");

        KafkaProducer<String, String> producer = 
            new KafkaProducer<>(props);

        try {
            // 同期送信
            for (int i = 0; i < 100; i++) {
                String key = "key-" + i;
                String value = "{\"id\": " + i + ", \"data\": \"test-" + i + "\"}";
                
                ProducerRecord<String, String> record = 
                    new ProducerRecord<>("my-topic", key, value);

                RecordMetadata metadata = producer.send(record).get();
                
                System.out.println("Topic: " + metadata.topic() +
                                 " Partition: " + metadata.partition() +
                                 " Offset: " + metadata.offset());
            }

            // 非同期送信 (コールバック付き)
            for (int i = 100; i < 200; i++) {
                String key = "async-key-" + i;
                String value = "{\"id\": " + i + ", \"type\": \"async\"}";
                
                ProducerRecord<String, String> record = 
                    new ProducerRecord<>("my-topic", key, value);

                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, 
                                            Exception exception) {
                        if (exception != null) {
                            System.err.println("送信エラー: " + exception);
                        } else {
                            System.out.println("成功 - オフセット: " + 
                                            metadata.offset());
                        }
                    }
                });
            }
        } finally {
            producer.close();
        }
    }
}

コンシューマー

コンシューマーグループ

複数のコンシューマーが同じトピックを処理する場合、コンシューマーグループを使用します。

トピック(3パーティション):
  ┌─────────┬─────────┬─────────┐
  │ Partition 0 │ Partition 1 │ Partition 2 │
  └─────────┴─────────┴─────────┘
       ↓           ↓           ↓
  ┌──────────┬──────────┬──────────┐
  │Consumer 1│Consumer 2│Consumer 3│
  │(group A) │(group A) │(group A) │
  └──────────┴──────────┴──────────┘

グループ内の各コンシューマーは異なるパーティションを処理

コンシューマー設定例

# 基本設定
bootstrap.servers=localhost:9092
group.id=my-consumer-group
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# オフセット管理
auto.offset.reset=earliest
enable.auto.commit=true
auto.commit.interval.ms=1000

# パフォーマンス設定
fetch.min.bytes=1024
fetch.max.wait.ms=500
max.poll.records=500
session.timeout.ms=30000

# セキュリティ設定
security.protocol=SSL
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=password

Javaコンシューマーの実装例

import org.apache.kafka.clients.consumer.*;
import java.util.*;
import java.time.Duration;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", 
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", 
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", false);  // 手動コミット

        KafkaConsumer<String, String> consumer = 
            new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("my-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("パーティション: " + 
                        record.partition() + 
                        " オフセット: " + record.offset() +
                        " キー: " + record.key() +
                        " 値: " + record.value());
                    
                    // メッセージ処理
                    processMessage(record);
                }

                // 手動コミット
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }

    private static void processMessage(ConsumerRecord<String, String> record) {
        // ビジネスロジック
        System.out.println("処理中: " + record.value());
    }
}

オフセット管理

public class OffsetManagementExample {
    public static void main(String[] args) {
        // ...コンシューマー設定...

        KafkaConsumer<String, String> consumer = 
            new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("my-topic"));

        while (true) {
            ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                try {
                    // メッセージ処理
                    processMessage(record);
                    
                    // 処理成功時のみコミット
                    consumer.commitSync();
                    
                } catch (Exception e) {
                    System.err.println("処理失敗: " + e.getMessage());
                    // オフセットをコミットしない = 再度処理される
                    continue;
                }
            }

            // シーク機能:特定のオフセットまでリセット
            TopicPartition tp = 
                new TopicPartition("my-topic", 0);
            consumer.seek(tp, 100);  // オフセット100から開始
        }
    }
}

ブローカー

ブローカークラスタの管理

クラスタ構成例:

3ノードクラスタ:
┌─────────────────────────────────────────────┐
│                Kafka Cluster                │
├─────────────────────────────────────────────┤
│                                             │
│  Broker 1 (ID: 1)                          │
│  ├── Topic A: [Leader P0, P1]              │
│  ├── Topic B: [Follower P0]                │
│  └── JMX Port: 9999                        │
│                                             │
│  Broker 2 (ID: 2)                          │
│  ├── Topic A: [Follower P2, Leader P1]    │
│  ├── Topic B: [Leader P1]                  │
│  └── JMX Port: 9999                        │
│                                             │
│  Broker 3 (ID: 3)                          │
│  ├── Topic A: [Follower P0, Follower P2]  │
│  ├── Topic B: [Follower P1]                │
│  └── JMX Port: 9999                        │
│                                             │
└─────────────────────────────────────────────┘

リーダー選出

Kafkaはファイル内の最後のメッセージから制御するメカニズムを使用:

初期状態:
Broker 1 (Partition 0 Leader)
Broker 2 (Partition 0 Replica)
Broker 3 (Partition 0 Replica)

Broker 1がダウン:
→ ZooKeeperが検知
→ ISR (In-Sync Replicas) から新しいリーダーを選出
→ Broker 2が新しいリーダーに昇格

Broker 1が復帰:
→ Broker 2 に追いつく
→ 元のリーダーには戻らない (デフォルト)

ディスク管理

ログセグメントの管理:

# ファイルサイズによるロテーション
log.segment.bytes=1073741824  # 1GB

# 時間によるロテーション
log.roll.ms=86400000  # 24時間

# アクティブなセグメント以外の保持
log.retention.hours=168  # 7日間
log.retention.bytes=1073741824  # 1GB

# クリーンアップポリシー
log.cleanup.policy=delete  # or 'compact'

ログクリーンアップポリシー:

  1. Delete Policy (削除ポリシー)

    時間または容量に基づいてログを削除
    設定:log.cleanup.policy=delete
    用途:イベントログ、一時データ
    
  2. Compact Policy (圧縮ポリシー)

    最新の状態のみを保持
    設定:log.cleanup.policy=compact
    用途:ユーザーデータ、設定情報
    
    例:
    キー1→値A → 削除
    キー1→値B → 削除
    キー1→値C → 保持(最新)
    

レプリケーション

レプリケーションの仕組み

パーティションのレプリケーション要因 (RF) = 3:

Broker 1 (Leader)
├── Write from Producer
├── Replicate → Broker 2
└── Replicate → Broker 3

Broker 2 (In-Sync Replica)
└── Ack → Broker 1

Broker 3 (In-Sync Replica)
└── Ack → Broker 1

Producer
└── Ack from Leader

ISR (In-Sync Replicas)

ISRはリーダーと同期しているレプリカのセットです:

# ISR設定
min.insync.replicas=2

# replica.lag.time.max.msを超えてフェッチしないレプリカは
# ISRから除外される
replica.lag.time.max.ms=10000

ISRから除外されるケース:

  • ネットワーク遅延
  • ブローカーのハング
  • ディスク I/O の問題

高信頼性設定

# 最も安全な設定
acks=all
min.insync.replicas=2
unclean.leader.election.enable=false
log.flush.interval.messages=10000
log.flush.interval.ms=1000

この設定では:

  • すべてのISRレプリカが確認するまでメッセージ送信は完了しない
  • ISRが min.insync.replicas より少ないと書き込み不可
  • 不潔なリーダー選出を防止(データ喪失を防ぐ)

設定とチューニング

パフォーマンスチューニング

1. ネットワークスループットの最適化:

# プロデューサー側
batch.size=32768  # バッチサイズを増やす
linger.ms=20      # メッセージ溜め込み時間
compression.type=snappy  # 圧縮を有効化

# ブローカー側
num.network.threads=16  # ネットワークスレッド数
num.io.threads=16       # I/O スレッド数
socket.send.buffer.bytes=262144
socket.receive.buffer.bytes=262144

2. ディスク I/O の最適化:

# ブローカー側
log.flush.interval.ms=0  # 非同期フラッシュ
log.segment.bytes=1073741824
num.recovery.threads.per.data.dir=4

# コンシューマー側
fetch.min.bytes=10240
fetch.max.wait.ms=500

3. メモリ管理:

# ブローカー JVM 設定
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
export KAFKA_JVM_PERFORMANCE_OPTS="
  -XX:+UseG1GC
  -XX:MaxGCPauseMillis=20
  -XX:InitiatingHeapOccupancyPercent=35
  -XX:+DisableExplicitGC
"

CPU 最適化

# マルチコア CPU の活用
num.network.threads=8
num.io.threads=8
background.threads=4

# キャッシュ効率の向上
log.index.interval.bytes=4096
socket.request.max.bytes=104857600

ストレージの考慮事項

計算例:
- スループット:100,000 msg/sec
- メッセージサイズ:1KB
- 保持期間:7日間

必要なストレージ容量 = 
  100,000 * 1KB * 60 * 60 * 24 * 7 * 3 (RF=3)
  = 1.8 TB

監視対象メトリクス

プロデューサーメトリクス:

kafka.producer:type=producer-metrics,client-id=*
- record-send-total
- record-send-rate
- record-error-total
- record-error-rate
- record-retry-total
- batch-size-avg
- compression-rate-avg

ブローカーメトリクス:

kafka.server:type=BrokerTopicMetrics,name=*
- MessagesInPerSec
- MessagesOutPerSec
- BytesInPerSec
- BytesOutPerSec
- BytesRejectedPerSec

コンシューマーメトリクス:

kafka.consumer:type=consumer-metrics,client-id=*
- records-consumed-rate
- records-lag-max
- fetch-latency-avg
- fetch-latency-max

運用

クラスタのセットアップ

最小構成例(開発環境):

# 1. ZooKeeperの起動
bin/zookeeper-server-start.sh config/zookeeper.properties &

# 2. Kafkaブローカーの起動
bin/kafka-server-start.sh config/server.properties &

# 3. トピック作成
bin/kafka-topics.sh --create \
  --topic test-topic \
  --partitions 1 \
  --replication-factor 1 \
  --bootstrap-server localhost:9092

本番構成例(3ノード):

Node 1: Kafka Broker 1 + ZooKeeper 1
Node 2: Kafka Broker 2 + ZooKeeper 2
Node 3: Kafka Broker 3 + ZooKeeper 3

各ノードの server.properties:

# Node 1
broker.id=1
log.dirs=/data/kafka-logs-1
listeners=PLAINTEXT://kafka1.example.com:9092
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

# Node 2
broker.id=2
log.dirs=/data/kafka-logs-2
listeners=PLAINTEXT://kafka2.example.com:9092
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

# Node 3
broker.id=3
log.dirs=/data/kafka-logs-3
listeners=PLAINTEXT://kafka3.example.com:9092
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

トピック管理

# トピック一覧表示
kafka-topics.sh --list --bootstrap-server localhost:9092

# トピック詳細表示
kafka-topics.sh --describe --topic my-topic \
  --bootstrap-server localhost:9092

# パーティション数の増加(減少は不可)
kafka-topics.sh --alter \
  --topic my-topic \
  --partitions 6 \
  --bootstrap-server localhost:9092

# トピック設定の変更
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name my-topic \
  --add-config retention.ms=604800000 \
  --bootstrap-server localhost:9092

# トピック削除
kafka-topics.sh --delete --topic my-topic \
  --bootstrap-server localhost:9092

レプリカ管理

# 復旧スクリプト作成
cat > reassign.json << 'EOF'
{
  "version": 1,
  "partitions": [
    {
      "topic": "my-topic",
      "partition": 0,
      "replicas": [1, 2, 3]
    },
    {
      "topic": "my-topic",
      "partition": 1,
      "replicas": [2, 3, 1]
    }
  ]
}
EOF

# レプリカ再配置の実行
kafka-reassign-partitions.sh \
  --zookeeper localhost:2181 \
  --reassignment-json-file reassign.json \
  --execute

# ステータス確認
kafka-reassign-partitions.sh \
  --zookeeper localhost:2181 \
  --reassignment-json-file reassign.json \
  --verify

バックアップとリカバリ

# トピックのバックアップ(MirrorMaker 使用)
bin/connect-mirror-maker.sh \
  config/connect-mirror-maker.properties

# 手動バックアップ
tar -czf kafka-backup-$(date +%Y%m%d).tar.gz \
  /var/kafka-logs

# リカバリ
tar -xzf kafka-backup-20260407.tar.gz -C /

トラブルシューティング

1. プロデューサーがハング:

# ブローカーの状態確認
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# コンシューマーグループのオフセット確認
kafka-consumer-groups.sh --describe \
  --group my-group \
  --bootstrap-server localhost:9092

# メッセージ確認
kafka-console-consumer.sh \
  --topic my-topic \
  --from-beginning \
  --bootstrap-server localhost:9092 \
  --max-messages 10

2. ディスク満杯:

# ログサイズ確認
du -sh /var/kafka-logs

# 古いセグメント削除
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name my-topic \
  --add-config retention.ms=86400000 \
  --bootstrap-server localhost:9092

ユースケース

1. リアルタイムアナリティクス

電子商取引プラットフォームでのユーザー行動追跡:

ユーザー行動
  ↓
クライアント側 Kafka Producer
  ├── ページビュー イベント
  ├── クリック イベント
  ├── 購入 イベント
  └── レビュー イベント
     ↓
  Kafka Topic: "user-events"
  (パーティション: 8, レプリケーション: 3)
     ↓
複数のコンシューマー
  ├── リアルタイム ダッシュボード
  ├── 異常検知 システム
  ├── 推奨エンジン
  └── データ ウェアハウス

設定:

# トピック設定
retention.ms=2592000000  # 30日間
partitions=8
replication.factor=3
compression.type=snappy
min.insync.replicas=2

# プロデューサー設定
acks=all
retries=3
max.in.flight.requests.per.connection=5
enable.idempotence=true

2. マイクロサービス間通信

サービス間の非同期通信:

Service A (注文サービス)
  ├── 新規注文 イベント
  └── 注文更新 イベント
       ↓
    Kafka Topic: "order-events"
       ↓
  Service B (在庫サービス)
  Service C (配送サービス)
  Service D (請求サービス)
  Service E (通知サービス)

トピック設計:

topics:
  - name: order-events
    partitions: 4
    replication-factor: 3
    config:
      retention.ms: 604800000  # 7日間
      compression.type: snappy

  - name: inventory-events
    partitions: 4
    replication-factor: 3
    config:
      retention.ms: 604800000

  - name: notification-events
    partitions: 2
    replication-factor: 3
    config:
      retention.ms: 86400000  # 1日間

3. ログアグリゲーション

複数のサーバーから集約されたログ:

Application Servers
  ├── Server 1 ← Filebeat → Kafka
  ├── Server 2 ← Filebeat → Kafka
  ├── Server 3 ← Filebeat → Kafka
  └── Server 4 ← Filebeat → Kafka
          ↓
    Kafka Topic: "logs"
          ↓
   Multiple Consumers
    ├── Elasticsearch
    ├── Data Lake
    ├── Real-time Alerts
    └── Archive (S3)

Filebeat 設定例:

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/application/*.log

output.kafka:
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
  topic: 'logs'
  partition.round_robin:
    reachable_only: true
  compression: gzip
  ssl.enabled: true
  ssl.verification_mode: full

4. CDC (Change Data Capture)

データベースの変更検知と配信:

Source Database
    ↓
CDC Tools (e.g., Debezium)
    ├── Insert → Kafka
    ├── Update → Kafka
    └── Delete → Kafka
         ↓
    Kafka Topics
    ├── customer-cdc
    ├── order-cdc
    └── product-cdc
         ↓
    Target Destinations
    ├── Target Database
    ├── Data Warehouse
    ├── Cache (Redis)
    └── Search Engine (Elasticsearch)

Debezium MySQL Connector 設定:

{
  "name": "mysql-connector-customers",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql.example.com",
    "database.port": 3306,
    "database.user": "debezium",
    "database.password": "password",
    "database.server.id": 1,
    "database.server.name": "mysql-server-1",
    "database.include.list": "ecommerce",
    "table.include.list": "ecommerce.customers,ecommerce.orders",
    "database.history.kafka.bootstrap.servers": "kafka1:9092,kafka2:9092,kafka3:9092",
    "database.history.kafka.topic": "schema_changes_history",
    "include.schema.changes": true,
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3",
    "decimal.handling.mode": "string"
  }
}

5. ストリーミング分析

リアルタイムデータ処理(Kafka Streams 使用):

Input Streams
    ↓
Kafka Streams
├── Stateless Operations
│   ├── filter
│   ├── map
│   └── flatMap
├── Stateful Operations
│   ├── aggregate
│   ├── reduce
│   └── count
└── Join Operations
    ├── KStream-KStream
    ├── KStream-KTable
    └── KTable-KTable
    ↓
Output Streams
├── Real-time Alerts
├── Aggregated Results
├── Joined Data
└── Back to Kafka Topics

Kafka Streams アプリケーション例:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;

public class StreamingAnalyticsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
                 "streaming-analytics-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
                 "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                 Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                 Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // ソーストピックから読み込み
        KStream<String, String> purchases = 
            builder.stream("purchase-events");

        // フィルタリング
        KStream<String, String> highValuePurchases = 
            purchases.filter((key, value) -> {
                try {
                    double amount = Double.parseDouble(value.split(",")[1]);
                    return amount > 1000;
                } catch (Exception e) {
                    return false;
                }
            });

        // 集計(30秒のウィンドウ)
        highValuePurchases
            .groupByKey()
            .windowedBy(TimeWindows.of(
                java.time.Duration.ofSeconds(30)))
            .count()
            .toStream()
            .to("high-value-alerts");

        // トピック結合
        KStream<String, String> customers = 
            builder.stream("customer-events");
        
        KStream<String, String> enriched = 
            purchases.join(customers,
                (purchase, customer) -> {
                    return purchase + " | " + customer;
                },
                JoinWindows.of(
                    java.time.Duration.ofSeconds(60)));
        
        enriched.to("enriched-events");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(
            new Thread(streams::close));
    }
}

まとめ

Apache Kafkaは、現代的な分散システムにおいてイベントストリーミングの中心的な役割を果たしています。

主な利点

  1. スケーラビリティ: 水平スケーリングにより数百万メッセージ/秒を処理可能
  2. 永続性: ディスクベースの保存により、メッセージの再処理が可能
  3. 信頼性: レプリケーションにより高可用性を実現
  4. 低レイテンシー: ミリ秒単位の処理遅延
  5. 柔軟性: 多様なユースケースに対応

設計上の考慮事項

  • パーティショニング戦略: スループット要件に基づいて適切なパーティション数を選択
  • レプリケーション係数: 信頼性要件に応じてバランスを取る
  • 保持期間: ビジネス要件と容量計画に基づいて設定
  • 監視: メトリクス収集と異常検知の実装

ベストプラクティス

  1. 運用面

    • 定期的なバックアップ
    • 適切なカパシティプランニング
    • 継続的な監視とアラート
  2. 設定面

    • 環境に応じた細粒度なチューニング
    • セキュリティ設定の適切な実装
    • ドキュメンテーションの維持
  3. 開発面

    • べき等なプロデューサーの実装
    • 手動オフセット管理の検討
    • エラーハンドリングの充実

Kafkaは成熟したプラットフォームであり、適切に設計・運用することで、エンタープライズグレードのイベントストリーミングシステムを構築できます。


参考資料:


最終更新: 2026年4月7日 ドキュメント作成: Claude AI 対象読者: システムアーキテクト、DevOpsエンジニア、バックエンドデベロッパー