ZooKeeper

Apache ZooKeeper:分散調整サービスの完全ガイド

1. はじめに

Apache ZooKeeperは、分散システムにおける調整(coordination)を実現する中核的なサービスです。複数のサーバーが協調して動作する必要がある分散環境では、各ノードの状態管理、リーダー選出、設定の一元管理、サービスディスカバリーなど、様々な課題に直面します。ZooKeeperはこれらの課題を統一的に解決するための基盤となります。

本記事では、ZooKeeperの基本的な概念から実装の詳細、実運用での応用例まで、包括的に解説します。


2. ZooKeeperの概要と役割

2.1 ZooKeeperとは

ZooKeeperは、Hadoopエコシステムから生まれた分散コーディネーションサービスです。Google Chubby(論文:The Chubby lock service for loosely-coupled distributed systems)にインスパイアされ、大規模分散システムにおける信頼性の高い調整機構を提供します。

主な特徴:

  • 高可用性:複数ノードからなるアンサンブル(クラスタ)で動作し、ノード障害に対応
  • 強い一貫性:各操作はACID特性を満たし、データ一貫性を保証
  • シンプルなデータモデル:ファイルシステムに似た階層構造のz-nodes
  • ウォッチ機構:リアルタイム変更通知により、イベント駆動設計が可能
  • 高パフォーマンス:読み取り負荷に対して線形スケーリング

2.2 ZooKeeperが解決する問題

分散システムにおいては、以下のような課題が発生します:

  1. リーダー選出:複数のプロセスの中から1つをリーダーとして決定
  2. 設定管理:複数ノードに共通の設定を一元管理
  3. サービスディスカバリー:利用可能なサービスの動的な検出
  4. 分散ロック:複数プロセス間の排他制御
  5. グループメンバーシップ:アクティブなメンバーの追跡
  6. キュー管理:分散環境でのタスク配信

ZooKeeperはこれらすべての問題に対して、統一的で信頼性の高いソリューションを提供します。


3. ZooKeeperのアーキテクチャ

3.1 システム構成

ZooKeeperは分散アンサンブル(クラスタ)として動作します:

┌─────────────────────────────────────────────────┐
│         ZooKeeper Ensemble (Cluster)            │
├─────────────────────────────────────────────────┤
│  ┌──────────┐  ┌──────────┐  ┌──────────┐      │
│  │ Server 1 │  │ Server 2 │  │ Server 3 │      │
│  │(Leader)  │  │(Follower)│  │(Follower)│      │
│  └──────────┘  └──────────┘  └──────────┘      │
│       ↑              ↑              ↑           │
│       └──────────────┼──────────────┘           │
│                    ZAB Protocol                 │
│          (ZooKeeper Atomic Broadcast)           │
└─────────────────────────────────────────────────┘

重要な概念:

  1. Leader:書き取り要求を処理し、他のサーバーに変更を伝播
  2. Follower:リーダーに従い、読み取り要求に応答
  3. Observer:Followerと同様だが、投票に参加しない(オプション)
  4. Quorum:過半数以上のサーバーが正常に機能すればシステムは動作継続

3.2 データモデル

ZooKeeperはz-nodesと呼ばれるノードで構成された階層的なファイルシステムに似たデータ構造を提供します:

/
├── zookeeper/
│   ├── config
│   └── quota
├── kafka/
│   ├── brokers/
│   │   ├── ids/
│   │   │   ├── 0
│   │   │   ├── 1
│   │   │   └── 2
│   │   └── topics/
│   ├── controller
│   ├── controller_epoch
│   └── isr_change_notification/
├── hadoop/
│   ├── activeNameNode
│   ├── standbyNameNodes
│   └── namenode1
└── appconfig/
    ├── database/
    │   ├── host
    │   ├── port
    │   └── credentials
    └── service/
        ├── timeout
        └── retries

z-nodeの特性:

  • パス/app/database/host のような絶対パス
  • データ:各ノードは最大1MBのバイナリデータを保持
  • メタデータ:バージョン情報、タイムスタンプ、ACL
  • 子ノード:各ノードは複数の子ノードを持つ可能

3.3 z-nodeの種類

a) Persistent Node(永続ノード)

明示的に削除されるまで存在し続けます。設定情報やサービス登録に適しています。

/kafka/brokers/ids/0  ← 削除されるまで保持

b) Ephemeral Node(一時ノード)

クライアントがセッションを切断すると自動的に削除されます。ハートビート機構として利用されます。

/kafka/brokers/alive/1  ← セッション終了時に自動削除

c) Sequential Node(連番ノード)

作成時に自動的に連番サフィックスが追加されます。キュー実装やタスク分配に適しています。

/queue/task-0000000000  
/queue/task-0000000001
/queue/task-0000000002

d) Ephemeral Sequential Node(一時連番ノード)

一時ノードと連番ノードの特性を組み合わせたもの。分散ロック実装で活用されます。

3.4 ウォッチ(Watch)機構

ZooKeeperの最も強力な機能の1つがウォッチです。クライアントが特定のz-nodeの変更を監視し、イベント発生時にコールバックを受け取ることができます。

ウォッチの種類:

// データ変更の監視
watch.exists()        // ノードの作成/削除を監視
watch.getData()       // ノードのデータ変更を監視
watch.getChildren()   // 子ノードの追加/削除を監視

ウォッチの特性:

  • ワンショット:イベント発火後、再設定が必要
  • 順序保証:イベントとz-nodeの変更は順序が保証される
  • 原子性:ウォッチコールバックと状態変更は原子的に処理される

3.5 ZAB(ZooKeeper Atomic Broadcast)プロトコル

ZooKeeperの信頼性の中核を担うのがZABプロトコルです。

ZABの特性:

  1. 原子性:すべてのサーバーがすべての更新を同じ順序で受け取る
  2. 一貫性:クライアントが見るデータは常に一貫している
  3. 障害対応:リーダー障害時に自動的に新リーダーが選出される

ZABの2つのフェーズ:

Discovery フェーズ
├─ 新リーダーが過去の提案を検出
└─ フォロワーと同期点を確立

Synchronization フェーズ
├─ リーダーがすべてのフォロワーをキャッチアップ
└─ Broadcast開始前にすべてのノードが同期状態に

4. ZooKeeperの機能詳解

4.1 リーダー選出

複数のプロセスの中からリーダーを選出する手法を実装します。

// リーダー選出の実装例
public class LeaderElection {
    private ZooKeeper zk;
    private String nodePath;
    private byte[] metadata;
    
    public void joinElection(String electionPath, byte[] data) 
            throws KeeperException, InterruptedException {
        metadata = data;
        // 連番のエフェメラルノードを作成
        nodePath = zk.create(
            electionPath + "/candidate_",
            data,
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL_SEQUENTIAL
        );
    }
    
    public void watchLeadership(String electionPath, 
            LeadershipCallback callback) 
            throws KeeperException, InterruptedException {
        List<String> candidates = 
            zk.getChildren(electionPath, event -> {
                if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                    try {
                        watchLeadership(electionPath, callback);
                    } catch (KeeperException | InterruptedException e) {
                        callback.onError(e);
                    }
                }
            });
        
        Collections.sort(candidates);
        
        if (nodePath.endsWith(candidates.get(0))) {
            // 自分がリーダー
            callback.onLeadershipGained();
        } else {
            // 前のノードを監視
            int idx = candidates.indexOf(
                nodePath.substring(nodePath.lastIndexOf('/') + 1)
            );
            String prevNodePath = electionPath + "/" + candidates.get(idx - 1);
            
            if (zk.exists(prevNodePath, event -> {
                if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                    try {
                        watchLeadership(electionPath, callback);
                    } catch (KeeperException | InterruptedException e) {
                        callback.onError(e);
                    }
                }
            }) == null) {
                watchLeadership(electionPath, callback);
            }
        }
    }
}

4.2 分散ロック

ZooKeeperを使用した排他ロックと読み取り/書き込みロックの実装:

// 排他ロック(Exclusive Lock)
public class ExclusiveLock {
    private ZooKeeper zk;
    private String lockPath;
    private String nodePath;
    
    public boolean acquire(String lockName, long timeoutMs) 
            throws KeeperException, InterruptedException {
        lockPath = "/locks/" + lockName;
        
        // ロック用ディレクトリが存在しなければ作成
        if (zk.exists(lockPath, false) == null) {
            try {
                zk.create(lockPath, new byte[0], 
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                    CreateMode.PERSISTENT);
            } catch (KeeperException.NodeExistsException e) {
                // 競争状態で既に作成されている場合
            }
        }
        
        // エフェメラル連番ノードを作成
        nodePath = zk.create(
            lockPath + "/lock_",
            new byte[0],
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL_SEQUENTIAL
        );
        
        // ロック取得待機
        long startTime = System.currentTimeMillis();
        while (true) {
            List<String> children = zk.getChildren(lockPath, false);
            Collections.sort(children);
            
            String myName = nodePath.substring(nodePath.lastIndexOf('/') + 1);
            
            if (myName.equals(children.get(0))) {
                // ロック取得成功
                return true;
            }
            
            // 前のノードの削除を待機
            int idx = children.indexOf(myName);
            String prevNode = lockPath + "/" + children.get(idx - 1);
            
            if (zk.exists(prevNode, event -> {}) != null) {
                // 削除イベント待機
                long elapsed = System.currentTimeMillis() - startTime;
                if (elapsed > timeoutMs) {
                    return false;
                }
                Thread.sleep(Math.min(100, timeoutMs - elapsed));
            }
        }
    }
    
    public void release() throws KeeperException, InterruptedException {
        if (nodePath != null) {
            zk.delete(nodePath, -1);
        }
    }
}

// 読み取り/書き込みロック(Read/Write Lock)
public class ReadWriteLock {
    private ZooKeeper zk;
    private String lockPath;
    private String nodePath;
    private LockType lockType;
    
    public enum LockType {
        READ, WRITE
    }
    
    public boolean acquireReadLock(String lockName, long timeoutMs) 
            throws KeeperException, InterruptedException {
        lockPath = "/rwlocks/" + lockName;
        lockType = LockType.READ;
        
        // ディレクトリ作成
        ensurePathExists(lockPath);
        
        // 読み取りロック用のエフェメラルノードを作成
        nodePath = zk.create(
            lockPath + "/read_",
            new byte[0],
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL_SEQUENTIAL
        );
        
        return waitForLock(lockPath, lockType, timeoutMs);
    }
    
    public boolean acquireWriteLock(String lockName, long timeoutMs) 
            throws KeeperException, InterruptedException {
        lockPath = "/rwlocks/" + lockName;
        lockType = LockType.WRITE;
        
        ensurePathExists(lockPath);
        
        nodePath = zk.create(
            lockPath + "/write_",
            new byte[0],
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL_SEQUENTIAL
        );
        
        return waitForLock(lockPath, lockType, timeoutMs);
    }
    
    private boolean waitForLock(String lockPath, LockType type, 
            long timeoutMs) 
            throws KeeperException, InterruptedException {
        long startTime = System.currentTimeMillis();
        
        while (true) {
            List<String> children = zk.getChildren(lockPath, false);
            Collections.sort(children);
            
            String myName = nodePath.substring(nodePath.lastIndexOf('/') + 1);
            int myIndex = children.indexOf(myName);
            
            boolean canAcquire = false;
            
            if (type == LockType.WRITE) {
                // 書き込みロック:最初のノードでなければならない
                canAcquire = (myIndex == 0);
            } else {
                // 読み取りロック:前に書き込みロックがなければ良い
                canAcquire = true;
                for (int i = 0; i < myIndex; i++) {
                    if (children.get(i).startsWith("write_")) {
                        canAcquire = false;
                        break;
                    }
                }
            }
            
            if (canAcquire) {
                return true;
            }
            
            long elapsed = System.currentTimeMillis() - startTime;
            if (elapsed > timeoutMs) {
                return false;
            }
            
            Thread.sleep(Math.min(100, timeoutMs - elapsed));
        }
    }
    
    private void ensurePathExists(String path) 
            throws KeeperException, InterruptedException {
        if (zk.exists(path, false) == null) {
            try {
                zk.create(path, new byte[0], 
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                    CreateMode.PERSISTENT);
            } catch (KeeperException.NodeExistsException e) {
                // 既存
            }
        }
    }
}

4.3 設定管理(Configuration Management)

分散システムにおける設定の一元管理:

public class ConfigurationManager {
    private ZooKeeper zk;
    private String configPath;
    private Map<String, String> config = new ConcurrentHashMap<>();
    private List<ConfigChangeListener> listeners = 
        new CopyOnWriteArrayList<>();
    
    public ConfigurationManager(ZooKeeper zk, String configPath) {
        this.zk = zk;
        this.configPath = configPath;
    }
    
    public void initialize() 
            throws KeeperException, InterruptedException {
        // 設定ディレクトリが存在しなければ作成
        if (zk.exists(configPath, false) == null) {
            zk.create(configPath, new byte[0], 
                ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                CreateMode.PERSISTENT);
        }
        
        // 既存の設定を読み込み
        loadConfiguration();
        
        // 変更を監視
        watchConfiguration();
    }
    
    private void loadConfiguration() 
            throws KeeperException, InterruptedException {
        List<String> keys = zk.getChildren(configPath, false);
        
        for (String key : keys) {
            String nodePath = configPath + "/" + key;
            byte[] data = zk.getData(nodePath, false, null);
            config.put(key, new String(data, StandardCharsets.UTF_8));
        }
    }
    
    private void watchConfiguration() 
            throws KeeperException, InterruptedException {
        zk.getChildren(configPath, event -> {
            if (event.getType() == 
                    Watcher.Event.EventType.NodeChildrenChanged) {
                try {
                    loadConfiguration();
                    watchConfiguration();
                    notifyListeners();
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    
    public void setConfig(String key, String value) 
            throws KeeperException, InterruptedException {
        String nodePath = configPath + "/" + key;
        
        if (zk.exists(nodePath, false) != null) {
            // 既存ノードを更新
            zk.setData(nodePath, 
                value.getBytes(StandardCharsets.UTF_8), -1);
        } else {
            // 新規ノードを作成
            zk.create(nodePath, 
                value.getBytes(StandardCharsets.UTF_8),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                CreateMode.PERSISTENT);
        }
    }
    
    public String getConfig(String key) {
        return config.get(key);
    }
    
    public Map<String, String> getAllConfig() {
        return new HashMap<>(config);
    }
    
    public void addListener(ConfigChangeListener listener) {
        listeners.add(listener);
    }
    
    private void notifyListeners() {
        for (ConfigChangeListener listener : listeners) {
            listener.onConfigChanged(new HashMap<>(config));
        }
    }
    
    public interface ConfigChangeListener {
        void onConfigChanged(Map<String, String> newConfig);
    }
}

4.4 サービスディスカバリー

動的にサービスエンドポイントを検出するメカニズム:

public class ServiceRegistry {
    private ZooKeeper zk;
    private String basePath;
    private String serviceName;
    private String nodeId;
    private String ephemeralNodePath;
    
    public ServiceRegistry(ZooKeeper zk, String serviceName) {
        this.zk = zk;
        this.basePath = "/services";
        this.serviceName = serviceName;
    }
    
    public void registerService(String nodeId, ServiceEndpoint endpoint) 
            throws KeeperException, InterruptedException {
        this.nodeId = nodeId;
        
        String servicePath = basePath + "/" + serviceName;
        if (zk.exists(servicePath, false) == null) {
            try {
                zk.create(servicePath, new byte[0], 
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                    CreateMode.PERSISTENT);
            } catch (KeeperException.NodeExistsException e) {
                // 既存
            }
        }
        
        // エフェメラルノードでサービスを登録
        String endpointData = endpoint.toJson();
        ephemeralNodePath = zk.create(
            servicePath + "/" + nodeId,
            endpointData.getBytes(StandardCharsets.UTF_8),
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL
        );
    }
    
    public List<ServiceEndpoint> discoverServices() 
            throws KeeperException, InterruptedException {
        String servicePath = basePath + "/" + serviceName;
        List<ServiceEndpoint> endpoints = new ArrayList<>();
        
        try {
            List<String> nodeIds = zk.getChildren(servicePath, false);
            
            for (String id : nodeIds) {
                byte[] data = zk.getData(
                    servicePath + "/" + id, false, null);
                ServiceEndpoint endpoint = 
                    ServiceEndpoint.fromJson(
                        new String(data, StandardCharsets.UTF_8));
                endpoints.add(endpoint);
            }
        } catch (KeeperException.NoNodeException e) {
            // サービスは存在しない
        }
        
        return endpoints;
    }
    
    public void watchServiceChanges(ServiceChangeListener listener) 
            throws KeeperException, InterruptedException {
        String servicePath = basePath + "/" + serviceName;
        
        zk.getChildren(servicePath, event -> {
            if (event.getType() == 
                    Watcher.Event.EventType.NodeChildrenChanged) {
                try {
                    List<ServiceEndpoint> endpoints = 
                        discoverServices();
                    listener.onServiceListChanged(endpoints);
                    watchServiceChanges(listener);
                } catch (KeeperException | InterruptedException e) {
                    listener.onError(e);
                }
            }
        });
    }
    
    public void unregisterService() 
            throws KeeperException, InterruptedException {
        if (ephemeralNodePath != null) {
            zk.delete(ephemeralNodePath, -1);
        }
    }
    
    public static class ServiceEndpoint {
        public String host;
        public int port;
        public Map<String, String> metadata;
        
        public String toJson() {
            // JSON形式でシリアライズ
            JSONObject obj = new JSONObject();
            obj.put("host", host);
            obj.put("port", port);
            obj.put("metadata", metadata);
            return obj.toString();
        }
        
        public static ServiceEndpoint fromJson(String json) {
            // JSONからデシリアライズ
            JSONObject obj = new JSONObject(json);
            ServiceEndpoint ep = new ServiceEndpoint();
            ep.host = obj.getString("host");
            ep.port = obj.getInt("port");
            ep.metadata = obj.getJSONObject("metadata").toMap();
            return ep;
        }
    }
    
    public interface ServiceChangeListener {
        void onServiceListChanged(List<ServiceEndpoint> endpoints);
        void onError(Exception e);
    }
}

5. ZooKeeperの設定と運用

5.1 基本設定(zoo.cfg)

ZooKeeperの動作を制御する主要な設定パラメータ:

# zoo.cfg - ZooKeeper設定ファイル

# 基本設定
tickTime=2000                           # ハートビート間隔(ミリ秒)
dataDir=/var/lib/zookeeper/data        # スナップショット保存ディレクトリ
clientPort=2181                         # クライアント接続ポート

# サーバー設定(3ノードアンサンブル)
server.1=zk-server-1.example.com:2888:3888
server.2=zk-server-2.example.com:2888:3888
server.3=zk-server-3.example.com:2888:3888

# 説明:
# server.{id}={hostname}:{ピアポート}:{リーダー選出ポート}
# - id: サーバーの一意識別子(myidファイルで指定)
# - ピアポート: サーバー間通信ポート(デフォルト2888)
# - リーダー選出ポート: リーダー選出プロトコル用ポート(デフォルト3888)

# パフォーマンスチューニング
initLimit=10                            # リーダー選出時の同期タイムアウト倍数
syncLimit=5                             # サーバー間同期タイムアウト倍数
snapCount=100000                        # スナップショット作成までのトランザクション数

# ネットワーク設定
serverCnxnSocket=org.apache.zookeeper.server.NettyServerCnxnFactory
serverCnxnSocket.portUnification=false

# セッション管理
sessionTimeout.min=4000                 # 最小セッションタイムアウト
sessionTimeout.max=40000                # 最大セッションタイムアウト

# 監視設定
autopurge.snapRetainCount=3             # 保持するスナップショット数
autopurge.purgeInterval=24              # パージ間隔(時間単位)

# ログレベル
4lw.commands.whitelist=srvr,stat,mntr   # 4文字コマンドのホワイトリスト

# Advanced設定
globalOutstandingLimit=1000              # 処理待ちのリクエスト最大数
preAllocSize=65536                      # トランザクションログ事前割り当てサイズ
snapCount=100000                         # スナップショット作成頻度
forceSync=yes                            # ディスクへの強制フラッシュ
standaloneEnabled=false                  # スタンドアロンモード無効化

5.2 各ノードのmyidファイル

各ZooKeeperサーバーは固有のIDを持つ必要があります:

# Server 1: /var/lib/zookeeper/data/myid
echo "1" > /var/lib/zookeeper/data/myid
chmod 600 /var/lib/zookeeper/data/myid

# Server 2: /var/lib/zookeeper/data/myid
echo "2" > /var/lib/zookeeper/data/myid

# Server 3: /var/lib/zookeeper/data/myid
echo "3" > /var/lib/zookeeper/data/myid

5.3 本番環境の設定例

大規模分散システム向けの推奨設定:

# zoo.cfg - 本番環境設定

tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
secure.clientPort=2281

# 5ノードアンサンブル(高可用性)
server.1=zk-node-1.internal:2888:3888
server.2=zk-node-2.internal:2888:3888
server.3=zk-node-3.internal:2888:3888
server.4=zk-node-4.internal:2888:3888
server.5=zk-node-5.internal:2888:3888

# Observer設定(投票に参加しないノード)
server.6=zk-observer-1.internal:2888:3888:observer

# 同期設定
initLimit=20
syncLimit=10

# パフォーマンス
snapCount=100000
autopurge.snapRetainCount=5
autopurge.purgeInterval=24

# ネットワーク
serverCnxnSocket=org.apache.zookeeper.server.NettyServerCnxnFactory
portUnification=false

# ログ設定
standaloneEnabled=false
serverCnxnSocket=org.apache.zookeeper.server.NettyServerCnxnFactory

# セキュリティ(SASL認証)
server.authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
serverCnxnSocket.portUnification=false

# 4文字コマンド
4lw.commands.whitelist=srvr,stat,mntr,cons

6. 実装パターン

6.1 Kafkaにおける活用

Apache KafkaはZooKeeperに大きく依存しており、ブローカー管理、トピック管理、リーダー選出に利用しています。

Kafka ZooKeeper トポロジ例:
/kafka/
├── brokers/
│   ├── ids/                          # ブローカーID登録
│   │   ├── 0: {ブローカーメタデータ}
│   │   ├── 1: {ブローカーメタデータ}
│   │   └── 2: {ブローカーメタデータ}
│   └── topics/                       # トピック情報
│       ├── user-events/
│       │   ├── partitions/           # パーティション情報
│       │   └── leader_epoch
│       └── logs/
├── controller/                        # 現在のコントローラー(リーダー)
├── controller_epoch/                  # コントローラーエポック
├── isr_change_notification/           # ISR(In-Sync Replicas)変更通知
├── admin/
│   ├── delete_topics/                 # 削除予定トピック
│   └── reassign_partitions/           # パーティション再割り当て
└── config/
    ├── topics/                        # トピック設定
    ├── brokers/                       # ブローカー設定
    └── clients/                       # クライアント設定

6.2 Hadoopにおける活用

HDFS High Availabilityでは、ZooKeeperがNameNodeのフェイルオーバーを管理します。

HDFS HA ZooKeeper トポロジ例:
/hadoop-ha/mycluster/
├── activeNameNode/                   # 現在のアクティブNameNode
├── standbyNameNodes/                 # スタンバイNameNodeのリスト
├── namenode1                          # NameNode 1の状態
├── namenode2                          # NameNode 2の状態
└── election/                          # リーダー選出ロック
    ├── namenode1
    └── namenode2

設定例:

<!-- hdfs-site.xml -->
<configuration>
    <!-- HA設定 -->
    <property>
        <name>dfs.ha.enabled</name>
        <value>true</value>
    </property>
    
    <property>
        <name>dfs.ha.namenodes.mycluster</name>
        <value>namenode1,namenode2</value>
    </property>
    
    <!-- NameNode 1設定 -->
    <property>
        <name>dfs.namenode.rpc-address.mycluster.namenode1</name>
        <value>nn1.example.com:8020</value>
    </property>
    
    <!-- NameNode 2設定 -->
    <property>
        <name>dfs.namenode.rpc-address.mycluster.namenode2</name>
        <value>nn2.example.com:8020</value>
    </property>
    
    <!-- ZooKeeper設定 -->
    <property>
        <name>ha.zookeeper.quorum</name>
        <value>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181</value>
    </property>
    
    <!-- Automatic Failover設定 -->
    <property>
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>
</configuration>

6.3 マイクロサービスアーキテクチャでの活用

マイクロサービス間の調整にZooKeeperを活用する例:

マイクロサービス構成例:
/microservices/
├── payment-service/                  # 支払いサービス
│   ├── instances/                    # インスタンス登録
│   │   ├── instance-1
│   │   ├── instance-2
│   │   └── instance-3
│   ├── config/
│   │   ├── timeout
│   │   ├── retries
│   │   └── circuitBreaker
│   └── leader/                       # 支払いサービスリーダー
├── inventory-service/
│   ├── instances/
│   ├── config/
│   └── leader/
└── notification-service/
    ├── instances/
    ├── config/
    └── leader/

7. ベストプラクティス

7.1 ZooKeeperの選択基準

ZooKeeperが適切な場合:

  • 強い一貫性が必須
  • リーダー選出やロックが必要
  • ノード数が小〜中規模(数百ノード程度)
  • 複雑な調整ロジックが必要

ZooKeeperが不適切な場合:

  • 超高スケール(数千以上のノード)
  • 高スループットデータストア
  • 最終的一貫性で許容可能

7.2 パフォーマンス最適化

# パフォーマンスチューニング設定

# 1. ネットワーク性能
serverCnxnSocket=org.apache.zookeeper.server.NettyServerCnxnFactory
Xmx4G                                   # JVMヒープサイズ

# 2. ディスク性能
forceSync=no                           # スループット重視時はno(durability低下)
snapCount=100000                       # 大きめに設定

# 3. タイムアウト設定
tickTime=2000
initLimit=20                           # ネットワーク遅延が大きい場合は増加
syncLimit=10

# 4. 接続数制限
maxClientCnxns=300                     # クライアント接続数上限

7.3 障害対応とリカバリ

# ZooKeeperクラスタの健全性確認
echo srvr | nc localhost 2181          # サーバー情報取得

# 出力例:
# Zookeeper version: 3.8.0
# Latency min/avg/max: 0/1/100
# Received: 1000
# Sent: 1000
# Mode: follower
# Node count: 150

# リーダー確認
echo stat | nc localhost 2181

# クライアント接続確認
echo cons | nc localhost 2181

# トランザクションログの確認
zkDumpLog.sh /var/lib/zookeeper/data

# スナップショットの復旧
zkServer.sh stop
rm -rf /var/lib/zookeeper/data/*
# スナップショットから復旧
zkServer.sh start

7.4 セキュリティ設定

# SASL認証の有効化
server.authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
serverCnxnSocket.portUnification=false

# JAAS設定
# /etc/zookeeper/conf/jaas.conf
Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    user_root=test_root_passwd
    user_zk_client=test_client_passwd;
};

# クライアント側
Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="zk_client"
    password="test_client_passwd";
};

8. トラブルシューティング

8.1 一般的な問題と解決法

問題原因解決法
クライアント接続失敗ファイアウォール設定ポート2181開放確認
クオーラム喪失ノード障害過半数以上のノードが起動確認
データ不整合ZABプロトコル障害トランザクションログ確認
高レイテンシディスク遅延SSD使用、fsync設定確認
メモリ不足ヒープサイズ小Xmxパラメータ増加

8.2 監視項目

# Javaプロセス監視
ps aux | grep zookeeper

# ポート監視
netstat -tlnp | grep 2181
netstat -tlnp | grep 2888

# ディスク使用量
du -sh /var/lib/zookeeper/data

# ログレベル設定(log4j.properties)
log4j.rootLogger=INFO, ROLLINGFILE
log4j.logger.org.apache.zookeeper.server.quorum=INFO
log4j.logger.org.apache.zookeeper.server.persistence=INFO

9. ZooKeeper 4.0への進化

9.1 Raft プロトコルへの移行

ZooKeeper 4.0では、ZABからよりシンプルなRaftプロトコルへの移行を計画しています。これにより:

  • 実装の簡潔化:Raftはより理解しやすい
  • 相互運用性の向上:Raftは業界標準
  • 保守性向上:バグ修正とテストが容易

9.2 新機能

  • 動的リコンフィグレーション:クラスタ構成のホットアップデート
  • 容量計画:大規模クラスタへのスケーリング改善
  • パフォーマンス向上:新しいネットワークI/O機構

10. 他の分散調整サービスとの比較

項目ZooKeeperetcdConsul
一貫性モデル強い一貫性強い一貫性最終的一貫性
言語JavaGoGo
データモデル階層的(ツリー)フラット(キー)キー・バリュー
サービスディスカバリー限定的限定的統合
APIJava/CLIREST/gRPCREST/DNS
学習曲線中程度容易容易
本番運用実績非常に高い高い高い

11. まとめ

Apache ZooKeeperは、分散システムの複雑な調整問題を解決するための基盤となるサービスです。その強力なウォッチ機構、原子性を保証するZABプロトコル、シンプルで効果的なデータモデルは、Hadoop、Kafka、HBase、その他多くの分散システムに信頼されています。

本記事では、ZooKeeperの基本概念からアーキテクチャ、実装パターン、運用までを包括的に解説しました。これらの知識を基に、堅牢で拡張性の高い分散システムの構築を実現できるでしょう。

要点:

  1. ZooKeeperはクォーラムベースの高可用性設計
  2. z-nodesと階層構造による直感的なデータモデル
  3. ウォッチ機構による効率的なイベント駆動
  4. ZABプロトコルによる強い一貫性保証
  5. 適切な設定と監視による本番環境での安定運用

12. 参考資料