Kafka
Apache Kafka:分散イベントストリーミングプラットフォーム完全ガイド
目次
概要
Apache Kafkaは、LinkedInによって開発され、現在はApache Softwareファウンデーションの下で開発される、スケーラブルで耐障害性の高い分散イベントストリーミングプラットフォームです。Kafkaは、大規模なリアルタイムデータストリームの処理、永続化、分析を可能にします。
Kafkaが解決する問題
従来のメッセージキューと異なり、Kafkaは以下の課題に対応しています:
- スケーラビリティ: 数百万のメッセージ/秒を処理
- 永続性: ディスク上にイベントを長期保存
- 再生可能性: 過去のイベントを再度処理可能
- 低遅延: リアルタイムデータ処理
主な特徴
- 高スループット: パーティション化による並列処理
- 低レイテンシー: ミリ秒単位の遅延
- 耐障害性: レプリケーションによる冗長性
- スケーラビリティ: ノード追加による線形スケーリング
- 持続性: 設定可能な保持期間
基本概念
イベントストリーミングとは
イベントストリーミングは、イベント駆動アーキテクチャの中核を成す技術です。システム内で発生する重要なビジネスイベント(ユーザーアクション、トランザクション、センサーデータなど)を、発生順に記録し、複数のシステムやアプリケーションに配信します。
メッセージ vs トピック vs パーティション
メッセージ: Kafkaの最小単位のデータ。キー、値、タイムスタンプで構成されます。
Message {
key: "user_123",
value: {
event: "purchase",
amount: 99.99,
timestamp: "2026-04-07T10:30:00Z"
},
partition: 2,
offset: 1524
}
トピック: メッセージを分類するカテゴリ。関連するメッセージをグループ化します。
例: user-events, payments, logs, sensor-data
パーティション: トピック内の並列ストレージユニット。スケーラビリティを実現します。
オフセットとシーケンス性
各パーティション内のメッセージには、オフセットという単調増加するIDが割り当てられます。
パーティション 0:
オフセット 0: Message A
オフセット 1: Message B
オフセット 2: Message C
パーティション 1:
オフセット 0: Message D
オフセット 1: Message E
オフセット 2: Message F
同じキーを持つメッセージは常に同じパーティションに配置され、順序が保証されます。
リーダーとフォロワー
Kafkaのレプリケーション設計:
- リーダー: パーティションのプライマリレプリカ。すべての読み書きを処理
- フォロワー: パーティションのセカンダリレプリカ。リーダーからデータをレプリケート
アーキテクチャ
全体構成
┌─────────────────────────────────────────────────────────────┐
│ クライアント層 │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Producers │ │ Consumers │ │
│ │ (データ送信) │ │ (データ受信) │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
└───────────┼──────────────────────────────┼──────────────────┘
│ │
│ ┌──────────────────────────┐
│ │ Kafka Cluster │
└─────────►│ ┌────────────────────┐ │
│ │ Broker 1 │ │
│ │ Topic A [P0][P1] │ │
│ └────────────────────┘ │
│ ┌────────────────────┐ │
│ │ Broker 2 │ │
│ │ Topic A [P2][P3] │ │
│ └────────────────────┘ │
│ ┌────────────────────┐ │
│ │ Broker 3 │ │
│ │ Topic B [P0][P1] │ │
│ └────────────────────┘ │
└──────────────────────────┘
│
┌──────────────────────────┐
│ ZooKeeper Ensemble │
│ (メタデータ管理) │
│ Node 1, Node 2, Node 3 │
└──────────────────────────┘
レイヤー構造
1. ストレージレイヤー
- パーティション: トピックのデータを保持
- セグメント: ファイルシステム上の物理的な保存単位
2. ネットワークレイヤー
- ブローカー間通信
- クライアント・ブローカー通信
- ZooKeeperとの通信
3. 処理レイヤー
- パーティション割り当て
- レプリケーション
- リバランシング
主要コンポーネント
ブローカー (Broker)
ブローカーはKafkaクラスタの中核となるサーバープロセスです。
ブローカーの責務:
- トピックのパーティションを保有
- プロデューサーからのメッセージを受け取る
- コンシューマーにメッセージを配信
- レプリケーションの管理
- パーティションのリーダー選出
ブローカー設定例 (server.properties):
# ブローカーID(クラスタ内で一意)
broker.id=1
# リスニングポート
listeners=PLAINTEXT://kafka-broker-1.example.com:9092
advertised.listeners=PLAINTEXT://kafka-broker-1.example.com:9092
# ログディレクトリ
log.dirs=/var/kafka-logs
# リテンション設定
log.retention.hours=168
log.retention.bytes=1073741824
# パフォーマンス設定
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# レプリケーション設定
min.insync.replicas=2
unclean.leader.election.enable=false
auto.leader.rebalance.enable=true
# ZooKeeper設定
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.session.timeout.ms=18000
ZooKeeper
ZooKeeperはKafkaのメタデータとクラスタ状態を管理します。
ZooKeeperが管理する情報:
- ブローカーのリスト
- トピック・パーティション構成
- パーティションリーダー情報
- コンシューマーグループのオフセット
- クォーラムリーダー情報
ZooKeeper設定例 (zoo.cfg):
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
# ZooKeeperクラス��メンバー
server.1=zk1.example.com:2888:3888
server.2=zk2.example.com:2888:3888
server.3=zk3.example.com:2888:3888
# セッションタイムアウト
4lw.commands.whitelist=stat,ruok,conf,isro
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
トピックとパーティション
トピックの作成
# 基本的なトピック作成
kafka-topics.sh --create \
--topic my-topic \
--partitions 3 \
--replication-factor 2 \
--bootstrap-server localhost:9092
# より詳細な設定を含む作成
kafka-topics.sh --create \
--topic events-topic \
--partitions 6 \
--replication-factor 3 \
--bootstrap-server localhost:9092 \
--config retention.ms=604800000 \
--config compression.type=snappy \
--config min.insync.replicas=2
パーティション戦略
1. ラウンドロビン分配:
キーなしメッセージが送信されるとき:
Message 1 → Partition 0
Message 2 → Partition 1
Message 3 → Partition 2
Message 4 → Partition 0
...
2. キーベース分配:
同じキーは同じパーティションへ:
Message(key="user_123") → Partition 2
Message(key="user_123") → Partition 2
Message(key="order_456") → Partition 0
3. カスタム分配: 実装例:
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key == null) {
return new Random().nextInt(numPartitions);
}
// キーのハッシュ値に基づいてパーティションを決定
String keyStr = key.toString();
if (keyStr.startsWith("priority_")) {
return 0; // 優先度の高いメッセージは Partition 0
}
return Math.abs(keyStr.hashCode()) % numPartitions;
}
}
パーティション数の決定
パーティション数を決定する際の考慮事項:
スループット要件:
必要なスループット = 100,000 msg/sec
単一パーティションのスループット = 10,000 msg/sec
推奨パーティション数 = 100,000 / 10,000 = 10 パーティション
コンシューマー並列性:
パーティション数 >= コンシューマー数
例:4つのコンシューマーで処理する場合、最小4パーティション
レイテンシー:
パーティション数が多い → 管理オーバーヘッドが増加
パーティション数が少ない → スループット制限
バランスが重要
プロデューサー
プロデューサー設定例
# 基本設定
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# 信頼性設定
acks=all
retries=3
max.in.flight.requests.per.connection=5
enable.idempotence=true
# パフォーマンス設定
batch.size=16384
linger.ms=10
compression.type=snappy
# タイムアウト設定
request.timeout.ms=30000
delivery.timeout.ms=120000
プロデューサー設定の詳細
acks設定:
acks=0: ファイア・アンド・フォーゲット
- 最もパフォーマンスが高い
- メッセージ喪失のリスク
acks=1: リーダーのみ確認
- 中程度の信頼性
- リーダーがダウンするとメッセージ喪失の可能性
acks=all (-1): ISR (In-Sync Replicas) すべてが確認
- 最も信頼性が高い
- レイテンシーが増加
- 推奨: 金融取引など重要なデータ
べき等性:
# べき等プロデューサーの設定
enable.idempotence=true
max.in.flight.requests.per.connection=5
# 結果:同じメッセージが複数回送信されてもファイナル状態は同じ
# Kafkaが内部で重複排除を行う
Javaプロデューサーの実装例
import org.apache.kafka.clients.producer.*;
import java.util.*;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.JsonSerializer");
props.put("acks", "all");
props.put("retries", 3);
props.put("compression.type", "snappy");
KafkaProducer<String, String> producer =
new KafkaProducer<>(props);
try {
// 同期送信
for (int i = 0; i < 100; i++) {
String key = "key-" + i;
String value = "{\"id\": " + i + ", \"data\": \"test-" + i + "\"}";
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", key, value);
RecordMetadata metadata = producer.send(record).get();
System.out.println("Topic: " + metadata.topic() +
" Partition: " + metadata.partition() +
" Offset: " + metadata.offset());
}
// 非同期送信 (コールバック付き)
for (int i = 100; i < 200; i++) {
String key = "async-key-" + i;
String value = "{\"id\": " + i + ", \"type\": \"async\"}";
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata,
Exception exception) {
if (exception != null) {
System.err.println("送信エラー: " + exception);
} else {
System.out.println("成功 - オフセット: " +
metadata.offset());
}
}
});
}
} finally {
producer.close();
}
}
}
コンシューマー
コンシューマーグループ
複数のコンシューマーが同じトピックを処理する場合、コンシューマーグループを使用します。
トピック(3パーティション):
┌─────────┬─────────┬─────────┐
│ Partition 0 │ Partition 1 │ Partition 2 │
└─────────┴─────────┴─────────┘
↓ ↓ ↓
┌──────────┬──────────┬──────────┐
│Consumer 1│Consumer 2│Consumer 3│
│(group A) │(group A) │(group A) │
└──────────┴──────────┴──────────┘
グループ内の各コンシューマーは異なるパーティションを処理
コンシューマー設定例
# 基本設定
bootstrap.servers=localhost:9092
group.id=my-consumer-group
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# オフセット管理
auto.offset.reset=earliest
enable.auto.commit=true
auto.commit.interval.ms=1000
# パフォーマンス設定
fetch.min.bytes=1024
fetch.max.wait.ms=500
max.poll.records=500
session.timeout.ms=30000
# セキュリティ設定
security.protocol=SSL
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=password
Javaコンシューマーの実装例
import org.apache.kafka.clients.consumer.*;
import java.util.*;
import java.time.Duration;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", false); // 手動コミット
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("パーティション: " +
record.partition() +
" オフセット: " + record.offset() +
" キー: " + record.key() +
" 値: " + record.value());
// メッセージ処理
processMessage(record);
}
// 手動コミット
consumer.commitSync();
}
} finally {
consumer.close();
}
}
private static void processMessage(ConsumerRecord<String, String> record) {
// ビジネスロジック
System.out.println("処理中: " + record.value());
}
}
オフセット管理
public class OffsetManagementExample {
public static void main(String[] args) {
// ...コンシューマー設定...
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// メッセージ処理
processMessage(record);
// 処理成功時のみコミット
consumer.commitSync();
} catch (Exception e) {
System.err.println("処理失敗: " + e.getMessage());
// オフセットをコミットしない = 再度処理される
continue;
}
}
// シーク機能:特定のオフセットまでリセット
TopicPartition tp =
new TopicPartition("my-topic", 0);
consumer.seek(tp, 100); // オフセット100から開始
}
}
}
ブローカー
ブローカークラスタの管理
クラスタ構成例:
3ノードクラスタ:
┌─────────────────────────────────────────────┐
│ Kafka Cluster │
├─────────────────────────────────────────────┤
│ │
│ Broker 1 (ID: 1) │
│ ├── Topic A: [Leader P0, P1] │
│ ├── Topic B: [Follower P0] │
│ └── JMX Port: 9999 │
│ │
│ Broker 2 (ID: 2) │
│ ├── Topic A: [Follower P2, Leader P1] │
│ ├── Topic B: [Leader P1] │
│ └── JMX Port: 9999 │
│ │
│ Broker 3 (ID: 3) │
│ ├── Topic A: [Follower P0, Follower P2] │
│ ├── Topic B: [Follower P1] │
│ └── JMX Port: 9999 │
│ │
└─────────────────────────────────────────────┘
リーダー選出
Kafkaはファイル内の最後のメッセージから制御するメカニズムを使用:
初期状態:
Broker 1 (Partition 0 Leader)
Broker 2 (Partition 0 Replica)
Broker 3 (Partition 0 Replica)
Broker 1がダウン:
→ ZooKeeperが検知
→ ISR (In-Sync Replicas) から新しいリーダーを選出
→ Broker 2が新しいリーダーに昇格
Broker 1が復帰:
→ Broker 2 に追いつく
→ 元のリーダーには戻らない (デフォルト)
ディスク管理
ログセグメントの管理:
# ファイルサイズによるロテーション
log.segment.bytes=1073741824 # 1GB
# 時間によるロテーション
log.roll.ms=86400000 # 24時間
# アクティブなセグメント以外の保持
log.retention.hours=168 # 7日間
log.retention.bytes=1073741824 # 1GB
# クリーンアップポリシー
log.cleanup.policy=delete # or 'compact'
ログクリーンアップポリシー:
-
Delete Policy (削除ポリシー)
時間または容量に基づいてログを削除 設定:log.cleanup.policy=delete 用途:イベントログ、一時データ -
Compact Policy (圧縮ポリシー)
最新の状態のみを保持 設定:log.cleanup.policy=compact 用途:ユーザーデータ、設定情報 例: キー1→値A → 削除 キー1→値B → 削除 キー1→値C → 保持(最新)
レプリケーション
レプリケーションの仕組み
パーティションのレプリケーション要因 (RF) = 3:
Broker 1 (Leader)
├── Write from Producer
├── Replicate → Broker 2
└── Replicate → Broker 3
Broker 2 (In-Sync Replica)
└── Ack → Broker 1
Broker 3 (In-Sync Replica)
└── Ack → Broker 1
Producer
└── Ack from Leader
ISR (In-Sync Replicas)
ISRはリーダーと同期しているレプリカのセットです:
# ISR設定
min.insync.replicas=2
# replica.lag.time.max.msを超えてフェッチしないレプリカは
# ISRから除外される
replica.lag.time.max.ms=10000
ISRから除外されるケース:
- ネットワーク遅延
- ブローカーのハング
- ディスク I/O の問題
高信頼性設定
# 最も安全な設定
acks=all
min.insync.replicas=2
unclean.leader.election.enable=false
log.flush.interval.messages=10000
log.flush.interval.ms=1000
この設定では:
- すべてのISRレプリカが確認するまでメッセージ送信は完了しない
- ISRが min.insync.replicas より少ないと書き込み不可
- 不潔なリーダー選出を防止(データ喪失を防ぐ)
設定とチューニング
パフォーマンスチューニング
1. ネットワークスループットの最適化:
# プロデューサー側
batch.size=32768 # バッチサイズを増やす
linger.ms=20 # メッセージ溜め込み時間
compression.type=snappy # 圧縮を有効化
# ブローカー側
num.network.threads=16 # ネットワークスレッド数
num.io.threads=16 # I/O スレッド数
socket.send.buffer.bytes=262144
socket.receive.buffer.bytes=262144
2. ディスク I/O の最適化:
# ブローカー側
log.flush.interval.ms=0 # 非同期フラッシュ
log.segment.bytes=1073741824
num.recovery.threads.per.data.dir=4
# コンシューマー側
fetch.min.bytes=10240
fetch.max.wait.ms=500
3. メモリ管理:
# ブローカー JVM 設定
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
export KAFKA_JVM_PERFORMANCE_OPTS="
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:+DisableExplicitGC
"
CPU 最適化
# マルチコア CPU の活用
num.network.threads=8
num.io.threads=8
background.threads=4
# キャッシュ効率の向上
log.index.interval.bytes=4096
socket.request.max.bytes=104857600
ストレージの考慮事項
計算例:
- スループット:100,000 msg/sec
- メッセージサイズ:1KB
- 保持期間:7日間
必要なストレージ容量 =
100,000 * 1KB * 60 * 60 * 24 * 7 * 3 (RF=3)
= 1.8 TB
監視対象メトリクス
プロデューサーメトリクス:
kafka.producer:type=producer-metrics,client-id=*
- record-send-total
- record-send-rate
- record-error-total
- record-error-rate
- record-retry-total
- batch-size-avg
- compression-rate-avg
ブローカーメトリクス:
kafka.server:type=BrokerTopicMetrics,name=*
- MessagesInPerSec
- MessagesOutPerSec
- BytesInPerSec
- BytesOutPerSec
- BytesRejectedPerSec
コンシューマーメトリクス:
kafka.consumer:type=consumer-metrics,client-id=*
- records-consumed-rate
- records-lag-max
- fetch-latency-avg
- fetch-latency-max
運用
クラスタのセットアップ
最小構成例(開発環境):
# 1. ZooKeeperの起動
bin/zookeeper-server-start.sh config/zookeeper.properties &
# 2. Kafkaブローカーの起動
bin/kafka-server-start.sh config/server.properties &
# 3. トピック作成
bin/kafka-topics.sh --create \
--topic test-topic \
--partitions 1 \
--replication-factor 1 \
--bootstrap-server localhost:9092
本番構成例(3ノード):
Node 1: Kafka Broker 1 + ZooKeeper 1
Node 2: Kafka Broker 2 + ZooKeeper 2
Node 3: Kafka Broker 3 + ZooKeeper 3
各ノードの server.properties:
# Node 1
broker.id=1
log.dirs=/data/kafka-logs-1
listeners=PLAINTEXT://kafka1.example.com:9092
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
# Node 2
broker.id=2
log.dirs=/data/kafka-logs-2
listeners=PLAINTEXT://kafka2.example.com:9092
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
# Node 3
broker.id=3
log.dirs=/data/kafka-logs-3
listeners=PLAINTEXT://kafka3.example.com:9092
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
トピック管理
# トピック一覧表示
kafka-topics.sh --list --bootstrap-server localhost:9092
# トピック詳細表示
kafka-topics.sh --describe --topic my-topic \
--bootstrap-server localhost:9092
# パーティション数の増加(減少は不可)
kafka-topics.sh --alter \
--topic my-topic \
--partitions 6 \
--bootstrap-server localhost:9092
# トピック設定の変更
kafka-configs.sh --alter \
--entity-type topics \
--entity-name my-topic \
--add-config retention.ms=604800000 \
--bootstrap-server localhost:9092
# トピック削除
kafka-topics.sh --delete --topic my-topic \
--bootstrap-server localhost:9092
レプリカ管理
# 復旧スクリプト作成
cat > reassign.json << 'EOF'
{
"version": 1,
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"replicas": [1, 2, 3]
},
{
"topic": "my-topic",
"partition": 1,
"replicas": [2, 3, 1]
}
]
}
EOF
# レプリカ再配置の実行
kafka-reassign-partitions.sh \
--zookeeper localhost:2181 \
--reassignment-json-file reassign.json \
--execute
# ステータス確認
kafka-reassign-partitions.sh \
--zookeeper localhost:2181 \
--reassignment-json-file reassign.json \
--verify
バックアップとリカバリ
# トピックのバックアップ(MirrorMaker 使用)
bin/connect-mirror-maker.sh \
config/connect-mirror-maker.properties
# 手動バックアップ
tar -czf kafka-backup-$(date +%Y%m%d).tar.gz \
/var/kafka-logs
# リカバリ
tar -xzf kafka-backup-20260407.tar.gz -C /
トラブルシューティング
1. プロデューサーがハング:
# ブローカーの状態確認
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# コンシューマーグループのオフセット確認
kafka-consumer-groups.sh --describe \
--group my-group \
--bootstrap-server localhost:9092
# メッセージ確認
kafka-console-consumer.sh \
--topic my-topic \
--from-beginning \
--bootstrap-server localhost:9092 \
--max-messages 10
2. ディスク満杯:
# ログサイズ確認
du -sh /var/kafka-logs
# 古いセグメント削除
kafka-configs.sh --alter \
--entity-type topics \
--entity-name my-topic \
--add-config retention.ms=86400000 \
--bootstrap-server localhost:9092
ユースケース
1. リアルタイムアナリティクス
電子商取引プラットフォームでのユーザー行動追跡:
ユーザー行動
↓
クライアント側 Kafka Producer
├── ページビュー イベント
├── クリック イベント
├── 購入 イベント
└── レビュー イベント
↓
Kafka Topic: "user-events"
(パーティション: 8, レプリケーション: 3)
↓
複数のコンシューマー
├── リアルタイム ダッシュボード
├── 異常検知 システム
├── 推奨エンジン
└── データ ウェアハウス
設定:
# トピック設定
retention.ms=2592000000 # 30日間
partitions=8
replication.factor=3
compression.type=snappy
min.insync.replicas=2
# プロデューサー設定
acks=all
retries=3
max.in.flight.requests.per.connection=5
enable.idempotence=true
2. マイクロサービス間通信
サービス間の非同期通信:
Service A (注文サービス)
├── 新規注文 イベント
└── 注文更新 イベント
↓
Kafka Topic: "order-events"
↓
Service B (在庫サービス)
Service C (配送サービス)
Service D (請求サービス)
Service E (通知サービス)
トピック設計:
topics:
- name: order-events
partitions: 4
replication-factor: 3
config:
retention.ms: 604800000 # 7日間
compression.type: snappy
- name: inventory-events
partitions: 4
replication-factor: 3
config:
retention.ms: 604800000
- name: notification-events
partitions: 2
replication-factor: 3
config:
retention.ms: 86400000 # 1日間
3. ログアグリゲーション
複数のサーバーから集約されたログ:
Application Servers
├── Server 1 ← Filebeat → Kafka
├── Server 2 ← Filebeat → Kafka
├── Server 3 ← Filebeat → Kafka
└── Server 4 ← Filebeat → Kafka
↓
Kafka Topic: "logs"
↓
Multiple Consumers
├── Elasticsearch
├── Data Lake
├── Real-time Alerts
└── Archive (S3)
Filebeat 設定例:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/application/*.log
output.kafka:
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic: 'logs'
partition.round_robin:
reachable_only: true
compression: gzip
ssl.enabled: true
ssl.verification_mode: full
4. CDC (Change Data Capture)
データベースの変更検知と配信:
Source Database
↓
CDC Tools (e.g., Debezium)
├── Insert → Kafka
├── Update → Kafka
└── Delete → Kafka
↓
Kafka Topics
├── customer-cdc
├── order-cdc
└── product-cdc
↓
Target Destinations
├── Target Database
├── Data Warehouse
├── Cache (Redis)
└── Search Engine (Elasticsearch)
Debezium MySQL Connector 設定:
{
"name": "mysql-connector-customers",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql.example.com",
"database.port": 3306,
"database.user": "debezium",
"database.password": "password",
"database.server.id": 1,
"database.server.name": "mysql-server-1",
"database.include.list": "ecommerce",
"table.include.list": "ecommerce.customers,ecommerce.orders",
"database.history.kafka.bootstrap.servers": "kafka1:9092,kafka2:9092,kafka3:9092",
"database.history.kafka.topic": "schema_changes_history",
"include.schema.changes": true,
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"decimal.handling.mode": "string"
}
}
5. ストリーミング分析
リアルタイムデータ処理(Kafka Streams 使用):
Input Streams
↓
Kafka Streams
├── Stateless Operations
│ ├── filter
│ ├── map
│ └── flatMap
├── Stateful Operations
│ ├── aggregate
│ ├── reduce
│ └── count
└── Join Operations
├── KStream-KStream
├── KStream-KTable
└── KTable-KTable
↓
Output Streams
├── Real-time Alerts
├── Aggregated Results
├── Joined Data
└── Back to Kafka Topics
Kafka Streams アプリケーション例:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class StreamingAnalyticsApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"streaming-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// ソーストピックから読み込み
KStream<String, String> purchases =
builder.stream("purchase-events");
// フィルタリング
KStream<String, String> highValuePurchases =
purchases.filter((key, value) -> {
try {
double amount = Double.parseDouble(value.split(",")[1]);
return amount > 1000;
} catch (Exception e) {
return false;
}
});
// 集計(30秒のウィンドウ)
highValuePurchases
.groupByKey()
.windowedBy(TimeWindows.of(
java.time.Duration.ofSeconds(30)))
.count()
.toStream()
.to("high-value-alerts");
// トピック結合
KStream<String, String> customers =
builder.stream("customer-events");
KStream<String, String> enriched =
purchases.join(customers,
(purchase, customer) -> {
return purchase + " | " + customer;
},
JoinWindows.of(
java.time.Duration.ofSeconds(60)));
enriched.to("enriched-events");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(
new Thread(streams::close));
}
}
まとめ
Apache Kafkaは、現代的な分散システムにおいてイベントストリーミングの中心的な役割を果たしています。
主な利点
- スケーラビリティ: 水平スケーリングにより数百万メッセージ/秒を処理可能
- 永続性: ディスクベースの保存により、メッセージの再処理が可能
- 信頼性: レプリケーションにより高可用性を実現
- 低レイテンシー: ミリ秒単位の処理遅延
- 柔軟性: 多様なユースケースに対応
設計上の考慮事項
- パーティショニング戦略: スループット要件に基づいて適切なパーティション数を選択
- レプリケーション係数: 信頼性要件に応じてバランスを取る
- 保持期間: ビジネス要件と容量計画に基づいて設定
- 監視: メトリクス収集と異常検知の実装
ベストプラクティス
-
運用面
- 定期的なバックアップ
- 適切なカパシティプランニング
- 継続的な監視とアラート
-
設定面
- 環境に応じた細粒度なチューニング
- セキュリティ設定の適切な実装
- ドキュメンテーションの維持
-
開発面
- べき等なプロデューサーの実装
- 手動オフセット管理の検討
- エラーハンドリングの充実
Kafkaは成熟したプラットフォームであり、適切に設計・運用することで、エンタープライズグレードのイベントストリーミングシステムを構築できます。
参考資料:
- Apache Kafka Official Documentation: https://kafka.apache.org/documentation/
- Kafka Design Document: https://kafka.apache.org/documentation/#design
- Confluent Kafka Best Practices: https://docs.confluent.io/
- "Kafka: The Definitive Guide" - O'Reilly Media
最終更新: 2026年4月7日 ドキュメント作成: Claude AI 対象読者: システムアーキテクト、DevOpsエンジニア、バックエンドデベロッパー