Temporal
Temporal 徹底解説:耐障害性を備えた分散アプリケーション基盤
1. はじめに
1.1 Temporalとは何か
Temporalは、耐障害性を備えた分散アプリケーションを構築するためのオープンソースプラットフォームである。従来、分散システムにおける障害処理やステート管理は開発者にとって大きな負担であったが、Temporalはこれらの複雑さを抽象化し、開発者がビジネスロジックに集中できる環境を提供する。
Temporalの核心的な価値提案は「Durable Execution(永続的実行)」である。アプリケーションがクラッシュ、ネットワーク障害、インフラ障害に遭遇した場合でも、Temporalは実行を中断した正確な箇所から再開することを保証する。これにより、注文処理、顧客オンボーディング、決済処理などのミッションクリティカルなプロセスを、障害に対して堅牢な形で構築できる。
1.2 Temporalの歴史と背景
TemporalはUberで開発された「Cadence」ワークフローエンジンの後継プロジェクトとして2019年に誕生した。Cadenceの開発者であるMaxim FateevとSamar Abbasが設立したTemporal Technologies社が開発を主導している。Cadenceで培われた技術と知見を基に、より洗練されたAPIとアーキテクチャを持つプラットフォームとして再設計された。
1.3 従来のアプローチとの比較
従来の分散システム開発では、以下のような課題に対処する必要があった:
| 課題 | 従来のアプローチ | Temporalのアプローチ |
|---|---|---|
| 障害復旧 | try-catch、リトライロジック、冪等性の実装 | 自動的なリトライとステート復元 |
| ステート管理 | データベース、キャッシュ、メッセージキュー | Event Historyによる自動管理 |
| 長時間実行プロセス | Cronジョブ、ステートマシン、キューシステム | Workflowとして直接コード化 |
| 分散トランザクション | Sagaパターンの手動実装 | Workflowによる自然な実装 |
| タイムアウト管理 | タイマーサービス、スケジューラー | 組み込みのタイムアウト機構 |
1.4 ユースケース
Temporalは以下のような幅広いユースケースに適用される:
- マイクロサービスオーケストレーション:複数のサービス間の呼び出しを確実に調整
- 決済・金融処理:トランザクションの一貫性と障害復旧の保証
- データパイプライン:大規模なデータ処理ワークフローの管理
- インフラプロビジョニング:クラウドリソースの自動構築と管理
- ユーザーオンボーディング:マルチステップの登録・設定プロセス
- バッチ処理:大量のタスクの並列実行と進捗管理
- 長期実行ビジネスプロセス:サブスクリプション管理、保険請求処理など
2. Temporalのアーキテクチャ
2.1 プラットフォーム全体構成
Temporalプラットフォームは、大きく分けて以下の2つのコンポーネントで構成される:
- Temporal Service:ワークフローの実行を監督・調整するサーバーサイドコンポーネント
- Worker Process:開発者がホストし、実際のビジネスロジックを実行するクライアントサイドコンポーネント
┌─────────────────────────────────────────────────────────────┐
│ Temporal Platform │
│ │
│ ┌──────────────────────┐ ┌─────────────────────────┐ │
│ │ Temporal Service │ │ Worker Processes │ │
│ │ │ │ │ │
│ │ ┌────────────────┐ │ │ ┌──────────────────┐ │ │
│ │ │ Frontend │ │◄──►│ │ Workflow Worker │ │ │
│ │ │ Service │ │ │ │ │ │ │
│ │ ├────────────────┤ │ │ ├──────────────────┤ │ │
│ │ │ History │ │ │ │ Activity Worker │ │ │
│ │ │ Service │ │ │ │ │ │ │
│ │ ├────────────────┤ │ │ └──────────────────┘ │ │
│ │ │ Matching │ │ │ │ │
│ │ │ Service │ │ │ ┌──────────────────┐ │ │
│ │ ├────────────────┤ │ │ │ Temporal SDK │ │ │
│ │ │ Worker │ │ │ │ (Go/Java/TS/ │ │ │
│ │ │ Service │ │ │ │ Python/PHP/.NET) │ │ │
│ │ └────────────────┘ │ │ └──────────────────┘ │ │
│ │ │ │ │ │
│ │ ┌────────────────┐ │ └─────────────────────────┘ │
│ │ │ Persistence │ │ │
│ │ │ (DB) │ │ │
│ │ ├────────────────┤ │ │
│ │ │ Visibility │ │ │
│ │ │ Store │ │ │
│ │ └────────────────┘ │ │
│ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
2.2 Temporal Serviceの内部構造
Temporal Serviceは、Go言語で実装されたサーバーとデータベースで構成される。内部は以下の4つのサービスに分かれている:
2.2.1 Frontend Service
Frontend Serviceは、Temporal Serviceへのすべてのリクエストのエントリーポイントである。gRPCベースのAPIを提供し、以下の機能を担う:
- クライアントからのリクエストの受付と認証
- リクエストのルーティング
- レート制限の適用
- 認可チェック
2.2.2 History Service
History Serviceは、Temporalの中核を担うコンポーネントである:
- ワークフロー実行のEvent Historyの管理
- ワークフローの状態遷移の制御
- タイマーとリマインダーの管理
- ワークフロー実行のライフサイクル管理
各ワークフロー実行はHistory Serviceの特定のシャードに割り当てられ、イベントの一貫性が保証される。
2.2.3 Matching Service
Matching Serviceは、Task Queueの管理を担当する:
- Workflow TaskとActivity Taskのキュー管理
- タスクとWorkerのマッチング
- Worker間のタスク分配
- 同期的マッチングと非同期的マッチングの両方をサポート
2.2.4 Worker Service(内部)
Temporal Service内部のWorker Serviceは、バックグラウンドタスクを実行する:
- アーカイブ処理
- レプリケーション
- クロスクラスター通信
注意: この内部Worker Serviceは、アプリケーション開発者が運用するWorker Processとは異なるものである。
2.3 Persistence Layer
Temporal Serviceのデータ永続化は、以下のデータベースをサポートしている:
- Apache Cassandra:大規模な本番環境向け
- MySQL (8.0.17以上):中小規模のデプロイメント向け
- PostgreSQL (12以上):中小規模のデプロイメント向け
- SQLite:開発・テスト環境向け
永続化されるデータには以下が含まれる:
- ワークフロー実行のEvent History
- ワークフローの現在の状態(ミュータブルステート)
- Task Queueの状態
- Namespace設定
- Visibility情報
2.4 Visibility Store
Visibility Storeは、ワークフロー実行の検索・フィルタリング機能を提供するサブシステムである。以下のデータストアをサポートする:
- MySQL (8.0.17以上)
- PostgreSQL (12以上)
- Elasticsearch (v7以上 / v8以上)
- OpenSearch (v2以上)
Visibility Storeにより、以下のような操作が可能となる:
- ワークフロー実行のリスト表示・フィルタリング・ソート
- カスタム検索属性を使用した高度なクエリ
- ワークフローの実行状態の監視
-- Visibility クエリの例(List Filter構文)
WorkflowType = 'OrderProcessing'
AND ExecutionStatus = 'Running'
AND StartTime > '2024-01-01T00:00:00Z'
ORDER BY StartTime DESC
2.5 Event Sourcing モデル
Temporalのアーキテクチャの根幹をなすのがEvent Sourcingモデルである。ワークフローの実行中に発生するすべてのアクションがイベントとして記録され、不変なイベントログ(Event History)を形成する。
イベントの記録フロー
- Workflowがアクション(例:Activityの実行)を要求
- Workerが
ScheduleActivityTaskCommandを生成 - Temporal Serviceがコマンドを受信し、対応するEventをEvent Historyに追記
- タスクがTask Queueに配置される(Scheduled Event)
- Workerがタスクをデキューする(Started Event)
- Workerがタスクを完了する(Completed Event)
Event History の例:
───────────────────────────────────────────────
Event 1: WorkflowExecutionStarted
Event 2: WorkflowTaskScheduled
Event 3: WorkflowTaskStarted
Event 4: WorkflowTaskCompleted
Event 5: ActivityTaskScheduled (SendEmail)
Event 6: ActivityTaskStarted
Event 7: ActivityTaskCompleted
Event 8: WorkflowTaskScheduled
Event 9: WorkflowTaskStarted
Event 10: WorkflowTaskCompleted
Event 11: WorkflowExecutionCompleted
───────────────────────────────────────────────
ワークフローリプレイ
障害からの復旧時、Temporal SDKはEvent Historyを使ってワークフローコードをリプレイする。このリプレイにより、ワークフローは障害前の正確な状態を再構築し、中断した箇所から実行を再開する。
リプレイ中、SDKはイベント履歴と照合しながらワークフローコードを再実行する。もしリプレイ中にEvent Historyと一致しないイベントが検出された場合、非決定性エラー(Non-Determinism Error)がスローされる。
3. コアコンセプト
3.1 Workflow(ワークフロー)
3.1.1 Workflow Definitionとは
Workflow Definition(ワークフロー定義)は、ワークフローのロジックと手順を指定するコードである。通常の関数やメソッドとして記述され、Temporalの各SDKに対応するプログラミング言語で実装される。
// TypeScript での Workflow Definition
import { proxyActivities, sleep } from '@temporalio/workflow';
import type * as activities from './activities';
const { sendEmail, processPayment, updateInventory } = proxyActivities<typeof activities>({
startToCloseTimeout: '30 seconds',
retry: {
maximumAttempts: 3,
},
});
export async function orderProcessingWorkflow(
orderId: string,
customerId: string,
items: OrderItem[]
): Promise<OrderResult> {
// 1. 在庫の確認と更新
await updateInventory(orderId, items);
// 2. 決済処理
const paymentResult = await processPayment(orderId, customerId);
// 3. 確認メールの送信
await sendEmail(customerId, `注文 ${orderId} が完了しました`);
return {
orderId,
status: 'completed',
paymentId: paymentResult.transactionId,
};
}
// Go での Workflow Definition
package workflows
import (
"time"
"go.temporal.io/sdk/workflow"
)
func OrderProcessingWorkflow(ctx workflow.Context, order Order) (OrderResult, error) {
options := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, options)
// 1. 在庫の確認と更新
err := workflow.ExecuteActivity(ctx, UpdateInventory, order.ID, order.Items).Get(ctx, nil)
if err != nil {
return OrderResult{}, err
}
// 2. 決済処理
var paymentResult PaymentResult
err = workflow.ExecuteActivity(ctx, ProcessPayment, order.ID, order.CustomerID).Get(ctx, &paymentResult)
if err != nil {
return OrderResult{}, err
}
// 3. 確認メールの送信
err = workflow.ExecuteActivity(ctx, SendEmail, order.CustomerID, order.ID).Get(ctx, nil)
if err != nil {
return OrderResult{}, err
}
return OrderResult{
OrderID: order.ID,
Status: "completed",
PaymentID: paymentResult.TransactionID,
}, nil
}
3.1.2 Workflow Type
Workflow Typeは、ワークフロー定義を識別するための名前である。同じWorkflow Typeのワークフローは同じロジックを共有するが、異なる入力パラメータで複数回実行できる。
3.1.3 Workflow Execution
Workflow Executionは、ワークフロー定義と実行リクエストのペアから生成される実際の実行インスタンスである。各実行は以下の識別子を持つ:
- Workflow ID:ビジネスロジックに基づく一意の識別子(例:
order-12345) - Run ID:実行インスタンスごとのUUID
3.1.4 決定性の制約
Temporalワークフローにおいて最も重要な制約が「決定性(Determinism)」である。ワークフローコードはリプレイ時に同一の実行パスを再現する必要があるため、以下の操作をワークフローコード内で直接行うことは禁止されている:
禁止される操作:
- 乱数生成(
Math.random(),rand.Float64()等) - 現在時刻の取得(
Date.now(),time.Now()等) - 外部API呼び出し
- ファイルI/O
- データベースアクセス
- UUIDの生成
- スレッド/ゴルーチンの直接生成
代替手段:
// ❌ 非決定的:直接の乱数生成
const id = Math.random().toString(36);
// ✅ 決定的:Activityを通じて実行
const id = await generateId();
// ❌ 非決定的:直接の時刻取得
const now = new Date();
// ✅ 決定的:ワークフロー専用APIの使用(Replayで再現可能)
// TypeScriptでは workflow モジュールには直接の時刻取得用のAPIはないが、
// Activityを通じて取得するか、workflow.now() に相当する機能を使用
// ❌ 非決定的
currentTime := time.Now()
// ✅ 決定的
currentTime := workflow.Now(ctx)
// ❌ 非決定的
id := uuid.New()
// ✅ 決定的
id := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.New()
})
3.2 Activity(アクティビティ)
3.2.1 Activity Definitionとは
Activityは、単一の明確に定義されたアクション(短時間のものから長時間実行のものまで)を実行する関数またはメソッドである。ワークフローとは異なり、Activityコードは非決定的であってよい。
Activityは以下のような処理に適している:
- 外部APIの呼び出し
- データベースの読み書き
- ファイル操作
- メール送信
- サードパーティサービスとの連携
// TypeScript での Activity Definition
import { Context } from '@temporalio/activity';
export async function sendEmail(
recipient: string,
message: string
): Promise<void> {
// 外部メールサービスの呼び出し(非決定的操作OK)
const response = await fetch('https://api.email-service.com/send', {
method: 'POST',
body: JSON.stringify({ to: recipient, body: message }),
});
if (!response.ok) {
throw new Error(`メール送信に失敗: ${response.status}`);
}
}
export async function processPayment(
orderId: string,
customerId: string
): Promise<PaymentResult> {
// 決済ゲートウェイの呼び出し
const result = await paymentGateway.charge(customerId, orderId);
return {
transactionId: result.id,
status: result.status,
};
}
export async function updateInventory(
orderId: string,
items: OrderItem[]
): Promise<void> {
// データベース更新
for (const item of items) {
await db.inventory.decrement(item.productId, item.quantity);
}
}
// Go での Activity Definition
package activities
import (
"context"
"fmt"
)
type Activities struct {
EmailClient EmailClient
PaymentClient PaymentClient
DB *sql.DB
}
func (a *Activities) SendEmail(ctx context.Context, recipient, message string) error {
return a.EmailClient.Send(ctx, recipient, message)
}
func (a *Activities) ProcessPayment(ctx context.Context, orderId, customerId string) (PaymentResult, error) {
result, err := a.PaymentClient.Charge(ctx, customerId, orderId)
if err != nil {
return PaymentResult{}, fmt.Errorf("決済処理に失敗: %w", err)
}
return PaymentResult{
TransactionID: result.ID,
Status: result.Status,
}, nil
}
func (a *Activities) UpdateInventory(ctx context.Context, orderId string, items []OrderItem) error {
for _, item := range items {
if err := a.DB.DecrementInventory(ctx, item.ProductID, item.Quantity); err != nil {
return fmt.Errorf("在庫更新に失敗: %w", err)
}
}
return nil
}
3.2.2 Activity Execution
Activity Executionは、Activity Definitionに基づいて実際に実行されるインスタンスである。実行中にWorkerがクラッシュした場合、Activity Executionは初期状態から再実行される(Heartbeat Detailsによるチェックポイントが設定されていない場合)。
3.2.3 スタンドアロンActivity
単一のActivity呼び出しのみが必要なケースでは、ワークフローを経由せず、SDKクライアントを使って直接Activityを呼び出す「Standalone Activity」として実行することも可能である。
3.3 Worker(ワーカー)
3.3.1 Workerの概念
Temporalにおける「Worker」は、3つの関連する概念を指す:
- Worker Program:Temporal SDK APIを使用して開発された静的コード
- Worker Entity:特定のTask Queueをリッスンする個々のWorker
- Worker Process:Task Queueをポーリングし、タスクをデキューし、コードを実行し、結果を返すランタイムインスタンス
// TypeScript での Worker 設定
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'order-processing',
// Worker の設定
maxConcurrentActivityTaskExecutions: 100,
maxConcurrentWorkflowTaskExecutions: 50,
});
await worker.run();
}
run().catch((err) => {
console.error('Worker起動エラー:', err);
process.exit(1);
});
// Go での Worker 設定
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
// Temporal Client の作成
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
})
if err != nil {
log.Fatalln("Temporal Clientの作成に失敗:", err)
}
defer c.Close()
// Worker の作成
w := worker.New(c, "order-processing", worker.Options{
MaxConcurrentActivityExecutionSize: 100,
MaxConcurrentWorkflowTaskExecutionSize: 50,
})
// Workflow と Activity の登録
w.RegisterWorkflow(OrderProcessingWorkflow)
activities := &Activities{
EmailClient: emailClient,
PaymentClient: paymentClient,
DB: db,
}
w.RegisterActivity(activities)
// Worker の起動
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Worker起動エラー:", err)
}
}
3.3.2 Worker のステートレス性
Workerはステートレスに設計されており、1つのWorkerで数百万のオープンなWorkflow Executionを管理できる。ブロックされたワークフローは安全にメモリから削除され、後で同じまたは異なるWorkerで復元される。これにより、スケーラビリティとリソース効率の両方が実現される。
3.3.3 Worker Identity
Worker Identityはデフォルトで${process.pid}@${os.hostname()}に設定される。コンテナ環境(Docker, ECS, Kubernetes)ではプロセスIDが通常1であり、ホスト名がランダムに生成されるため、環境固有の識別子を設定することが推奨される。
const worker = await Worker.create({
// ...
identity: `worker-${process.env.POD_NAME || process.pid}@${process.env.NODE_NAME || os.hostname()}`,
});
3.4 Task Queue(タスクキュー)
3.4.1 Task Queueの役割
Task Queueは、Temporal ServiceとWorker間のタスク配信メカニズムである。Workerは指定されたTask Queueをポーリングし、処理すべきタスクを受け取る。
Task Queueには2種類のタスクが存在する:
- Workflow Task:ワークフローロジックの実行を指示するタスク
- Activity Task:アクティビティの実行を指示するタスク
3.4.2 Task Queue の設計パターン
// 機能別のTask Queue分離
// 注文処理用Worker
const orderWorker = await Worker.create({
taskQueue: 'order-processing',
activities: orderActivities,
workflowsPath: require.resolve('./workflows/order'),
});
// 通知処理用Worker
const notificationWorker = await Worker.create({
taskQueue: 'notification',
activities: notificationActivities,
workflowsPath: require.resolve('./workflows/notification'),
});
// GPUを必要とするML処理用Worker
const mlWorker = await Worker.create({
taskQueue: 'ml-inference',
activities: mlActivities,
});
Task Queueを分離することで、以下の利点が得られる:
- リソース分離:GPU必須のタスクを専用Workerにルーティング
- スケーリング独立性:各機能ごとに独立してスケーリング可能
- 優先度管理:重要度に応じた処理順序の制御
- デプロイメント分離:Workerごとに独立したデプロイが可能
4. メッセージパッシングと通信
4.1 Signal(シグナル)
Signalは、実行中のワークフローに対して非同期的にデータを送信するメカニズムである。Signal送信者はレスポンスを待たず、「fire-and-forget」のセマンティクスで動作する。
Signalの特性
- 非同期的な書き込み操作
- クライアントはレスポンスを待つ必要がない
- 高スループットの並列メッセージングに適している
- Worker の可用性に依存しない(Signalはバッファリングされる)
// TypeScript での Signal の定義と使用
import { defineSignal, setHandler, condition } from '@temporalio/workflow';
// Signal の定義
export const approvalSignal = defineSignal<[boolean]>('approval');
export const cancelSignal = defineSignal('cancel');
export async function purchaseOrderWorkflow(order: Order): Promise<OrderResult> {
let isApproved = false;
let isCancelled = false;
// Signal ハンドラの設定
setHandler(approvalSignal, (approved: boolean) => {
isApproved = approved;
});
setHandler(cancelSignal, () => {
isCancelled = true;
});
// 承認待ち(タイムアウト付き)
const approved = await condition(
() => isApproved || isCancelled,
'24 hours' // 24時間のタイムアウト
);
if (isCancelled || !approved) {
return { status: 'cancelled', orderId: order.id };
}
if (!isApproved) {
return { status: 'timeout', orderId: order.id };
}
// 承認後の処理を続行
await processOrder(order);
return { status: 'completed', orderId: order.id };
}
// Signal の送信(クライアント側)
import { Client } from '@temporalio/client';
const client = new Client();
const handle = client.workflow.getHandle('purchase-order-123');
// 承認Signalの送信
await handle.signal(approvalSignal, true);
// キャンセルSignalの送信
await handle.signal(cancelSignal);
// Go での Signal の使用
func PurchaseOrderWorkflow(ctx workflow.Context, order Order) (OrderResult, error) {
isApproved := false
isCancelled := false
// Signal チャネルの作成
approvalCh := workflow.GetSignalChannel(ctx, "approval")
cancelCh := workflow.GetSignalChannel(ctx, "cancel")
// セレクタを使ったSignal受信
selector := workflow.NewSelector(ctx)
selector.AddReceive(approvalCh, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &isApproved)
})
selector.AddReceive(cancelCh, func(c workflow.ReceiveChannel, more bool) {
isCancelled = true
c.Receive(ctx, nil)
})
// タイムアウト付きで待機
timerCtx, cancelTimer := workflow.WithCancel(ctx)
selector.AddReceive(workflow.NewTimer(timerCtx, 24*time.Hour), func(f workflow.ReceiveChannel, more bool) {})
selector.Select(ctx)
cancelTimer()
if isCancelled {
return OrderResult{Status: "cancelled", OrderID: order.ID}, nil
}
if !isApproved {
return OrderResult{Status: "timeout", OrderID: order.ID}, nil
}
// 承認後の処理
err := workflow.ExecuteActivity(ctx, ProcessOrder, order).Get(ctx, nil)
if err != nil {
return OrderResult{}, err
}
return OrderResult{Status: "completed", OrderID: order.ID}, nil
}
4.2 Query(クエリ)
Queryは、ワークフローの現在の状態を読み取るための同期的な読み取り専用操作である。Event Historyにエントリを追加しないため、効率的である。
// TypeScript での Query の定義
import { defineQuery, setHandler } from '@temporalio/workflow';
export const getOrderStatusQuery = defineQuery<OrderStatus>('getOrderStatus');
export const getProgressQuery = defineQuery<number>('getProgress');
export async function orderWorkflow(order: Order): Promise<OrderResult> {
let status: OrderStatus = 'pending';
let progress = 0;
// Query ハンドラの設定
setHandler(getOrderStatusQuery, () => status);
setHandler(getProgressQuery, () => progress);
status = 'processing';
progress = 25;
await validateOrder(order);
progress = 50;
await processPayment(order);
progress = 75;
await shipOrder(order);
status = 'completed';
progress = 100;
return { status, orderId: order.id };
}
// Query の実行(クライアント側)
const handle = client.workflow.getHandle('order-12345');
const status = await handle.query(getOrderStatusQuery);
const progress = await handle.query(getProgressQuery);
console.log(`注文状態: ${status}, 進捗: ${progress}%`);
4.3 Update(アップデート)
Updateは、Temporal 1.21以降で導入された同期的な書き込み操作である。Signalとは異なり、送信者は処理の完了を待ち、結果やエラーを受け取ることができる。
Signal vs Update の比較
| 特性 | Signal | Update |
|---|---|---|
| 通信パターン | 非同期(Fire & Forget) | 同期(結果を待つ) |
| レスポンス | なし | あり |
| バリデーション | なし | あり(受け入れ前にバリデーション可能) |
| Event History | イベント追加あり | イベント追加あり |
| スループット | 高 | 中 |
| 使用場面 | 通知、トリガー | 状態変更と確認が必要な操作 |
// TypeScript での Update の定義
import { defineUpdate, setHandler } from '@temporalio/workflow';
export const addItemUpdate = defineUpdate<CartResult, [CartItem]>('addItem');
export async function shoppingCartWorkflow(): Promise<Cart> {
const cart: Cart = { items: [], total: 0 };
setHandler(addItemUpdate, async (item: CartItem) => {
// バリデーション
if (item.quantity <= 0) {
throw new Error('数量は1以上である必要があります');
}
// カートへの追加
cart.items.push(item);
cart.total += item.price * item.quantity;
return {
itemCount: cart.items.length,
total: cart.total,
};
});
// カートのタイムアウトまで待機
await condition(() => false, '30 minutes');
return cart;
}
4.4 メッセージタイプの選択指針
┌─────────────────┐
│ 操作の種類は? │
└─────────┬───────┘
┌─────────┴───────┐
┌─────┤ ├─────┐
│ │ │ │
読み取り 読み書き 書き込み
│ │ │
┌────┴──┐ │ ┌─────┴─────┐
│ Query │ │ 結果が │ │ 結果が
└───────┘ │ 必要? │ │ 不要?
│ │ │
┌────┴──┐ ┌───┴───┐ ┌───┴────┐
│Update │ │Update │ │ Signal │
└───────┘ └───────┘ └────────┘
5. Namespace(名前空間)
5.1 Namespaceの概念
Namespaceは、Temporalプラットフォームにおける基本的な分離単位である。ワークフロー実行とTask Queueはすべて特定のNamespace内に存在する。
Namespaceの主要機能
- Workflow IDの一意性保証:同一Namespace内でWorkflow IDの一意性が保証される(異なるNamespace間では同じIDが使用可能)
- リソース分離:あるNamespaceの高負荷が他のNamespaceに影響しない
- 設定の独立性:リテンション期間やアーカイブ先をNamespaceごとに設定可能
- セキュリティ境界:アクセス制御をNamespace単位で管理
5.2 Namespace の設定
# Namespace の作成(Temporal CLI)
temporal operator namespace create \
--namespace order-service \
--retention 30d \
--description "注文処理サービス用Namespace"
# Namespace の情報表示
temporal operator namespace describe --namespace order-service
# Namespace の更新
temporal operator namespace update \
--namespace order-service \
--retention 60d
5.3 マルチテナント設計
単一のNamespaceは依然としてマルチテナントである。複数のアプリケーションやチームがNamespaceを共有できるが、Workflow IDとTask Queueの命名規則を調整する必要がある。
推奨される命名パターン:
# Workflow ID の命名規則
{service-name}/{entity-type}/{entity-id}
例: order-service/order/12345
# Task Queue の命名規則
{service-name}-{function}
例: order-service-processing
5.4 Temporal Cloud での追加機能
Temporal Cloudでは、Namespaceに以下の追加機能が提供される:
- mTLS認証:相互TLS認証によるセキュアな通信
- ロールベースアクセス制御(RBAC):細粒度の権限管理
- 高可用性レプリケーション:マルチリージョンレプリケーション
- Namespace タグ:メタデータによる管理
- コードの変更なし:セルフホストからCloudへの移行時にコード変更不要
6. リトライポリシーとタイムアウト
6.1 Retry Policy(リトライポリシー)
Retry Policyは、失敗時にTemporalがどのように・いつ再試行するかを定義する設定の集合である。Activityはデフォルトでリトライポリシーが有効であるが、Workflowはデフォルトではリトライしない。
6.1.1 デフォルトのリトライ設定(Activity)
| パラメータ | デフォルト値 | 説明 |
|---|---|---|
| Initial Interval | 1秒 | 最初のリトライまでの待機時間 |
| Backoff Coefficient | 2.0 | リトライ間隔の増加率 |
| Maximum Interval | 100秒 | リトライ間隔の上限 |
| Maximum Attempts | 無制限 | 最大リトライ回数 |
| Non-Retryable Errors | なし | リトライしないエラータイプ |
6.1.2 リトライポリシーの設定例
// TypeScript でのリトライポリシー設定
import { proxyActivities, ApplicationFailure } from '@temporalio/workflow';
// 厳格なリトライポリシー(決済処理向け)
const paymentActivities = proxyActivities({
startToCloseTimeout: '30 seconds',
retry: {
initialInterval: '1 second',
backoffCoefficient: 2,
maximumInterval: '30 seconds',
maximumAttempts: 5,
nonRetryableErrorTypes: [
'InvalidCardError',
'InsufficientFundsError',
],
},
});
// 寛容なリトライポリシー(通知処理向け)
const notificationActivities = proxyActivities({
startToCloseTimeout: '10 seconds',
retry: {
initialInterval: '2 seconds',
backoffCoefficient: 3,
maximumInterval: '120 seconds',
maximumAttempts: 10,
},
});
// Go でのリトライポリシー設定
activityOptions := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 30 * time.Second,
MaximumAttempts: 5,
NonRetryableErrorTypes: []string{
"InvalidCardError",
"InsufficientFundsError",
},
},
}
6.1.3 Exponential Backoff の動作
リトライ間隔は指数関数的に増加する:
リトライ 1: 1秒後 (Initial Interval)
リトライ 2: 2秒後 (1 × 2^1)
リトライ 3: 4秒後 (1 × 2^2)
リトライ 4: 8秒後 (1 × 2^3)
リトライ 5: 16秒後 (1 × 2^4)
リトライ 6: 32秒後 → 30秒後 (Maximum Intervalで制限)
6.1.4 Non-Retryable Errors
バリデーションエラーや入力データの不正など、リトライしても解決しない永続的な障害に対しては、Non-Retryable Errorを指定して即座に失敗をサーフェスする:
// TypeScript での Non-Retryable Error
import { ApplicationFailure } from '@temporalio/common';
export async function processPayment(orderId: string): Promise<PaymentResult> {
const card = await getCardInfo(orderId);
if (!card) {
// リトライ不要なエラーとしてスロー
throw ApplicationFailure.nonRetryable(
'カード情報が見つかりません',
'InvalidCardError',
{ orderId }
);
}
// 処理続行...
}
6.2 タイムアウト
Temporalには4種類のタイムアウトが定義されている:
6.2.1 タイムアウトの種類
Timeline:
─────────────────────────────────────────────────────────────►
│ │
│ Schedule-to-Close Timeout │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Schedule-to-Start │ Start-to-Close │ │
│ │ Timeout │ Timeout │ │
│ │ ┌──────────┐ │ ┌──────────────────┐ │ │
│ │ │キュー待ち │ │ │実行中 │ │ │
│ │ └──────────┘ │ └──────────────────┘ │ │
│ │ Scheduled Started │ Completed │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Heartbeat Timeout │
│ (実行中のActivity内で定期的に報告が必要) │
| タイムアウト | 説明 | 推奨使用場面 |
|---|---|---|
| Schedule-to-Close | タスクのスケジュールから完了までの総時間 | Activity全体の最大実行時間を制限 |
| Start-to-Close | タスクの開始から完了までの時間 | 個々の実行試行の時間制限 |
| Schedule-to-Start | スケジュールからWorkerがタスクを取得するまでの時間 | Worker枯渇の検出 |
| Heartbeat | Activity実行中の生存報告間隔 | 長時間実行Activityの監視 |
// タイムアウトの包括的な設定例
const activities = proxyActivities({
// 各試行の最大実行時間
startToCloseTimeout: '30 seconds',
// 全リトライを含む最大時間
scheduleToCloseTimeout: '5 minutes',
// Workerがタスクを取得するまでの最大待機時間
scheduleToStartTimeout: '10 seconds',
// 長時間Activityのハートビート間隔
heartbeatTimeout: '10 seconds',
});
6.3 Heartbeat(ハートビート)
長時間実行されるActivityでは、Heartbeatを使用して進捗を定期的に報告する。Workerがクラッシュした場合、ハートビートに付加されたチェックポイント情報を使って、中断箇所から再開できる。
// TypeScript での Heartbeat の使用
import { heartbeat, Context } from '@temporalio/activity';
export async function processLargeFile(filePath: string): Promise<void> {
const lines = await readLines(filePath);
const lastCheckpoint = Context.current().info.heartbeatDetails;
// チェックポイントから再開
const startIndex = lastCheckpoint ? lastCheckpoint as number : 0;
for (let i = startIndex; i < lines.length; i++) {
await processLine(lines[i]);
// 定期的にハートビートを送信(チェックポイント情報付き)
heartbeat(i + 1);
}
}
// Go での Heartbeat の使用
func ProcessLargeFile(ctx context.Context, filePath string) error {
lines, err := readLines(filePath)
if err != nil {
return err
}
// 前回のチェックポイントを取得
startIndex := 0
if activity.HasHeartbeatDetails(ctx) {
var checkpoint int
if err := activity.GetHeartbeatDetails(ctx, &checkpoint); err == nil {
startIndex = checkpoint
}
}
for i := startIndex; i < len(lines); i++ {
if err := processLine(lines[i]); err != nil {
return err
}
// ハートビート送信
activity.RecordHeartbeat(ctx, i+1)
}
return nil
}
7. 高度なワークフローパターン
7.1 Child Workflow(子ワークフロー)
Child Workflowは、親ワークフローから起動される別のワークフロー実行である。複雑なビジネスロジックを分割し、モジュール化するために使用される。
// TypeScript での Child Workflow
import { executeChild, ParentClosePolicy } from '@temporalio/workflow';
export async function orderFulfillmentWorkflow(order: Order): Promise<void> {
// 決済処理を子ワークフローとして実行
const paymentResult = await executeChild(paymentWorkflow, {
args: [order.paymentInfo],
workflowId: `payment-${order.id}`,
parentClosePolicy: ParentClosePolicy.TERMINATE,
});
// 配送処理を子ワークフローとして実行
await executeChild(shippingWorkflow, {
args: [order.shippingInfo, paymentResult.transactionId],
workflowId: `shipping-${order.id}`,
parentClosePolicy: ParentClosePolicy.TERMINATE,
});
// 複数の通知を並列で実行
await Promise.all([
executeChild(emailNotificationWorkflow, {
args: [order.customerEmail, order.id],
workflowId: `email-${order.id}`,
}),
executeChild(smsNotificationWorkflow, {
args: [order.customerPhone, order.id],
workflowId: `sms-${order.id}`,
}),
]);
}
Parent Close Policy
親ワークフローが終了した際の子ワークフローの動作を制御する:
| ポリシー | 動作 |
|---|---|
| TERMINATE | 親の終了時に子を強制終了 |
| ABANDON | 親の終了後も子は独立して実行を継続 |
| REQUEST_CANCEL | 親の終了時に子にキャンセルリクエストを送信 |
7.2 Continue-As-New
Continue-As-Newは、Event Historyの肥大化を防ぐためにワークフローを新しい実行として再起動するパターンである。長時間実行されるワークフロー(例:数ヶ月〜数年にわたるサブスクリプション管理)で特に重要となる。
// TypeScript での Continue-As-New
import { continueAsNew, sleep } from '@temporalio/workflow';
export async function subscriptionWorkflow(
customerId: string,
billingCycle: number = 0
): Promise<void> {
// 月次の請求処理
await chargeMontlyFee(customerId);
await sendBillingNotification(customerId, billingCycle);
// 1ヶ月間待機
await sleep('30 days');
// 新しい実行として再起動(Event Historyをリセット)
await continueAsNew<typeof subscriptionWorkflow>(
customerId,
billingCycle + 1
);
}
7.3 Timer と Sleep
Temporalのタイマーは通常のsleepとは異なり、永続的である。Workerがクラッシュしても、指定された時間後に正確に再開される。
// TypeScript での永続的タイマー
import { sleep } from '@temporalio/workflow';
export async function trialWorkflow(userId: string): Promise<void> {
// トライアル開始通知
await sendWelcomeEmail(userId);
// 7日後にリマインダー(Workerクラッシュしても確実に実行)
await sleep('7 days');
await sendTrialReminderEmail(userId);
// さらに7日後にトライアル終了
await sleep('7 days');
await endTrial(userId);
await sendTrialEndedEmail(userId);
}
7.4 Saga パターン
Temporalは分散トランザクションのSagaパターンを自然に実装できる:
// TypeScript での Saga パターン
export async function bookTripWorkflow(trip: TripBooking): Promise<BookingResult> {
const compensations: Array<() => Promise<void>> = [];
try {
// 1. フライト予約
const flightBooking = await bookFlight(trip.flight);
compensations.push(() => cancelFlight(flightBooking.id));
// 2. ホテル予約
const hotelBooking = await bookHotel(trip.hotel);
compensations.push(() => cancelHotel(hotelBooking.id));
// 3. レンタカー予約
const carBooking = await bookCar(trip.car);
compensations.push(() => cancelCar(carBooking.id));
return {
status: 'confirmed',
flightId: flightBooking.id,
hotelId: hotelBooking.id,
carId: carBooking.id,
};
} catch (error) {
// 補償トランザクションの実行(逆順)
for (const compensate of compensations.reverse()) {
try {
await compensate();
} catch (compensationError) {
// 補償失敗のログ記録
console.error('補償トランザクション失敗:', compensationError);
}
}
throw error;
}
}
7.5 Schedule(スケジュール)
Temporalのスケジュール機能は、従来のcronジョブの代替として使用できる。耐障害性と可観測性を備えたスケジュール実行を提供する。
# Temporal CLI でのスケジュール作成
temporal schedule create \
--schedule-id "daily-report" \
--cron "0 9 * * *" \
--workflow-type "generateDailyReport" \
--task-queue "reporting" \
--namespace "analytics"
# スケジュールの一覧表示
temporal schedule list --namespace analytics
# スケジュールの一時停止
temporal schedule toggle \
--schedule-id "daily-report" \
--pause \
--reason "メンテナンス中"
8. Visibility と監視
8.1 Visibility の概要
Temporalの Visibility サブシステムは、ワークフロー実行の検索・フィルタリング・監視機能を提供する。オペレータはList Filter(カスタムSQL風クエリ言語)を使って、Temporal Service内のワークフロー実行を詳細に検索できる。
8.2 Search Attributes(検索属性)
Search Attributesには、システムが自動的に設定するデフォルト属性と、ユーザーが定義するカスタム属性がある。
デフォルトの Search Attributes
| 属性名 | 型 | 説明 |
|---|---|---|
| WorkflowType | Keyword | ワークフローの種類 |
| WorkflowId | Keyword | ワークフローID |
| RunId | Keyword | 実行ID |
| ExecutionStatus | Keyword | 実行状態(Running, Completed, Failed等) |
| StartTime | Datetime | 開始時刻 |
| CloseTime | Datetime | 終了時刻 |
| ExecutionDuration | Int | 実行時間(ナノ秒) |
| TaskQueue | Keyword | タスクキュー名 |
カスタム Search Attributes の定義
# カスタム検索属性の追加
temporal operator search-attribute create \
--namespace default \
--name CustomerId \
--type Keyword
temporal operator search-attribute create \
--namespace default \
--name OrderTotal \
--type Double
temporal operator search-attribute create \
--namespace default \
--name IsVIP \
--type Bool
temporal operator search-attribute create \
--namespace default \
--name Region \
--type Keyword
// TypeScript での Search Attribute の使用
import { upsertSearchAttributes } from '@temporalio/workflow';
export async function orderWorkflow(order: Order): Promise<OrderResult> {
// 検索属性の設定
upsertSearchAttributes({
CustomerId: [order.customerId],
OrderTotal: [order.total],
IsVIP: [order.isVipCustomer],
Region: [order.region],
});
// ワークフローロジック...
await processOrder(order);
// 状態変更時に検索属性を更新
upsertSearchAttributes({
OrderStatus: ['shipped'],
});
return { status: 'completed' };
}
8.3 List Filter クエリ
-- 実行中の注文処理ワークフロー
WorkflowType = 'orderWorkflow' AND ExecutionStatus = 'Running'
-- VIP顧客の高額注文
IsVIP = true AND OrderTotal > 10000 ORDER BY StartTime DESC
-- 特定期間に失敗したワークフロー
ExecutionStatus = 'Failed'
AND StartTime > '2024-01-01T00:00:00Z'
AND StartTime < '2024-02-01T00:00:00Z'
-- 特定リージョンの実行中ワークフロー数
WorkflowType = 'orderWorkflow'
AND Region = 'ap-northeast-1'
AND ExecutionStatus = 'Running'
-- ワークフローのカウント(GROUP BY付き)
-- CLIコマンド
temporal workflow count -q "WorkflowType='orderWorkflow' GROUP BY ExecutionStatus"
8.4 Temporal Web UI
Temporal Web UIは、ワークフロー実行の可視化と管理のためのWebインターフェースを提供する:
- ワークフロー実行の一覧表示と検索
- Event Historyの詳細表示
- ワークフローのSignal送信、キャンセル、終了
- Task Queueの監視
- Namespaceの管理
9. セキュリティ
9.1 認証と認可
mTLS(相互TLS)
# Temporal Server の TLS 設定例
tls:
internode:
server:
certFile: /certs/internode/server.crt
keyFile: /certs/internode/server.key
requireClientAuth: true
clientCaFiles:
- /certs/internode/ca.crt
client:
serverName: temporal-internode
rootCaFiles:
- /certs/internode/ca.crt
frontend:
server:
certFile: /certs/frontend/server.crt
keyFile: /certs/frontend/server.key
requireClientAuth: true
clientCaFiles:
- /certs/frontend/ca.crt
client:
serverName: temporal-frontend
rootCaFiles:
- /certs/frontend/ca.crt
Authorizer プラグイン
Temporalは、カスタムAuthorizerプラグインを通じて細粒度のアクセス制御を実装できる:
// Go での Authorizer 実装例
type MyAuthorizer struct{}
func (a *MyAuthorizer) Authorize(
ctx context.Context,
caller *authorization.Claims,
target *authorization.CallTarget,
) (authorization.Result, error) {
// Namespace単位のアクセス制御
if target.Namespace == "production" && !isProductionUser(caller) {
return authorization.Result{Decision: authorization.DecisionDeny}, nil
}
return authorization.Result{Decision: authorization.DecisionAllow}, nil
}
9.2 データの暗号化
Payload Codec
Temporalは、ワークフローのペイロード(入出力データ)をカスタムCodecで暗号化・復号化する機能を提供する:
// TypeScript での Payload Codec の実装
import { PayloadCodec } from '@temporalio/common';
import { encrypt, decrypt } from './crypto';
class EncryptionCodec implements PayloadCodec {
async encode(payloads: Payload[]): Promise<Payload[]> {
return Promise.all(
payloads.map(async (payload) => ({
metadata: {
'encoding': Buffer.from('binary/encrypted'),
'encryption-key-id': Buffer.from('my-key-v1'),
},
data: await encrypt(payload.data!),
}))
);
}
async decode(payloads: Payload[]): Promise<Payload[]> {
return Promise.all(
payloads.map(async (payload) => {
if (payload.metadata?.['encoding']?.toString() !== 'binary/encrypted') {
return payload;
}
return {
metadata: { 'encoding': Buffer.from('json/plain') },
data: await decrypt(payload.data!),
};
})
);
}
}
10. デプロイメントと運用
10.1 デプロイメントオプション
Temporal Cloud(マネージドサービス)
Temporal Cloudは、Temporal Technologies社が提供するフルマネージドサービスである:
- インフラ管理不要
- 自動スケーリング
- マルチリージョンレプリケーション
- SLAに基づく可用性保証
- 組み込みのモニタリングとアラート
// Temporal Cloud への接続
import { Client, Connection } from '@temporalio/client';
import fs from 'fs';
const connection = await Connection.connect({
address: 'your-namespace.tmprl.cloud:7233',
tls: {
clientCertPair: {
crt: fs.readFileSync('/certs/client.pem'),
key: fs.readFileSync('/certs/client.key'),
},
},
});
const client = new Client({
connection,
namespace: 'your-namespace.account-id',
});
セルフホスト
# docker-compose.yml での Temporal セルフホスト構成
version: '3.8'
services:
postgresql:
image: postgres:15
environment:
POSTGRES_USER: temporal
POSTGRES_PASSWORD: temporal
POSTGRES_DB: temporal
ports:
- "5432:5432"
volumes:
- postgresql_data:/var/lib/postgresql/data
elasticsearch:
image: elasticsearch:7.17.16
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- ES_JAVA_OPTS=-Xms256m -Xmx256m
ports:
- "9200:9200"
temporal:
image: temporalio/auto-setup:latest
depends_on:
- postgresql
- elasticsearch
environment:
- DB=postgresql
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_SEEDS=postgresql
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
- ENABLE_ES=true
- ES_SEEDS=elasticsearch
- ES_VERSION=v7
ports:
- "7233:7233"
temporal-ui:
image: temporalio/ui:latest
depends_on:
- temporal
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CORS_ORIGINS=http://localhost:3000
ports:
- "8080:8080"
temporal-admin-tools:
image: temporalio/admin-tools:latest
depends_on:
- temporal
environment:
- TEMPORAL_ADDRESS=temporal:7233
stdin_open: true
tty: true
volumes:
postgresql_data:
Kubernetes でのデプロイ
# Helm Chart を使用した Kubernetes デプロイ
# values.yaml
server:
replicaCount: 3
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 4Gi
config:
persistence:
default:
driver: sql
sql:
driver: postgres
host: temporal-postgresql
port: 5432
database: temporal
user: temporal
password: temporal
visibility:
driver: sql
sql:
driver: postgres
host: temporal-postgresql
port: 5432
database: temporal_visibility
user: temporal
password: temporal
frontend:
replicaCount: 2
history:
replicaCount: 3
matching:
replicaCount: 2
worker:
replicaCount: 1
web:
replicaCount: 1
elasticsearch:
enabled: true
replicas: 3
postgresql:
enabled: true
# Helm Chart のインストール
helm repo add temporal https://charts.temporal.io
helm repo update
helm install temporal temporal/temporal \
--namespace temporal \
--create-namespace \
-f values.yaml
10.2 本番運用のベストプラクティス
Worker のデプロイ
# Worker の Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-worker
spec:
replicas: 3
selector:
matchLabels:
app: order-worker
template:
metadata:
labels:
app: order-worker
spec:
containers:
- name: worker
image: myapp/order-worker:latest
resources:
requests:
cpu: 250m
memory: 512Mi
limits:
cpu: 1
memory: 1Gi
env:
- name: TEMPORAL_ADDRESS
value: "temporal-frontend:7233"
- name: TASK_QUEUE
value: "order-processing"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 15
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
動的設定(Dynamic Configuration)
# dynamicconfig/production.yaml
# フロントエンドの設定
frontend.rps:
- value: 2000
constraints: {}
frontend.namespaceCount:
- value: 100
constraints: {}
# ヒストリーの設定
history.maximumSignalsPerExecution:
- value: 10000
constraints: {}
history.defaultActivityRetryPolicy:
- value:
InitialIntervalInSeconds: 1
BackoffCoefficient: 2.0
MaximumIntervalInSeconds: 100
MaximumAttempts: 0
constraints: {}
# マッチングの設定
matching.numTaskqueueReadPartitions:
- value: 5
constraints: {}
matching.numTaskqueueWritePartitions:
- value: 5
constraints: {}
# ワークフロー実行のリテンション
system.retentionDays:
- value: 30
constraints:
namespace: production
- value: 7
constraints:
namespace: development
11. テスト
11.1 ワークフローテスト
Temporalは、ワークフローとActivityの単体テスト・統合テストのための専用テストフレームワークを提供している。
// TypeScript でのワークフローテスト
import { TestWorkflowEnvironment } from '@temporalio/testing';
import { Worker } from '@temporalio/worker';
import { orderProcessingWorkflow } from './workflows';
import * as activities from './activities';
describe('OrderProcessingWorkflow', () => {
let testEnv: TestWorkflowEnvironment;
beforeAll(async () => {
testEnv = await TestWorkflowEnvironment.createLocal();
});
afterAll(async () => {
await testEnv?.teardown();
});
it('正常な注文処理が完了すること', async () => {
const { client, nativeConnection } = testEnv;
const worker = await Worker.create({
connection: nativeConnection,
taskQueue: 'test-order',
workflowsPath: require.resolve('./workflows'),
activities: {
...activities,
// Activityのモック
sendEmail: async () => {},
processPayment: async () => ({ transactionId: 'test-123' }),
updateInventory: async () => {},
},
});
const result = await worker.runUntil(
client.workflow.execute(orderProcessingWorkflow, {
taskQueue: 'test-order',
workflowId: 'test-order-1',
args: ['order-1', 'customer-1', [{ productId: 'prod-1', quantity: 1 }]],
})
);
expect(result.status).toBe('completed');
expect(result.paymentId).toBe('test-123');
});
it('時間経過のテスト(タイマースキップ)', async () => {
const { client, nativeConnection } = testEnv;
const worker = await Worker.create({
connection: nativeConnection,
taskQueue: 'test-timer',
workflowsPath: require.resolve('./workflows'),
activities,
});
// 時間をスキップしてテストを高速化
const handle = await client.workflow.start(trialWorkflow, {
taskQueue: 'test-timer',
workflowId: 'test-trial-1',
args: ['user-1'],
});
// 7日分の時間をスキップ
await testEnv.sleep('7 days');
// ワークフローの状態を確認
const status = await handle.query(getStatusQuery);
expect(status).toBe('reminder_sent');
});
});
// Go でのワークフローテスト
package workflows_test
import (
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.temporal.io/sdk/testsuite"
)
type OrderWorkflowTestSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
env *testsuite.TestWorkflowEnvironment
}
func (s *OrderWorkflowTestSuite) SetupTest() {
s.env = s.NewTestWorkflowEnvironment()
}
func (s *OrderWorkflowTestSuite) TestSuccessfulOrder() {
// Activity のモック設定
s.env.OnActivity(UpdateInventory, mock.Anything, "order-1", mock.Anything).
Return(nil)
s.env.OnActivity(ProcessPayment, mock.Anything, "order-1", "customer-1").
Return(PaymentResult{TransactionID: "test-123"}, nil)
s.env.OnActivity(SendEmail, mock.Anything, "customer-1", mock.Anything).
Return(nil)
// ワークフローの実行
s.env.ExecuteWorkflow(OrderProcessingWorkflow, Order{
ID: "order-1",
CustomerID: "customer-1",
Items: []OrderItem{{ProductID: "prod-1", Quantity: 1}},
})
s.True(s.env.IsWorkflowCompleted())
s.NoError(s.env.GetWorkflowError())
var result OrderResult
s.NoError(s.env.GetWorkflowResult(&result))
s.Equal("completed", result.Status)
s.Equal("test-123", result.PaymentID)
}
func TestOrderWorkflow(t *testing.T) {
suite.Run(t, new(OrderWorkflowTestSuite))
}
12. ワークフローのバージョニング
12.1 決定的制約とバージョニングの必要性
ワークフローコードを変更する際、実行中のワークフローはリプレイ時に変更前のコードパスを再現する必要がある。そのため、ワークフローの変更にはバージョニングが不可欠である。
12.2 Worker Versioning
// TypeScript でのワークフロー バージョニング
import { patched } from '@temporalio/workflow';
export async function myWorkflow(): Promise<string> {
if (patched('v2-new-step')) {
// v2: 新しいステップを追加
await newActivity();
await existingActivity();
} else {
// v1: 元のロジック
await existingActivity();
}
return 'done';
}
// Go でのワークフロー バージョニング
func MyWorkflow(ctx workflow.Context) (string, error) {
version := workflow.GetVersion(ctx, "v2-change", workflow.DefaultVersion, 1)
if version == 1 {
// v2: 新しいロジック
err := workflow.ExecuteActivity(ctx, NewActivity).Get(ctx, nil)
if err != nil {
return "", err
}
}
err := workflow.ExecuteActivity(ctx, ExistingActivity).Get(ctx, nil)
if err != nil {
return "", err
}
return "done", nil
}
13. Temporal Nexus
13.1 概要
Temporal Nexusは、Temporalの比較的新しい機能で、異なるNamespace間やチーム間でのサービス連携を実現するための仕組みである。マイクロサービスアーキテクチャにおいて、サービス間の境界を明確に定義しつつ、信頼性の高い連携を可能にする。
Nexusにより、以下のような連携パターンが実現される:
- 異なるNamespace間のワークフロー呼び出し
- チーム間のサービスインターフェース定義
- 契約ベースのサービス連携
14. SDKサポート
14.1 対応言語
Temporalは以下のプログラミング言語のSDKを提供している:
| SDK | 成熟度 | 主な用途 |
|---|---|---|
| Go | GA(本番利用可) | バックエンドサービス、インフラ自動化 |
| Java | GA(本番利用可) | エンタープライズアプリケーション |
| TypeScript | GA(本番利用可) | Node.jsベースのサービス |
| Python | GA(本番利用可) | データパイプライン、ML/AI |
| PHP | GA(本番利用可) | Webアプリケーション |
| .NET | GA(本番利用可) | エンタープライズ .NET アプリケーション |
| Ruby | 開発中 | Webアプリケーション |
14.2 SDK の共通アーキテクチャ
すべてのTemporal SDKは、Rust/Core SDKをベースとした共通のアーキテクチャを採用している(Go SDKを除く)。これにより、各言語SDKは以下の共通機能を一貫して提供する:
- Temporal Serviceとの通信(gRPC)
- Event Historyのリプレイ
- Workflow/Activityの実行管理
- Retry Policyの適用
- Heartbeatの管理
15. まとめ
15.1 Temporalの核心的価値
Temporalは、分散アプリケーション開発における以下の根本的な課題を解決するプラットフォームである:
- Durable Execution:Event Sourcingに基づく永続的実行により、あらゆる障害からの自動復旧を実現
- 開発者体験の向上:複雑な障害処理ロジックをインフラレベルで抽象化し、ビジネスロジックへの集中を可能に
- スケーラビリティ:ステートレスなWorkerアーキテクチャとTask Queueによる分散処理で、大規模ワークロードに対応
- 可観測性:VisibilityサブシステムとSearch Attributeによるワークフロー実行の詳細な監視
- 多言語サポート:Go、Java、TypeScript、Python、PHP、.NETなど主要言語のSDKを提供
15.2 導入判断の指針
Temporalは以下のケースで特に効果的である:
- 長時間実行プロセスを確実に完了させる必要がある場合
- マイクロサービス間の複雑なオーケストレーションが必要な場合
- 分散トランザクション(Sagaパターン)の実装が必要な場合
- 既存のキューシステムやステートマシンの複雑さが管理困難になっている場合
- ビジネスプロセスの可視性と監視が重要な場合
一方、以下のケースではオーバースペックとなる可能性がある:
- 単純なリクエスト/レスポンスパターンのみのアプリケーション
- 非常に短時間で完了する処理のみの場合
- インフラの管理コストが許容できない小規模プロジェクト
Temporalは、現代の分散システム開発において、信頼性と開発効率の両方を大幅に向上させるプラットフォームである。そのEvent Sourcingベースのアーキテクチャと洗練されたSDKにより、複雑なビジネスプロセスを直感的なコードで表現しながら、本番環境レベルの耐障害性を実現できる。