Ray

Ray 完全ガイド — 分散コンピューティングと機械学習フレームワークの全容


1. はじめに

1.1 Ray とは何か

Ray は、UC Berkeley の RISELab(現 Anyscale)が開発した汎用分散コンピューティングフレームワークである。Python や Java のアプリケーションを、最小限のコード変更で分散環境にスケールさせることを目的として設計されている。2017 年にオープンソースとして公開されて以来、機械学習(ML)ワークロードを中心に急速に採用が広がり、OpenAI、Uber、Netflix、Spotify、Ant Group、LinkedIn など多くの企業で本番環境に導入されている。

Ray の核心的な思想は「開発者がインフラの複雑さを意識することなく、分散アプリケーションを構築できるようにする」ことにある。従来、分散システムの構築には、メッセージパッシング、プロセス管理、フォールトトレランス、リソーススケジューリングなどの複雑な基盤技術への深い理解が求められた。Ray はこれらの複雑さを抽象化し、開発者がビジネスロジックに集中できる環境を提供する。

1.2 Ray が解決する課題

現代の ML ワークフローは、以下のような多様なコンピューティングパターンを必要とする:

ワークフロー段階コンピューティングパターン典型的なツール
データ前処理バッチ並列処理Spark, Dask
モデル学習データ並列・モデル並列Horovod, PyTorch DDP
ハイパーパラメータ調整非同期並列探索Optuna, HyperOpt
モデルサービングオンラインリクエスト処理TFServing, TorchServe
強化学習シミュレーション並列化専用フレームワーク

従来、これらの段階ごとに異なるフレームワークを使い分ける必要があり、以下のような問題が生じていた:

  1. ツール間のデータ移動コスト: 各ツール間でデータをシリアライズ・デシリアライズする必要がある
  2. 運用複雑性の増大: 複数のクラスタを管理し、それぞれ異なる設定やモニタリングが必要
  3. 開発者体験の断片化: 各ツールの API やプログラミングモデルが異なるため、学習コストが高い
  4. リソースの非効率な利用: 各ツールが独自にリソースを確保するため、全体最適化が困難

Ray はこれらの問題を単一の統合プラットフォームで解決する。すべてのワークロードが同一のクラスタ上で実行され、共通のリソースマネージャーにより効率的にスケジューリングされる。

1.3 Ray のエコシステム概観

Ray のエコシステムは、以下の階層構造で構成される:

┌─────────────────────────────────────────────────────┐
│              Ray AIR (AI Runtime)                     │
│  ┌──────────┬──────────┬──────────┬──────────┐       │
│  │ Ray Data │Ray Train │ Ray Tune │Ray Serve │       │
│  └──────────┴──────────┴──────────┴──────────┘       │
│  ┌──────────┬──────────┐                              │
│  │  RLlib   │Workflows │                              │
│  └──────────┴──────────┘                              │
├─────────────────────────────────────────────────────┤
│              Ray Core                                 │
│  ┌──────────┬──────────┬──────────┐                   │
│  │  Tasks   │  Actors  │ Objects  │                   │
│  └──────────┴──────────┴──────────┘                   │
├─────────────────────────────────────────────────────┤
│           Ray Cluster                                 │
│  ┌──────────┬──────────┬──────────┐                   │
│  │  GCS     │Raylet    │Scheduler │                   │
│  └──────────┴──────────┴──────────┘                   │
└─────────────────────────────────────────────────────┘
  • Ray Core: タスク、アクター、オブジェクトストアの 3 つのプリミティブを提供する基盤層
  • Ray ライブラリ群: ML ワークフローの各段階に特化した高レベルライブラリ
  • Ray AIR(AI Runtime): ライブラリ群を統合し、エンドツーエンドの ML パイプラインを実現する統合レイヤー

1.4 バージョンと互換性

本ドキュメントは Ray 2.x 系(特に 2.9 〜 2.44)を基準に記述している。Ray 2.x は Ray 1.x から大幅なアーキテクチャ改善が行われており、主な変更点は以下のとおりである:

  • Global Control Store (GCS) のフォールトトレランス: GCS の障害時にもクラスタが継続動作可能に
  • Ray AIR の導入: 各ライブラリを統合する一貫した API レイヤー
  • 自動スケーリングの改善: Kubernetes (KubeRay) との統合が大幅に強化
  • 型ヒント・静的解析対応の向上: Python の型システムとの親和性が改善

2. Ray Core アーキテクチャ

2.1 システムアーキテクチャ概要

Ray クラスタは、ヘッドノードと 1 つ以上のワーカーノードから構成される。各ノードは以下のプロセス群を実行する:

┌─────────────── Head Node ───────────────┐
│  ┌────────────────────────────────────┐  │
│  │     Global Control Service (GCS)   │  │
│  │  - Actor Registry                  │  │
│  │  - Placement Group Table           │  │
│  │  - Node Manager                    │  │
│  │  - Resource Manager                │  │
│  └────────────────────────────────────┘  │
│  ┌────────────────────────────────────┐  │
│  │           Raylet                   │  │
│  │  ┌─────────────┬────────────────┐  │  │
│  │  │ Node Manager│ Object Manager │  │  │
│  │  └─────────────┴────────────────┘  │  │
│  │  ┌──────────────────────────────┐  │  │
│  │  │    Object Store (Plasma)     │  │  │
│  │  └──────────────────────────────┘  │  │
│  └────────────────────────────────────┘  │
│  ┌──────────┐ ┌──────────┐               │
│  │ Worker 1 │ │ Worker 2 │ ...           │
│  └──────────┘ └──────────┘               │
│  ┌────────────────────────────────────┐  │
│  │         Ray Dashboard              │  │
│  └────────────────────────────────────┘  │
│  ┌────────────────────────────────────┐  │
│  │         Autoscaler                 │  │
│  └────────────────────────────────────┘  │
└──────────────────────────────────────────┘

┌─────────────── Worker Node ──────────────┐
│  ┌────────────────────────────────────┐  │
│  │           Raylet                   │  │
│  │  ┌─────────────┬────────────────┐  │  │
│  │  │ Node Manager│ Object Manager │  │  │
│  │  └─────────────┴────────────────┘  │  │
│  │  ┌──────────────────────────────┐  │  │
│  │  │    Object Store (Plasma)     │  │  │
│  │  └──────────────────────────────┘  │  │
│  └────────────────────────────────────┘  │
│  ┌──────────┐ ┌──────────┐               │
│  │ Worker 1 │ │ Worker 2 │ ...           │
│  └──────────┘ └──────────┘               │
└──────────────────────────────────────────┘

2.2 Global Control Service (GCS)

GCS は Ray クラスタのメタデータ管理サービスであり、ヘッドノード上で動作する。主な責務は以下のとおり:

クラスタメンバーシップ管理

  • ノードの登録・離脱・障害検知
  • ハートビートベースの生存監視

リソースメタデータ管理

  • 各ノードの利用可能リソース(CPU、GPU、メモリ、カスタムリソース)の把握
  • リソース使用状況の集約

アクターディレクトリ

  • Named Actor の名前解決
  • アクターの配置情報の管理
  • Detached Actor のライフサイクル管理

Placement Group 管理

  • Placement Group(後述)の作成・配置決定

GCS のフォールトトレランス(Ray 2.x)

Ray 2.x では、GCS に外部ストレージ(Redis)を接続することで、GCS 障害時の復旧が可能になった:

# GCS フォールトトレランスの有効化(クラスタ起動時)
ray start --head \
    --port=6379 \
    --redis-password="your_password" \
    --storage="/path/to/external/storage"
# KubeRay での GCS FT 設定例
apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: ft-cluster
spec:
  headGroupSpec:
    rayStartParams:
      redis-password: "your_password"
      storage: "s3://my-bucket/ray-storage"
    template:
      spec:
        containers:
        - name: ray-head
          env:
          - name: RAY_REDIS_ADDRESS
            value: "redis-service:6379"
          - name: RAY_external_storage_namespace
            value: "my-cluster"

2.3 Raylet

Raylet は各ノード上で動作する C++ デーモンプロセスであり、ノードローカルなリソース管理とタスクスケジューリングを担う。Raylet は以下の 2 つの主要コンポーネントから構成される:

Node Manager

  • ローカルワーカープロセスの管理(起動・停止・リサイクル)
  • ローカルリソースの管理
  • GCS との通信(ノード状態のレポート)

Object Manager

  • ローカルオブジェクトストア(Plasma)の管理
  • ノード間オブジェクト転送の調整
  • オブジェクトのスピル(メモリ溢れ時のディスク退避)

2.4 ワーカープロセス

ワーカープロセスは、実際にユーザーコード(タスクやアクターメソッド)を実行するプロセスである。各ワーカーは以下の特徴を持つ:

  • 言語ランタイム: Python または Java のプロセスとして起動
  • Raylet との通信: gRPC を介してタスクの受信・結果の返却を行う
  • オブジェクトストアアクセス: 共有メモリ経由で Plasma ストアにアクセス
  • アイドル管理: 一定時間アイドル状態のワーカーは自動的に終了
# ワーカープロセス数の制御
import ray

ray.init(
    num_cpus=8,               # 利用可能な CPU 数(=最大同時タスク数の目安)
    num_gpus=2,               # 利用可能な GPU 数
    _memory=8 * 1024**3,      # オブジェクトストアメモリ (8GB)
    _worker_maximum_startup_concurrency=10,  # 同時起動可能なワーカー数
)

2.5 オブジェクトストア (Plasma)

Ray のオブジェクトストアは、Apache Arrow の Plasma をベースとした共有メモリ型の分散オブジェクトストアである。

設計原則

  • イミュータブル: 一度書き込まれたオブジェクトは変更不可
  • 共有メモリ: 同一ノード上のプロセス間でゼロコピーアクセス
  • 参照カウント: 参照がなくなったオブジェクトは自動的に回収
  • スピル機能: メモリ不足時にディスクや外部ストレージに退避
# オブジェクトストアの設定
ray.init(
    object_store_memory=10 * 1024**3,  # オブジェクトストアに 10GB を割り当て
    _system_config={
        "object_spilling_config": {
            "type": "filesystem",
            "params": {
                "directory_path": ["/tmp/ray_spill", "/mnt/data/ray_spill"],
                "buffer_size": 1048576,  # 1MB バッファ
            }
        }
    }
)

オブジェクトのライフサイクル

作成 (ray.put / タスク返り値)
    ↓
ローカル Plasma ストアに格納
    ↓
他ノードから参照 → Object Manager が転送を調整
    ↓
参照カウントが 0 → GC による回収
    ↓
メモリ不足 → スピル(ディスク退避)

2.6 分散スケジューリング

Ray のスケジューリングは分散型ボトムアップ方式を採用している。これは、中央集権的なスケジューラがボトルネックになることを避けるための設計判断である。

スケジューリングの流れ

  1. ドライバーまたはワーカーがタスクを submit
  2. ローカル Raylet がリソース要件を評価
  3. ローカルリソースが十分 → ローカル実行
  4. ローカルリソースが不十分 → 他ノードの Raylet にスピル(転送)
  5. 受信した Raylet がリソースを確保し、ワーカーにタスクを割り当て

スケジューリングポリシー

Ray は以下のスケジューリング戦略をサポートする:

ポリシー説明ユースケース
DEFAULTデータローカリティを考慮した分散スケジューリング一般的なワークロード
SPREADノード間で均等に分散メモリ集約型タスク
NODE_AFFINITY特定ノードへのバインドデータローカリティが重要
import ray

@ray.remote(scheduling_strategy="SPREAD")
def memory_intensive_task(data):
    """メモリ集約型タスク — ノード間に分散して実行"""
    return process(data)

# 特定ノードへのアフィニティ
@ray.remote(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().get_node_id(),
        soft=True,  # ベストエフォート(False の場合は厳密)
    )
)
def local_task():
    return "executed on preferred node"

3. Ray Core API — プリミティブの詳細

3.1 Tasks(リモート関数)

Task は Ray の最も基本的なプリミティブであり、ステートレスな関数をリモートで非同期実行する仕組みである。

基本的な使い方

import ray
import time

ray.init()

# @ray.remote デコレータでリモート関数を定義
@ray.remote
def compute_square(x: int) -> int:
    time.sleep(1)  # 何らかの重い計算をシミュレート
    return x * x

# .remote() でタスクを submit(即座に ObjectRef が返る)
future = compute_square.remote(42)

# ray.get() で結果を取得(ブロッキング)
result = ray.get(future)
print(result)  # 1764

# 複数タスクの並列実行
futures = [compute_square.remote(i) for i in range(100)]
results = ray.get(futures)  # 全タスクの完了を待機

リソース要件の指定

# CPU リソースの指定
@ray.remote(num_cpus=2)
def cpu_intensive():
    """2 CPU コアを使用するタスク"""
    return heavy_computation()

# GPU リソースの指定
@ray.remote(num_gpus=1)
def gpu_training(model, data):
    """1 GPU を使用するタスク"""
    return train_on_gpu(model, data)

# 分数リソースの指定(リソースの細分化)
@ray.remote(num_cpus=0.5, num_gpus=0.25)
def lightweight_inference(input_data):
    """軽量な推論タスク — リソースを節約"""
    return model.predict(input_data)

# カスタムリソースの指定
@ray.remote(resources={"special_hardware": 1, "memory_gb": 16})
def specialized_task():
    """特殊ハードウェアを必要とするタスク"""
    return process_with_special_hw()

# 実行時にリソース要件をオーバーライド
future = cpu_intensive.options(num_cpus=4, num_gpus=1).remote()

タスクのオプション

@ray.remote(
    num_cpus=1,
    num_gpus=0,
    max_retries=3,                    # 失敗時の最大リトライ回数
    retry_exceptions=[ConnectionError, TimeoutError],  # リトライ対象の例外
    max_calls=100,                    # ワーカー再起動までの最大呼び出し回数
    max_task_retries=5,               # タスクレベルの最大リトライ
    runtime_env={                     # タスク固有のランタイム環境
        "pip": ["numpy==1.24.0"],
        "env_vars": {"CUDA_VISIBLE_DEVICES": "0"},
    },
)
def robust_task(data):
    """フォールトトレラントなタスク"""
    return process(data)

タスクの依存関係(DAG パターン)

@ray.remote
def load_data(path: str):
    return pd.read_parquet(path)

@ray.remote
def preprocess(data):
    return clean_and_transform(data)

@ray.remote
def train_model(preprocessed_data, config):
    return fit(preprocessed_data, config)

@ray.remote
def evaluate(model, test_data):
    return compute_metrics(model, test_data)

# 自動的に DAG が構成される(ObjectRef を渡すだけ)
raw_data = load_data.remote("/data/train.parquet")
clean_data = preprocess.remote(raw_data)          # raw_data の完了を自動的に待機
model = train_model.remote(clean_data, config)     # clean_data の完了を自動的に待機
test_data = load_data.remote("/data/test.parquet") # 並列で実行可能
metrics = evaluate.remote(model, test_data)        # model と test_data の両方を待機

print(ray.get(metrics))

ray.wait() による逐次処理

# 100 個のタスクを起動し、完了したものから順に処理
futures = [slow_task.remote(i) for i in range(100)]

completed_results = []
while futures:
    # 1 つ以上が完了するまで待機、タイムアウト 10 秒
    done, futures = ray.wait(futures, num_returns=1, timeout=10.0)
    for ref in done:
        try:
            result = ray.get(ref)
            completed_results.append(result)
            print(f"Completed: {len(completed_results)}/100")
        except ray.exceptions.RayTaskError as e:
            print(f"Task failed: {e}")

3.2 Actors(ステートフルワーカー)

Actor は Ray の 2 つ目のプリミティブであり、ステートフルなオブジェクトをリモートプロセスとして実行する仕組みである。Actor は Python のクラスを分散環境で実行可能にする。

基本的な使い方

@ray.remote
class Counter:
    def __init__(self, initial_value: int = 0):
        self.value = initial_value
    
    def increment(self, delta: int = 1) -> int:
        self.value += delta
        return self.value
    
    def get_value(self) -> int:
        return self.value

# Actor のインスタンス化(リモートプロセスとして起動)
counter = Counter.remote(initial_value=10)

# メソッド呼び出し(リモート実行)
ref1 = counter.increment.remote(5)
ref2 = counter.increment.remote(3)  # ref1 の完了後に順次実行される

print(ray.get(ref2))  # 18(10 + 5 + 3)

Actor のリソース指定

@ray.remote(num_cpus=2, num_gpus=1)
class ModelServer:
    def __init__(self, model_path: str):
        import torch
        self.device = torch.device("cuda")
        self.model = torch.load(model_path).to(self.device)
        self.model.eval()
    
    def predict(self, input_tensor):
        with torch.no_grad():
            return self.model(input_tensor.to(self.device)).cpu()

# GPU を持つノードに自動配置される
server = ModelServer.remote("/models/resnet50.pt")
prediction = ray.get(server.predict.remote(sample_input))

Named Actor(名前付きアクター)

# Named Actor の作成(クラスタ内で一意の名前を付与)
counter = Counter.options(
    name="global_counter",
    namespace="my_app",
    lifetime="detached",       # 作成元プロセスが終了しても存続
    max_restarts=-1,           # 無限リスタート
    max_task_retries=-1,       # 無限タスクリトライ
).remote(initial_value=0)

# 別のプロセスから名前で取得
counter = ray.get_actor("global_counter", namespace="my_app")
ray.get(counter.increment.remote(1))

AsyncIO Actor

@ray.remote
class AsyncActor:
    """非同期 I/O 対応アクター — 同時に複数リクエストを処理可能"""
    
    async def fetch_url(self, url: str) -> str:
        import aiohttp
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.text()
    
    async def batch_fetch(self, urls: list[str]) -> list[str]:
        import asyncio
        tasks = [self.fetch_url(url) for url in urls]
        return await asyncio.gather(*tasks)

actor = AsyncActor.options(
    max_concurrency=100,  # 同時処理数の上限
).remote()

Threaded Actor

@ray.remote
class ThreadedActor:
    """スレッドベースの同時実行アクター"""
    
    def __init__(self):
        self.lock = threading.Lock()
        self.data = {}
    
    def process(self, key, value):
        """max_concurrency の数だけスレッドで同時実行される"""
        result = expensive_computation(value)
        with self.lock:
            self.data[key] = result
        return result

actor = ThreadedActor.options(
    max_concurrency=10,        # 10 スレッドで同時実行
    max_pending_calls=100,     # キューに入れられる最大呼び出し数
).remote()

Actor Pool パターン

from ray.util.actor_pool import ActorPool

@ray.remote
class Predictor:
    def __init__(self, model_path: str):
        self.model = load_model(model_path)
    
    def predict(self, batch):
        return self.model(batch)

# Actor プールの作成(8 つの Predictor アクター)
predictors = [Predictor.remote("/models/model.pt") for _ in range(8)]
pool = ActorPool(predictors)

# map() で自動的にアクター間で作業を分散
data_batches = [batch for batch in split_data(dataset, batch_size=64)]
results = list(pool.map(lambda actor, batch: actor.predict.remote(batch), data_batches))

# submit/get_next() で逐次処理
for batch in data_batches:
    pool.submit(lambda actor, b: actor.predict.remote(b), batch)

while pool.has_next():
    result = pool.get_next()
    process_result(result)

3.3 Objects(分散オブジェクト)

Object は Ray の 3 つ目のプリミティブであり、イミュータブルな分散オブジェクトを表現する。

# 明示的なオブジェクト作成
large_array = np.random.rand(10000, 10000)
ref = ray.put(large_array)  # オブジェクトストアに格納、ObjectRef が返る

# ObjectRef を複数のタスクに渡す(ゼロコピー共有)
futures = [process_chunk.remote(ref, i) for i in range(10)]

# オブジェクトの取得
data = ray.get(ref)  # ローカルの場合はゼロコピー

# 複数オブジェクトの一括取得
refs = [ray.put(i) for i in range(100)]
values = ray.get(refs)

# オブジェクトの寿命管理
# Ray は参照カウントベースの GC を使用
# ObjectRef への参照がなくなると自動的に回収される

オブジェクト転送の最適化

# 大きなオブジェクトは ray.put() で事前にオブジェクトストアに格納
# BAD: 各タスク呼び出し時にシリアライズが発生
large_data = load_large_dataset()
futures = [process.remote(large_data) for _ in range(100)]  # 100 回シリアライズ

# GOOD: 一度だけシリアライズし、ObjectRef を共有
large_data_ref = ray.put(load_large_dataset())
futures = [process.remote(large_data_ref) for _ in range(100)]  # 1 回だけシリアライズ

4. Placement Group とリソース管理

4.1 Placement Group の概念

Placement Group は、複数のリソースバンドルを物理的な配置制約に従ってまとめて確保する仕組みである。分散学習やアンサンブル推論など、リソース間の物理的な近接性が性能に影響するワークロードに重要である。

配置戦略

戦略説明ユースケース
STRICT_PACK全バンドルを同一ノードに配置GPU 間通信が頻繁
PACK可能な限り少数ノードに集約データローカリティ重視
STRICT_SPREAD各バンドルを異なるノードに分散耐障害性重視
SPREADベストエフォートで分散負荷分散
from ray.util.placement_group import (
    placement_group,
    placement_group_table,
    remove_placement_group,
)
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

# 4 GPU を 2 ノードに分散して確保
pg = placement_group(
    bundles=[
        {"GPU": 2, "CPU": 8},   # ノード 1
        {"GPU": 2, "CPU": 8},   # ノード 2
    ],
    strategy="STRICT_SPREAD",
    name="training_group",
    lifetime="detached",        # 作成元のタスク終了後も存続
)

# Placement Group の準備完了を待機
ray.get(pg.ready())

# Placement Group 内でタスクを実行
@ray.remote(num_gpus=2, num_cpus=8)
def train_shard(data_shard, rank):
    return distributed_train(data_shard, rank)

futures = []
for i in range(2):
    future = train_shard.options(
        scheduling_strategy=PlacementGroupSchedulingStrategy(
            placement_group=pg,
            placement_group_bundle_index=i,
        )
    ).remote(data_shards[i], rank=i)
    futures.append(future)

results = ray.get(futures)

# 使用後は解放
remove_placement_group(pg)

4.2 カスタムリソースの定義

# クラスタ起動時にカスタムリソースを定義
ray.init(
    resources={
        "special_gpu": 2,        # 特殊 GPU を 2 つ利用可能
        "high_memory": 128,      # 高メモリノードであることを示す
        "tpu": 4,                # TPU が 4 基
    }
)

# カスタムリソースを要求するタスク
@ray.remote(resources={"special_gpu": 1, "high_memory": 32})
def specialized_training():
    return train_with_special_hardware()

4.3 リソースの動的管理

# 利用可能なリソースの確認
print(ray.available_resources())
# {'CPU': 32.0, 'GPU': 4.0, 'memory': 68719476736.0, ...}

print(ray.cluster_resources())
# {'CPU': 64.0, 'GPU': 8.0, 'memory': 137438953472.0, ...}

# ノード情報の取得
for node in ray.nodes():
    print(f"Node: {node['NodeID']}")
    print(f"  Alive: {node['Alive']}")
    print(f"  Resources: {node['Resources']}")
    print(f"  Labels: {node.get('Labels', {})}")

5. Runtime Environment(ランタイム環境)

5.1 概要

Runtime Environment は、タスクやアクターが実行される際のソフトウェア環境を動的に指定する仕組みである。これにより、同一クラスタ上で異なるライブラリバージョンや環境変数を持つワークロードを共存させることができる。

# ジョブレベルの Runtime Environment
ray.init(runtime_env={
    "pip": [
        "torch==2.1.0",
        "transformers==4.35.0",
        "accelerate>=0.24.0",
    ],
    "env_vars": {
        "TOKENIZERS_PARALLELISM": "false",
        "CUDA_VISIBLE_DEVICES": "0,1",
    },
    "working_dir": "/path/to/project",
    "excludes": ["*.pyc", "__pycache__/", "data/"],
})

# タスクレベルの Runtime Environment(ジョブレベルをオーバーライド)
@ray.remote(
    runtime_env={
        "pip": ["scikit-learn==1.3.0"],
        "conda": {
            "dependencies": ["python=3.10", "numpy=1.24"],
        },
    }
)
def sklearn_task(data):
    from sklearn.ensemble import RandomForestClassifier
    clf = RandomForestClassifier()
    return clf.fit(data.X, data.y)

# コンテナイメージの指定
@ray.remote(
    runtime_env={
        "container": {
            "image": "my-registry/ml-image:v2.0",
            "worker_path": "/opt/ray/python/ray/_private/workers",
            "run_options": ["--gpus=all", "--shm-size=16g"],
        }
    }
)
def containerized_task():
    return run_in_container()

5.2 Runtime Environment の詳細オプション

runtime_env = {
    # Python パッケージの指定
    "pip": {
        "packages": ["torch>=2.0", "numpy"],
        "pip_check": False,            # 依存関係チェックを無効化
        "pip_version": ">=23.0",       # pip バージョンの指定
    },
    
    # Conda 環境の指定
    "conda": "environment.yml",         # ファイルパス
    # または辞書形式:
    # "conda": {"dependencies": ["python=3.10", "pytorch"]}
    
    # 作業ディレクトリ(クラスタにアップロードされる)
    "working_dir": "s3://my-bucket/project.zip",  # S3, GCS, ローカルパス
    
    # Python モジュール(作業ディレクトリとは別にアップロード)
    "py_modules": [
        "/local/path/to/my_module",
        "s3://my-bucket/shared_lib.zip",
    ],
    
    # 環境変数
    "env_vars": {
        "MY_ENV_VAR": "value",
        "OMP_NUM_THREADS": "4",
    },
    
    # Eager Install(ワーカー起動時にインストール)
    "eager_install": True,
    
    # 設定
    "config": {
        "setup_timeout_seconds": 600,  # セットアップのタイムアウト
    },
}

6. Ray Data — 分散データ処理

6.1 概要

Ray Data は、ML ワークフロー向けのスケーラブルなデータ処理ライブラリである。Spark や Dask に似た分散データ処理を提供するが、ML パイプラインとの統合に特化している点が特徴である。

設計原則

  • ストリーミングファースト: データを全てメモリに載せるのではなく、ストリーミング処理
  • ML ネイティブ: テンソル、バッチ処理、GPU アクセラレーションを第一級サポート
  • Ray Core 統合: Task と Actor の上に構築され、Ray のスケジューリングを活用

6.2 Dataset の基本操作

import ray

# さまざまなソースからの読み込み
ds = ray.data.read_parquet("s3://my-bucket/data/")
ds = ray.data.read_csv("/local/path/to/data.csv")
ds = ray.data.read_json("gs://my-bucket/data.json")
ds = ray.data.read_images("s3://my-bucket/images/")
ds = ray.data.read_tfrecords("s3://my-bucket/tfrecords/")

# Python オブジェクトから作成
ds = ray.data.from_items([{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}])
ds = ray.data.from_pandas(pandas_df)
ds = ray.data.from_numpy(numpy_array)
ds = ray.data.from_arrow(arrow_table)

# SQL データベースからの読み込み
ds = ray.data.read_sql(
    "SELECT * FROM users WHERE active = true",
    lambda: create_connection(),
)

# データセット情報
print(ds.schema())        # スキーマ
print(ds.count())         # 行数
print(ds.num_blocks())    # ブロック数
print(ds.size_bytes())    # サイズ
ds.show(5)                # 最初の 5 行を表示

6.3 データ変換

# map: 行単位の変換
ds = ds.map(lambda row: {"feature": row["value"] * 2, "label": row["label"]})

# map_batches: バッチ単位の変換(GPU アクセラレーション可能)
def preprocess_batch(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
    """バッチ単位で前処理(NumPy 配列として受け取る)"""
    batch["normalized"] = (batch["feature"] - batch["feature"].mean()) / batch["feature"].std()
    return batch

ds = ds.map_batches(
    preprocess_batch,
    batch_size=1024,
    batch_format="numpy",       # "pandas", "numpy", "pyarrow" から選択
    num_cpus=2,                 # バッチ処理に使用する CPU
)

# GPU を使用したバッチ処理
class GPUPreprocessor:
    def __init__(self):
        import torch
        self.device = torch.device("cuda")
        self.model = load_preprocessing_model().to(self.device)
    
    def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
        import torch
        tensor = torch.from_numpy(batch["image"]).to(self.device)
        with torch.no_grad():
            embeddings = self.model(tensor).cpu().numpy()
        batch["embedding"] = embeddings
        return batch

ds = ds.map_batches(
    GPUPreprocessor,
    batch_size=256,
    num_gpus=1,                 # 各ワーカーに 1 GPU を割り当て
    concurrency=4,              # 4 つの GPU ワーカーで並列処理
    batch_format="numpy",
)

# filter: 条件に基づくフィルタリング
ds = ds.filter(lambda row: row["score"] > 0.8)

# flat_map: 1 行を複数行に展開
ds = ds.flat_map(lambda row: [
    {"token": token, "doc_id": row["id"]} 
    for token in row["text"].split()
])

# sort: ソート
ds = ds.sort("timestamp")

# groupby + aggregate: 集約
import ray.data.aggregate as agg
ds.groupby("category").aggregate(
    agg.Count(),
    agg.Mean("price"),
    agg.Max("rating"),
    agg.Std("score"),
)

6.4 ストリーミング処理

# ストリーミングモードでのデータ処理(メモリ効率が高い)
ds = ray.data.read_parquet("s3://my-bucket/huge_dataset/")

# iter_batches: バッチ単位でイテレーション
for batch in ds.iter_batches(batch_size=512, batch_format="numpy"):
    train_step(batch)

# iter_torch_batches: PyTorch テンソルとしてイテレーション
for batch in ds.iter_torch_batches(
    batch_size=256,
    dtypes=torch.float32,
    device="cuda:0",
):
    loss = model(batch["features"], batch["labels"])
    loss.backward()

# iter_tf_batches: TensorFlow テンソルとしてイテレーション
for batch in ds.iter_tf_batches(batch_size=256):
    model.train_on_batch(batch["features"], batch["labels"])

# streaming_split: 複数のワーカーにデータを分配
train_ds, val_ds = ds.train_test_split(test_size=0.2)
shards = train_ds.streaming_split(num_splits=4, equal=True)

6.5 データの書き出し

# さまざまなフォーマットへの書き出し
ds.write_parquet("s3://my-bucket/output/")
ds.write_csv("/local/path/output/")
ds.write_json("gs://my-bucket/output/")
ds.write_tfrecords("s3://my-bucket/tfrecords/")

# カスタム書き出し
ds.write_datasource(
    my_custom_datasource,
    path="s3://my-bucket/custom/",
)

7. Ray Train — 分散学習

7.1 概要

Ray Train は、機械学習モデルの分散学習を簡素化するためのライブラリである。PyTorch、TensorFlow、Horovod、XGBoost、LightGBM などの主要な ML フレームワークをサポートする。

7.2 PyTorch 分散学習

import ray.train
from ray.train import ScalingConfig, RunConfig, CheckpointConfig
from ray.train.torch import TorchTrainer, TorchCheckpoint

def train_func(config: dict):
    """各ワーカーで実行される学習関数"""
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader
    
    # Ray Train が自動的に分散環境をセットアップ
    # - torch.distributed の初期化
    # - 適切な GPU デバイスの設定
    # - DistributedDataParallel (DDP) の設定
    
    # モデルの作成
    model = nn.Sequential(
        nn.Linear(784, 256),
        nn.ReLU(),
        nn.Linear(256, 10),
    )
    
    # Ray Train が DDP ラッパーを適用
    model = ray.train.torch.prepare_model(model)
    
    # データローダーの準備
    dataset = ray.train.get_dataset_shard("train")
    dataloader = dataset.iter_torch_batches(
        batch_size=config["batch_size"],
        dtypes=torch.float32,
    )
    
    # Ray Train が DistributedSampler を自動適用
    # dataloader = ray.train.torch.prepare_data_loader(dataloader)
    
    optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(config["epochs"]):
        total_loss = 0
        num_batches = 0
        
        for batch in dataloader:
            optimizer.zero_grad()
            outputs = model(batch["features"])
            loss = criterion(outputs, batch["labels"].long())
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            num_batches += 1
        
        avg_loss = total_loss / num_batches
        
        # メトリクスのレポートとチェックポイント
        with ray.train.report(
            metrics={"loss": avg_loss, "epoch": epoch},
            checkpoint=TorchCheckpoint.from_model(model.module),
        ) as checkpoint:
            pass

# データの準備
train_ds = ray.data.read_parquet("s3://my-bucket/train/")
val_ds = ray.data.read_parquet("s3://my-bucket/val/")

# Trainer の設定と実行
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    train_loop_config={
        "lr": 1e-3,
        "batch_size": 64,
        "epochs": 10,
    },
    scaling_config=ScalingConfig(
        num_workers=4,            # 4 GPU ワーカー
        use_gpu=True,
        resources_per_worker={
            "CPU": 4,
            "GPU": 1,
        },
    ),
    run_config=RunConfig(
        name="pytorch_training",
        storage_path="s3://my-bucket/ray_results/",
        checkpoint_config=CheckpointConfig(
            num_to_keep=3,                          # 最新 3 つのチェックポイントを保持
            checkpoint_score_attribute="loss",
            checkpoint_score_order="min",
        ),
        failure_config=ray.train.FailureConfig(
            max_failures=3,       # 最大 3 回のリトライ
        ),
    ),
    datasets={
        "train": train_ds,
        "validation": val_ds,
    },
)

result = trainer.fit()
print(f"Best checkpoint: {result.best_checkpoints}")
print(f"Final metrics: {result.metrics}")

7.3 DeepSpeed / FSDP 統合

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

# DeepSpeed ZeRO-3 設定
deepspeed_config = {
    "train_batch_size": "auto",
    "train_micro_batch_size_per_gpu": 4,
    "gradient_accumulation_steps": "auto",
    "fp16": {"enabled": True},
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": True,
        },
        "offload_param": {
            "device": "cpu",
            "pin_memory": True,
        },
        "overlap_comm": True,
        "contiguous_gradients": True,
        "sub_group_size": 1e9,
        "reduce_bucket_size": "auto",
        "stage3_prefetch_bucket_size": "auto",
        "stage3_param_persistence_threshold": "auto",
        "stage3_max_live_parameters": 1e9,
        "stage3_max_reuse_distance": 1e9,
    },
}

def train_func_deepspeed(config):
    import deepspeed
    
    model = create_large_model()
    
    model, optimizer, _, _ = deepspeed.initialize(
        model=model,
        config=deepspeed_config,
        model_parameters=model.parameters(),
    )
    
    dataset = ray.train.get_dataset_shard("train")
    
    for epoch in range(config["epochs"]):
        for batch in dataset.iter_torch_batches(batch_size=4):
            loss = model(batch["input_ids"], labels=batch["labels"]).loss
            model.backward(loss)
            model.step()
        
        ray.train.report(metrics={"loss": loss.item(), "epoch": epoch})

trainer = TorchTrainer(
    train_loop_per_worker=train_func_deepspeed,
    train_loop_config={"epochs": 3},
    scaling_config=ScalingConfig(
        num_workers=8,
        use_gpu=True,
        resources_per_worker={"CPU": 8, "GPU": 1},
    ),
)

result = trainer.fit()

7.4 XGBoost / LightGBM 分散学習

from ray.train.xgboost import XGBoostTrainer

trainer = XGBoostTrainer(
    label_column="target",
    params={
        "objective": "binary:logistic",
        "eval_metric": "auc",
        "max_depth": 6,
        "learning_rate": 0.1,
        "subsample": 0.8,
        "colsample_bytree": 0.8,
        "tree_method": "gpu_hist",
    },
    num_boost_round=100,
    scaling_config=ScalingConfig(
        num_workers=4,
        use_gpu=True,
    ),
    datasets={
        "train": ray.data.read_parquet("s3://data/train/"),
        "validation": ray.data.read_parquet("s3://data/val/"),
    },
)

result = trainer.fit()

8. Ray Tune — ハイパーパラメータ最適化

8.1 概要

Ray Tune は、大規模なハイパーパラメータ探索と実験管理のためのライブラリである。Grid Search、Random Search に加え、ベイズ最適化やバンディットベースのアルゴリズムをサポートする。

8.2 基本的な使い方

from ray import tune
from ray.tune import TuneConfig, RunConfig
from ray.tune.schedulers import ASHAScheduler
from ray.tune.search.optuna import OptunaSearch

def training_function(config: dict):
    """各トライアルで実行される学習関数"""
    import torch
    import torch.nn as nn
    
    model = nn.Sequential(
        nn.Linear(784, config["hidden_size"]),
        nn.ReLU(),
        nn.Dropout(config["dropout"]),
        nn.Linear(config["hidden_size"], 10),
    )
    
    optimizer = torch.optim.Adam(
        model.parameters(),
        lr=config["lr"],
        weight_decay=config["weight_decay"],
    )
    
    for epoch in range(100):
        train_loss = train_epoch(model, optimizer, train_loader)
        val_loss, val_acc = evaluate(model, val_loader)
        
        # Tune にメトリクスをレポート
        tune.report(
            train_loss=train_loss,
            val_loss=val_loss,
            val_accuracy=val_acc,
        )

# 探索空間の定義
search_space = {
    "lr": tune.loguniform(1e-5, 1e-1),
    "hidden_size": tune.choice([64, 128, 256, 512]),
    "dropout": tune.uniform(0.1, 0.5),
    "weight_decay": tune.loguniform(1e-6, 1e-2),
    "batch_size": tune.choice([32, 64, 128]),
}

# スケジューラの設定(早期停止)
scheduler = ASHAScheduler(
    time_attr="training_iteration",
    max_t=100,                    # 最大エポック数
    grace_period=10,              # 最低 10 エポックは実行
    reduction_factor=3,           # トップ 1/3 のみ続行
    brackets=1,
)

# 検索アルゴリズムの設定
search_alg = OptunaSearch(
    metric="val_loss",
    mode="min",
    n_startup_trials=10,          # 最初の 10 試行はランダム
)

# Tuner の作成と実行
tuner = tune.Tuner(
    trainable=training_function,
    param_space=search_space,
    tune_config=TuneConfig(
        num_samples=100,              # 100 トライアル
        scheduler=scheduler,
        search_alg=search_alg,
        metric="val_loss",
        mode="min",
        max_concurrent_trials=8,      # 同時実行数
    ),
    run_config=RunConfig(
        name="hparam_search",
        storage_path="s3://my-bucket/ray_results/",
        stop={"val_accuracy": 0.99},  # 目標精度に達したら停止
        checkpoint_config=ray.train.CheckpointConfig(
            checkpoint_frequency=5,
            num_to_keep=2,
        ),
    ),
)

results = tuner.fit()

# 結果の分析
best_result = results.get_best_result(metric="val_loss", mode="min")
print(f"Best config: {best_result.config}")
print(f"Best val_loss: {best_result.metrics['val_loss']}")
print(f"Best val_accuracy: {best_result.metrics['val_accuracy']}")

# 結果の DataFrame 取得
results_df = results.get_dataframe()
print(results_df.sort_values("val_loss").head(10))

8.3 検索アルゴリズム

# ベイズ最適化(BayesOpt)
from ray.tune.search.bayesopt import BayesOptSearch
search_alg = BayesOptSearch(
    utility_kwargs={"kind": "ucb", "kappa": 2.5, "xi": 0.0}
)

# HyperOpt (Tree-structured Parzen Estimator)
from ray.tune.search.hyperopt import HyperOptSearch
search_alg = HyperOptSearch(
    metric="val_loss",
    mode="min",
    n_initial_points=20,
)

# Optuna
from ray.tune.search.optuna import OptunaSearch
search_alg = OptunaSearch(
    metric="val_loss",
    mode="min",
)

# BOHB (Bayesian Optimization + HyperBand)
from ray.tune.search.bohb import TuneBOHB
from ray.tune.schedulers import HyperBandForBOHB

bohb_search = TuneBOHB(metric="val_loss", mode="min")
bohb_scheduler = HyperBandForBOHB(
    time_attr="training_iteration",
    max_t=100,
    reduction_factor=4,
)

8.4 スケジューラ

# ASHA (Asynchronous Successive Halving Algorithm)
from ray.tune.schedulers import ASHAScheduler
scheduler = ASHAScheduler(
    time_attr="training_iteration",
    max_t=100,
    grace_period=10,
    reduction_factor=3,
)

# Population Based Training (PBT)
from ray.tune.schedulers import PopulationBasedTraining
scheduler = PopulationBasedTraining(
    time_attr="training_iteration",
    perturbation_interval=5,       # 5 エポックごとにハイパーパラメータを変異
    hyperparam_mutations={
        "lr": tune.loguniform(1e-5, 1e-1),
        "weight_decay": tune.loguniform(1e-6, 1e-2),
        "batch_size": [32, 64, 128],
    },
    quantile_fraction=0.25,        # 下位 25% を変異対象
    resample_probability=0.25,     # 25% の確率で探索空間から再サンプリング
)

# MedianStoppingRule
from ray.tune.schedulers import MedianStoppingRule
scheduler = MedianStoppingRule(
    time_attr="training_iteration",
    grace_period=10,
    min_samples_required=3,
)

8.5 Ray Train との統合

from ray.train.torch import TorchTrainer
from ray import tune
from ray.tune import TuneConfig

def train_func(config):
    model = create_model(config["hidden_size"])
    model = ray.train.torch.prepare_model(model)
    
    optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])
    
    dataset = ray.train.get_dataset_shard("train")
    
    for epoch in range(config["epochs"]):
        for batch in dataset.iter_torch_batches(batch_size=config["batch_size"]):
            loss = train_step(model, optimizer, batch)
        
        val_metrics = evaluate(model, val_loader)
        ray.train.report(metrics=val_metrics)

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=ScalingConfig(num_workers=2, use_gpu=True),
    datasets={"train": train_ds},
)

tuner = tune.Tuner(
    trainer,
    param_space={
        "train_loop_config": {
            "lr": tune.loguniform(1e-5, 1e-2),
            "hidden_size": tune.choice([128, 256, 512]),
            "batch_size": tune.choice([32, 64]),
            "epochs": 20,
        }
    },
    tune_config=TuneConfig(
        num_samples=50,
        scheduler=ASHAScheduler(max_t=20, grace_period=5),
        metric="val_loss",
        mode="min",
    ),
)

results = tuner.fit()

9. Ray Serve — モデルサービング

9.1 概要

Ray Serve は、ML モデルやビジネスロジックをスケーラブルな HTTP/gRPC エンドポイントとしてデプロイするためのフレームワークである。

主な特徴

  • フレームワーク非依存(PyTorch、TensorFlow、scikit-learn、任意のPython)
  • 動的リクエストバッチング
  • 複数モデルの合成(Model Composition)
  • オートスケーリング
  • カナリアデプロイ / A-B テスト

9.2 基本的なデプロイメント

from ray import serve
from ray.serve import Application
import ray

@serve.deployment(
    num_replicas=2,                    # レプリカ数
    ray_actor_options={
        "num_cpus": 2,
        "num_gpus": 1,
    },
    max_ongoing_requests=100,          # レプリカあたりの最大同時リクエスト数
)
class SentimentAnalyzer:
    def __init__(self, model_name: str = "distilbert-base-uncased"):
        from transformers import pipeline
        self.pipeline = pipeline(
            "sentiment-analysis",
            model=model_name,
            device=0,  # GPU
        )
    
    async def __call__(self, request) -> dict:
        data = await request.json()
        text = data["text"]
        result = self.pipeline(text)
        return {"sentiment": result[0]["label"], "score": result[0]["score"]}

# デプロイ
app = SentimentAnalyzer.bind(model_name="distilbert-base-uncased")
serve.run(app, host="0.0.0.0", port=8000)

9.3 リクエストバッチング

@serve.deployment(
    ray_actor_options={"num_gpus": 1},
)
class BatchPredictor:
    def __init__(self):
        self.model = load_model()
    
    @serve.batch(max_batch_size=32, batch_wait_timeout_s=0.1)
    async def predict(self, inputs: list[np.ndarray]) -> list[dict]:
        """
        複数のリクエストを自動的にバッチ化して GPU 推論を効率化。
        max_batch_size に達するか、batch_wait_timeout_s が経過するとバッチ処理される。
        """
        # inputs は自動的にリストとして集約される
        batch = np.stack(inputs)
        predictions = self.model.predict(batch)
        
        # 各リクエストに対応する結果をリストで返す
        return [
            {"prediction": pred.tolist(), "confidence": conf}
            for pred, conf in zip(predictions, confidences)
        ]
    
    async def __call__(self, request):
        data = await request.json()
        input_array = np.array(data["input"])
        return await self.predict(input_array)

9.4 モデル合成(Model Composition)

@serve.deployment(num_replicas=1)
class ImageClassifier:
    def __init__(self):
        self.model = load_image_classifier()
    
    def classify(self, image: np.ndarray) -> dict:
        return self.model.predict(image)

@serve.deployment(num_replicas=1)
class ObjectDetector:
    def __init__(self):
        self.model = load_object_detector()
    
    def detect(self, image: np.ndarray) -> list[dict]:
        return self.model.detect(image)

@serve.deployment(num_replicas=2)
class ImagePipeline:
    def __init__(self, classifier, detector):
        self.classifier = classifier
        self.detector = detector
    
    async def __call__(self, request):
        image_data = await request.body()
        image = decode_image(image_data)
        
        # 複数モデルの並列呼び出し
        import asyncio
        classification_ref = self.classifier.classify.remote(image)
        detection_ref = self.detector.detect.remote(image)
        
        classification, detections = await asyncio.gather(
            classification_ref,
            detection_ref,
        )
        
        return {
            "classification": classification,
            "detections": detections,
        }

# デプロイメントグラフの構築
classifier = ImageClassifier.bind()
detector = ObjectDetector.bind()
pipeline = ImagePipeline.bind(classifier, detector)

serve.run(pipeline, host="0.0.0.0", port=8000)

9.5 オートスケーリング設定

from ray.serve.config import AutoscalingConfig

@serve.deployment(
    autoscaling_config=AutoscalingConfig(
        min_replicas=1,                  # 最小レプリカ数
        max_replicas=10,                 # 最大レプリカ数
        initial_replicas=2,              # 初期レプリカ数
        target_ongoing_requests=5,       # レプリカあたりの目標同時リクエスト数
        upscale_delay_s=30,             # スケールアップまでの遅延
        downscale_delay_s=300,          # スケールダウンまでの遅延
        upscaling_factor=1.0,           # スケールアップの倍率
        downscaling_factor=0.5,         # スケールダウンの倍率
        metrics_interval_s=10,          # メトリクス収集間隔
        look_back_period_s=60,          # メトリクス評価期間
    ),
    ray_actor_options={"num_cpus": 2, "num_gpus": 0.5},
    max_ongoing_requests=20,
    graceful_shutdown_timeout_s=30,     # グレースフルシャットダウンの待機時間
    health_check_period_s=10,           # ヘルスチェック間隔
    health_check_timeout_s=30,          # ヘルスチェックタイムアウト
)
class AutoScaledModel:
    def __init__(self):
        self.model = load_model()
    
    async def __call__(self, request):
        data = await request.json()
        return self.model.predict(data["input"])

9.6 Serve の設定ファイル(YAML)

# serve_config.yaml
proxy_location: EveryNode

http_options:
  host: 0.0.0.0
  port: 8000
  root_path: /api/v1

grpc_options:
  port: 9000
  grpc_servicer_functions:
    - my_service.add_servicer

logging_config:
  encoding: JSON
  log_level: INFO
  logs_dir: /var/log/ray/serve

applications:
  - name: sentiment-api
    route_prefix: /sentiment
    import_path: my_app.sentiment:app
    runtime_env:
      pip:
        - transformers==4.35.0
        - torch==2.1.0
    deployments:
      - name: SentimentAnalyzer
        num_replicas: auto
        autoscaling_config:
          min_replicas: 1
          max_replicas: 10
          target_ongoing_requests: 5
        ray_actor_options:
          num_cpus: 2
          num_gpus: 1
        user_config:
          model_name: distilbert-base-uncased
          max_length: 512
          
  - name: image-api
    route_prefix: /image
    import_path: my_app.image:app
    deployments:
      - name: ImageClassifier
        num_replicas: 3
        ray_actor_options:
          num_gpus: 1
# 設定ファイルからデプロイ
serve deploy serve_config.yaml

# デプロイ状態の確認
serve status

# アプリケーション一覧
serve list

10. RLlib — 強化学習

10.1 概要

RLlib は、Ray 上に構築されたスケーラブルな強化学習ライブラリである。多数のアルゴリズムを提供し、シングルエージェントからマルチエージェントまで幅広い RL ワークロードをサポートする。

10.2 基本的な使い方

from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.algorithms.dqn import DQNConfig
import gymnasium as gym

# PPO アルゴリズムの設定
config = (
    PPOConfig()
    .environment(
        env="CartPole-v1",
        env_config={},
        render_env=False,
    )
    .framework("torch")  # "tf2", "torch"
    .env_runners(
        num_env_runners=8,           # 並列環境実行ワーカー数
        num_envs_per_env_runner=2,   # ワーカーあたりの環境数
        rollout_fragment_length=200,
        sample_timeout_s=60,
    )
    .training(
        lr=5e-5,
        gamma=0.99,
        lambda_=0.95,
        kl_coeff=0.2,
        clip_param=0.3,
        vf_clip_param=10.0,
        entropy_coeff=0.01,
        train_batch_size=4000,
        sgd_minibatch_size=128,
        num_sgd_iter=30,
        model={
            "fcnet_hiddens": [256, 256],
            "fcnet_activation": "relu",
            "use_lstm": False,
        },
    )
    .resources(
        num_gpus=1,
        num_cpus_per_env_runner=1,
    )
    .evaluation(
        evaluation_interval=10,
        evaluation_duration=20,
        evaluation_num_env_runners=2,
        evaluation_config={
            "explore": False,
        },
    )
)

# アルゴリズムの構築と学習
algo = config.build()

for iteration in range(100):
    result = algo.train()
    print(
        f"Iteration {iteration}: "
        f"reward_mean={result['env_runners']['episode_reward_mean']:.2f}, "
        f"episode_len_mean={result['env_runners']['episode_len_mean']:.1f}"
    )
    
    # 10 イテレーションごとにチェックポイント保存
    if (iteration + 1) % 10 == 0:
        checkpoint = algo.save()
        print(f"Checkpoint saved: {checkpoint}")

# チェックポイントからの復元
algo.restore(checkpoint)

# 推論
env = gym.make("CartPole-v1")
obs, info = env.reset()
total_reward = 0

while True:
    action = algo.compute_single_action(obs)
    obs, reward, terminated, truncated, info = env.step(action)
    total_reward += reward
    if terminated or truncated:
        break

print(f"Total reward: {total_reward}")
algo.stop()

10.3 カスタム環境

import gymnasium as gym
from gymnasium import spaces
import numpy as np

class CustomTradingEnv(gym.Env):
    """カスタム取引環境の例"""
    
    metadata = {"render_modes": ["human"]}
    
    def __init__(self, config: dict):
        super().__init__()
        self.max_steps = config.get("max_steps", 1000)
        self.initial_balance = config.get("initial_balance", 10000)
        
        # 観測空間: [残高, ポジション, 価格履歴(20日分)]
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(22,), dtype=np.float32
        )
        
        # 行動空間: 0=保持, 1=買い, 2=売り
        self.action_space = spaces.Discrete(3)
    
    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)
        self.balance = self.initial_balance
        self.position = 0
        self.step_count = 0
        self.prices = generate_price_series()
        return self._get_obs(), {}
    
    def step(self, action):
        self.step_count += 1
        reward = self._execute_action(action)
        terminated = self.step_count >= self.max_steps
        truncated = self.balance <= 0
        return self._get_obs(), reward, terminated, truncated, {}
    
    def _get_obs(self):
        return np.array([
            self.balance,
            self.position,
            *self.prices[self.step_count:self.step_count + 20],
        ], dtype=np.float32)
    
    def _execute_action(self, action):
        # 取引ロジック
        ...
        return reward

# カスタム環境の登録と使用
from ray.tune.registry import register_env

register_env("trading", lambda config: CustomTradingEnv(config))

config = (
    PPOConfig()
    .environment(
        env="trading",
        env_config={
            "max_steps": 500,
            "initial_balance": 100000,
        },
    )
    .training(
        lr=3e-4,
        train_batch_size=8000,
    )
)

10.4 マルチエージェント強化学習

from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.policy.policy import PolicySpec

config = (
    PPOConfig()
    .environment(env="multi_agent_env")
    .multi_agent(
        policies={
            "policy_attacker": PolicySpec(
                config={"gamma": 0.95},
            ),
            "policy_defender": PolicySpec(
                config={"gamma": 0.99},
            ),
        },
        policy_mapping_fn=lambda agent_id, episode, **kwargs: (
            "policy_attacker" if agent_id.startswith("attacker") else "policy_defender"
        ),
        policies_to_train=["policy_attacker", "policy_defender"],
    )
)

11. Ray Workflows

11.1 概要

Ray Workflows は、耐障害性のあるステップベースのワークフローを実行するためのライブラリである。長時間実行されるタスクや、障害時に途中から再開が必要なパイプラインに適している。

11.2 基本的な使い方

from ray import workflow

@ray.remote
def extract_data(source: str) -> dict:
    """データ抽出ステップ"""
    return load_from_source(source)

@ray.remote
def transform_data(data: dict, config: dict) -> dict:
    """データ変換ステップ"""
    return apply_transformations(data, config)

@ray.remote
def load_data(transformed: dict, target: str) -> str:
    """データロードステップ"""
    write_to_target(transformed, target)
    return f"Loaded to {target}"

@ray.remote
def notify(result: str) -> None:
    """通知ステップ"""
    send_notification(result)

# ワークフローの定義と実行
# 各 .bind() 呼び出しがワークフローステップを定義
data = extract_data.bind("s3://raw-data/")
transformed = transform_data.bind(data, {"normalize": True})
loaded = load_data.bind(transformed, "s3://processed-data/")
notification = notify.bind(loaded)

# ワークフローの実行(永続化 ID 付き)
workflow.run(notification, workflow_id="etl_pipeline_20240101")

# ワークフローの状態確認
status = workflow.get_status("etl_pipeline_20240101")
print(f"Status: {status}")  # RUNNING, SUCCESSFUL, FAILED, etc.

# 障害時の再開
workflow.resume("etl_pipeline_20240101")

# ワークフロー一覧
all_workflows = workflow.list_all()
for wf_id, wf_status in all_workflows:
    print(f"{wf_id}: {wf_status}")

11.3 動的ワークフロー

@ray.remote
def dynamic_fanout(items: list) -> list:
    """動的に並列ステップを生成"""
    sub_workflows = []
    for item in items:
        sub_wf = process_item.bind(item)
        sub_workflows.append(sub_wf)
    
    # 全サブワークフローの完了を待機
    return workflow.continuation(aggregate.bind(sub_workflows))

@ray.remote
def process_item(item) -> dict:
    return {"item": item, "result": compute(item)}

@ray.remote
def aggregate(results: list[dict]) -> dict:
    return {"total": sum(r["result"] for r in results)}

12. フォールトトレランス

12.1 タスクのフォールトトレランス

# タスクレベルのリトライ設定
@ray.remote(
    max_retries=5,                                 # 最大リトライ回数
    retry_exceptions=[ConnectionError, TimeoutError, OSError],  # リトライ対象例外
)
def unreliable_task(data):
    """ネットワークアクセスなど、失敗する可能性があるタスク"""
    response = fetch_from_api(data)
    return response

# タスク実行時のオプションでオーバーライド
ref = unreliable_task.options(
    max_retries=10,
    retry_exceptions=True,  # True = すべての例外でリトライ
).remote(data)

12.2 Actor のフォールトトレランス

@ray.remote(
    max_restarts=3,                # アクターの最大再起動回数(-1 で無限)
    max_task_retries=5,            # メソッド呼び出しの最大リトライ回数
)
class ResilientActor:
    def __init__(self):
        # チェックポイントからの復元を試みる
        self.state = self._load_checkpoint()
    
    def process(self, data):
        result = compute(data)
        self.state.update(result)
        self._save_checkpoint()  # 定期的にチェックポイント保存
        return result
    
    def _save_checkpoint(self):
        with open("/tmp/actor_checkpoint.pkl", "wb") as f:
            pickle.dump(self.state, f)
    
    def _load_checkpoint(self):
        try:
            with open("/tmp/actor_checkpoint.pkl", "rb") as f:
                return pickle.load(f)
        except FileNotFoundError:
            return {}

# Detached Actor(作成元プロセスの終了後も存続)
actor = ResilientActor.options(
    name="resilient_worker",
    lifetime="detached",
    namespace="production",
    max_restarts=-1,        # 無限再起動
).remote()

12.3 Object のフォールトトレランス

# オブジェクト再構築(Lineage-based Recovery)
# Ray はオブジェクトの作成方法(リネージ)を追跡しており、
# ノード障害でオブジェクトが失われた場合、自動的に再計算する

@ray.remote
def create_dataset():
    """このタスクの結果が失われた場合、自動的に再実行される"""
    return load_and_process_data()

ref = create_dataset.remote()
# ノード障害が発生しても、ref を ray.get() すると
# 自動的にタスクが再実行される

# オブジェクトスピル設定(メモリ不足対策)
ray.init(
    _system_config={
        "object_spilling_config": {
            "type": "filesystem",
            "params": {
                "directory_path": ["/mnt/ssd/ray_spill"],
                "buffer_size": 1048576,
            },
        },
        "max_io_workers": 4,
        "min_spilling_size": 100 * 1024 * 1024,  # 100MB 以上でスピル
    }
)

12.4 クラスタレベルのフォールトトレランス

# GCS フォールトトレランス(Redis バックエンド)
# ヘッドノード障害時に GCS を復旧可能

# 設定方法 1: 環境変数
import os
os.environ["RAY_REDIS_ADDRESS"] = "redis-ha:6379"
os.environ["RAY_external_storage_namespace"] = "production_cluster"

# 設定方法 2: ray start コマンド
# ray start --head \
#     --redis-password="secure_password" \
#     --storage="s3://my-bucket/ray-gcs/"

13. Ray クラスタのデプロイメント

13.1 ローカル開発

# シングルノード(ローカル開発用)
import ray
ray.init()  # ローカルマシンのリソースを自動検出

# リソースを明示的に指定
ray.init(
    num_cpus=4,
    num_gpus=1,
    object_store_memory=2 * 1024**3,  # 2GB
    dashboard_host="0.0.0.0",
    dashboard_port=8265,
    logging_level="info",
    log_to_driver=True,
)

13.2 手動クラスタ構築

# ヘッドノード起動
ray start --head \
    --port=6379 \
    --dashboard-host=0.0.0.0 \
    --dashboard-port=8265 \
    --num-cpus=8 \
    --num-gpus=2 \
    --object-store-memory=10000000000 \
    --metrics-export-port=8080 \
    --temp-dir=/tmp/ray

# ワーカーノード接続
ray start \
    --address="head-node-ip:6379" \
    --num-cpus=16 \
    --num-gpus=4 \
    --object-store-memory=20000000000 \
    --resources='{"special_gpu": 2}'

# クラスタ状態の確認
ray status

# クラスタ停止
ray stop

13.3 クラスタ設定ファイル

# cluster_config.yaml(Ray Cluster Launcher 用)
cluster_name: ml-cluster

max_workers: 10
upscaling_speed: 1.0

idle_timeout_minutes: 5

docker:
  image: "rayproject/ray-ml:2.9.0-gpu"
  container_name: "ray_container"
  pull_before_run: true
  run_options:
    - --gpus all
    - --shm-size=16g

provider:
  type: aws
  region: us-west-2
  availability_zone: us-west-2a
  cache_stopped_nodes: true

auth:
  ssh_user: ubuntu
  ssh_private_key: ~/.ssh/ray-cluster.pem

available_node_types:
  head_node:
    node_config:
      InstanceType: m5.4xlarge
      ImageId: ami-0123456789abcdef0
      BlockDeviceMappings:
        - DeviceName: /dev/sda1
          Ebs:
            VolumeSize: 200
            VolumeType: gp3
    resources:
      CPU: 16
    min_workers: 0
    max_workers: 0
  
  gpu_worker:
    node_config:
      InstanceType: p3.2xlarge
      ImageId: ami-0123456789abcdef0
      BlockDeviceMappings:
        - DeviceName: /dev/sda1
          Ebs:
            VolumeSize: 500
            VolumeType: gp3
    resources:
      CPU: 8
      GPU: 1
    min_workers: 2
    max_workers: 8

  cpu_worker:
    node_config:
      InstanceType: m5.8xlarge
    resources:
      CPU: 32
    min_workers: 0
    max_workers: 20

head_node_type: head_node

file_mounts:
  "/home/ubuntu/data": "/local/data"

setup_commands:
  - pip install -U ray[default] torch transformers
  - pip install -r requirements.txt

head_setup_commands:
  - pip install -U ray[serve]

head_start_ray_commands:
  - ray stop
  - >-
    ray start --head
    --port=6379
    --dashboard-host=0.0.0.0
    --autoscaling-config=~/ray_bootstrap_config.yaml

worker_start_ray_commands:
  - ray stop
  - >-
    ray start
    --address=$RAY_HEAD_IP:6379
# クラスタの起動
ray up cluster_config.yaml

# クラスタにジョブを submit
ray job submit --address http://head-node:8265 -- python train.py

# クラスタに SSH 接続
ray attach cluster_config.yaml

# クラスタの停止
ray down cluster_config.yaml

13.4 KubeRay — Kubernetes 上のデプロイメント

KubeRay は、Kubernetes 上で Ray クラスタを管理するためのKubernetes Operatorである。

# RayCluster リソースの定義
apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: ml-cluster
  namespace: ray
spec:
  rayVersion: "2.9.0"
  enableInTreeAutoscaling: true
  autoscalerOptions:
    upscalingMode: Default
    idleTimeoutSeconds: 300
    resources:
      limits:
        cpu: "500m"
        memory: "512Mi"

  headGroupSpec:
    serviceType: ClusterIP
    rayStartParams:
      dashboard-host: "0.0.0.0"
      num-cpus: "0"                 # ヘッドノードではタスクを実行しない
      block: "true"
    template:
      metadata:
        labels:
          ray-node-type: head
      spec:
        containers:
        - name: ray-head
          image: rayproject/ray-ml:2.9.0-gpu
          ports:
          - containerPort: 6379
            name: gcs-server
          - containerPort: 8265
            name: dashboard
          - containerPort: 10001
            name: client
          - containerPort: 8000
            name: serve
          resources:
            limits:
              cpu: "4"
              memory: "16Gi"
            requests:
              cpu: "2"
              memory: "8Gi"
          volumeMounts:
          - name: shared-storage
            mountPath: /mnt/shared
          env:
          - name: RAY_GRAFANA_HOST
            value: "http://grafana.monitoring:3000"
          - name: RAY_PROMETHEUS_HOST
            value: "http://prometheus.monitoring:9090"
        volumes:
        - name: shared-storage
          persistentVolumeClaim:
            claimName: ray-shared-pvc

  workerGroupSpecs:
  - replicas: 2
    minReplicas: 1
    maxReplicas: 10
    groupName: gpu-workers
    rayStartParams:
      num-gpus: "1"
      block: "true"
    template:
      metadata:
        labels:
          ray-node-type: worker
          worker-type: gpu
      spec:
        tolerations:
        - key: "nvidia.com/gpu"
          operator: "Exists"
          effect: "NoSchedule"
        containers:
        - name: ray-worker
          image: rayproject/ray-ml:2.9.0-gpu
          resources:
            limits:
              cpu: "8"
              memory: "32Gi"
              nvidia.com/gpu: "1"
            requests:
              cpu: "4"
              memory: "16Gi"
              nvidia.com/gpu: "1"
          volumeMounts:
          - name: shared-storage
            mountPath: /mnt/shared
          - name: dshm
            mountPath: /dev/shm
        volumes:
        - name: shared-storage
          persistentVolumeClaim:
            claimName: ray-shared-pvc
        - name: dshm
          emptyDir:
            medium: Memory
            sizeLimit: "16Gi"
  
  - replicas: 4
    minReplicas: 0
    maxReplicas: 20
    groupName: cpu-workers
    rayStartParams:
      num-cpus: "8"
      block: "true"
    template:
      spec:
        containers:
        - name: ray-worker
          image: rayproject/ray-ml:2.9.0
          resources:
            limits:
              cpu: "8"
              memory: "32Gi"
            requests:
              cpu: "4"
              memory: "16Gi"

13.5 RayJob と RayService

# RayJob: バッチジョブの実行
apiVersion: ray.io/v1
kind: RayJob
metadata:
  name: training-job
  namespace: ray
spec:
  entrypoint: "python /app/train.py --epochs 100 --lr 0.001"
  shutdownAfterJobFinishes: true
  ttlSecondsAfterFinished: 3600
  runtimeEnvYAML: |
    pip:
      - torch==2.1.0
      - transformers==4.35.0
    working_dir: "s3://my-bucket/project/"
    env_vars:
      WANDB_API_KEY: "xxxxx"
  rayClusterSpec:
    rayVersion: "2.9.0"
    headGroupSpec:
      rayStartParams:
        dashboard-host: "0.0.0.0"
      template:
        spec:
          containers:
          - name: ray-head
            image: rayproject/ray-ml:2.9.0-gpu
            resources:
              limits:
                cpu: "4"
                memory: "16Gi"
    workerGroupSpecs:
    - replicas: 4
      groupName: gpu-workers
      rayStartParams:
        num-gpus: "1"
      template:
        spec:
          containers:
          - name: ray-worker
            image: rayproject/ray-ml:2.9.0-gpu
            resources:
              limits:
                cpu: "8"
                memory: "32Gi"
                nvidia.com/gpu: "1"

---
# RayService: サービングのデプロイ
apiVersion: ray.io/v1
kind: RayService
metadata:
  name: ml-serving
  namespace: ray
spec:
  serviceUnhealthySecondThreshold: 900
  deploymentUnhealthySecondThreshold: 300
  serveConfigV2: |
    applications:
      - name: sentiment-api
        route_prefix: /sentiment
        import_path: my_app.sentiment:app
        runtime_env:
          pip:
            - transformers==4.35.0
            - torch==2.1.0
        deployments:
          - name: SentimentAnalyzer
            autoscaling_config:
              min_replicas: 1
              max_replicas: 5
              target_ongoing_requests: 10
            ray_actor_options:
              num_gpus: 0.5
  rayClusterConfig:
    rayVersion: "2.9.0"
    headGroupSpec:
      rayStartParams:
        dashboard-host: "0.0.0.0"
      template:
        spec:
          containers:
          - name: ray-head
            image: rayproject/ray-ml:2.9.0-gpu
            ports:
            - containerPort: 8000
              name: serve
            resources:
              limits:
                cpu: "4"
                memory: "8Gi"
    workerGroupSpecs:
    - replicas: 2
      maxReplicas: 8
      groupName: gpu-workers
      template:
        spec:
          containers:
          - name: ray-worker
            image: rayproject/ray-ml:2.9.0-gpu
            resources:
              limits:
                cpu: "4"
                memory: "16Gi"
                nvidia.com/gpu: "1"

14. モニタリングとオブザーバビリティ

14.1 Ray Dashboard

Ray Dashboard は、クラスタの状態を可視化するための Web UI であり、デフォルトでポート 8265 でアクセス可能である。

主な機能

  • Jobs: 実行中・完了済みジョブの一覧と詳細
  • Cluster: ノードのリソース使用状況
  • Actors: アクターの状態、メモリ使用量
  • Metrics: Prometheus / Grafana 統合のメトリクス
  • Logs: ワーカーログのストリーミング表示
  • Events: クラスタイベントの履歴

14.2 Prometheus / Grafana 統合

# Ray のメトリクスエクスポート設定
# ray_metrics.yaml
prometheus:
  port: 8080

# Prometheus の scrape 設定
# prometheus.yml
scrape_configs:
  - job_name: 'ray'
    scrape_interval: 5s
    metrics_path: '/metrics'
    static_configs:
      - targets:
        - 'ray-head:8080'
    # KubeRay の場合は ServiceMonitor を使用
    # kubernetes_sd_configs:
    #   - role: pod
    #     selectors:
    #       - role: pod
    #         label: "ray.io/node-type"

主要なメトリクス

メトリクス説明
ray_node_cpu_utilizationノードの CPU 使用率
ray_node_gpu_utilizationノードの GPU 使用率
ray_node_mem_usedノードのメモリ使用量
ray_node_disk_usageノードのディスク使用量
ray_object_store_memoryオブジェクトストアのメモリ使用量
ray_tasksタスクの状態別カウント
ray_actorsアクターの状態別カウント
ray_serve_num_requestsServe のリクエスト数
ray_serve_request_latency_msServe のレイテンシ
ray_serve_num_replicasServe のレプリカ数

14.3 ログ管理

import logging

# Ray ロガーの設定
ray.init(
    logging_level=logging.INFO,
    log_to_driver=True,              # ドライバーにログを出力
    _system_config={
        "log_rotation_max_bytes": 100 * 1024 * 1024,   # 100MB
        "log_rotation_backup_count": 5,
    },
)

# タスク/アクター内でのロギング
@ray.remote
def my_task():
    logger = logging.getLogger("my_task")
    logger.info("Processing started")
    # ログは Ray のログディレクトリに保存される
    # デフォルト: /tmp/ray/session_latest/logs/
    return result

# ログの場所
# /tmp/ray/session_latest/logs/
# ├── dashboard.log
# ├── gcs_server.log
# ├── monitor.log
# ├── raylet.log
# ├── worker-{worker_id}-{job_id}-{pid}.log
# └── ...

14.4 Ray State API

from ray.experimental.state.api import (
    list_actors,
    list_tasks,
    list_objects,
    list_nodes,
    list_jobs,
    list_placement_groups,
    get_actor,
    get_task,
    get_node,
)

# アクター一覧
actors = list_actors(filters=[("state", "=", "ALIVE")])
for actor in actors:
    print(f"Actor: {actor['name']}, State: {actor['state']}, "
          f"PID: {actor['pid']}, Node: {actor['node_id']}")

# タスク一覧
tasks = list_tasks(filters=[("state", "=", "RUNNING")], limit=100)

# ノード一覧
nodes = list_nodes()
for node in nodes:
    print(f"Node: {node['node_id']}, "
          f"Alive: {node['state']}, "
          f"Resources: CPU={node['resources_total'].get('CPU', 0)}, "
          f"GPU={node['resources_total'].get('GPU', 0)}")

# オブジェクト一覧
objects = list_objects(filters=[("reference_type", "=", "PINNED_IN_MEMORY")])

# ジョブ一覧
jobs = list_jobs()
# CLI からの状態確認
ray list actors --filter "state=ALIVE" --format json
ray list tasks --filter "state=RUNNING" --limit 50
ray list nodes
ray list objects --filter "reference_type=PINNED_IN_MEMORY"

# 詳細情報の取得
ray get actors <actor_id>
ray get tasks <task_id>

# クラスタの概要
ray status
ray summary tasks
ray summary actors
ray summary objects

15. Ray Jobs API

15.1 概要

Ray Jobs API は、Ray クラスタにジョブを submit し、ライフサイクルを管理するための API である。ローカル開発環境からプロダクションクラスタへのジョブ投入を標準化する。

15.2 CLI からのジョブ管理

# ジョブの submit
ray job submit \
    --address http://ray-head:8265 \
    --runtime-env-json '{
        "pip": ["torch==2.1.0", "transformers"],
        "working_dir": "s3://my-bucket/project/",
        "env_vars": {"WANDB_PROJECT": "my_project"}
    }' \
    --entrypoint-num-cpus 2 \
    --entrypoint-num-gpus 1 \
    -- python train.py --epochs 50 --lr 0.001

# ジョブの状態確認
ray job status <job_id> --address http://ray-head:8265

# ジョブのログ取得
ray job logs <job_id> --address http://ray-head:8265 --follow

# ジョブの停止
ray job stop <job_id> --address http://ray-head:8265

# ジョブ一覧
ray job list --address http://ray-head:8265

15.3 Python SDK からのジョブ管理

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient("http://ray-head:8265")

# ジョブの submit
job_id = client.submit_job(
    entrypoint="python train.py --epochs 50",
    runtime_env={
        "pip": ["torch==2.1.0", "transformers"],
        "working_dir": "s3://my-bucket/project/",
        "env_vars": {"CUDA_VISIBLE_DEVICES": "0,1"},
    },
    entrypoint_num_cpus=2,
    entrypoint_num_gpus=1,
    metadata={
        "job_name": "training_v2",
        "team": "ml-platform",
    },
)

print(f"Submitted job: {job_id}")

# ジョブの状態をポーリング
import time
from ray.job_submission import JobStatus

while True:
    status = client.get_job_status(job_id)
    print(f"Status: {status}")
    
    if status in {JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.STOPPED}:
        break
    time.sleep(10)

# ジョブのログ取得
logs = client.get_job_logs(job_id)
print(logs)

# 非同期ログストリーミング
async for lines in client.tail_job_logs(job_id):
    print(lines, end="")

16. パフォーマンスチューニング

16.1 メモリ管理

# メモリ関連の設定
ray.init(
    object_store_memory=20 * 1024**3,     # オブジェクトストアサイズ
    _system_config={
        # オブジェクトスピル設定
        "object_spilling_config": {
            "type": "filesystem",
            "params": {
                "directory_path": [
                    "/mnt/nvme/ray_spill",   # 高速 SSD を優先
                    "/mnt/hdd/ray_spill",    # フォールバック
                ],
                "buffer_size": 1048576,
            },
        },
        # メモリ管理
        "max_io_workers": 8,                 # I/O ワーカー数
        "min_spilling_size": 100 * 1024 * 1024,  # 最小スピルサイズ
        "object_store_full_delay_ms": 100,   # オブジェクトストア満杯時の遅延
    },
)

メモリリークの防止

# BAD: 大量の ObjectRef を保持
refs = []
for i in range(1000000):
    ref = compute.remote(i)
    refs.append(ref)  # 全ての参照を保持 → メモリ逼迫

# GOOD: ray.wait() で逐次処理
refs = [compute.remote(i) for i in range(1000000)]
results = []
while refs:
    done, refs = ray.wait(refs, num_returns=min(100, len(refs)))
    results.extend(ray.get(done))

# GOOD: ジェネレータパターン
def process_in_batches(items, batch_size=100):
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        futures = [compute.remote(item) for item in batch]
        yield ray.get(futures)  # バッチ単位で結果を取得・解放

16.2 シリアライゼーション最適化

# カスタムシリアライザの登録
import ray
from ray import cloudpickle

# 大きなオブジェクトは ray.put() で事前共有
# BAD
large_model = load_model()
futures = [predict.remote(large_model, x) for x in inputs]  # 毎回シリアライズ

# GOOD
model_ref = ray.put(load_model())
futures = [predict.remote(model_ref, x) for x in inputs]  # ObjectRef のみ渡す

# NumPy 配列は Arrow シリアライゼーションで高速
import numpy as np
array = np.random.rand(10000, 10000)  # Arrow で高速シリアライズ
ref = ray.put(array)

# Pandas DataFrame も Arrow ベースで効率的
import pandas as pd
df = pd.DataFrame({"a": range(1000000), "b": range(1000000)})
ref = ray.put(df)  # Arrow 変換で高速

# カスタムクラスのシリアライゼーション最適化
class MyModel:
    def __reduce__(self):
        """pickle プロトコルのカスタマイズ"""
        return (self.__class__._from_bytes, (self._serialize(),))
    
    def _serialize(self):
        return self.state_dict()  # 最小限のデータのみ
    
    @classmethod
    def _from_bytes(cls, data):
        model = cls()
        model.load_state_dict(data)
        return model

16.3 タスクの粒度最適化

# BAD: 粒度が細かすぎる(オーバーヘッドが大きい)
@ray.remote
def add_one(x):
    return x + 1

# 100万回のリモート呼び出し → 大量のオーバーヘッド
refs = [add_one.remote(i) for i in range(1000000)]

# GOOD: バッチ化して粒度を適切に
@ray.remote
def add_one_batch(xs):
    return [x + 1 for x in xs]

# 1000 個のバッチ × 1000 要素 → 適切なオーバーヘッド
batch_size = 1000
batches = [list(range(i, min(i + batch_size, 1000000))) 
           for i in range(0, 1000000, batch_size)]
refs = [add_one_batch.remote(batch) for batch in batches]

16.4 ネットワーク最適化

# データローカリティを活用
# 処理するデータと同じノードでタスクを実行
@ray.remote
def process_local(data_ref):
    """data_ref が格納されているノードで実行されるよう、
    スケジューラがデータローカリティを考慮する"""
    data = ray.get(data_ref)  # ローカルならゼロコピー
    return transform(data)

# 明示的なノードアフィニティ
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

data_ref = ray.put(large_data)
node_id = ray.get(ray.runtime_context.get_runtime_context().get_node_id.remote())

ref = process_local.options(
    scheduling_strategy=NodeAffinitySchedulingStrategy(
        node_id=node_id,
        soft=True,
    )
).remote(data_ref)

16.5 GPU 最適化

# 分数 GPU の活用(小さなモデルで GPU を共有)
@ray.remote(num_gpus=0.25)
def light_inference(model_ref, batch):
    """4 つのタスクで 1 GPU を共有"""
    model = ray.get(model_ref)
    return model.predict(batch)

# GPU メモリの管理
@ray.remote(num_gpus=1)
class GPUWorker:
    def __init__(self):
        import torch
        # GPU メモリの使用量を制限
        torch.cuda.set_per_process_memory_fraction(0.8)
        self.model = load_model().cuda()
    
    def predict(self, batch):
        import torch
        with torch.no_grad():
            with torch.cuda.amp.autocast():  # 混合精度推論
                return self.model(batch)

# マルチ GPU タスク
@ray.remote(num_gpus=4)
def multi_gpu_training(config):
    """4 GPU を使用する学習タスク"""
    import torch
    import torch.distributed as dist
    
    # Ray が CUDA_VISIBLE_DEVICES を自動設定
    model = create_model()
    model = torch.nn.DataParallel(model)  # または DistributedDataParallel
    return train(model, config)

17. セキュリティ

17.1 TLS 暗号化

# TLS の有効化
ray.init(
    _system_config={
        "tls_cert_file": "/certs/server.crt",
        "tls_key_file": "/certs/server.key",
        "tls_ca_cert_file": "/certs/ca.crt",
    }
)
# 環境変数での設定
export RAY_USE_TLS=1
export RAY_TLS_SERVER_CERT=/certs/server.crt
export RAY_TLS_SERVER_KEY=/certs/server.key
export RAY_TLS_CA_CERT=/certs/ca.crt

ray start --head

17.2 認証

# Ray Client の認証(トークンベース)
ray.init(
    "ray://ray-head:10001",
    runtime_env={"env_vars": {"RAY_JOB_TOKEN": "secure_token"}},
)

# KubeRay での認証(Kubernetes RBAC)
# ServiceAccount と RoleBinding で制御
# KubeRay RBAC 設定例
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: ray-user
  namespace: ray
rules:
- apiGroups: ["ray.io"]
  resources: ["rayjobs"]
  verbs: ["create", "get", "list", "delete"]
- apiGroups: ["ray.io"]
  resources: ["rayclusters"]
  verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: ray-user-binding
  namespace: ray
subjects:
- kind: User
  name: ml-engineer
  apiGroup: rbac.authorization.k8s.io
roleRef:
  kind: Role
  name: ray-user
  apiGroup: rbac.authorization.k8s.io

18. ベストプラクティスとデザインパターン

18.1 アンチパターンと対策

アンチパターン 1: 過剰な ray.get() 呼び出し

# BAD: 各タスクの結果を逐次取得(並列性が失われる)
results = []
for i in range(100):
    ref = compute.remote(i)
    result = ray.get(ref)      # ブロッキング!
    results.append(result)

# GOOD: 全タスクを起動してからまとめて取得
refs = [compute.remote(i) for i in range(100)]
results = ray.get(refs)        # 並列実行後にまとめて取得

# BETTER: ray.wait() で逐次処理(メモリ効率)
refs = [compute.remote(i) for i in range(100)]
while refs:
    done, refs = ray.wait(refs, num_returns=1)
    process(ray.get(done[0]))

アンチパターン 2: グローバル変数への依存

# BAD: グローバル変数はワーカーにシリアライズされる
global_model = load_large_model()  # 毎回シリアライズ

@ray.remote
def predict(x):
    return global_model.predict(x)

# GOOD: Actor を使用してステートを管理
@ray.remote
class ModelServer:
    def __init__(self):
        self.model = load_large_model()
    
    def predict(self, x):
        return self.model.predict(x)

server = ModelServer.remote()
results = ray.get([server.predict.remote(x) for x in inputs])

# GOOD: ray.put() で明示的にオブジェクトストアに格納
model_ref = ray.put(load_large_model())

@ray.remote
def predict(model_ref, x):
    model = ray.get(model_ref)  # ローカルならゼロコピー
    return model.predict(x)

アンチパターン 3: タスク内でのタスク起動の過剰ネスト

# BAD: 深いネストは追跡困難でデバッグしにくい
@ray.remote
def level1():
    refs = [level2.remote(i) for i in range(100)]
    return ray.get(refs)

@ray.remote
def level2(x):
    refs = [level3.remote(x, j) for j in range(100)]
    return ray.get(refs)

# GOOD: フラットな構造を維持
@ray.remote
def process_item(x, j):
    return compute(x, j)

all_futures = []
for i in range(100):
    for j in range(100):
        all_futures.append(process_item.remote(i, j))
results = ray.get(all_futures)

18.2 プロダクション運用チェックリスト

## デプロイメント前チェック
- [ ] GCS フォールトトレランスが有効化されている
- [ ] オブジェクトスピルが設定されている
- [ ] メモリ制限が適切に設定されている
- [ ] Prometheus/Grafana モニタリングが設定されている
- [ ] ログ収集パイプラインが設定されている
- [ ] TLS 暗号化が有効化されている
- [ ] オートスケーリングパラメータが調整されている
- [ ] ヘルスチェックが設定されている

## パフォーマンス最適化
- [ ] タスクの粒度が適切である(100ms〜10s が目安)
- [ ] 大きなオブジェクトは ray.put() で事前共有されている
- [ ] 不要な ray.get() が排除されている
- [ ] GPU 利用率が最適化されている
- [ ] ネットワーク転送が最小化されている

## フォールトトレランス
- [ ] タスクのリトライが設定されている
- [ ] 重要なアクターに max_restarts が設定されている
- [ ] チェックポイントが定期的に保存されている
- [ ] ワーカーノード障害時のリカバリが検証されている

19. Ray のユースケースと事例

19.1 大規模言語モデル (LLM) の学習・推論

# vLLM + Ray Serve による LLM サービング
from ray import serve

@serve.deployment(
    ray_actor_options={"num_gpus": 1},
    autoscaling_config={
        "min_replicas": 1,
        "max_replicas": 4,
        "target_ongoing_requests": 5,
    },
)
class LLMDeployment:
    def __init__(self):
        from vllm import LLM, SamplingParams
        self.llm = LLM(
            model="meta-llama/Llama-2-7b-chat-hf",
            tensor_parallel_size=1,
            gpu_memory_utilization=0.9,
        )
        self.sampling_params = SamplingParams(
            temperature=0.7,
            top_p=0.95,
            max_tokens=512,
        )
    
    async def __call__(self, request):
        data = await request.json()
        prompt = data["prompt"]
        outputs = self.llm.generate([prompt], self.sampling_params)
        return {"response": outputs[0].outputs[0].text}

app = LLMDeployment.bind()
serve.run(app)

19.2 大規模データ処理パイプライン

# ETL パイプラインの例
import ray

# 1. データ読み込み(並列)
ds = ray.data.read_parquet(
    "s3://datalake/raw/2024/01/*/",
    parallelism=200,
)

# 2. データクレンジング
ds = ds.map_batches(
    CleansingProcessor,
    batch_size=4096,
    num_cpus=2,
    concurrency=16,
)

# 3. 特徴量エンジニアリング(GPU アクセラレーション)
ds = ds.map_batches(
    FeatureEngineer,
    batch_size=1024,
    num_gpus=0.5,
    concurrency=8,
)

# 4. 結果の書き出し
ds.write_parquet(
    "s3://datalake/processed/2024/01/",
    num_rows_per_file=1000000,
)

19.3 ハイパーパラメータ探索 + 分散学習

# 大規模な HPO + 分散学習のパイプライン
from ray.train.torch import TorchTrainer
from ray import tune

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=ScalingConfig(
        num_workers=4,
        use_gpu=True,
        resources_per_worker={"CPU": 4, "GPU": 1},
    ),
    datasets={"train": train_ds, "val": val_ds},
)

tuner = tune.Tuner(
    trainer,
    param_space={
        "train_loop_config": {
            "lr": tune.loguniform(1e-5, 1e-2),
            "batch_size": tune.choice([16, 32, 64]),
            "hidden_dim": tune.choice([256, 512, 1024]),
            "num_layers": tune.choice([2, 4, 6]),
            "dropout": tune.uniform(0.0, 0.5),
        }
    },
    tune_config=TuneConfig(
        num_samples=200,
        scheduler=ASHAScheduler(max_t=50, grace_period=5),
        search_alg=OptunaSearch(metric="val_loss", mode="min"),
    ),
    run_config=RunConfig(
        storage_path="s3://experiments/hpo/",
        name="large_scale_hpo",
    ),
)

results = tuner.fit()
best = results.get_best_result("val_loss", "min")
print(f"Best config: {best.config}")

20. まとめ

20.1 Ray の主要な利点

  1. 統一プラットフォーム: データ処理、学習、チューニング、サービングを単一のフレームワークで実現
  2. 簡潔な API: @ray.remote デコレータで既存の Python コードを最小限の変更で分散化
  3. スケーラビリティ: 単一マシンから数千ノードのクラスタまでシームレスにスケール
  4. エコシステムの充実: PyTorch、TensorFlow、XGBoost、Hugging Face など主要な ML フレームワークとの統合
  5. プロダクション対応: フォールトトレランス、オートスケーリング、モニタリングを標準装備
  6. Kubernetes ネイティブ: KubeRay による Kubernetes 上のネイティブデプロイメント

20.2 Ray の選定基準

条件Ray が適切Ray 以外の検討
ML ワークロード全般-
分散学習 (>1 GPU)Horovod(既存環境)
ハイパーパラメータ探索Optuna(小規模)
モデルサービングTorchServe(単一モデル)
バッチデータ処理✅(ML パイプライン内)Spark(大規模 ETL)
ストリーミング処理Flink, Kafka Streams
汎用分散処理Dask(NumPy/Pandas 互換性)

20.3 今後の展望

Ray は急速に進化を続けており、以下の領域での発展が期待される:

  • LLM ネイティブサポート: vLLM との統合による大規模言語モデルの効率的なサービング
  • Ray Data の進化: ストリーミング処理の強化とデータソース対応の拡大
  • マルチクラウド対応: クラウドプロバイダー横断的なクラスタ管理
  • エッジコンピューティング: エッジデバイスでの Ray 軽量実行
  • セキュリティ強化: エンタープライズ向けの認証・認可機能の充実

Ray は、ML エンジニアリングにおける**「インフラの複雑さからの解放」**というビジョンのもと、分散コンピューティングの民主化を推進し続けている。