CloudEvents

CloudEvents 完全ガイド -- CNCF イベントデータ仕様の全容


第1章 はじめに

1.1 本書の目的と対象読者

本書は、Cloud Native Computing Foundation (CNCF) が策定したイベントデータ仕様である CloudEvents について、その設計思想、アーキテクチャ、具体的な実装方法を包括的に解説する技術文書である。マイクロサービスアーキテクチャやイベント駆動型システムの設計・開発・運用に携わるエンジニアを主な対象読者とし、CloudEvents の全体像から実装詳細に至るまでを体系的にまとめている。

1.2 イベント駆動型アーキテクチャの課題

現代のクラウドネイティブシステムでは、サービス間の通信手段としてイベント駆動型アーキテクチャ (Event-Driven Architecture: EDA) が広く採用されている。Apache Kafka、Amazon EventBridge、Google Cloud Pub/Sub、Azure Event Grid、RabbitMQ など多数のメッセージングプラットフォームが存在し、それぞれが独自のイベントフォーマットを定義している。

この状況は以下の課題を生み出していた。

  1. 相互運用性の欠如: 異なるプラットフォーム間でイベントを交換する際、フォーマット変換が必要となり、ブリッジコンポーネントの開発・保守コストが増大する。
  2. ツールチェーンの分断: イベントのルーティング、フィルタリング、監視などの共通機能を各プラットフォーム向けに個別開発する必要がある。
  3. 開発者体験の劣化: プラットフォームごとに異なる SDK やライブラリを習得する学習コストが高い。
  4. ベンダーロックイン: 特定のプラットフォームに依存したイベント設計は、将来の移行を困難にする。
  5. 可搬性の制約: FaaS (Function as a Service) 環境において、関数を異なるクラウドプロバイダーへ移行する際にイベント処理ロジックの書き直しが必要になる。

1.3 CloudEvents とは何か

CloudEvents は、これらの課題に対するソリューションとして 2018 年に CNCF Serverless Working Group の中で誕生したプロジェクトである。2019 年 10 月に v1.0 仕様がリリースされ、2024 年には CNCF の Graduated プロジェクト (最上位ステータス) に昇格した。

CloudEvents を一言で定義すると以下のようになる。

CloudEvents は、イベントデータを共通の方法で記述するための仕様 (specification) である。

CloudEvents は特定の実装やランタイム、プロトコルではない。あくまでイベントの「エンベロープ (封筒)」の形式を標準化する仕様であり、以下の特徴を持つ。

  • ベンダー中立: 特定のクラウドプロバイダーやメッセージングプラットフォームに依存しない。
  • プロトコル非依存: HTTP、AMQP、MQTT、Kafka、NATS など様々なトランスポートプロトコル上で利用可能。
  • シリアライズ形式非依存: JSON、Avro、Protobuf など様々なエンコーディングに対応。
  • 拡張可能: コア仕様を拡張して、ドメイン固有のメタデータを追加可能。
  • 軽量: 必須属性はわずか 4 つ。最小構成のイベントは数行で記述できる。

1.4 CloudEvents が解決する問題

CloudEvents の導入により、以下の恩恵が得られる。

課題CloudEvents による解決
相互運用性の欠如共通仕様により、異なるプラットフォーム間でイベントをそのまま交換可能
ツールチェーンの分断仕様準拠の共通ライブラリ・ミドルウェアを共有利用
開発者体験の劣化統一された SDK と一貫した概念モデルにより学習コスト削減
ベンダーロックイン標準仕様に準拠することでプラットフォーム移行が容易
可搬性の制約FaaS 関数のイベント処理を移植可能な形で記述

1.5 エコシステムと採用事例

CloudEvents は多くの主要クラウドプロバイダーとプラットフォームに採用されている。

  • Microsoft Azure: Azure Event Grid のネイティブイベントスキーマとして CloudEvents v1.0 をサポート。
  • Google Cloud: Eventarc が CloudEvents 形式でイベントを配信。Cloud Functions (第2世代) は CloudEvents をネイティブにサポート。
  • Amazon Web Services: Amazon EventBridge が CloudEvents との変換をサポート。
  • Knative: Knative Eventing は CloudEvents を基盤仕様として全面採用。
  • Oracle Cloud: Oracle Cloud Infrastructure Events は CloudEvents 準拠。
  • SAP: SAP Event Mesh が CloudEvents をサポート。
  • Red Hat: OpenShift Serverless が Knative ベースで CloudEvents をネイティブサポート。

1.6 CNCF における位置づけ

CNCF のプロジェクト成熟度モデルにおいて、CloudEvents は以下の経緯を辿っている。

2018年5月  : CNCF Sandbox プロジェクトとして採択
2019年10月 : CloudEvents v1.0 仕様リリース
2021年11月 : CNCF Incubating プロジェクトに昇格
2024年     : CNCF Graduated プロジェクトに昇格

Graduated ステータスは、Kubernetes、Prometheus、Envoy などと同等の成熟度を示し、本番環境での広範な利用と、ガバナンス・セキュリティ面での成熟を意味する。

1.7 本書の構成

本書は以下の構成でCloudEventsの全容を解説する。

内容
第1章はじめに (本章)
第2章コア仕様 -- 属性とイベント構造
第3章プロトコルバインディング
第4章イベントフォーマット
第5章コンテンツモード
第6章拡張属性 (Extensions)
第7章サブスクリプション API
第8章ディスカバリー API
第9章SDK と実装
第10章アーキテクチャパターン
第11章セキュリティ
第12章実装例とチュートリアル
第13章運用と可観測性
第14章既存システムとの統合
第15章ベストプラクティスとアンチパターン
第16章まとめと今後の展望

第2章 コア仕様 -- 属性とイベント構造

2.1 CloudEvents のデータモデル

CloudEvents の核心は、イベントの「メタデータ」と「データ」を明確に分離するデータモデルにある。すべての CloudEvent は以下の構成要素から成る。

+----------------------------------------------+
|              CloudEvent                       |
|                                               |
|  +------------------+  +------------------+   |
|  |   Context        |  |   Event Data     |   |
|  |   Attributes     |  |   (payload)      |   |
|  |                  |  |                  |   |
|  |  - Required      |  |  任意の形式の    |   |
|  |  - Optional      |  |  ビジネスデータ  |   |
|  |  - Extension     |  |                  |   |
|  +------------------+  +------------------+   |
+----------------------------------------------+

コンテキスト属性 (Context Attributes): イベントの発生元、種類、一意識別子などのメタデータ。ルーティングやフィルタリングに使用される。

イベントデータ (Event Data / payload): イベントの実質的な内容を表すドメイン固有のデータ。CloudEvents 仕様はこの部分の形式を規定しない。

この分離により、ミドルウェアやルーターはイベントデータの中身を解釈することなく、コンテキスト属性だけを参照してイベントのルーティングやフィルタリングを実行できる。

2.2 必須属性 (REQUIRED Attributes)

CloudEvents v1.0 で定義される必須属性は以下の 4 つである。これらの属性がすべて存在しないイベントは、有効な CloudEvent とは見なされない。

2.2.1 id

項目内容
String
制約空文字列不可
意味イベントの一意識別子

idsource と組み合わせてイベントの一意性を担保する。同じ source から発行される 2 つのイベントが同じ id を持つ場合、それらは重複 (duplicate) と見なされる。

"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"

UUID v4 が一般的に使用されるが、仕様はフォーマットを規定しない。ULID、Snowflake ID、その他のスキームも使用可能である。

2.2.2 source

項目内容
URI-Reference (RFC 3986)
制約空文字列不可
意味イベントの発生元を識別する

イベントがどのシステム、サービス、リソースから発生したかを示す。絶対 URI が推奨されるが、相対 URI 参照も許容される。

"source": "https://example.com/services/order-service"
"source": "/myapp/order/12345"
"source": "urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66"

推奨される source の設計パターン:

# クラウドリソースを示す場合
"source": "//pubsub.googleapis.com/projects/my-project/topics/my-topic"

# サービスとリソースの組み合わせ
"source": "https://api.example.com/orders/12345"

# Kubernetes リソース
"source": "/apis/apps/v1/namespaces/default/deployments/my-app"

2.2.3 specversion

項目内容
String
制約空文字列不可
意味使用する CloudEvents 仕様のバージョン

現時点で有効な値は "1.0" のみである。

"specversion": "1.0"

2.2.4 type

項目内容
String
制約空文字列不可
意味イベントの種類を示す

type はイベントのカテゴリを表し、ルーティングやサブスクリプションフィルタリングで最も頻繁に使用される属性である。

"type": "com.example.order.created"
"type": "com.github.pull_request.opened"
"type": "io.cloudevents.test"

推奨される命名規則は逆ドメイン名表記 (reverse domain name notation) である。

com.{company}.{service}.{action}
com.example.warehouse.item.shipped

2.3 オプション属性 (OPTIONAL Attributes)

2.3.1 datacontenttype

data 属性の MIME タイプを示す。型は String (RFC 2046 Content-Type)。

"datacontenttype": "application/json"
"datacontenttype": "application/xml"
"datacontenttype": "text/plain"

2.3.2 dataschema

data 属性のスキーマを示す URI。

"dataschema": "https://schemas.example.com/order/v1.0/created.json"

2.3.3 subject

source のコンテキストにおけるイベントの対象。

"source": "https://example.com/storage/buckets",
"subject": "images/photo.jpg"

2.3.4 time

イベントの発生時刻。型は Timestamp (RFC 3339)。

"time": "2025-01-15T09:30:00.000Z"

2.4 data 属性

data (または data_base64) はイベントの実際のペイロードを格納する。

{
    "specversion": "1.0",
    "id": "evt-001",
    "source": "https://example.com/orders",
    "type": "com.example.order.created",
    "datacontenttype": "application/json",
    "time": "2025-01-15T09:30:00Z",
    "data": {
        "orderId": "ORD-12345",
        "customerId": "CUST-678",
        "items": [
            {"productId": "PROD-001", "quantity": 2, "price": 29.99}
        ],
        "totalAmount": 59.98,
        "currency": "JPY"
    }
}

バイナリデータを含める場合は data_base64 を使用する。datadata_base64 は相互排他である。

2.5 属性の型システム

説明
Boolean真偽値true, false
Integer整数値42, -1
StringUnicode 文字列"hello"
Binaryバイト列Base64 エンコード
URIRFC 3986 準拠の絶対 URI"https://example.com"
URI-ReferenceRFC 3986 準拠の URI 参照"/path/to/resource"
TimestampRFC 3339 準拠のタイムスタンプ"2025-01-15T09:30:00Z"

2.6 属性命名規則

  • 小文字の英数字 (a-z, 0-9) のみで構成
  • 1文字以上20文字以下
  • 英字で始まること

2.7 完全な CloudEvent 例

{
    "specversion": "1.0",
    "id": "b3c4d5e6-f7a8-9012-bcde-f34567890123",
    "source": "https://example.com/services/payment-service",
    "type": "com.example.payment.completed",
    "datacontenttype": "application/json",
    "dataschema": "https://schemas.example.com/payment/v2.0/completed.json",
    "subject": "payment/PAY-98765",
    "time": "2025-01-15T09:30:00.123456789Z",
    "data": {
        "paymentId": "PAY-98765",
        "orderId": "ORD-12345",
        "amount": 15000,
        "currency": "JPY",
        "method": "credit_card",
        "status": "completed",
        "processedAt": "2025-01-15T09:30:00.100Z"
    }
}

第3章 プロトコルバインディング

3.1 プロトコルバインディングとは

プロトコルバインディング (Protocol Binding) は、CloudEvents をトランスポートプロトコル上でどのように送受信するかを定義する仕様である。

プロトコル状態主な用途
HTTPStable (v1.0)REST API、Webhook
AMQPWorking Draftメッセージキュー (RabbitMQ 等)
MQTTWorking DraftIoT デバイス通信
KafkaStable (v1.0)ストリーミングプラットフォーム
NATSWorking Draftクラウドネイティブメッセージング
WebSocketWorking Draftリアルタイム双方向通信

3.2 HTTP プロトコルバインディング

3.2.1 バイナリコンテンツモード (Binary Content Mode)

コンテキスト属性を HTTP ヘッダーに、イベントデータを HTTP ボディに格納する。

POST /events HTTP/1.1
Host: example.com
Content-Type: application/json
ce-specversion: 1.0
ce-id: evt-001
ce-source: https://example.com/orders
ce-type: com.example.order.created
ce-time: 2025-01-15T09:30:00Z
ce-subject: order/ORD-12345

{
    "orderId": "ORD-12345",
    "customerId": "CUST-678",
    "totalAmount": 59.98,
    "currency": "JPY"
}

HTTP ヘッダー名の規則:

  • 必須/オプション属性: ce- プレフィックス + 属性名
  • datacontenttype は HTTP の Content-Type ヘッダーにマッピング
  • data は HTTP ボディにそのまま格納

3.2.2 構造化コンテンツモード (Structured Content Mode)

POST /events HTTP/1.1
Host: example.com
Content-Type: application/cloudevents+json

{
    "specversion": "1.0",
    "id": "evt-001",
    "source": "https://example.com/orders",
    "type": "com.example.order.created",
    "time": "2025-01-15T09:30:00Z",
    "subject": "order/ORD-12345",
    "datacontenttype": "application/json",
    "data": {
        "orderId": "ORD-12345",
        "customerId": "CUST-678",
        "totalAmount": 59.98,
        "currency": "JPY"
    }
}

3.2.3 バッチモード (Batched Content Mode)

POST /events HTTP/1.1
Host: example.com
Content-Type: application/cloudevents-batch+json

[
    {
        "specversion": "1.0",
        "id": "batch-001",
        "source": "https://example.com/orders",
        "type": "com.example.order.created",
        "data": {"orderId": "ORD-001"}
    },
    {
        "specversion": "1.0",
        "id": "batch-002",
        "source": "https://example.com/orders",
        "type": "com.example.order.created",
        "data": {"orderId": "ORD-002"}
    }
]

3.2.4 HTTP レスポンスのハンドリング

ステータスコード意味
200 OK / 201 Created / 202 Accepted / 204 No Contentイベント受理成功
400 Bad Request不正な CloudEvent (属性不足等)
415 Unsupported Media Type未対応の Content-Type
429 Too Many Requestsレート制限超過

3.3 Kafka プロトコルバインディング

3.3.1 バイナリコンテンツモード

Kafka Record:
  Key:     "ORD-12345"
  Headers:
    ce_specversion: "1.0"
    ce_id:          "evt-001"
    ce_source:      "https://example.com/orders"
    ce_type:        "com.example.order.created"
    ce_time:        "2025-01-15T09:30:00Z"
    content-type:   "application/json"
  Value:   {"orderId":"ORD-12345","totalAmount":59.98}

3.3.2 構造化コンテンツモード

Kafka Record:
  Key:     "ORD-12345"
  Headers:
    content-type: "application/cloudevents+json"
  Value:   {
               "specversion": "1.0",
               "id": "evt-001",
               "source": "https://example.com/orders",
               "type": "com.example.order.created",
               "datacontenttype": "application/json",
               "data": {"orderId":"ORD-12345","totalAmount":59.98}
           }

3.4 AMQP プロトコルバインディング

AMQP Message:
  Application Properties:
    cloudEvents:specversion: "1.0"
    cloudEvents:id:          "evt-001"
    cloudEvents:source:      "https://example.com/orders"
    cloudEvents:type:        "com.example.order.created"
  Content-Type: "application/json"
  Body: {"orderId":"ORD-12345"}

3.5 MQTT プロトコルバインディング

MQTT v5.0 バイナリモード

MQTT PUBLISH:
  Topic: /cloudevents/orders
  User Properties:
    specversion: "1.0"
    id:          "evt-001"
    source:      "urn:device:sensor-001"
    type:        "io.example.temperature.reading"
  Content Type: "application/json"
  Payload: {"temperature":23.5,"unit":"celsius"}

3.6 NATS プロトコルバインディング

NATS Message:
  Subject: "cloudevents.orders.created"
  Headers:
    ce-specversion: 1.0
    ce-id: evt-001
    ce-source: https://example.com/orders
    ce-type: com.example.order.created
    Content-Type: application/json
  Payload: {"orderId":"ORD-12345"}

3.7 プロトコルバインディング選択の指針

要件推奨バインディング理由
Webhook / REST APIHTTP広範なサポート、シンプルな統合
高スループットストリーミングKafkaパーティション並列性、永続化
メッセージキューイングAMQP柔軟なルーティング、確実な配信
IoT デバイスMQTT軽量プロトコル、低帯域幅
低レイテンシメッセージングNATS高速、軽量
リアルタイム WebWebSocket双方向通信、プッシュ配信

第4章 イベントフォーマット

4.1 イベントフォーマットの役割

イベントフォーマット (Event Format) は、CloudEvents をシリアライズ/デシリアライズする方法を定義する。プロトコルバインディングが「どのプロトコルで送るか」を定義するのに対し、イベントフォーマットは「どの形式でエンコードするか」を定義する。

4.2 JSON イベントフォーマット

CloudEvents 型JSON 型
Booleanboolean
Integernumber (整数)
Stringstring
Binarystring (Base64 エンコード)
URIstring
URI-Referencestring
Timestampstring (RFC 3339)

4.3 Avro イベントフォーマット

Apache Avro はコンパクトなバイナリフォーマットであり、高スループット環境に適している。スキーマレジストリとの親和性が高い。

4.4 Protobuf イベントフォーマット

syntax = "proto3";
package io.cloudevents.v1;

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";

message CloudEvent {
    string id = 1;
    string source = 2;
    string spec_version = 3;
    string type = 4;
    map<string, CloudEventAttributeValue> attributes = 5;
    oneof data {
        bytes binary_data = 6;
        string text_data = 7;
        google.protobuf.Any proto_data = 8;
    }
}

message CloudEventAttributeValue {
    oneof attr {
        bool ce_boolean = 1;
        int32 ce_integer = 2;
        string ce_string = 3;
        bytes ce_bytes = 4;
        string ce_uri = 5;
        string ce_uri_ref = 6;
        google.protobuf.Timestamp ce_timestamp = 7;
    }
}

message CloudEventBatch {
    repeated CloudEvent events = 1;
}

4.5 フォーマット選択の指針

フォーマットMIME タイプ適用場面
JSONapplication/cloudevents+json汎用、デバッグ容易、Webhook
Avroapplication/cloudevents+avro高スループット、Kafka 連携
Protobufapplication/cloudevents+protobufgRPC 連携、型安全性重視
XMLapplication/cloudevents+xmlレガシー統合、エンタープライズ

第5章 コンテンツモード

5.1 コンテンツモードの概要

コンテンツモード (Content Mode) は、CloudEvents のメタデータとイベントデータをトランスポートメッセージのどこに配置するかを決定する。

5.2 バイナリコンテンツモード

+---------------------------------------------------+
| Transport Message                                  |
|  Metadata Section (Headers)                        |
|  +-----------------------------------------------+|
|  | ce-specversion: 1.0                            ||
|  | ce-id: evt-001                                 ||
|  | ce-source: https://example.com/orders          ||
|  | ce-type: com.example.order.created             ||
|  | Content-Type: application/json                 ||
|  +-----------------------------------------------+|
|  Body Section                                      |
|  +-----------------------------------------------+|
|  | {"orderId":"ORD-12345","totalAmount":59.98}    ||
|  +-----------------------------------------------+|
+---------------------------------------------------+

5.3 構造化コンテンツモード

+---------------------------------------------------+
| Transport Message                                  |
|  Metadata Section (Headers)                        |
|  +-----------------------------------------------+|
|  | Content-Type: application/cloudevents+json     ||
|  +-----------------------------------------------+|
|  Body Section                                      |
|  +-----------------------------------------------+|
|  | {                                              ||
|  |   "specversion": "1.0",                       ||
|  |   "id": "evt-001",                            ||
|  |   "source": "...",                             ||
|  |   "type": "...",                               ||
|  |   "data": { ... }                              ||
|  | }                                              ||
|  +-----------------------------------------------+|
+---------------------------------------------------+

5.4 モード選択のフローチャート

イベント送信が必要
     |
     v
バイナリデータを含むか? -- Yes -> バイナリモード
     |
    No
     v
中間プロキシが CloudEvents を認識しないか? -- Yes -> バイナリモード
     |
    No
     v
イベントのログ/監査/保存が重要か? -- Yes -> 構造化モード
     |
    No
     v
大量のイベントを一括送信するか? -- Yes -> バッチモード
     |
    No -> バイナリモードまたは構造化モード

第6章 拡張属性 (Extensions)

6.1 拡張メカニズムの概要

拡張属性 (Extension Attributes) は、コア仕様に含まれないドメイン固有のメタデータをイベントに付加するためのメカニズムである。

6.2 公式に文書化された拡張属性

6.2.1 Distributed Tracing Extension

{
    "specversion": "1.0",
    "id": "trace-001",
    "source": "https://example.com/orders",
    "type": "com.example.order.created",
    "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
    "tracestate": "congo=t61rcWkgMzE",
    "data": {"orderId": "ORD-12345"}
}

6.2.2 Sequence Extension

{
    "specversion": "1.0",
    "id": "seq-005",
    "source": "https://example.com/event-log",
    "type": "com.example.log.entry",
    "sequence": "5",
    "sequencetype": "Integer",
    "data": {"message": "User logged in", "userId": "USER-123"}
}

6.2.3 Partitioning Extension

{
    "specversion": "1.0",
    "id": "part-001",
    "source": "https://example.com/orders",
    "type": "com.example.order.created",
    "partitionkey": "customer-678",
    "data": {"orderId": "ORD-12345", "customerId": "CUST-678"}
}

6.2.4 Dataref Extension

{
    "specversion": "1.0",
    "id": "ref-001",
    "source": "https://example.com/image-processor",
    "type": "com.example.image.uploaded",
    "dataref": "https://storage.example.com/images/photo-12345.jpg",
    "data": {"imageId": "IMG-12345", "width": 1920, "height": 1080}
}

6.2.5 Sampling Rate Extension

{
    "specversion": "1.0",
    "id": "sample-001",
    "source": "https://example.com/metrics",
    "type": "com.example.metric.collected",
    "sampledrate": 100,
    "data": {"metric": "response_time", "value": 45.2, "unit": "ms"}
}

6.3 カスタム拡張属性の設計原則

  1. 属性名の衝突回避: 公式拡張属性や将来のコア属性と衝突しない名前を選択する。
  2. 最小限のメタデータ: ルーティングやフィルタリングに必要なメタデータのみを拡張属性とし、ビジネスデータは data に格納する。
  3. 型の適切な選択: CloudEvents の型システムがサポートする型を使用する。
  4. ドキュメント化: 拡張属性の意味、型、制約を明文化する。

6.4 拡張属性の使用パターン

パターン1: マルチテナント環境

{
    "specversion": "1.0",
    "id": "mt-001",
    "source": "https://saas.example.com/billing",
    "type": "com.example.invoice.generated",
    "tenantid": "tenant-12345",
    "tenantregion": "jp",
    "data": {"invoiceId": "INV-001", "amount": 50000}
}

パターン2: Saga/ワークフロー管理

{
    "specversion": "1.0",
    "id": "saga-step-003",
    "source": "https://example.com/inventory-service",
    "type": "com.example.inventory.reserved",
    "sagaid": "saga-order-12345",
    "sagastep": "3",
    "correlationid": "corr-order-12345",
    "data": {
        "orderId": "ORD-12345",
        "items": [{"productId": "PROD-001", "quantity": 2, "reserved": true}]
    }
}

第7章 サブスクリプション API

7.1 概要

CloudEvents Subscription API は、イベントのサブスクリプション (購読) を管理するための RESTful API 仕様である。

7.2 サブスクリプションオブジェクト

属性必須説明
idStringYesサブスクリプションの一意識別子
sourceURINoフィルタ対象の source 値
typesString[]Noフィルタ対象の type 値のリスト
sinkURIYesイベント配信先の URI
protocolStringYes配信プロトコル
filtersFilter[]Noイベントフィルタの配列

7.3 CRUD 操作

POST /subscriptions HTTP/1.1
Content-Type: application/json

{
    "protocol": "HTTP",
    "sink": "https://consumer.example.com/events",
    "types": ["com.example.order.created", "com.example.order.updated"],
    "source": "https://example.com/order-service",
    "config": {
        "maxretries": 3,
        "deadletterqueue": "https://consumer.example.com/dlq"
    }
}

7.4 フィルタリングダイアレクト

exact フィルタ

{"filters": [{"exact": {"type": "com.example.order.created"}}]}

prefix フィルタ

{"filters": [{"prefix": {"type": "com.example.order."}}]}

複合フィルタ (NOT / AND / OR)

{
    "filters": [{
        "all": [
            {"prefix": {"type": "com.example.order."}},
            {"not": {"exact": {"type": "com.example.order.deleted"}}}
        ]
    }]
}

CESQL フィルタ

-- CESQL 構文例
type LIKE 'com.example.order.%' AND source = 'https://example.com/orders'
type IN ('com.example.order.created', 'com.example.order.updated')
priority = 'high' AND tenantid = 'tenant-12345'
NOT (type = 'com.example.test' OR type = 'com.example.debug')

第8章 ディスカバリー API

8.1 概要

CloudEvents Discovery API は、イベントプロデューサーが「どのようなイベントを発行しているか」をプログラマティックに公開するための仕様である。

8.2 サービスオブジェクト

{
    "id": "svc-order-service",
    "name": "Order Service",
    "url": "https://api.example.com/order-service",
    "description": "Manages order lifecycle events",
    "specversions": ["1.0"],
    "subscriptionurl": "https://api.example.com/order-service/subscriptions",
    "protocols": ["HTTP"],
    "events": [
        {
            "type": "com.example.order.created",
            "description": "Emitted when a new order is placed",
            "datacontenttype": "application/json",
            "dataschema": "https://schemas.example.com/order/v1/created.json"
        },
        {
            "type": "com.example.order.updated",
            "description": "Emitted when an order is modified"
        },
        {
            "type": "com.example.order.cancelled",
            "description": "Emitted when an order is cancelled"
        }
    ]
}

8.3 イベントカタログの構築

+-------------------+     +-----------------+     +-------------------+
| Order Service     |     | Event Catalog   |     | Developer Portal  |
|                   | --> | Service         | --> |                   |
| GET /services     |     | (Aggregator)    |     | - 検索可能な       |
+-------------------+     |                 |     |   イベント一覧     |
                          |                 |     | - スキーマ閲覧     |
+-------------------+     |                 |     | - サンプルコード    |
| Payment Service   | --> |                 |     |   自動生成         |
| GET /services     |     +-----------------+     +-------------------+
+-------------------+

第9章 SDK と実装

9.1 公式 SDK 一覧

言語/ランタイムリポジトリ成熟度
Gocloudevents/sdk-goStable
Javacloudevents/sdk-javaStable
JavaScript/TypeScriptcloudevents/sdk-javascriptStable
Pythoncloudevents/sdk-pythonStable
C# (.NET)cloudevents/sdk-csharpStable
Rustcloudevents/sdk-rustActive Development
Rubycloudevents/sdk-rubyActive Development
PHPcloudevents/sdk-phpActive Development

9.2 Go SDK の使用例

package main

import (
    "context"
    "log"
    "time"
    cloudevents "github.com/cloudevents/sdk-go/v2"
)

type OrderCreated struct {
    OrderID     string  `json:"orderId"`
    CustomerID  string  `json:"customerId"`
    TotalAmount float64 `json:"totalAmount"`
    Currency    string  `json:"currency"`
}

func main() {
    c, err := cloudevents.NewClientHTTP()
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }

    event := cloudevents.NewEvent()
    event.SetID("go-evt-001")
    event.SetSource("https://example.com/go-service/orders")
    event.SetType("com.example.order.created")
    event.SetTime(time.Now())

    data := OrderCreated{
        OrderID: "ORD-12345", CustomerID: "CUST-678",
        TotalAmount: 15000.0, Currency: "JPY",
    }
    event.SetData(cloudevents.ApplicationJSON, data)

    ctx := cloudevents.ContextWithTarget(context.Background(),
        "http://localhost:8080/events")
    result := c.Send(ctx, event)
    if cloudevents.IsUndelivered(result) {
        log.Fatalf("Failed to send: %v", result)
    }
}

9.3 Python SDK の使用例

from uuid import uuid4
from cloudevents.http import CloudEvent, to_structured
import requests
import datetime

attributes = {
    "id": str(uuid4()),
    "source": "https://example.com/python-service/orders",
    "type": "com.example.order.created",
    "time": datetime.datetime.now(datetime.timezone.utc).isoformat(),
    "datacontenttype": "application/json",
}

data = {
    "orderId": "ORD-12345",
    "customerId": "CUST-678",
    "totalAmount": 15000,
    "currency": "JPY",
}

event = CloudEvent(attributes, data)
headers, body = to_structured(event)
response = requests.post("http://localhost:8080/events", headers=headers, data=body)

9.4 JavaScript/TypeScript SDK の使用例

import { CloudEvent, httpTransport, emitterFor } from "cloudevents";

const event = new CloudEvent({
    id: "ts-evt-001",
    source: "https://example.com/ts-service/orders",
    type: "com.example.order.created",
    datacontenttype: "application/json",
    data: {
        orderId: "ORD-12345",
        customerId: "CUST-678",
        totalAmount: 15000,
        currency: "JPY",
    },
});

const emit = emitterFor(httpTransport("http://localhost:8080/events"));
await emit(event);

9.5 Java SDK の使用例 (Spring Framework)

import io.cloudevents.CloudEvent;
import io.cloudevents.spring.http.CloudEventHttpUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/events")
public class CloudEventController {

    @PostMapping
    public ResponseEntity<Void> receiveEvent(
            @RequestHeader HttpHeaders headers,
            @RequestBody byte[] body) {

        CloudEvent event = CloudEventHttpUtils.toReader(headers, body).toEvent();
        System.out.printf("Received: id=%s, type=%s, source=%s%n",
            event.getId(), event.getType(), event.getSource());
        return ResponseEntity.ok().build();
    }
}

第10章 アーキテクチャパターン

10.1 Point-to-Point (P2P) パターン

+-----------+     CloudEvent (HTTP)     +-----------+
| Producer  | ------------------------> | Consumer  |
| (Order    |  POST /events             | (Email    |
|  Service) |  ce-type: order.created   |  Service) |
+-----------+                           +-----------+

10.2 Publish/Subscribe パターン

+-----------+                    +----------------+                   +-----------+
| Producer  | -- CloudEvent --> | Event Broker   | -- CloudEvent --> | Consumer A|
+-----------+                   | (Kafka/Knative)|                   +-----------+
                                |                | -- CloudEvent --> | Consumer B|
+-----------+                   |                |                   +-----------+
| Producer  | -- CloudEvent --> +----------------+ -- CloudEvent --> | Consumer C|
+-----------+                                                       +-----------+

10.3 Event Sourcing パターン

すべての状態変更を CloudEvent として記録し、イベントストアに永続化する。

// Event 1: 注文作成
{
    "specversion": "1.0", "id": "es-001",
    "source": "https://example.com/orders/ORD-123",
    "type": "com.example.order.created",
    "sequence": "1", "sequencetype": "Integer",
    "data": {"orderId": "ORD-123", "total": 2000}
}

// Event 2: アイテム追加
{
    "specversion": "1.0", "id": "es-002",
    "source": "https://example.com/orders/ORD-123",
    "type": "com.example.order.item_added",
    "sequence": "2", "sequencetype": "Integer",
    "data": {"orderId": "ORD-123", "item": {"productId": "P-002", "qty": 1}, "newTotal": 5000}
}

10.4 Saga パターン

Order Service         Payment Service       Inventory Service      Shipping Service
     |                      |                      |                      |
     | order.created        |                      |                      |
     |--------------------->|                      |                      |
     |                      | payment.completed    |                      |
     |                      |--------------------->|                      |
     |                      |                      | inventory.reserved   |
     |                      |                      |--------------------->|
     |                      |                      |                      | shipping.scheduled
     |<--------------------------------------------------------------------|

10.5 Claim Check パターン

大きなイベントデータを外部ストレージに保存し、CloudEvent には参照のみを含める。

{
    "specversion": "1.0",
    "id": "claim-001",
    "source": "https://example.com/document-service",
    "type": "com.example.document.uploaded",
    "dataref": "https://storage.example.com/documents/doc-12345.pdf",
    "data": {
        "documentId": "DOC-12345",
        "fileName": "contract.pdf",
        "fileSize": 5242880,
        "mimeType": "application/pdf"
    }
}

10.6 Dead Letter Queue パターン

処理に失敗したイベントを DLQ に退避し、後でリトライまたは調査する。


第11章 セキュリティ

11.1 WebHook Abuse Protection

# Step 1: 送信者が OPTIONS リクエストで検証
OPTIONS /events HTTP/1.1
Host: consumer.example.com
WebHook-Request-Origin: https://producer.example.com
WebHook-Request-Callback: https://producer.example.com/webhook/confirm?token=abc123

# Step 2: 受信者が許可を返す
HTTP/1.1 200 OK
WebHook-Allowed-Origin: https://producer.example.com

11.2 トランスポートレベルのセキュリティ

  • すべての通信は TLS 1.2 以上で暗号化
  • サービスメッシュ内では mTLS を推奨
  • OAuth 2.0 トークンによる認証

11.3 データレベルのセキュリティ

  • PII はコンテキスト属性に含めない (subject にメールアドレス等を使用しない)
  • 機密データのペイロード暗号化を検討 (JWE 等)

11.4 セキュリティチェックリスト

カテゴリチェック項目
トランスポートTLS 1.2 以上を使用しているか
トランスポートWebhook エンドポイントは検証ハンドシェイクを実装しているか
認証イベント送信者の認証メカニズムがあるか
認可イベントタイプごとのアクセス制御ポリシーがあるか
データ保護PII がコンテキスト属性に含まれていないか
監査イベントの送受信ログを適切に記録しているか
可用性レート制限が設定されているか

第12章 実装例とチュートリアル

12.1 Knative Eventing を使った CloudEvents パイプライン

# Broker の作成
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: default
  namespace: demo
spec:
  delivery:
    retry: 3
    backoffPolicy: exponential
    backoffDelay: "PT2S"
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dead-letter-service
---
# Trigger (フィルタリングルール)
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-created-trigger
  namespace: demo
spec:
  broker: default
  filter:
    attributes:
      type: com.example.order.created
      source: https://example.com/order-service
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-processor

12.2 Azure Event Grid での CloudEvents

# CloudEvents スキーマを使用する Event Grid トピックの作成
az eventgrid topic create \
  --name my-cloudevents-topic \
  --resource-group my-rg \
  --location japaneast \
  --input-schema cloudeventschemav1_0

# イベントの発行
curl -X POST \
  "https://my-cloudevents-topic.japaneast-1.eventgrid.azure.net/api/events" \
  -H "Content-Type: application/cloudevents+json" \
  -H "aeg-sas-key: {your-sas-key}" \
  -d '{
    "specversion": "1.0",
    "id": "azure-001",
    "source": "/azure/my-app",
    "type": "com.example.order.created",
    "datacontenttype": "application/json",
    "data": {"orderId": "ORD-12345", "amount": 5000}
  }'

12.3 Google Cloud Eventarc での CloudEvents

gcloud eventarc triggers create storage-trigger \
  --location=asia-northeast1 \
  --destination-run-service=my-service \
  --destination-run-region=asia-northeast1 \
  --event-filters="type=google.cloud.storage.object.v1.finalized" \
  --event-filters="bucket=my-bucket" \
  --service-account=my-sa@my-project.iam.gserviceaccount.com

第13章 運用と可観測性

13.1 メトリクスの収集

メトリクス説明アラート閾値例
cloudevents_produced_total発行されたイベント数急激な増減
cloudevents_consumed_total消費されたイベント数急激な増減
cloudevents_failed_total処理失敗したイベント数> 0
cloudevents_dlq_totalDLQ に送られたイベント数> 0
cloudevents_delivery_latency配信レイテンシ (ms)p99 > 1000ms
cloudevents_processing_duration処理時間 (ms)p99 > 5000ms
cloudevents_retry_countリトライ回数> 3

13.2 分散トレーシング

CloudEvents の Distributed Tracing Extension (traceparent/tracestate) を使用して、OpenTelemetry によるイベントフロー追跡を実現する。

13.3 ログの構造化

{
    "timestamp": "2025-01-15T09:30:00.123Z",
    "level": "INFO",
    "message": "Event processed successfully",
    "cloudevent.id": "evt-001",
    "cloudevent.type": "com.example.order.created",
    "cloudevent.source": "https://example.com/orders",
    "trace.id": "0af7651916cd43dd8448eb211c80319c",
    "processing.duration_ms": 45
}

第14章 既存システムとの統合

14.1 Adapter パターン (アンチコラプションレイヤー)

from cloudevents.http import CloudEvent
import datetime, uuid

def transform_legacy_to_cloudevent(legacy_event: dict) -> CloudEvent:
    event_type_mapping = {
        "new_order": "com.example.order.created",
        "update_order": "com.example.order.updated",
        "cancel_order": "com.example.order.cancelled",
    }
    ce_type = event_type_mapping.get(
        legacy_event.get("evt"),
        f"com.example.legacy.{legacy_event.get('evt', 'unknown')}"
    )
    timestamp = datetime.datetime.fromtimestamp(
        legacy_event.get("ts", 0), tz=datetime.timezone.utc
    ).isoformat()
    attributes = {
        "id": str(uuid.uuid4()),
        "source": "https://example.com/legacy-adapter",
        "type": ce_type,
        "time": timestamp,
        "datacontenttype": "application/json",
    }
    return CloudEvent(attributes, legacy_event.get("payload", {}))

14.2 Change Data Capture (CDC) + CloudEvents

+---------------+     +-----------+     +------------------+     +----------+
| PostgreSQL    | --> | Debezium  | --> | Kafka            | --> | Consumer |
| (Source DB)   |     | Connector |     | (CloudEvents)    |     |          |
+---------------+     +-----------+     +------------------+     +----------+

14.3 API Gateway / サービスメッシュとの統合

# Istio VirtualService での CloudEvents ルーティング
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: cloudevents-routing
spec:
  hosts:
    - events.internal.example.com
  http:
    - match:
        - headers:
            ce-type:
              prefix: "com.example.order."
      route:
        - destination:
            host: order-service
            port:
              number: 8080
    - match:
        - headers:
            ce-type:
              prefix: "com.example.payment."
      route:
        - destination:
            host: payment-service
            port:
              number: 8080

第15章 ベストプラクティスとアンチパターン

15.1 ベストプラクティス

BP-1: イベントタイプの命名は逆ドメイン名表記を使用する (com.example.order.created)

BP-2: source URI は一意で永続的な識別子とする

BP-3: イベントデータにはイベント発生時のスナップショットを含める (コンシューマーが追加 API 呼び出し不要)

BP-4: 冪等性を確保する (id + source で重複検知)

BP-5: イベントサイズは 64KB 以下に保つ (大きなデータは dataref を使用)

BP-6: スキーマバージョニングを実施する

BP-7: 後方互換性を維持する (フィールド追加は互換、削除/型変更は破壊的変更)

BP-8: Dead Letter Queue (DLQ) を必ず設定する

BP-9: time 属性を必ず設定する

15.2 アンチパターン

AP-1: Fat Event - イベントに過剰なデータを詰め込む

AP-2: Command Disguised as Event - イベントを命令形にする (send_confirmation_email ではなく order.created とし、コンシューマー側でアクションを決定)

AP-3: Chatty Events - フィールド単位でイベントを発行する (業務単位でまとめる)

AP-4: Missing Idempotency - 重複チェックなしの処理

AP-5: Temporal Coupling - イベントの処理順序に依存する設計

15.3 イベント設計チェックリスト

#チェック項目必須/推奨
1type は逆ドメイン名表記か推奨
2source は永続的で一意な URI か必須
3id は source スコープ内で一意か必須
4time 属性が設定されているか推奨
5イベントサイズは 64KB 以下か推奨
6PII がコンテキスト属性に含まれていないか必須
7冪等性を確保する仕組みがあるか必須
8DLQ が設定されているか推奨
9スキーマの後方互換性が維持されているか推奨
10分散トレーシング拡張が設定されているか推奨

第16章 まとめと今後の展望

16.1 CloudEvents の価値

  1. 標準化によるエコシステムの統一: 異なるクラウドプロバイダー、メッセージングプラットフォーム間でイベントを統一的に扱える。ベンダーロックインのリスク低減。

  2. 開発者生産性の向上: 公式 SDK、標準化されたプロトコルバインディング、明確な型システムにより開発効率化。

  3. 運用性の向上: 統一されたメタデータ構造により、監視・トレーシング・デバッグの共通ツールチェーンを構築可能。

16.2 仕様の現在の状態

仕様バージョン状態
Core Specification1.0.2Stable
JSON Event Format1.0.2Stable
HTTP Protocol Binding1.0.2Stable
Kafka Protocol Binding1.0.2Stable
AMQP Protocol Binding1.0.1Working Draft
MQTT Protocol Binding1.0.1Working Draft
Avro Event Format1.0Working Draft
Protobuf Event Format1.0Working Draft
Subscription API0.1Working Draft
Discovery API0.1Working Draft
CESQL0.1Working Draft

16.3 今後の展望

  1. Subscription API と Discovery API の安定化: イベントカタログの自動化が進む。
  2. CESQL の拡張: より複雑なフィルタリングや変換がイベントルーター層で実行可能になる。
  3. Schema Registry との統合: 標準的な統合パターンが確立される。
  4. AsyncAPI との連携: イベント駆動型 API の設計・文書化・コード生成のワークフロー統一。
  5. AI/ML パイプラインとの統合: 機械学習パイプラインのイベント標準化。
  6. エッジコンピューティング対応: IoT エッジデバイスでの CloudEvents 処理の軽量化。

16.4 参考資料

リソースURL
CloudEvents 公式仕様https://github.com/cloudevents/spec
CloudEvents SDK 一覧https://github.com/cloudevents
CNCF CloudEvents ページhttps://cloudevents.io
CloudEvents Primerhttps://github.com/cloudevents/spec/blob/main/cloudevents/primer.md
Knative Eventinghttps://knative.dev/docs/eventing/
AsyncAPIhttps://www.asyncapi.com/

本書は2026年4月時点の情報に基づいて執筆されている。CloudEvents 仕様は継続的に更新されているため、最新の仕様は公式リポジトリを参照されたい。