CloudEvents
CloudEvents 完全ガイド -- CNCF イベントデータ仕様の全容
第1章 はじめに
1.1 本書の目的と対象読者
本書は、Cloud Native Computing Foundation (CNCF) が策定したイベントデータ仕様である CloudEvents について、その設計思想、アーキテクチャ、具体的な実装方法を包括的に解説する技術文書である。マイクロサービスアーキテクチャやイベント駆動型システムの設計・開発・運用に携わるエンジニアを主な対象読者とし、CloudEvents の全体像から実装詳細に至るまでを体系的にまとめている。
1.2 イベント駆動型アーキテクチャの課題
現代のクラウドネイティブシステムでは、サービス間の通信手段としてイベント駆動型アーキテクチャ (Event-Driven Architecture: EDA) が広く採用されている。Apache Kafka、Amazon EventBridge、Google Cloud Pub/Sub、Azure Event Grid、RabbitMQ など多数のメッセージングプラットフォームが存在し、それぞれが独自のイベントフォーマットを定義している。
この状況は以下の課題を生み出していた。
- 相互運用性の欠如: 異なるプラットフォーム間でイベントを交換する際、フォーマット変換が必要となり、ブリッジコンポーネントの開発・保守コストが増大する。
- ツールチェーンの分断: イベントのルーティング、フィルタリング、監視などの共通機能を各プラットフォーム向けに個別開発する必要がある。
- 開発者体験の劣化: プラットフォームごとに異なる SDK やライブラリを習得する学習コストが高い。
- ベンダーロックイン: 特定のプラットフォームに依存したイベント設計は、将来の移行を困難にする。
- 可搬性の制約: FaaS (Function as a Service) 環境において、関数を異なるクラウドプロバイダーへ移行する際にイベント処理ロジックの書き直しが必要になる。
1.3 CloudEvents とは何か
CloudEvents は、これらの課題に対するソリューションとして 2018 年に CNCF Serverless Working Group の中で誕生したプロジェクトである。2019 年 10 月に v1.0 仕様がリリースされ、2024 年には CNCF の Graduated プロジェクト (最上位ステータス) に昇格した。
CloudEvents を一言で定義すると以下のようになる。
CloudEvents は、イベントデータを共通の方法で記述するための仕様 (specification) である。
CloudEvents は特定の実装やランタイム、プロトコルではない。あくまでイベントの「エンベロープ (封筒)」の形式を標準化する仕様であり、以下の特徴を持つ。
- ベンダー中立: 特定のクラウドプロバイダーやメッセージングプラットフォームに依存しない。
- プロトコル非依存: HTTP、AMQP、MQTT、Kafka、NATS など様々なトランスポートプロトコル上で利用可能。
- シリアライズ形式非依存: JSON、Avro、Protobuf など様々なエンコーディングに対応。
- 拡張可能: コア仕様を拡張して、ドメイン固有のメタデータを追加可能。
- 軽量: 必須属性はわずか 4 つ。最小構成のイベントは数行で記述できる。
1.4 CloudEvents が解決する問題
CloudEvents の導入により、以下の恩恵が得られる。
| 課題 | CloudEvents による解決 |
|---|---|
| 相互運用性の欠如 | 共通仕様により、異なるプラットフォーム間でイベントをそのまま交換可能 |
| ツールチェーンの分断 | 仕様準拠の共通ライブラリ・ミドルウェアを共有利用 |
| 開発者体験の劣化 | 統一された SDK と一貫した概念モデルにより学習コスト削減 |
| ベンダーロックイン | 標準仕様に準拠することでプラットフォーム移行が容易 |
| 可搬性の制約 | FaaS 関数のイベント処理を移植可能な形で記述 |
1.5 エコシステムと採用事例
CloudEvents は多くの主要クラウドプロバイダーとプラットフォームに採用されている。
- Microsoft Azure: Azure Event Grid のネイティブイベントスキーマとして CloudEvents v1.0 をサポート。
- Google Cloud: Eventarc が CloudEvents 形式でイベントを配信。Cloud Functions (第2世代) は CloudEvents をネイティブにサポート。
- Amazon Web Services: Amazon EventBridge が CloudEvents との変換をサポート。
- Knative: Knative Eventing は CloudEvents を基盤仕様として全面採用。
- Oracle Cloud: Oracle Cloud Infrastructure Events は CloudEvents 準拠。
- SAP: SAP Event Mesh が CloudEvents をサポート。
- Red Hat: OpenShift Serverless が Knative ベースで CloudEvents をネイティブサポート。
1.6 CNCF における位置づけ
CNCF のプロジェクト成熟度モデルにおいて、CloudEvents は以下の経緯を辿っている。
2018年5月 : CNCF Sandbox プロジェクトとして採択
2019年10月 : CloudEvents v1.0 仕様リリース
2021年11月 : CNCF Incubating プロジェクトに昇格
2024年 : CNCF Graduated プロジェクトに昇格
Graduated ステータスは、Kubernetes、Prometheus、Envoy などと同等の成熟度を示し、本番環境での広範な利用と、ガバナンス・セキュリティ面での成熟を意味する。
1.7 本書の構成
本書は以下の構成でCloudEventsの全容を解説する。
| 章 | 内容 |
|---|---|
| 第1章 | はじめに (本章) |
| 第2章 | コア仕様 -- 属性とイベント構造 |
| 第3章 | プロトコルバインディング |
| 第4章 | イベントフォーマット |
| 第5章 | コンテンツモード |
| 第6章 | 拡張属性 (Extensions) |
| 第7章 | サブスクリプション API |
| 第8章 | ディスカバリー API |
| 第9章 | SDK と実装 |
| 第10章 | アーキテクチャパターン |
| 第11章 | セキュリティ |
| 第12章 | 実装例とチュートリアル |
| 第13章 | 運用と可観測性 |
| 第14章 | 既存システムとの統合 |
| 第15章 | ベストプラクティスとアンチパターン |
| 第16章 | まとめと今後の展望 |
第2章 コア仕様 -- 属性とイベント構造
2.1 CloudEvents のデータモデル
CloudEvents の核心は、イベントの「メタデータ」と「データ」を明確に分離するデータモデルにある。すべての CloudEvent は以下の構成要素から成る。
+----------------------------------------------+
| CloudEvent |
| |
| +------------------+ +------------------+ |
| | Context | | Event Data | |
| | Attributes | | (payload) | |
| | | | | |
| | - Required | | 任意の形式の | |
| | - Optional | | ビジネスデータ | |
| | - Extension | | | |
| +------------------+ +------------------+ |
+----------------------------------------------+
コンテキスト属性 (Context Attributes): イベントの発生元、種類、一意識別子などのメタデータ。ルーティングやフィルタリングに使用される。
イベントデータ (Event Data / payload): イベントの実質的な内容を表すドメイン固有のデータ。CloudEvents 仕様はこの部分の形式を規定しない。
この分離により、ミドルウェアやルーターはイベントデータの中身を解釈することなく、コンテキスト属性だけを参照してイベントのルーティングやフィルタリングを実行できる。
2.2 必須属性 (REQUIRED Attributes)
CloudEvents v1.0 で定義される必須属性は以下の 4 つである。これらの属性がすべて存在しないイベントは、有効な CloudEvent とは見なされない。
2.2.1 id
| 項目 | 内容 |
|---|---|
| 型 | String |
| 制約 | 空文字列不可 |
| 意味 | イベントの一意識別子 |
id は source と組み合わせてイベントの一意性を担保する。同じ source から発行される 2 つのイベントが同じ id を持つ場合、それらは重複 (duplicate) と見なされる。
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
UUID v4 が一般的に使用されるが、仕様はフォーマットを規定しない。ULID、Snowflake ID、その他のスキームも使用可能である。
2.2.2 source
| 項目 | 内容 |
|---|---|
| 型 | URI-Reference (RFC 3986) |
| 制約 | 空文字列不可 |
| 意味 | イベントの発生元を識別する |
イベントがどのシステム、サービス、リソースから発生したかを示す。絶対 URI が推奨されるが、相対 URI 参照も許容される。
"source": "https://example.com/services/order-service"
"source": "/myapp/order/12345"
"source": "urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66"
推奨される source の設計パターン:
# クラウドリソースを示す場合
"source": "//pubsub.googleapis.com/projects/my-project/topics/my-topic"
# サービスとリソースの組み合わせ
"source": "https://api.example.com/orders/12345"
# Kubernetes リソース
"source": "/apis/apps/v1/namespaces/default/deployments/my-app"
2.2.3 specversion
| 項目 | 内容 |
|---|---|
| 型 | String |
| 制約 | 空文字列不可 |
| 意味 | 使用する CloudEvents 仕様のバージョン |
現時点で有効な値は "1.0" のみである。
"specversion": "1.0"
2.2.4 type
| 項目 | 内容 |
|---|---|
| 型 | String |
| 制約 | 空文字列不可 |
| 意味 | イベントの種類を示す |
type はイベントのカテゴリを表し、ルーティングやサブスクリプションフィルタリングで最も頻繁に使用される属性である。
"type": "com.example.order.created"
"type": "com.github.pull_request.opened"
"type": "io.cloudevents.test"
推奨される命名規則は逆ドメイン名表記 (reverse domain name notation) である。
com.{company}.{service}.{action}
com.example.warehouse.item.shipped
2.3 オプション属性 (OPTIONAL Attributes)
2.3.1 datacontenttype
data 属性の MIME タイプを示す。型は String (RFC 2046 Content-Type)。
"datacontenttype": "application/json"
"datacontenttype": "application/xml"
"datacontenttype": "text/plain"
2.3.2 dataschema
data 属性のスキーマを示す URI。
"dataschema": "https://schemas.example.com/order/v1.0/created.json"
2.3.3 subject
source のコンテキストにおけるイベントの対象。
"source": "https://example.com/storage/buckets",
"subject": "images/photo.jpg"
2.3.4 time
イベントの発生時刻。型は Timestamp (RFC 3339)。
"time": "2025-01-15T09:30:00.000Z"
2.4 data 属性
data (または data_base64) はイベントの実際のペイロードを格納する。
{
"specversion": "1.0",
"id": "evt-001",
"source": "https://example.com/orders",
"type": "com.example.order.created",
"datacontenttype": "application/json",
"time": "2025-01-15T09:30:00Z",
"data": {
"orderId": "ORD-12345",
"customerId": "CUST-678",
"items": [
{"productId": "PROD-001", "quantity": 2, "price": 29.99}
],
"totalAmount": 59.98,
"currency": "JPY"
}
}
バイナリデータを含める場合は data_base64 を使用する。data と data_base64 は相互排他である。
2.5 属性の型システム
| 型 | 説明 | 例 |
|---|---|---|
| Boolean | 真偽値 | true, false |
| Integer | 整数値 | 42, -1 |
| String | Unicode 文字列 | "hello" |
| Binary | バイト列 | Base64 エンコード |
| URI | RFC 3986 準拠の絶対 URI | "https://example.com" |
| URI-Reference | RFC 3986 準拠の URI 参照 | "/path/to/resource" |
| Timestamp | RFC 3339 準拠のタイムスタンプ | "2025-01-15T09:30:00Z" |
2.6 属性命名規則
- 小文字の英数字 (
a-z,0-9) のみで構成 - 1文字以上20文字以下
- 英字で始まること
2.7 完全な CloudEvent 例
{
"specversion": "1.0",
"id": "b3c4d5e6-f7a8-9012-bcde-f34567890123",
"source": "https://example.com/services/payment-service",
"type": "com.example.payment.completed",
"datacontenttype": "application/json",
"dataschema": "https://schemas.example.com/payment/v2.0/completed.json",
"subject": "payment/PAY-98765",
"time": "2025-01-15T09:30:00.123456789Z",
"data": {
"paymentId": "PAY-98765",
"orderId": "ORD-12345",
"amount": 15000,
"currency": "JPY",
"method": "credit_card",
"status": "completed",
"processedAt": "2025-01-15T09:30:00.100Z"
}
}
第3章 プロトコルバインディング
3.1 プロトコルバインディングとは
プロトコルバインディング (Protocol Binding) は、CloudEvents をトランスポートプロトコル上でどのように送受信するかを定義する仕様である。
| プロトコル | 状態 | 主な用途 |
|---|---|---|
| HTTP | Stable (v1.0) | REST API、Webhook |
| AMQP | Working Draft | メッセージキュー (RabbitMQ 等) |
| MQTT | Working Draft | IoT デバイス通信 |
| Kafka | Stable (v1.0) | ストリーミングプラットフォーム |
| NATS | Working Draft | クラウドネイティブメッセージング |
| WebSocket | Working Draft | リアルタイム双方向通信 |
3.2 HTTP プロトコルバインディング
3.2.1 バイナリコンテンツモード (Binary Content Mode)
コンテキスト属性を HTTP ヘッダーに、イベントデータを HTTP ボディに格納する。
POST /events HTTP/1.1
Host: example.com
Content-Type: application/json
ce-specversion: 1.0
ce-id: evt-001
ce-source: https://example.com/orders
ce-type: com.example.order.created
ce-time: 2025-01-15T09:30:00Z
ce-subject: order/ORD-12345
{
"orderId": "ORD-12345",
"customerId": "CUST-678",
"totalAmount": 59.98,
"currency": "JPY"
}
HTTP ヘッダー名の規則:
- 必須/オプション属性:
ce-プレフィックス + 属性名 datacontenttypeは HTTP のContent-Typeヘッダーにマッピングdataは HTTP ボディにそのまま格納
3.2.2 構造化コンテンツモード (Structured Content Mode)
POST /events HTTP/1.1
Host: example.com
Content-Type: application/cloudevents+json
{
"specversion": "1.0",
"id": "evt-001",
"source": "https://example.com/orders",
"type": "com.example.order.created",
"time": "2025-01-15T09:30:00Z",
"subject": "order/ORD-12345",
"datacontenttype": "application/json",
"data": {
"orderId": "ORD-12345",
"customerId": "CUST-678",
"totalAmount": 59.98,
"currency": "JPY"
}
}
3.2.3 バッチモード (Batched Content Mode)
POST /events HTTP/1.1
Host: example.com
Content-Type: application/cloudevents-batch+json
[
{
"specversion": "1.0",
"id": "batch-001",
"source": "https://example.com/orders",
"type": "com.example.order.created",
"data": {"orderId": "ORD-001"}
},
{
"specversion": "1.0",
"id": "batch-002",
"source": "https://example.com/orders",
"type": "com.example.order.created",
"data": {"orderId": "ORD-002"}
}
]
3.2.4 HTTP レスポンスのハンドリング
| ステータスコード | 意味 |
|---|---|
| 200 OK / 201 Created / 202 Accepted / 204 No Content | イベント受理成功 |
| 400 Bad Request | 不正な CloudEvent (属性不足等) |
| 415 Unsupported Media Type | 未対応の Content-Type |
| 429 Too Many Requests | レート制限超過 |
3.3 Kafka プロトコルバインディング
3.3.1 バイナリコンテンツモード
Kafka Record:
Key: "ORD-12345"
Headers:
ce_specversion: "1.0"
ce_id: "evt-001"
ce_source: "https://example.com/orders"
ce_type: "com.example.order.created"
ce_time: "2025-01-15T09:30:00Z"
content-type: "application/json"
Value: {"orderId":"ORD-12345","totalAmount":59.98}
3.3.2 構造化コンテンツモード
Kafka Record:
Key: "ORD-12345"
Headers:
content-type: "application/cloudevents+json"
Value: {
"specversion": "1.0",
"id": "evt-001",
"source": "https://example.com/orders",
"type": "com.example.order.created",
"datacontenttype": "application/json",
"data": {"orderId":"ORD-12345","totalAmount":59.98}
}
3.4 AMQP プロトコルバインディング
AMQP Message:
Application Properties:
cloudEvents:specversion: "1.0"
cloudEvents:id: "evt-001"
cloudEvents:source: "https://example.com/orders"
cloudEvents:type: "com.example.order.created"
Content-Type: "application/json"
Body: {"orderId":"ORD-12345"}
3.5 MQTT プロトコルバインディング
MQTT v5.0 バイナリモード
MQTT PUBLISH:
Topic: /cloudevents/orders
User Properties:
specversion: "1.0"
id: "evt-001"
source: "urn:device:sensor-001"
type: "io.example.temperature.reading"
Content Type: "application/json"
Payload: {"temperature":23.5,"unit":"celsius"}
3.6 NATS プロトコルバインディング
NATS Message:
Subject: "cloudevents.orders.created"
Headers:
ce-specversion: 1.0
ce-id: evt-001
ce-source: https://example.com/orders
ce-type: com.example.order.created
Content-Type: application/json
Payload: {"orderId":"ORD-12345"}
3.7 プロトコルバインディング選択の指針
| 要件 | 推奨バインディング | 理由 |
|---|---|---|
| Webhook / REST API | HTTP | 広範なサポート、シンプルな統合 |
| 高スループットストリーミング | Kafka | パーティション並列性、永続化 |
| メッセージキューイング | AMQP | 柔軟なルーティング、確実な配信 |
| IoT デバイス | MQTT | 軽量プロトコル、低帯域幅 |
| 低レイテンシメッセージング | NATS | 高速、軽量 |
| リアルタイム Web | WebSocket | 双方向通信、プッシュ配信 |
第4章 イベントフォーマット
4.1 イベントフォーマットの役割
イベントフォーマット (Event Format) は、CloudEvents をシリアライズ/デシリアライズする方法を定義する。プロトコルバインディングが「どのプロトコルで送るか」を定義するのに対し、イベントフォーマットは「どの形式でエンコードするか」を定義する。
4.2 JSON イベントフォーマット
| CloudEvents 型 | JSON 型 |
|---|---|
| Boolean | boolean |
| Integer | number (整数) |
| String | string |
| Binary | string (Base64 エンコード) |
| URI | string |
| URI-Reference | string |
| Timestamp | string (RFC 3339) |
4.3 Avro イベントフォーマット
Apache Avro はコンパクトなバイナリフォーマットであり、高スループット環境に適している。スキーマレジストリとの親和性が高い。
4.4 Protobuf イベントフォーマット
syntax = "proto3";
package io.cloudevents.v1;
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
message CloudEvent {
string id = 1;
string source = 2;
string spec_version = 3;
string type = 4;
map<string, CloudEventAttributeValue> attributes = 5;
oneof data {
bytes binary_data = 6;
string text_data = 7;
google.protobuf.Any proto_data = 8;
}
}
message CloudEventAttributeValue {
oneof attr {
bool ce_boolean = 1;
int32 ce_integer = 2;
string ce_string = 3;
bytes ce_bytes = 4;
string ce_uri = 5;
string ce_uri_ref = 6;
google.protobuf.Timestamp ce_timestamp = 7;
}
}
message CloudEventBatch {
repeated CloudEvent events = 1;
}
4.5 フォーマット選択の指針
| フォーマット | MIME タイプ | 適用場面 |
|---|---|---|
| JSON | application/cloudevents+json | 汎用、デバッグ容易、Webhook |
| Avro | application/cloudevents+avro | 高スループット、Kafka 連携 |
| Protobuf | application/cloudevents+protobuf | gRPC 連携、型安全性重視 |
| XML | application/cloudevents+xml | レガシー統合、エンタープライズ |
第5章 コンテンツモード
5.1 コンテンツモードの概要
コンテンツモード (Content Mode) は、CloudEvents のメタデータとイベントデータをトランスポートメッセージのどこに配置するかを決定する。
5.2 バイナリコンテンツモード
+---------------------------------------------------+
| Transport Message |
| Metadata Section (Headers) |
| +-----------------------------------------------+|
| | ce-specversion: 1.0 ||
| | ce-id: evt-001 ||
| | ce-source: https://example.com/orders ||
| | ce-type: com.example.order.created ||
| | Content-Type: application/json ||
| +-----------------------------------------------+|
| Body Section |
| +-----------------------------------------------+|
| | {"orderId":"ORD-12345","totalAmount":59.98} ||
| +-----------------------------------------------+|
+---------------------------------------------------+
5.3 構造化コンテンツモード
+---------------------------------------------------+
| Transport Message |
| Metadata Section (Headers) |
| +-----------------------------------------------+|
| | Content-Type: application/cloudevents+json ||
| +-----------------------------------------------+|
| Body Section |
| +-----------------------------------------------+|
| | { ||
| | "specversion": "1.0", ||
| | "id": "evt-001", ||
| | "source": "...", ||
| | "type": "...", ||
| | "data": { ... } ||
| | } ||
| +-----------------------------------------------+|
+---------------------------------------------------+
5.4 モード選択のフローチャート
イベント送信が必要
|
v
バイナリデータを含むか? -- Yes -> バイナリモード
|
No
v
中間プロキシが CloudEvents を認識しないか? -- Yes -> バイナリモード
|
No
v
イベントのログ/監査/保存が重要か? -- Yes -> 構造化モード
|
No
v
大量のイベントを一括送信するか? -- Yes -> バッチモード
|
No -> バイナリモードまたは構造化モード
第6章 拡張属性 (Extensions)
6.1 拡張メカニズムの概要
拡張属性 (Extension Attributes) は、コア仕様に含まれないドメイン固有のメタデータをイベントに付加するためのメカニズムである。
6.2 公式に文書化された拡張属性
6.2.1 Distributed Tracing Extension
{
"specversion": "1.0",
"id": "trace-001",
"source": "https://example.com/orders",
"type": "com.example.order.created",
"traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
"tracestate": "congo=t61rcWkgMzE",
"data": {"orderId": "ORD-12345"}
}
6.2.2 Sequence Extension
{
"specversion": "1.0",
"id": "seq-005",
"source": "https://example.com/event-log",
"type": "com.example.log.entry",
"sequence": "5",
"sequencetype": "Integer",
"data": {"message": "User logged in", "userId": "USER-123"}
}
6.2.3 Partitioning Extension
{
"specversion": "1.0",
"id": "part-001",
"source": "https://example.com/orders",
"type": "com.example.order.created",
"partitionkey": "customer-678",
"data": {"orderId": "ORD-12345", "customerId": "CUST-678"}
}
6.2.4 Dataref Extension
{
"specversion": "1.0",
"id": "ref-001",
"source": "https://example.com/image-processor",
"type": "com.example.image.uploaded",
"dataref": "https://storage.example.com/images/photo-12345.jpg",
"data": {"imageId": "IMG-12345", "width": 1920, "height": 1080}
}
6.2.5 Sampling Rate Extension
{
"specversion": "1.0",
"id": "sample-001",
"source": "https://example.com/metrics",
"type": "com.example.metric.collected",
"sampledrate": 100,
"data": {"metric": "response_time", "value": 45.2, "unit": "ms"}
}
6.3 カスタム拡張属性の設計原則
- 属性名の衝突回避: 公式拡張属性や将来のコア属性と衝突しない名前を選択する。
- 最小限のメタデータ: ルーティングやフィルタリングに必要なメタデータのみを拡張属性とし、ビジネスデータは
dataに格納する。 - 型の適切な選択: CloudEvents の型システムがサポートする型を使用する。
- ドキュメント化: 拡張属性の意味、型、制約を明文化する。
6.4 拡張属性の使用パターン
パターン1: マルチテナント環境
{
"specversion": "1.0",
"id": "mt-001",
"source": "https://saas.example.com/billing",
"type": "com.example.invoice.generated",
"tenantid": "tenant-12345",
"tenantregion": "jp",
"data": {"invoiceId": "INV-001", "amount": 50000}
}
パターン2: Saga/ワークフロー管理
{
"specversion": "1.0",
"id": "saga-step-003",
"source": "https://example.com/inventory-service",
"type": "com.example.inventory.reserved",
"sagaid": "saga-order-12345",
"sagastep": "3",
"correlationid": "corr-order-12345",
"data": {
"orderId": "ORD-12345",
"items": [{"productId": "PROD-001", "quantity": 2, "reserved": true}]
}
}
第7章 サブスクリプション API
7.1 概要
CloudEvents Subscription API は、イベントのサブスクリプション (購読) を管理するための RESTful API 仕様である。
7.2 サブスクリプションオブジェクト
| 属性 | 型 | 必須 | 説明 |
|---|---|---|---|
| id | String | Yes | サブスクリプションの一意識別子 |
| source | URI | No | フィルタ対象の source 値 |
| types | String[] | No | フィルタ対象の type 値のリスト |
| sink | URI | Yes | イベント配信先の URI |
| protocol | String | Yes | 配信プロトコル |
| filters | Filter[] | No | イベントフィルタの配列 |
7.3 CRUD 操作
POST /subscriptions HTTP/1.1
Content-Type: application/json
{
"protocol": "HTTP",
"sink": "https://consumer.example.com/events",
"types": ["com.example.order.created", "com.example.order.updated"],
"source": "https://example.com/order-service",
"config": {
"maxretries": 3,
"deadletterqueue": "https://consumer.example.com/dlq"
}
}
7.4 フィルタリングダイアレクト
exact フィルタ
{"filters": [{"exact": {"type": "com.example.order.created"}}]}
prefix フィルタ
{"filters": [{"prefix": {"type": "com.example.order."}}]}
複合フィルタ (NOT / AND / OR)
{
"filters": [{
"all": [
{"prefix": {"type": "com.example.order."}},
{"not": {"exact": {"type": "com.example.order.deleted"}}}
]
}]
}
CESQL フィルタ
-- CESQL 構文例
type LIKE 'com.example.order.%' AND source = 'https://example.com/orders'
type IN ('com.example.order.created', 'com.example.order.updated')
priority = 'high' AND tenantid = 'tenant-12345'
NOT (type = 'com.example.test' OR type = 'com.example.debug')
第8章 ディスカバリー API
8.1 概要
CloudEvents Discovery API は、イベントプロデューサーが「どのようなイベントを発行しているか」をプログラマティックに公開するための仕様である。
8.2 サービスオブジェクト
{
"id": "svc-order-service",
"name": "Order Service",
"url": "https://api.example.com/order-service",
"description": "Manages order lifecycle events",
"specversions": ["1.0"],
"subscriptionurl": "https://api.example.com/order-service/subscriptions",
"protocols": ["HTTP"],
"events": [
{
"type": "com.example.order.created",
"description": "Emitted when a new order is placed",
"datacontenttype": "application/json",
"dataschema": "https://schemas.example.com/order/v1/created.json"
},
{
"type": "com.example.order.updated",
"description": "Emitted when an order is modified"
},
{
"type": "com.example.order.cancelled",
"description": "Emitted when an order is cancelled"
}
]
}
8.3 イベントカタログの構築
+-------------------+ +-----------------+ +-------------------+
| Order Service | | Event Catalog | | Developer Portal |
| | --> | Service | --> | |
| GET /services | | (Aggregator) | | - 検索可能な |
+-------------------+ | | | イベント一覧 |
| | | - スキーマ閲覧 |
+-------------------+ | | | - サンプルコード |
| Payment Service | --> | | | 自動生成 |
| GET /services | +-----------------+ +-------------------+
+-------------------+
第9章 SDK と実装
9.1 公式 SDK 一覧
| 言語/ランタイム | リポジトリ | 成熟度 |
|---|---|---|
| Go | cloudevents/sdk-go | Stable |
| Java | cloudevents/sdk-java | Stable |
| JavaScript/TypeScript | cloudevents/sdk-javascript | Stable |
| Python | cloudevents/sdk-python | Stable |
| C# (.NET) | cloudevents/sdk-csharp | Stable |
| Rust | cloudevents/sdk-rust | Active Development |
| Ruby | cloudevents/sdk-ruby | Active Development |
| PHP | cloudevents/sdk-php | Active Development |
9.2 Go SDK の使用例
package main
import (
"context"
"log"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
type OrderCreated struct {
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
TotalAmount float64 `json:"totalAmount"`
Currency string `json:"currency"`
}
func main() {
c, err := cloudevents.NewClientHTTP()
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
event := cloudevents.NewEvent()
event.SetID("go-evt-001")
event.SetSource("https://example.com/go-service/orders")
event.SetType("com.example.order.created")
event.SetTime(time.Now())
data := OrderCreated{
OrderID: "ORD-12345", CustomerID: "CUST-678",
TotalAmount: 15000.0, Currency: "JPY",
}
event.SetData(cloudevents.ApplicationJSON, data)
ctx := cloudevents.ContextWithTarget(context.Background(),
"http://localhost:8080/events")
result := c.Send(ctx, event)
if cloudevents.IsUndelivered(result) {
log.Fatalf("Failed to send: %v", result)
}
}
9.3 Python SDK の使用例
from uuid import uuid4
from cloudevents.http import CloudEvent, to_structured
import requests
import datetime
attributes = {
"id": str(uuid4()),
"source": "https://example.com/python-service/orders",
"type": "com.example.order.created",
"time": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"datacontenttype": "application/json",
}
data = {
"orderId": "ORD-12345",
"customerId": "CUST-678",
"totalAmount": 15000,
"currency": "JPY",
}
event = CloudEvent(attributes, data)
headers, body = to_structured(event)
response = requests.post("http://localhost:8080/events", headers=headers, data=body)
9.4 JavaScript/TypeScript SDK の使用例
import { CloudEvent, httpTransport, emitterFor } from "cloudevents";
const event = new CloudEvent({
id: "ts-evt-001",
source: "https://example.com/ts-service/orders",
type: "com.example.order.created",
datacontenttype: "application/json",
data: {
orderId: "ORD-12345",
customerId: "CUST-678",
totalAmount: 15000,
currency: "JPY",
},
});
const emit = emitterFor(httpTransport("http://localhost:8080/events"));
await emit(event);
9.5 Java SDK の使用例 (Spring Framework)
import io.cloudevents.CloudEvent;
import io.cloudevents.spring.http.CloudEventHttpUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/events")
public class CloudEventController {
@PostMapping
public ResponseEntity<Void> receiveEvent(
@RequestHeader HttpHeaders headers,
@RequestBody byte[] body) {
CloudEvent event = CloudEventHttpUtils.toReader(headers, body).toEvent();
System.out.printf("Received: id=%s, type=%s, source=%s%n",
event.getId(), event.getType(), event.getSource());
return ResponseEntity.ok().build();
}
}
第10章 アーキテクチャパターン
10.1 Point-to-Point (P2P) パターン
+-----------+ CloudEvent (HTTP) +-----------+
| Producer | ------------------------> | Consumer |
| (Order | POST /events | (Email |
| Service) | ce-type: order.created | Service) |
+-----------+ +-----------+
10.2 Publish/Subscribe パターン
+-----------+ +----------------+ +-----------+
| Producer | -- CloudEvent --> | Event Broker | -- CloudEvent --> | Consumer A|
+-----------+ | (Kafka/Knative)| +-----------+
| | -- CloudEvent --> | Consumer B|
+-----------+ | | +-----------+
| Producer | -- CloudEvent --> +----------------+ -- CloudEvent --> | Consumer C|
+-----------+ +-----------+
10.3 Event Sourcing パターン
すべての状態変更を CloudEvent として記録し、イベントストアに永続化する。
// Event 1: 注文作成
{
"specversion": "1.0", "id": "es-001",
"source": "https://example.com/orders/ORD-123",
"type": "com.example.order.created",
"sequence": "1", "sequencetype": "Integer",
"data": {"orderId": "ORD-123", "total": 2000}
}
// Event 2: アイテム追加
{
"specversion": "1.0", "id": "es-002",
"source": "https://example.com/orders/ORD-123",
"type": "com.example.order.item_added",
"sequence": "2", "sequencetype": "Integer",
"data": {"orderId": "ORD-123", "item": {"productId": "P-002", "qty": 1}, "newTotal": 5000}
}
10.4 Saga パターン
Order Service Payment Service Inventory Service Shipping Service
| | | |
| order.created | | |
|--------------------->| | |
| | payment.completed | |
| |--------------------->| |
| | | inventory.reserved |
| | |--------------------->|
| | | | shipping.scheduled
|<--------------------------------------------------------------------|
10.5 Claim Check パターン
大きなイベントデータを外部ストレージに保存し、CloudEvent には参照のみを含める。
{
"specversion": "1.0",
"id": "claim-001",
"source": "https://example.com/document-service",
"type": "com.example.document.uploaded",
"dataref": "https://storage.example.com/documents/doc-12345.pdf",
"data": {
"documentId": "DOC-12345",
"fileName": "contract.pdf",
"fileSize": 5242880,
"mimeType": "application/pdf"
}
}
10.6 Dead Letter Queue パターン
処理に失敗したイベントを DLQ に退避し、後でリトライまたは調査する。
第11章 セキュリティ
11.1 WebHook Abuse Protection
# Step 1: 送信者が OPTIONS リクエストで検証
OPTIONS /events HTTP/1.1
Host: consumer.example.com
WebHook-Request-Origin: https://producer.example.com
WebHook-Request-Callback: https://producer.example.com/webhook/confirm?token=abc123
# Step 2: 受信者が許可を返す
HTTP/1.1 200 OK
WebHook-Allowed-Origin: https://producer.example.com
11.2 トランスポートレベルのセキュリティ
- すべての通信は TLS 1.2 以上で暗号化
- サービスメッシュ内では mTLS を推奨
- OAuth 2.0 トークンによる認証
11.3 データレベルのセキュリティ
- PII はコンテキスト属性に含めない (subject にメールアドレス等を使用しない)
- 機密データのペイロード暗号化を検討 (JWE 等)
11.4 セキュリティチェックリスト
| カテゴリ | チェック項目 |
|---|---|
| トランスポート | TLS 1.2 以上を使用しているか |
| トランスポート | Webhook エンドポイントは検証ハンドシェイクを実装しているか |
| 認証 | イベント送信者の認証メカニズムがあるか |
| 認可 | イベントタイプごとのアクセス制御ポリシーがあるか |
| データ保護 | PII がコンテキスト属性に含まれていないか |
| 監査 | イベントの送受信ログを適切に記録しているか |
| 可用性 | レート制限が設定されているか |
第12章 実装例とチュートリアル
12.1 Knative Eventing を使った CloudEvents パイプライン
# Broker の作成
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: default
namespace: demo
spec:
delivery:
retry: 3
backoffPolicy: exponential
backoffDelay: "PT2S"
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: dead-letter-service
---
# Trigger (フィルタリングルール)
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: order-created-trigger
namespace: demo
spec:
broker: default
filter:
attributes:
type: com.example.order.created
source: https://example.com/order-service
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: order-processor
12.2 Azure Event Grid での CloudEvents
# CloudEvents スキーマを使用する Event Grid トピックの作成
az eventgrid topic create \
--name my-cloudevents-topic \
--resource-group my-rg \
--location japaneast \
--input-schema cloudeventschemav1_0
# イベントの発行
curl -X POST \
"https://my-cloudevents-topic.japaneast-1.eventgrid.azure.net/api/events" \
-H "Content-Type: application/cloudevents+json" \
-H "aeg-sas-key: {your-sas-key}" \
-d '{
"specversion": "1.0",
"id": "azure-001",
"source": "/azure/my-app",
"type": "com.example.order.created",
"datacontenttype": "application/json",
"data": {"orderId": "ORD-12345", "amount": 5000}
}'
12.3 Google Cloud Eventarc での CloudEvents
gcloud eventarc triggers create storage-trigger \
--location=asia-northeast1 \
--destination-run-service=my-service \
--destination-run-region=asia-northeast1 \
--event-filters="type=google.cloud.storage.object.v1.finalized" \
--event-filters="bucket=my-bucket" \
--service-account=my-sa@my-project.iam.gserviceaccount.com
第13章 運用と可観測性
13.1 メトリクスの収集
| メトリクス | 説明 | アラート閾値例 |
|---|---|---|
| cloudevents_produced_total | 発行されたイベント数 | 急激な増減 |
| cloudevents_consumed_total | 消費されたイベント数 | 急激な増減 |
| cloudevents_failed_total | 処理失敗したイベント数 | > 0 |
| cloudevents_dlq_total | DLQ に送られたイベント数 | > 0 |
| cloudevents_delivery_latency | 配信レイテンシ (ms) | p99 > 1000ms |
| cloudevents_processing_duration | 処理時間 (ms) | p99 > 5000ms |
| cloudevents_retry_count | リトライ回数 | > 3 |
13.2 分散トレーシング
CloudEvents の Distributed Tracing Extension (traceparent/tracestate) を使用して、OpenTelemetry によるイベントフロー追跡を実現する。
13.3 ログの構造化
{
"timestamp": "2025-01-15T09:30:00.123Z",
"level": "INFO",
"message": "Event processed successfully",
"cloudevent.id": "evt-001",
"cloudevent.type": "com.example.order.created",
"cloudevent.source": "https://example.com/orders",
"trace.id": "0af7651916cd43dd8448eb211c80319c",
"processing.duration_ms": 45
}
第14章 既存システムとの統合
14.1 Adapter パターン (アンチコラプションレイヤー)
from cloudevents.http import CloudEvent
import datetime, uuid
def transform_legacy_to_cloudevent(legacy_event: dict) -> CloudEvent:
event_type_mapping = {
"new_order": "com.example.order.created",
"update_order": "com.example.order.updated",
"cancel_order": "com.example.order.cancelled",
}
ce_type = event_type_mapping.get(
legacy_event.get("evt"),
f"com.example.legacy.{legacy_event.get('evt', 'unknown')}"
)
timestamp = datetime.datetime.fromtimestamp(
legacy_event.get("ts", 0), tz=datetime.timezone.utc
).isoformat()
attributes = {
"id": str(uuid.uuid4()),
"source": "https://example.com/legacy-adapter",
"type": ce_type,
"time": timestamp,
"datacontenttype": "application/json",
}
return CloudEvent(attributes, legacy_event.get("payload", {}))
14.2 Change Data Capture (CDC) + CloudEvents
+---------------+ +-----------+ +------------------+ +----------+
| PostgreSQL | --> | Debezium | --> | Kafka | --> | Consumer |
| (Source DB) | | Connector | | (CloudEvents) | | |
+---------------+ +-----------+ +------------------+ +----------+
14.3 API Gateway / サービスメッシュとの統合
# Istio VirtualService での CloudEvents ルーティング
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: cloudevents-routing
spec:
hosts:
- events.internal.example.com
http:
- match:
- headers:
ce-type:
prefix: "com.example.order."
route:
- destination:
host: order-service
port:
number: 8080
- match:
- headers:
ce-type:
prefix: "com.example.payment."
route:
- destination:
host: payment-service
port:
number: 8080
第15章 ベストプラクティスとアンチパターン
15.1 ベストプラクティス
BP-1: イベントタイプの命名は逆ドメイン名表記を使用する (com.example.order.created)
BP-2: source URI は一意で永続的な識別子とする
BP-3: イベントデータにはイベント発生時のスナップショットを含める (コンシューマーが追加 API 呼び出し不要)
BP-4: 冪等性を確保する (id + source で重複検知)
BP-5: イベントサイズは 64KB 以下に保つ (大きなデータは dataref を使用)
BP-6: スキーマバージョニングを実施する
BP-7: 後方互換性を維持する (フィールド追加は互換、削除/型変更は破壊的変更)
BP-8: Dead Letter Queue (DLQ) を必ず設定する
BP-9: time 属性を必ず設定する
15.2 アンチパターン
AP-1: Fat Event - イベントに過剰なデータを詰め込む
AP-2: Command Disguised as Event - イベントを命令形にする (send_confirmation_email ではなく order.created とし、コンシューマー側でアクションを決定)
AP-3: Chatty Events - フィールド単位でイベントを発行する (業務単位でまとめる)
AP-4: Missing Idempotency - 重複チェックなしの処理
AP-5: Temporal Coupling - イベントの処理順序に依存する設計
15.3 イベント設計チェックリスト
| # | チェック項目 | 必須/推奨 |
|---|---|---|
| 1 | type は逆ドメイン名表記か | 推奨 |
| 2 | source は永続的で一意な URI か | 必須 |
| 3 | id は source スコープ内で一意か | 必須 |
| 4 | time 属性が設定されているか | 推奨 |
| 5 | イベントサイズは 64KB 以下か | 推奨 |
| 6 | PII がコンテキスト属性に含まれていないか | 必須 |
| 7 | 冪等性を確保する仕組みがあるか | 必須 |
| 8 | DLQ が設定されているか | 推奨 |
| 9 | スキーマの後方互換性が維持されているか | 推奨 |
| 10 | 分散トレーシング拡張が設定されているか | 推奨 |
第16章 まとめと今後の展望
16.1 CloudEvents の価値
-
標準化によるエコシステムの統一: 異なるクラウドプロバイダー、メッセージングプラットフォーム間でイベントを統一的に扱える。ベンダーロックインのリスク低減。
-
開発者生産性の向上: 公式 SDK、標準化されたプロトコルバインディング、明確な型システムにより開発効率化。
-
運用性の向上: 統一されたメタデータ構造により、監視・トレーシング・デバッグの共通ツールチェーンを構築可能。
16.2 仕様の現在の状態
| 仕様 | バージョン | 状態 |
|---|---|---|
| Core Specification | 1.0.2 | Stable |
| JSON Event Format | 1.0.2 | Stable |
| HTTP Protocol Binding | 1.0.2 | Stable |
| Kafka Protocol Binding | 1.0.2 | Stable |
| AMQP Protocol Binding | 1.0.1 | Working Draft |
| MQTT Protocol Binding | 1.0.1 | Working Draft |
| Avro Event Format | 1.0 | Working Draft |
| Protobuf Event Format | 1.0 | Working Draft |
| Subscription API | 0.1 | Working Draft |
| Discovery API | 0.1 | Working Draft |
| CESQL | 0.1 | Working Draft |
16.3 今後の展望
- Subscription API と Discovery API の安定化: イベントカタログの自動化が進む。
- CESQL の拡張: より複雑なフィルタリングや変換がイベントルーター層で実行可能になる。
- Schema Registry との統合: 標準的な統合パターンが確立される。
- AsyncAPI との連携: イベント駆動型 API の設計・文書化・コード生成のワークフロー統一。
- AI/ML パイプラインとの統合: 機械学習パイプラインのイベント標準化。
- エッジコンピューティング対応: IoT エッジデバイスでの CloudEvents 処理の軽量化。
16.4 参考資料
| リソース | URL |
|---|---|
| CloudEvents 公式仕様 | https://github.com/cloudevents/spec |
| CloudEvents SDK 一覧 | https://github.com/cloudevents |
| CNCF CloudEvents ページ | https://cloudevents.io |
| CloudEvents Primer | https://github.com/cloudevents/spec/blob/main/cloudevents/primer.md |
| Knative Eventing | https://knative.dev/docs/eventing/ |
| AsyncAPI | https://www.asyncapi.com/ |
本書は2026年4月時点の情報に基づいて執筆されている。CloudEvents 仕様は継続的に更新されているため、最新の仕様は公式リポジトリを参照されたい。