Apache Hadoop

Apache Hadoop 包括的技術ガイド

本記事は AI によって生成された技術資料です。 作成日: 2026-04-11


目次

  1. はじめに・歴史
  2. アーキテクチャ概要
  3. HDFS 詳解
  4. YARN 詳解
  5. MapReduce プログラミングモデル
  6. 設定ファイル
  7. セキュリティ
  8. パフォーマンスチューニング
  9. メモリ管理
  10. モニタリング
  11. エコシステム
  12. Hadoop 3.x の新機能
  13. モダンな代替技術との比較
  14. ユースケースとベストプラクティス
  15. まとめ

1. はじめに・歴史

1.1 Apache Hadoop とは

Apache Hadoop は、コモディティハードウェアのクラスタ上で大規模データセットの 分散処理を可能にするオープンソースフレームワークである。数ペタバイト規模のデータを 数千台のノードにわたって保存・処理する能力を持ち、ビッグデータ処理の基盤技術として 広く利用されている。

Hadoop の名前は、プロジェクト創設者 Doug Cutting の息子が持っていた 黄色いぬいぐるみの象に由来する。技術的な意味はないが、この名前は ビッグデータ業界の象徴的存在となった。

1.2 起源: Google の研究論文

Hadoop の技術的基盤は、Google が発表した 2 つの画期的な論文に遡る。

Google File System (GFS) - 2003年

論文: "The Google File System"
著者: Sanjay Ghemawat, Howard Gobioff, Shun-Tak Leung
発表: SOSP 2003 (Symposium on Operating Systems Principles)

GFS は以下の設計思想を持っていた:

  • コンポーネント障害は例外ではなく常態である
  • ファイルは通常のファイルシステムの基準から見て非常に大きい
  • ほとんどのファイルはランダム書き込みではなく追記で更新される
  • アプリケーションとファイルシステム API の協調設計が柔軟性を高める

MapReduce - 2004年

論文: "MapReduce: Simplified Data Processing on Large Clusters"
著者: Jeffrey Dean, Sanjay Ghemawat
発表: OSDI 2004 (Operating Systems Design and Implementation)

MapReduce は、大規模データ処理を 2 つの単純な関数に抽象化した:

  • Map: 入力データをキーバリューペアに変換
  • Reduce: 同一キーの値を集約して最終結果を生成

1.3 Yahoo! での発展と Apache Software Foundation

2004年 - プロジェクトの開始

Doug Cutting と Mike Cafarella は、オープンソース Web 検索エンジン プロジェクト「Nutch」の一部として、Google の論文に基づいた分散ファイルシステムと MapReduce 処理フレームワークの実装を開始した。

2006年 - Yahoo! の貢献と ASF への移管

Doug Cutting が Yahoo! に入社し、Nutch の分散処理部分が独立プロジェクト として分離された。これが Apache Hadoop となり、Apache Software Foundation (ASF) のトップレベルプロジェクトとして正式に採用された。

Yahoo! は Hadoop の開発に大きく貢献し、数千台のノードからなる 本番クラスタを運用した。

主要なマイルストーン:

年表:
--------------------------------------------------------------
2003年  Google GFS 論文発表
2004年  Google MapReduce 論文発表
2004年  Nutch で分散処理の実装開始
2006年  Yahoo! が開発を支援、Hadoop が独立プロジェクトに
2006年  Apache Software Foundation のトップレベルプロジェクトに
2008年  Yahoo! が 10,000 ノードの Hadoop クラスタを運用
2009年  Hadoop が 1 分間で 1TB のデータをソート (世界記録)
2011年  Hadoop 1.0 リリース
2012年  YARN の導入 (Hadoop 2.0)
2013年  Hadoop 2.2.0 GA リリース
2017年  Hadoop 3.0 リリース (erasure coding, YARN Timeline v2)
2022年  Hadoop 3.3.x 安定版
--------------------------------------------------------------

1.4 バージョンの進化

Hadoop 1.x

Hadoop の初期バージョンは 2 つの主要コンポーネントで構成されていた:

+------------------------------------------+
|            Hadoop 1.x Architecture       |
+------------------------------------------+
|                                          |
|   +----------------+  +--------------+   |
|   |     HDFS       |  |  MapReduce   |   |
|   | (Storage Layer)|  | (Processing) |   |
|   +----------------+  +--------------+   |
|                                          |
|   - Single NameNode (SPOF)               |
|   - JobTracker / TaskTracker             |
|   - MapReduce のみサポート                 |
|   - スケーラビリティ制限 (~4,000 ノード)     |
+------------------------------------------+

Hadoop 1.x の課題:

  • NameNode が Single Point of Failure (SPOF) であった
  • JobTracker がリソース管理とジョブスケジューリングの両方を担当
  • MapReduce 以外の処理モデルをサポートできなかった
  • クラスタサイズが約 4,000 ノードに制限されていた

Hadoop 2.x

YARN (Yet Another Resource Negotiator) の導入により、アーキテクチャが 大幅に改善された:

+--------------------------------------------------+
|            Hadoop 2.x Architecture               |
+--------------------------------------------------+
|                                                  |
|   +--------------------------------------------+|
|   |              Applications                   ||
|   | MapReduce | Spark | Tez | HBase | Storm    ||
|   +--------------------------------------------+|
|   |                 YARN                        ||
|   |    (Resource Management Layer)              ||
|   +--------------------------------------------+|
|   |                 HDFS                        ||
|   |    (Storage Layer - HA Support)             ||
|   +--------------------------------------------+|
|                                                  |
|   - HDFS HA (NameNode の冗長化)                    |
|   - YARN によるリソース管理の分離                    |
|   - 複数の処理モデルをサポート                       |
|   - HDFS Federation                              |
|   - 10,000+ ノードへのスケーラビリティ               |
+--------------------------------------------------+

Hadoop 3.x

さらなる機能強化と効率改善が実現された:

+--------------------------------------------------+
|            Hadoop 3.x Architecture               |
+--------------------------------------------------+
|                                                  |
|   主要な改善点:                                    |
|   - Erasure Coding (ストレージ効率の向上)           |
|   - YARN Timeline Service v2                     |
|   - 2つ以上の NameNode サポート (HA強化)            |
|   - GPU/FPGA スケジューリング                      |
|   - Java 8+ サポート (Java 11+ 推奨)              |
|   - S3A コネクタの改善                              |
|   - Intra-DataNode バランサー                      |
|   - ポート番号の変更 (非特権ポートへ)                 |
|   - コンテナの Docker サポート                      |
+--------------------------------------------------+

2. アーキテクチャ概要

2.1 Master-Slave アーキテクチャ

Hadoop は典型的な Master-Slave (マスター・スレーブ) アーキテクチャを採用している。 マスターノードがメタデータ管理やリソーススケジューリングを担当し、 スレーブノードが実際のデータ保存と処理を行う。

                    Hadoop クラスタ アーキテクチャ
                    ==============================

    +------------------+         +--------------------+
    |   Master Nodes   |         |    Client Nodes    |
    +------------------+         +--------------------+
    |                  |         |                    |
    |  +-----------+   |         |  +-------------+  |
    |  | NameNode  |   |         |  | hadoop fs   |  |
    |  | (Active)  |   |<------->|  | hadoop jar  |  |
    |  +-----------+   |         |  | yarn app    |  |
    |                  |         |  +-------------+  |
    |  +-----------+   |         +--------------------+
    |  | NameNode  |   |
    |  | (Standby) |   |
    |  +-----------+   |
    |                  |
    |  +------------+  |
    |  | Resource   |  |
    |  | Manager    |  |
    |  +------------+  |
    |                  |
    +------------------+
            |
            | Heartbeat / Block Reports
            |
    +-------+-------+-------+-------+
    |       |       |       |       |
    v       v       v       v       v
  +----+  +----+  +----+  +----+  +----+
  | DN |  | DN |  | DN |  | DN |  | DN |
  | NM |  | NM |  | NM |  | NM |  | NM |
  +----+  +----+  +----+  +----+  +----+

  DN = DataNode (HDFS ストレージ)
  NM = NodeManager (YARN 計算リソース)

2.2 主要コンポーネント

コンポーネント役割ノードタイプ
NameNodeHDFS メタデータ管理Master
Secondary NameNodeチェックポイント作成 (HA ではない)Master
DataNodeブロックデータの保存Slave
ResourceManagerクラスタリソースの管理Master
NodeManagerノード上のコンテナ管理Slave
ApplicationMasterアプリケーション固有のリソース管理Slave (動的)
JournalNodeHA 用の編集ログ共有専用/共有
ZooKeeper分散協調サービス専用

2.3 データフローの概要

書き込みフロー:

Client                NameNode              DataNode1   DataNode2   DataNode3
  |                      |                     |           |           |
  |-- create file ------>|                     |           |           |
  |<-- block locations --|                     |           |           |
  |                      |                     |           |           |
  |-- write block 1 ---------------------------->|           |           |
  |                      |                     |-- replicate -->|       |
  |                      |                     |           |-- replicate -->|
  |                      |                     |           |           |
  |<-- ack ----------------------------------------|           |           |
  |                      |                     |           |           |
  |-- close file ------->|                     |           |           |
  |                      |-- block report <----|           |           |

読み取りフロー:

Client                NameNode              DataNode
  |                      |                     |
  |-- open file -------->|                     |
  |<-- block locations --|                     |
  |                      |                     |
  |-- read block 1 -------------------------------->|
  |<-- data ----------------------------------------|
  |                      |                     |
  |-- read block 2 -------------------------------->|  (closest replica)
  |<-- data ----------------------------------------|

2.4 ネットワークトポロジー

Hadoop はラックを意識したネットワークトポロジーを使用する:

                    +-------------------+
                    |   Network Switch  |
                    |    (Top-of-Rack)  |
                    +-------------------+
                   /         |          \
         +--------+    +--------+    +--------+
         | Rack 1 |    | Rack 2 |    | Rack 3 |
         +--------+    +--------+    +--------+
         | Node A |    | Node D |    | Node G |
         | Node B |    | Node E |    | Node H |
         | Node C |    | Node F |    | Node I |
         +--------+    +--------+    +--------+

    距離の定義:
    - 同一ノード:     distance = 0
    - 同一ラック:     distance = 2
    - 異なるラック:   distance = 4
    - 異なるデータセンター: distance = 6

3. HDFS 詳解

3.1 HDFS の設計思想

HDFS (Hadoop Distributed File System) は以下の設計原則に基づいている:

  1. ハードウェア障害の許容: 障害は例外ではなく通常の運用状態
  2. ストリーミングデータアクセス: バッチ処理に最適化、低レイテンシーよりスループット重視
  3. 大規模データセット: ギガバイト〜テラバイト単位のファイルを想定
  4. シンプルな一貫性モデル: Write-Once-Read-Many (WORM) アクセスパターン
  5. データの移動よりも計算の移動: データローカリティの活用
  6. ポータビリティ: 異種ハードウェア・ソフトウェアプラットフォーム間の互換性

3.2 NameNode

NameNode は HDFS のマスターサーバーであり、ファイルシステムの名前空間全体を管理する。

NameNode の主要な役割:

  • ファイルシステムの名前空間管理 (ディレクトリツリー)
  • ファイルとブロックのマッピング管理
  • ブロックとDataNode のマッピング管理 (メモリ内のみ)
  • クライアントからのファイル操作リクエストの処理
  • ブロックレプリケーション戦略の決定

メタデータの構成:

NameNode メタデータ
=====================

1. FsImage (ファイルシステムイメージ)
   - ファイルシステムの完全なスナップショット
   - ファイル、ディレクトリ、パーミッション
   - ブロックとファイルのマッピング
   - ディスクに永続化される

2. EditLog (編集ログ)
   - FsImage 以降の全変更を記録
   - すべてのメタデータ操作をシーケンシャルに記録
   - ディスクに永続化される

3. ブロックマッピング (メモリ内)
   - ブロック → DataNode のマッピング
   - DataNode からの BlockReport で構築
   - ディスクには永続化されない (起動時に再構築)

メモリ使用量の目安:
- 各ファイル: 約 150 バイト
- 各ブロック: 約 150 バイト
- 1億ファイルの場合: 約 30GB のヒープメモリが必要

チェックポイントプロセス:

            チェックポイントの仕組み
            ========================

  +----------+                    +-------------------+
  | NameNode |                    | Secondary NameNode|
  |          |                    | or Standby NN     |
  +----------+                    +-------------------+
       |                                   |
       |  1. EditLog を転送                  |
       |---------------------------------->|
       |                                   |
       |                            2. FsImage + EditLog
       |                               をマージ
       |                                   |
       |  3. 新しい FsImage を転送            |
       |<----------------------------------|
       |                                   |
  4. 新しい FsImage で                       |
     古い FsImage を置換                     |
       |                                   |

3.3 DataNode

DataNode は実際のデータブロックを保存するワーカーノードである。

DataNode の主要な役割:

  • ブロックデータの保存と取得
  • 定期的な Heartbeat の送信 (デフォルト: 3秒)
  • Block Report の送信 (デフォルト: 6時間ごと)
  • クライアントからの読み書きリクエストの処理
  • ブロックのレプリケーション
DataNode のディスクレイアウト
==============================

${dfs.datanode.data.dir}/
├── current/
│   ├── VERSION
│   ├── BP-<pool-id>/
│   │   ├── current/
│   │   │   ├── VERSION
│   │   │   ├── finalized/
│   │   │   │   ├── subdir0/
│   │   │   │   │   ├── subdir0/
│   │   │   │   │   │   ├── blk_1073741825
│   │   │   │   │   │   ├── blk_1073741825.meta
│   │   │   │   │   │   ├── blk_1073741826
│   │   │   │   │   │   └── blk_1073741826.meta
│   │   │   │   │   └── subdir1/
│   │   │   │   └── subdir1/
│   │   │   └── rbw/  (Replica Being Written)
│   │   └── tmp/
│   └── in_use.lock
└── in_use.lock

3.4 ブロックレプリケーション

HDFS はデータの信頼性を確保するためにブロックレプリケーションを使用する。

デフォルトのレプリケーション戦略 (replication factor = 3):

レプリカ配置ポリシー
======================

   +------ Rack 1 ------+     +------ Rack 2 ------+
   |                     |     |                     |
   |  +------+           |     |  +------+           |
   |  |Node A| Replica 1 |     |  |Node C| Replica 3 |
   |  |      | ●         |     |  |      | ●         |
   |  +------+           |     |  +------+           |
   |                     |     |                     |
   |  +------+           |     |  +------+           |
   |  |Node B| Replica 2 |     |  |Node D|           |
   |  |      | ●         |     |  |      |           |
   |  +------+           |     |  +------+           |
   |                     |     |                     |
   +---------------------+     +---------------------+

   配置ルール:
   1. Replica 1: クライアントと同じノード (またはランダム)
   2. Replica 2: 同じラックの異なるノード
   3. Replica 3: 異なるラックのノード
   
   理由:
   - 1ノード障害: 2つのレプリカが残る
   - 1ラック障害: 少なくとも1つのレプリカが残る
   - 書き込み帯域幅: ラック間通信を最小化

3.5 Rack Awareness (ラック認識)

ラック認識は、Hadoop がネットワークトポロジーを理解してデータ配置と 読み取りの最適化を行う機能である。

ラックトポロジースクリプトの例:

#!/bin/bash
# /etc/hadoop/conf/topology.sh
# ラックトポロジースクリプト

HADOOP_CONF=/etc/hadoop/conf

while [ $# -gt 0 ] ; do
  nodeArg=$1
  exec< ${HADOOP_CONF}/topology.data
  result=""
  while read line ; do
    ar=( $line )
    if [ "${ar[0]}" = "$nodeArg" ] ; then
      result="${ar[1]}"
    fi
  done
  shift
  if [ -z "$result" ] ; then
    echo -n "/default-rack "
  else
    echo -n "$result "
  fi
done

topology.data の例:

# IP/Hostname    Rack
192.168.1.1      /dc1/rack1
192.168.1.2      /dc1/rack1
192.168.1.3      /dc1/rack1
192.168.1.4      /dc1/rack2
192.168.1.5      /dc1/rack2
192.168.1.6      /dc1/rack2
192.168.2.1      /dc2/rack1
192.168.2.2      /dc2/rack1

core-site.xml での設定:

<property>
  <name>net.topology.script.file.name</name>
  <value>/etc/hadoop/conf/topology.sh</value>
</property>
<property>
  <name>net.topology.script.number.args</name>
  <value>100</value>
</property>

3.6 HDFS High Availability (HA)

JournalNode / QJM (Quorum Journal Manager)

Hadoop 2.x から、NameNode の SPOF 問題を解決するために HA が導入された。

           HDFS HA アーキテクチャ (QJM)
           =============================

  +----------------+              +----------------+
  |   Active       |              |   Standby      |
  |   NameNode     |              |   NameNode     |
  |                |              |                |
  |  FsImage       |              |  FsImage       |
  |  EditLog (mem) |              |  EditLog (mem) |
  +-------+--------+              +--------+-------+
          |                                |
          | Write Edits         Read Edits |
          |                                |
  +-------v--------+--------+--------v-------+
  |  JournalNode 1 | JournalNode 2 | JournalNode 3 |
  |  (EditLog)     | (EditLog)     | (EditLog)      |
  +---------+------+-------+-------+------+---------+
                                     
  +---------------------------------------------------+
  |                  ZooKeeper Ensemble                |
  |  (Active NameNode の選出とフェイルオーバー管理)       |
  |  ZK Node 1  |  ZK Node 2  |  ZK Node 3           |
  +---------------------------------------------------+
                          |
                +---------+---------+
                |  ZKFC    |  ZKFC  |
                | (Active) | (Stby) |
                +----------+--------+
                
  ZKFC = ZooKeeper Failover Controller
  
  動作フロー:
  1. Active NN が EditLog を JournalNode に書き込み
  2. Standby NN が JournalNode から EditLog を読み取り
  3. Standby NN が常に最新のメタデータを保持
  4. ZKFC が NameNode の状態を監視
  5. Active NN 障害時、ZKFC が ZooKeeper 経由でフェイルオーバー開始
  6. Standby NN が Active に昇格

HA 設定の重要パラメータ:

<!-- hdfs-site.xml -->
<property>
  <name>dfs.nameservices</name>
  <value>mycluster</value>
</property>

<property>
  <name>dfs.ha.namenodes.mycluster</name>
  <value>nn1,nn2</value>
</property>

<property>
  <name>dfs.namenode.rpc-address.mycluster.nn1</name>
  <value>namenode1.example.com:8020</value>
</property>

<property>
  <name>dfs.namenode.rpc-address.mycluster.nn2</name>
  <value>namenode2.example.com:8020</value>
</property>

<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://jn1.example.com:8485;jn2.example.com:8485;jn3.example.com:8485/mycluster</value>
</property>

<property>
  <name>dfs.ha.automatic-failover.enabled</name>
  <value>true</value>
</property>

<property>
  <name>dfs.ha.fencing.methods</name>
  <value>sshfence</value>
</property>

<property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/hadoop/.ssh/id_rsa</value>
</property>

3.7 HDFS Federation

HDFS Federation により、複数の独立した NameNode/Namespace を使用して 水平スケーラビリティを実現する。

           HDFS Federation アーキテクチャ
           ===============================

  +---------------+  +---------------+  +---------------+
  |  NameNode 1   |  |  NameNode 2   |  |  NameNode 3   |
  |  Namespace A  |  |  Namespace B  |  |  Namespace C  |
  |  /user        |  |  /data        |  |  /tmp         |
  |  Block Pool 1 |  |  Block Pool 2 |  |  Block Pool 3 |
  +-------+-------+  +-------+-------+  +-------+-------+
          |                   |                   |
          +-------------------+-------------------+
          |                   |                   |
  +-------v-------------------v-------------------v-------+
  |                    DataNode Cluster                    |
  |  +----------+  +----------+  +----------+              |
  |  | DN 1     |  | DN 2     |  | DN 3     |  ...        |
  |  | BP1,BP2  |  | BP1,BP3  |  | BP2,BP3  |             |
  |  | BP3      |  | BP2      |  | BP1      |             |
  |  +----------+  +----------+  +----------+              |
  +--------------------------------------------------------+

  利点:
  - 名前空間の水平スケーリング
  - パフォーマンスの分離
  - 名前空間ごとの独立した管理
  
  注意点:
  - DataNode は全 Block Pool のデータを保存
  - 名前空間間の参照は不可

3.8 Erasure Coding (消失符号化)

Hadoop 3.x で導入された Erasure Coding は、レプリケーションの代替として ストレージ効率を大幅に向上させる。

レプリケーション vs Erasure Coding
===================================

【3x レプリケーション】
  ストレージオーバーヘッド: 200%
  
  元データ: [Block A] [Block B] [Block C]
  
  保存データ:
  [Block A] [Block A] [Block A]   (3 copies)
  [Block B] [Block B] [Block B]   (3 copies)
  [Block C] [Block C] [Block C]   (3 copies)
  
  合計: 9 ブロック (元データの 3 倍)

【RS-6-3 Erasure Coding】
  ストレージオーバーヘッド: 50%
  
  元データ: [D1] [D2] [D3] [D4] [D5] [D6]
  パリティ:                 [P1] [P2] [P3]
  
  合計: 9 ブロック (元データの 1.5 倍)
  
  → 任意の 3 ブロック障害を許容
  → ストレージ使用量が約 50% 削減

Erasure Coding の設定:

# Erasure Coding ポリシーの確認
hdfs ec -listPolicies

# ディレクトリに EC ポリシーを設定
hdfs ec -setPolicy -path /data/cold -policy RS-6-3-1024k

# EC ポリシーの確認
hdfs ec -getPolicy -path /data/cold

# EC ポリシーの無効化
hdfs ec -unsetPolicy -path /data/cold

# 利用可能なポリシー:
# RS-3-2-1024k  : 3 データ + 2 パリティ (オーバーヘッド 67%)
# RS-6-3-1024k  : 6 データ + 3 パリティ (オーバーヘッド 50%)
# RS-10-4-1024k : 10 データ + 4 パリティ (オーバーヘッド 40%)
# XOR-2-1-1024k : 2 データ + 1 パリティ (オーバーヘッド 50%)

3.9 HDFS コマンドリファレンス

# ============================================
# HDFS シェルコマンド
# ============================================

# --- ファイル/ディレクトリ操作 ---

# ディレクトリ一覧表示
hdfs dfs -ls /user/hadoop/
hdfs dfs -ls -R /user/hadoop/    # 再帰的

# ディレクトリ作成
hdfs dfs -mkdir /user/hadoop/data
hdfs dfs -mkdir -p /user/hadoop/data/2026/04   # 親ディレクトリも作成

# ファイルのアップロード
hdfs dfs -put localfile.txt /user/hadoop/
hdfs dfs -copyFromLocal localfile.txt /user/hadoop/
hdfs dfs -moveFromLocal localfile.txt /user/hadoop/  # ローカルから削除

# ファイルのダウンロード
hdfs dfs -get /user/hadoop/file.txt ./
hdfs dfs -copyToLocal /user/hadoop/file.txt ./
hdfs dfs -getmerge /user/hadoop/output/ merged.txt  # 複数ファイルをマージ

# ファイル内容の表示
hdfs dfs -cat /user/hadoop/file.txt
hdfs dfs -text /user/hadoop/file.txt   # 圧縮ファイルも展開表示
hdfs dfs -head /user/hadoop/file.txt   # 先頭 1KB
hdfs dfs -tail /user/hadoop/file.txt   # 末尾 1KB

# ファイルのコピー/移動/削除
hdfs dfs -cp /user/hadoop/src /user/hadoop/dst
hdfs dfs -mv /user/hadoop/old /user/hadoop/new
hdfs dfs -rm /user/hadoop/file.txt
hdfs dfs -rm -r /user/hadoop/directory/    # 再帰削除
hdfs dfs -rm -r -skipTrash /user/hadoop/tmp/  # ゴミ箱をスキップ

# パーミッション
hdfs dfs -chmod 755 /user/hadoop/dir
hdfs dfs -chmod -R 644 /user/hadoop/data/
hdfs dfs -chown hadoop:hadoop /user/hadoop/
hdfs dfs -chgrp analytics /user/hadoop/shared/

# ディスク使用量
hdfs dfs -du -s -h /user/hadoop/     # サマリー (人間が読みやすい形式)
hdfs dfs -df -h                       # ファイルシステム全体の使用量
hdfs dfs -count /user/hadoop/         # ファイル/ディレクトリ数

# --- 管理コマンド ---

# NameNode 操作
hdfs namenode -format                 # フォーマット (初回のみ)
hdfs namenode -rollEdits              # EditLog のロール
hdfs namenode -metaSave filename      # メタデータの保存

# HDFS 管理
hdfs dfsadmin -report                 # クラスタレポート
hdfs dfsadmin -safemode enter         # セーフモード開始
hdfs dfsadmin -safemode leave         # セーフモード解除
hdfs dfsadmin -safemode get           # セーフモード状態確認
hdfs dfsadmin -refreshNodes           # ノードリストの更新
hdfs dfsadmin -printTopology          # トポロジーの表示
hdfs dfsadmin -setBalancerBandwidth 104857600  # バランサー帯域幅設定

# ファイルシステムチェック
hdfs fsck / -files -blocks -locations  # 全ブロックの場所を表示
hdfs fsck /user/hadoop/ -delete        # 壊れたファイルの削除
hdfs fsck /user/hadoop/ -move          # 壊れたファイルの移動

# バランサー
hdfs balancer -threshold 10            # 10% の閾値でバランシング
hdfs balancer -policy datanode         # DataNode ベースのポリシー

# スナップショット
hdfs dfsadmin -allowSnapshot /user/hadoop/data
hdfs dfs -createSnapshot /user/hadoop/data snap1
hdfs snapshotDiff /user/hadoop/data snap1 snap2
hdfs dfs -deleteSnapshot /user/hadoop/data snap1

4. YARN 詳解

4.1 YARN の概要

YARN (Yet Another Resource Negotiator) は Hadoop 2.x で導入された リソース管理フレームワークである。MapReduce に限定されていた Hadoop 1.x の 制約を解消し、様々な分散処理フレームワークが同一クラスタ上で動作可能になった。

             YARN アーキテクチャ
             ===================

  +--------------------------------------------------+
  |              ResourceManager (RM)                 |
  |  +-------------------+  +--------------------+   |
  |  |    Scheduler      |  | ApplicationManager |   |
  |  | (Capacity/Fair)   |  | (AM管理)           |   |
  |  +-------------------+  +--------------------+   |
  +---+------------------------+------------------+--+
      |                        |                  |
      |   Container割当         |   Heartbeat      |
      |                        |                  |
  +---v------+  +---------v---+  +------------v--+
  |NodeMgr 1 |  |NodeMgr 2   |  |NodeMgr 3     |
  |          |  |            |  |              |
  | +------+ |  | +------+  |  | +------+     |
  | | AM   | |  | |Cont. |  |  | |Cont. |     |
  | |(App1)| |  | |(App1)|  |  | |(App1)|     |
  | +------+ |  | +------+  |  | +------+     |
  |          |  |            |  |              |
  | +------+ |  | +------+  |  | +------+     |
  | |Cont. | |  | | AM   |  |  | |Cont. |     |
  | |(App2)| |  | |(App2)|  |  | |(App2)|     |
  | +------+ |  | +------+  |  | +------+     |
  +----------+  +------------+  +--------------+

  RM  = ResourceManager (クラスタ全体のリソース管理)
  NM  = NodeManager (各ノードのリソース管理)
  AM  = ApplicationMaster (アプリケーション固有の管理)
  Cont = Container (リソースの割当単位)

4.2 ResourceManager (RM)

ResourceManager はクラスタ全体のリソースを管理するマスターデーモンである。

主要コンポーネント:

ResourceManager の内部構造
============================

+----------------------------------------------------------+
|                    ResourceManager                        |
|                                                          |
|  +--------------------+   +------------------------+     |
|  |     Scheduler      |   | ApplicationManager     |     |
|  |                    |   |                        |     |
|  | - リソース割当      |   | - アプリケーション受付   |     |
|  | - キュー管理        |   | - AM のライフサイクル管理 |     |
|  | - キャパシティ管理   |   | - AM の障害回復         |     |
|  +--------------------+   +------------------------+     |
|                                                          |
|  +--------------------+   +------------------------+     |
|  | ResourceTracker    |   | AdminService           |     |
|  |                    |   |                        |     |
|  | - NM のハートビート |   | - 管理操作              |     |
|  | - NM の登録/解除   |   | - キュー設定変更         |     |
|  +--------------------+   +------------------------+     |
|                                                          |
|  +--------------------+   +------------------------+     |
|  | ClientRMService    |   | NMTokenSecretManager   |     |
|  |                    |   |                        |     |
|  | - クライアント通信   |   | - セキュリティトークン   |     |
|  +--------------------+   +------------------------+     |
+----------------------------------------------------------+

ResourceManager HA:

<!-- yarn-site.xml -->
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>

<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>yarn-cluster</value>
</property>

<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>

<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>rm-host1.example.com</value>
</property>

<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>rm-host2.example.com</value>
</property>

<property>
  <name>yarn.resourcemanager.zk-address</name>
  <value>zk1:2181,zk2:2181,zk3:2181</value>
</property>

<property>
  <name>yarn.resourcemanager.recovery.enabled</name>
  <value>true</value>
</property>

<property>
  <name>yarn.resourcemanager.store.class</name>
  <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>

4.3 NodeManager (NM)

NodeManager は各ワーカーノード上で動作するエージェントである。

NodeManager の責務:

  • ノード上のリソース (CPU、メモリ) の監視
  • コンテナのライフサイクル管理 (起動、監視、終了)
  • ResourceManager への Heartbeat 送信
  • ログの集約と管理
  • ヘルスチェックの実行
NodeManager のリソース管理
===========================

ノード: 128GB RAM, 32 CPU コア

+---------------------------------------------------+
|              NodeManager                          |
|                                                   |
|  予約リソース (OS + NM 自体):                       |
|  - メモリ: 8GB                                     |
|  - CPU: 2 コア                                     |
|                                                   |
|  YARN 管理リソース:                                 |
|  - メモリ: 120GB                                   |
|  - CPU: 30 vコア                                   |
|                                                   |
|  +--------+ +--------+ +--------+ +--------+     |
|  |Cont. 1 | |Cont. 2 | |Cont. 3 | |Cont. 4 |     |
|  |4GB     | |8GB     | |2GB     | |16GB    |     |
|  |2 vcore | |4 vcore | |1 vcore | |8 vcore |     |
|  +--------+ +--------+ +--------+ +--------+     |
|                                                   |
|  空きリソース: 90GB メモリ, 15 vコア                 |
+---------------------------------------------------+

4.4 ApplicationMaster (AM)

ApplicationMaster はアプリケーション固有のマスターコンポーネントであり、 ResourceManager からリソースをネゴシエートし、NodeManager と連携して タスクの実行を管理する。

ApplicationMaster のライフサイクル:

アプリケーション実行フロー
============================

Client          ResourceManager      NodeManager     ApplicationMaster
  |                  |                    |                |
  |  1. Submit App   |                    |                |
  |----------------->|                    |                |
  |                  |                    |                |
  |                  |  2. Allocate AM    |                |
  |                  |    Container       |                |
  |                  |------------------->|                |
  |                  |                    |                |
  |                  |                    | 3. Launch AM   |
  |                  |                    |--------------->|
  |                  |                    |                |
  |                  |  4. Register AM    |                |
  |                  |<-----------------------------------|
  |                  |                    |                |
  |                  |  5. Request        |                |
  |                  |    Containers      |                |
  |                  |<-----------------------------------|
  |                  |                    |                |
  |                  |  6. Allocate       |                |
  |                  |    Containers      |                |
  |                  |----------------------------------->|
  |                  |                    |                |
  |                  |                    | 7. Launch      |
  |                  |                    |    Tasks       |
  |                  |                    |<---------------|
  |                  |                    |                |
  |  8. Status       |                    |                |
  |<----------------------------------------------------|
  |                  |                    |                |
  |                  | 9. Unregister AM   |                |
  |                  |<-----------------------------------|

4.5 Container

Container は YARN におけるリソースの割当単位である。

Container の構成要素
=====================

Container {
    ContainerID:    container_1234567890_0001_01_000001
    NodeID:         node1.example.com:45454
    Resources: {
        Memory:     4096 MB
        VCores:     2
        GPU:        0 (Hadoop 3.x)
    }
    Priority:       1
    ContainerToken: <encrypted-token>
    LocalResources: {
        app.jar:    hdfs://nn/user/app/app.jar
        config.xml: hdfs://nn/user/app/config.xml
    }
    Environment: {
        JAVA_HOME:  /usr/lib/jvm/java-11
        CLASSPATH:  ...
    }
    Command:        java -Xmx3072m ...
}

4.6 スケジューラ

Capacity Scheduler

Capacity Scheduler は Apache Hadoop のデフォルトスケジューラであり、 複数のテナントが計算リソースを共有するためのマルチテナント対応スケジューラである。

Capacity Scheduler のキュー階層
=================================

                    root (100%)
                   /     |      \
                  /      |       \
            prod (60%) dev (30%) staging (10%)
           /     \        |
          /       \       |
   batch (40%) streaming (20%)  research (30%)
   
   特徴:
   - 階層的なキュー構造
   - 最小容量保証
   - 弾力的なリソース共有 (余剰リソースの借用)
   - ACL によるアクセス制御
   - ユーザー/アプリケーション制限

Capacity Scheduler の設定例:

<!-- capacity-scheduler.xml -->
<configuration>
  <!-- キュー定義 -->
  <property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>prod,dev,staging</value>
  </property>

  <!-- prod キューの設定 -->
  <property>
    <name>yarn.scheduler.capacity.root.prod.capacity</name>
    <value>60</value>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.prod.maximum-capacity</name>
    <value>80</value>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.prod.queues</name>
    <value>batch,streaming</value>
  </property>

  <!-- prod.batch サブキュー -->
  <property>
    <name>yarn.scheduler.capacity.root.prod.batch.capacity</name>
    <value>67</value>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.prod.batch.maximum-capacity</name>
    <value>100</value>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.prod.batch.user-limit-factor</name>
    <value>1</value>
  </property>

  <!-- prod.streaming サブキュー -->
  <property>
    <name>yarn.scheduler.capacity.root.prod.streaming.capacity</name>
    <value>33</value>
  </property>

  <!-- dev キューの設定 -->
  <property>
    <name>yarn.scheduler.capacity.root.dev.capacity</name>
    <value>30</value>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>
    <value>50</value>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.dev.maximum-applications</name>
    <value>50</value>
  </property>

  <!-- staging キューの設定 -->
  <property>
    <name>yarn.scheduler.capacity.root.staging.capacity</name>
    <value>10</value>
  </property>

  <!-- ACL 設定 -->
  <property>
    <name>yarn.scheduler.capacity.root.prod.acl_submit_applications</name>
    <value>prod-team</value>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.dev.acl_submit_applications</name>
    <value>*</value>
  </property>
</configuration>

Fair Scheduler

Fair Scheduler は全アプリケーションに均等にリソースを分配するスケジューラである。

<!-- fair-scheduler.xml -->
<allocations>
  <defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>

  <queue name="prod">
    <minResources>40000 mb, 20 vcores</minResources>
    <maxResources>80000 mb, 40 vcores</maxResources>
    <weight>3.0</weight>
    <schedulingPolicy>fair</schedulingPolicy>
    <aclSubmitApps>prod-group</aclSubmitApps>
    
    <queue name="batch">
      <minResources>20000 mb, 10 vcores</minResources>
      <weight>2.0</weight>
    </queue>
    
    <queue name="streaming">
      <minResources>10000 mb, 5 vcores</minResources>
      <weight>1.0</weight>
    </queue>
  </queue>

  <queue name="dev">
    <minResources>20000 mb, 10 vcores</minResources>
    <maxResources>40000 mb, 20 vcores</maxResources>
    <weight>1.0</weight>
    <schedulingPolicy>fair</schedulingPolicy>
    <aclSubmitApps>*</aclSubmitApps>
    <maxRunningApps>10</maxRunningApps>
  </queue>

  <!-- ユーザーごとのデフォルト制限 -->
  <userMaxAppsDefault>5</userMaxAppsDefault>

  <!-- キュー配置ルール -->
  <queuePlacementPolicy>
    <rule name="specified" />
    <rule name="primaryGroup" create="false" />
    <rule name="default" queue="dev" />
  </queuePlacementPolicy>
</allocations>

4.7 YARN コマンドリファレンス

# ============================================
# YARN コマンド
# ============================================

# アプリケーション管理
yarn application -list                          # 実行中アプリ一覧
yarn application -list -appStates ALL           # 全状態のアプリ一覧
yarn application -list -appTypes MAPREDUCE      # MapReduce アプリのみ
yarn application -status application_123_0001   # アプリ状態確認
yarn application -kill application_123_0001     # アプリ強制終了

# ノード管理
yarn node -list                                 # アクティブノード一覧
yarn node -list -all                           # 全ノード一覧
yarn node -status node1.example.com:45454      # ノード状態確認

# キュー管理
yarn queue -status prod                         # キュー状態確認

# ログ確認
yarn logs -applicationId application_123_0001   # アプリログ取得
yarn logs -applicationId application_123_0001 \
  -containerId container_123_0001_01_000001     # コンテナログ

# クラスタ情報
yarn cluster --loglevel                         # ログレベル確認
yarn top                                        # リアルタイムクラスタ状態

# ResourceManager 管理
yarn rmadmin -refreshQueues                     # キュー設定リロード
yarn rmadmin -refreshNodes                      # ノードリスト更新
yarn rmadmin -getServiceState rm1               # RM 状態確認 (HA)
yarn rmadmin -transitionToActive rm1            # Active に切替 (HA)

5. MapReduce プログラミングモデル

5.1 MapReduce の概要

MapReduce は大規模データの並列分散処理を行うプログラミングモデルである。 入力データを分割し、Map フェーズで変換、Shuffle/Sort フェーズで中間データを 整理し、Reduce フェーズで集約する。

MapReduce 処理フロー
=====================

Input          Split        Map        Shuffle &     Reduce      Output
Data           Phase        Phase      Sort Phase    Phase       Data
                                                     
+--------+   +------+   +--------+                +--------+   +--------+
|Record 1|-->|Split1|-->|Mapper 1|--+          +-->|Reducer1|-->|Part-0  |
|Record 2|   |      |   |        |  |   Sort   |   |        |   |        |
|Record 3|   +------+   +--------+  +-->+------+-->+--------+   +--------+
|Record 4|                          |   |Merge |
|Record 5|   +------+   +--------+  +-->|& Sort|   +--------+   +--------+
|Record 6|-->|Split2|-->|Mapper 2|--+   +------+-->|Reducer2|-->|Part-1  |
|Record 7|   |      |   |        |  |          |   |        |   |        |
|Record 8|   +------+   +--------+  +--------->+-->+--------+   +--------+
|Record 9|                          |
|Record10|   +------+   +--------+  |
|Record11|-->|Split3|-->|Mapper 3|--+
|Record12|   |      |   |        |
+--------+   +------+   +--------+

5.2 各コンポーネントの詳細

InputFormat

InputFormat はデータの読み込み方法を定義する。

// InputFormat の主要な実装
//
// TextInputFormat       - テキストファイル (行単位)
// KeyValueTextInputFormat - キーバリュー形式テキスト
// NLineInputFormat      - N行ごとに分割
// SequenceFileInputFormat - SequenceFile 形式
// CombineTextInputFormat - 小さなファイルを結合

// カスタム InputFormat の例
public class CustomInputFormat extends FileInputFormat<LongWritable, Text> {
    
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        return new CustomRecordReader();
    }
    
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        // 圧縮形式によって分割可否を判断
        CompressionCodec codec = 
            new CompressionCodecFactory(context.getConfiguration())
                .getCodec(file);
        if (codec == null) {
            return true;  // 非圧縮は分割可能
        }
        return codec instanceof SplittableCompressionCodec;
    }
}

Mapper

Mapper は入力データをキーバリューペアに変換する。

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * WordCount の Mapper 実装例
 * 入力: (行オフセット, 行テキスト)
 * 出力: (単語, 1)
 */
public class WordCountMapper 
    extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    private final static IntWritable ONE = new IntWritable(1);
    private Text word = new Text();
    
    @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {
        // 初期化処理 (各 Mapper タスク起動時に1回実行)
        // 例: 分散キャッシュからファイルを読み込み
    }
    
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // key: 行のバイトオフセット
        // value: 行のテキスト
        String line = value.toString();
        String[] tokens = line.split("\\s+");
        
        for (String token : tokens) {
            word.set(token.toLowerCase().trim());
            context.write(word, ONE);
        }
    }
    
    @Override
    protected void cleanup(Context context)
            throws IOException, InterruptedException {
        // 終了処理 (各 Mapper タスク完了時に1回実行)
    }
}

Combiner

Combiner はローカルでの事前集約を行い、Shuffle フェーズのデータ転送量を削減する。

/**
 * Combiner の例 (Reducer と同じロジックを使用可能な場合)
 * 
 * Combiner 適用前:
 *   Mapper 1: (apple, 1), (banana, 1), (apple, 1), (apple, 1)
 *   → Shuffle で 4 レコード転送
 * 
 * Combiner 適用後:
 *   Mapper 1: (apple, 3), (banana, 1)
 *   → Shuffle で 2 レコード転送 (50% 削減)
 */
public class WordCountCombiner
    extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

// 注意: Combiner は最適化であり、実行回数は保証されない
// - 0回実行される場合がある
// - 複数回実行される場合がある
// - 結果は Combiner なしの場合と同一でなければならない
// - 平均値計算には直接使用不可 (合計+カウントで対応)

Partitioner

Partitioner は Mapper の出力をどの Reducer に送るか決定する。

/**
 * デフォルトの HashPartitioner の動作:
 * partition = (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
 */

/**
 * カスタム Partitioner の例
 * 日付キーの年ごとに異なる Reducer に分配
 */
public class DatePartitioner extends Partitioner<Text, IntWritable> {
    
    @Override
    public int getPartition(Text key, IntWritable value, 
            int numReduceTasks) {
        // key format: "2026-04-11"
        String date = key.toString();
        int year = Integer.parseInt(date.substring(0, 4));
        
        // 年に基づいてパーティションを決定
        return (year % numReduceTasks);
    }
}

Shuffle / Sort

Shuffle/Sort フェーズは MapReduce の核心であり、Map 出力を Reducer に 効率的に転送する。

Shuffle / Sort フェーズの詳細
================================

Mapper 側 (Map Side):
+----------------------------------------------------------+
|                                                          |
|  Map Output  --> Serialization --> Circular Buffer        |
|                                   (100MB default)        |
|                                        |                 |
|                                   80% threshold          |
|                                        |                 |
|                                   Sort (QuickSort)       |
|                                   + Partition            |
|                                   + Combiner (optional)  |
|                                        |                 |
|                                   Spill to Disk          |
|                                   (spill.0, spill.1...)  |
|                                        |                 |
|                                   Merge Spills           |
|                                   + Combiner (optional)  |
|                                        |                 |
|                                   Single Sorted File     |
|                                   (per partition)        |
+----------------------------------------------------------+

Reducer 側 (Reduce Side):
+----------------------------------------------------------+
|                                                          |
|  HTTP GET -----> Copy Phase (Shuffle)                    |
|  (from Mappers)  (fetch map output partitions)           |
|                        |                                 |
|                   In-Memory Buffer                       |
|                   or Disk (threshold)                    |
|                        |                                 |
|                   Sort Phase (Merge Sort)                |
|                   (merge multiple map outputs)           |
|                        |                                 |
|                   Reduce Phase                           |
|                   (user-defined reduce function)         |
|                        |                                 |
|                   Output to HDFS                         |
+----------------------------------------------------------+

設定パラメータ:
- mapreduce.task.io.sort.mb = 100          (ソートバッファサイズ)
- mapreduce.map.sort.spill.percent = 0.80  (スピル閾値)
- mapreduce.task.io.sort.factor = 10       (マージファクタ)
- mapreduce.reduce.shuffle.parallelcopies = 5  (並列コピー数)

Reducer

Reducer は同一キーの値を集約して最終結果を生成する。

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * WordCount の Reducer 実装例
 * 入力: (単語, [1, 1, 1, ...])
 * 出力: (単語, 合計)
 */
public class WordCountReducer
    extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    private IntWritable result = new IntWritable();
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

OutputFormat

OutputFormat は結果データの書き込み方法を定義する。

// OutputFormat の主要な実装
//
// TextOutputFormat        - テキストファイル (TSV)
// SequenceFileOutputFormat - SequenceFile 形式
// MapFileOutputFormat     - MapFile 形式
// MultipleOutputs        - 複数出力ファイル
// LazyOutputFormat        - レコードがある場合のみ出力
// NullOutputFormat        - 出力なし (副作用のみ)

5.3 完全な MapReduce ジョブの例

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 完全な WordCount ジョブ設定
 */
public class WordCount extends Configured implements Tool {
    
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: WordCount <input> <output>");
            return 1;
        }
        
        Configuration conf = getConf();
        
        // ジョブの設定
        Job job = Job.getInstance(conf, "Word Count");
        job.setJarByClass(WordCount.class);
        
        // 入力設定
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        
        // Mapper 設定
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        // Combiner 設定 (オプション)
        job.setCombinerClass(WordCountCombiner.class);
        
        // Partitioner 設定 (カスタムの場合)
        // job.setPartitionerClass(CustomPartitioner.class);
        
        // Reducer 設定
        job.setReducerClass(WordCountReducer.class);
        job.setNumReduceTasks(3);  // Reducer 数
        
        // 出力設定
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        // 圧縮設定 (オプション)
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(
            job, org.apache.hadoop.io.compress.GzipCodec.class);
        
        // ジョブ実行
        return job.waitForCompletion(true) ? 0 : 1;
    }
    
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Configuration(), 
            new WordCount(), args);
        System.exit(exitCode);
    }
}

ジョブの実行:

# コンパイルとパッケージング
mvn clean package

# ジョブの実行
hadoop jar wordcount-1.0.jar com.example.WordCount \
  /user/hadoop/input/ \
  /user/hadoop/output/

# Hadoop Streaming (Python の例)
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
  -input /user/hadoop/input/ \
  -output /user/hadoop/output/ \
  -mapper mapper.py \
  -reducer reducer.py \
  -file mapper.py \
  -file reducer.py

Python Streaming Mapper の例:

#!/usr/bin/env python3
# mapper.py
import sys

for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print(f"{word.lower()}\t1")

Python Streaming Reducer の例:

#!/usr/bin/env python3
# reducer.py
import sys
from collections import defaultdict

current_word = None
current_count = 0

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    count = int(count)
    
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print(f"{current_word}\t{current_count}")
        current_word = word
        current_count = count

if current_word:
    print(f"{current_word}\t{current_count}")

6. 設定ファイル

6.1 core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  core-site.xml - Hadoop コア設定
  全コンポーネント (HDFS, YARN, MapReduce) に共通の設定
-->
<configuration>

  <!-- ============================================ -->
  <!-- ファイルシステム設定                           -->
  <!-- ============================================ -->
  
  <!-- デフォルトファイルシステム URI -->
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://mycluster</value>
    <description>
      HDFS HA の場合は nameservice ID を指定。
      非 HA の場合は hdfs://namenode-host:8020
    </description>
  </property>

  <!-- ============================================ -->
  <!-- I/O 設定                                      -->
  <!-- ============================================ -->
  
  <!-- I/O バッファサイズ -->
  <property>
    <name>io.file.buffer.size</name>
    <value>131072</value>
    <description>
      SequenceFile の読み書きバッファサイズ (128KB)。
      大規模クラスタでは 131072 以上を推奨。
    </description>
  </property>

  <!-- 圧縮コーデック -->
  <property>
    <name>io.compression.codecs</name>
    <value>
      org.apache.hadoop.io.compress.GzipCodec,
      org.apache.hadoop.io.compress.DefaultCodec,
      org.apache.hadoop.io.compress.BZip2Codec,
      org.apache.hadoop.io.compress.SnappyCodec,
      org.apache.hadoop.io.compress.Lz4Codec,
      org.apache.hadoop.io.compress.ZStandardCodec
    </value>
  </property>

  <!-- ============================================ -->
  <!-- HA / ZooKeeper 設定                           -->
  <!-- ============================================ -->
  
  <property>
    <name>ha.zookeeper.quorum</name>
    <value>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181</value>
  </property>

  <property>
    <name>ha.zookeeper.session-timeout.ms</name>
    <value>5000</value>
  </property>

  <!-- ============================================ -->
  <!-- プロキシユーザー設定                           -->
  <!-- ============================================ -->
  
  <property>
    <name>hadoop.proxyuser.hive.hosts</name>
    <value>*</value>
  </property>
  <property>
    <name>hadoop.proxyuser.hive.groups</name>
    <value>*</value>
  </property>

  <!-- ============================================ -->
  <!-- ゴミ箱設定                                     -->
  <!-- ============================================ -->
  
  <property>
    <name>fs.trash.interval</name>
    <value>1440</value>
    <description>
      ゴミ箱のチェックポイント間隔 (分)。
      1440 = 24時間。0 = ゴミ箱無効。
    </description>
  </property>

  <property>
    <name>fs.trash.checkpoint.interval</name>
    <value>0</value>
    <description>
      ゴミ箱チェックポイントの間隔。
      0 の場合 fs.trash.interval と同じ値が使用される。
    </description>
  </property>

  <!-- ============================================ -->
  <!-- セキュリティ設定 (Kerberos)                    -->
  <!-- ============================================ -->
  
  <property>
    <name>hadoop.security.authentication</name>
    <value>kerberos</value>
  </property>
  
  <property>
    <name>hadoop.security.authorization</name>
    <value>true</value>
  </property>

  <!-- ============================================ -->
  <!-- S3A コネクタ設定 (クラウド連携)                 -->
  <!-- ============================================ -->
  
  <property>
    <name>fs.s3a.endpoint</name>
    <value>s3.us-east-1.amazonaws.com</value>
  </property>
  
  <property>
    <name>fs.s3a.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>
  
  <property>
    <name>fs.s3a.connection.maximum</name>
    <value>100</value>
  </property>
  
  <property>
    <name>fs.s3a.fast.upload</name>
    <value>true</value>
  </property>

</configuration>

6.2 hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  hdfs-site.xml - HDFS 設定
-->
<configuration>

  <!-- ============================================ -->
  <!-- NameNode 設定                                 -->
  <!-- ============================================ -->
  
  <!-- NameNode データディレクトリ -->
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>file:///data/hadoop/hdfs/namenode</value>
    <description>
      NameNode の FsImage と EditLog を保存するディレクトリ。
      複数のディレクトリをカンマ区切りで指定可能 (冗長化)。
    </description>
  </property>

  <!-- NameNode RPC アドレス -->
  <property>
    <name>dfs.namenode.rpc-address</name>
    <value>namenode.example.com:8020</value>
  </property>

  <!-- NameNode HTTP アドレス -->
  <property>
    <name>dfs.namenode.http-address</name>
    <value>namenode.example.com:9870</value>
  </property>

  <!-- NameNode ハンドラスレッド数 -->
  <property>
    <name>dfs.namenode.handler.count</name>
    <value>100</value>
    <description>
      NameNode の RPC ハンドラスレッド数。
      推奨値: 20 * ln(クラスタノード数)
      100ノードの場合: 20 * ln(100) ≈ 92
    </description>
  </property>

  <!-- ============================================ -->
  <!-- DataNode 設定                                 -->
  <!-- ============================================ -->
  
  <!-- DataNode データディレクトリ -->
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>
      [DISK]file:///data1/hadoop/hdfs/datanode,
      [DISK]file:///data2/hadoop/hdfs/datanode,
      [DISK]file:///data3/hadoop/hdfs/datanode,
      [DISK]file:///data4/hadoop/hdfs/datanode,
      [SSD]file:///ssd1/hadoop/hdfs/datanode
    </value>
    <description>
      DataNode がブロックデータを保存するディレクトリ。
      複数ディスクに分散配置。ストレージタイプの指定も可能。
    </description>
  </property>

  <!-- DataNode ハンドラスレッド数 -->
  <property>
    <name>dfs.datanode.handler.count</name>
    <value>10</value>
  </property>

  <!-- DataNode 最大転送スレッド数 -->
  <property>
    <name>dfs.datanode.max.transfer.threads</name>
    <value>4096</value>
  </property>

  <!-- ============================================ -->
  <!-- ブロック設定                                   -->
  <!-- ============================================ -->
  
  <!-- ブロックサイズ (デフォルト 128MB) -->
  <property>
    <name>dfs.blocksize</name>
    <value>134217728</value>
    <description>
      HDFS ブロックサイズ (バイト)。
      134217728 = 128MB (デフォルト)
      268435456 = 256MB (大規模データ向け)
    </description>
  </property>

  <!-- レプリケーションファクタ -->
  <property>
    <name>dfs.replication</name>
    <value>3</value>
    <description>
      デフォルトのブロックレプリケーション数。
      3 = 標準 (推奨)
      1 = 開発環境
    </description>
  </property>

  <!-- ============================================ -->
  <!-- HDFS HA 設定                                  -->
  <!-- ============================================ -->
  
  <property>
    <name>dfs.nameservices</name>
    <value>mycluster</value>
  </property>

  <property>
    <name>dfs.ha.namenodes.mycluster</name>
    <value>nn1,nn2</value>
  </property>

  <property>
    <name>dfs.namenode.rpc-address.mycluster.nn1</name>
    <value>namenode1.example.com:8020</value>
  </property>

  <property>
    <name>dfs.namenode.rpc-address.mycluster.nn2</name>
    <value>namenode2.example.com:8020</value>
  </property>

  <property>
    <name>dfs.namenode.http-address.mycluster.nn1</name>
    <value>namenode1.example.com:9870</value>
  </property>

  <property>
    <name>dfs.namenode.http-address.mycluster.nn2</name>
    <value>namenode2.example.com:9870</value>
  </property>

  <property>
    <name>dfs.namenode.shared.edits.dir</name>
    <value>qjournal://jn1.example.com:8485;jn2.example.com:8485;jn3.example.com:8485/mycluster</value>
  </property>

  <property>
    <name>dfs.client.failover.proxy.provider.mycluster</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
  </property>

  <property>
    <name>dfs.ha.automatic-failover.enabled</name>
    <value>true</value>
  </property>

  <property>
    <name>dfs.ha.fencing.methods</name>
    <value>
      sshfence
      shell(/bin/true)
    </value>
  </property>

  <property>
    <name>dfs.ha.fencing.ssh.private-key-files</name>
    <value>/home/hadoop/.ssh/id_rsa</value>
  </property>

  <!-- JournalNode 設定 -->
  <property>
    <name>dfs.journalnode.edits.dir</name>
    <value>/data/hadoop/hdfs/journalnode</value>
  </property>

  <!-- ============================================ -->
  <!-- パフォーマンス設定                             -->
  <!-- ============================================ -->
  
  <!-- Short-circuit Local Reads -->
  <property>
    <name>dfs.client.read.shortcircuit</name>
    <value>true</value>
  </property>
  
  <property>
    <name>dfs.domain.socket.path</name>
    <value>/var/lib/hadoop-hdfs/dn_socket</value>
  </property>

  <!-- ============================================ -->
  <!-- パーミッション設定                             -->
  <!-- ============================================ -->
  
  <property>
    <name>dfs.permissions.enabled</name>
    <value>true</value>
  </property>

  <property>
    <name>dfs.permissions.superusergroup</name>
    <value>hadoop</value>
  </property>

  <!-- WebHDFS 有効化 -->
  <property>
    <name>dfs.webhdfs.enabled</name>
    <value>true</value>
  </property>

</configuration>

6.3 yarn-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  yarn-site.xml - YARN 設定
-->
<configuration>

  <!-- ============================================ -->
  <!-- ResourceManager 設定                          -->
  <!-- ============================================ -->
  
  <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>rm.example.com</value>
  </property>

  <property>
    <name>yarn.resourcemanager.webapp.address</name>
    <value>rm.example.com:8088</value>
  </property>

  <!-- スケジューラ設定 -->
  <property>
    <name>yarn.resourcemanager.scheduler.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
    <description>
      使用可能なスケジューラ:
      - CapacityScheduler (デフォルト)
      - FairScheduler
    </description>
  </property>

  <!-- クラスタの最大メモリ/CPU -->
  <property>
    <name>yarn.scheduler.maximum-allocation-mb</name>
    <value>65536</value>
  </property>
  
  <property>
    <name>yarn.scheduler.minimum-allocation-mb</name>
    <value>1024</value>
  </property>

  <property>
    <name>yarn.scheduler.maximum-allocation-vcores</name>
    <value>32</value>
  </property>

  <property>
    <name>yarn.scheduler.minimum-allocation-vcores</name>
    <value>1</value>
  </property>

  <!-- ============================================ -->
  <!-- NodeManager 設定                              -->
  <!-- ============================================ -->
  
  <!-- ノード利用可能リソース -->
  <property>
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>122880</value>
    <description>
      NodeManager が YARN に提供するメモリ量 (MB)。
      物理メモリから OS + サービス用メモリを差し引いた値。
      例: 128GB RAM - 8GB (OS等) = 120GB = 122880MB
    </description>
  </property>

  <property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>30</value>
    <description>
      NodeManager が YARN に提供する仮想 CPU コア数。
      物理コアから OS 用コアを差し引いた値。
      例: 32 物理コア - 2 (OS等) = 30 vコア
    </description>
  </property>

  <!-- ============================================ -->
  <!-- コンテナ設定                                   -->
  <!-- ============================================ -->
  
  <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
    <description>
      仮想メモリのチェック。Java 8+ では false 推奨。
    </description>
  </property>

  <property>
    <name>yarn.nodemanager.vmem-pmem-ratio</name>
    <value>2.1</value>
    <description>
      仮想メモリ/物理メモリ比率。
      コンテナの仮想メモリ使用量が
      (割当メモリ * この比率) を超えると kill される。
    </description>
  </property>

  <!-- ============================================ -->
  <!-- ログ集約設定                                   -->
  <!-- ============================================ -->
  
  <property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
  </property>

  <property>
    <name>yarn.nodemanager.remote-app-log-dir</name>
    <value>/app-logs</value>
  </property>

  <property>
    <name>yarn.nodemanager.remote-app-log-dir-suffix</name>
    <value>logs</value>
  </property>

  <property>
    <name>yarn.log-aggregation.retain-seconds</name>
    <value>604800</value>
    <description>ログ保持期間: 7日 (604800秒)</description>
  </property>

  <!-- ============================================ -->
  <!-- ResourceManager HA 設定                       -->
  <!-- ============================================ -->
  
  <property>
    <name>yarn.resourcemanager.ha.enabled</name>
    <value>true</value>
  </property>

  <property>
    <name>yarn.resourcemanager.cluster-id</name>
    <value>yarn-cluster</value>
  </property>

  <property>
    <name>yarn.resourcemanager.ha.rm-ids</name>
    <value>rm1,rm2</value>
  </property>

  <property>
    <name>yarn.resourcemanager.hostname.rm1</name>
    <value>rm1.example.com</value>
  </property>

  <property>
    <name>yarn.resourcemanager.hostname.rm2</name>
    <value>rm2.example.com</value>
  </property>

  <property>
    <name>yarn.resourcemanager.zk-address</name>
    <value>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181</value>
  </property>

  <property>
    <name>yarn.resourcemanager.recovery.enabled</name>
    <value>true</value>
  </property>

  <property>
    <name>yarn.resourcemanager.store.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
  </property>

  <!-- Auxiliary Services (MapReduce shuffle) -->
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>

  <property>
    <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>

</configuration>

6.4 mapred-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  mapred-site.xml - MapReduce 設定
-->
<configuration>

  <!-- 実行フレームワーク -->
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
    <description>
      MapReduce 実行フレームワーク。
      yarn = YARN 上で実行 (本番)
      local = ローカルモード (デバッグ)
    </description>
  </property>

  <!-- JobHistory Server -->
  <property>
    <name>mapreduce.jobhistory.address</name>
    <value>historyserver.example.com:10020</value>
  </property>

  <property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>historyserver.example.com:19888</value>
  </property>

  <!-- ============================================ -->
  <!-- Map タスク設定                                 -->
  <!-- ============================================ -->
  
  <property>
    <name>mapreduce.map.memory.mb</name>
    <value>4096</value>
    <description>各 Map タスクのコンテナメモリ (MB)</description>
  </property>

  <property>
    <name>mapreduce.map.java.opts</name>
    <value>-Xmx3276m</value>
    <description>
      Map タスクの JVM ヒープサイズ。
      コンテナメモリの約80%に設定。
      4096 * 0.8 = 3276m
    </description>
  </property>

  <property>
    <name>mapreduce.map.cpu.vcores</name>
    <value>1</value>
  </property>

  <!-- ============================================ -->
  <!-- Reduce タスク設定                              -->
  <!-- ============================================ -->
  
  <property>
    <name>mapreduce.reduce.memory.mb</name>
    <value>8192</value>
    <description>各 Reduce タスクのコンテナメモリ (MB)</description>
  </property>

  <property>
    <name>mapreduce.reduce.java.opts</name>
    <value>-Xmx6553m</value>
    <description>
      Reduce タスクの JVM ヒープサイズ。
      コンテナメモリの約80%に設定。
    </description>
  </property>

  <property>
    <name>mapreduce.reduce.cpu.vcores</name>
    <value>1</value>
  </property>

  <!-- ============================================ -->
  <!-- ApplicationMaster 設定                        -->
  <!-- ============================================ -->
  
  <property>
    <name>yarn.app.mapreduce.am.resource.mb</name>
    <value>4096</value>
  </property>

  <property>
    <name>yarn.app.mapreduce.am.command-opts</name>
    <value>-Xmx3276m</value>
  </property>

  <!-- ============================================ -->
  <!-- Shuffle / Sort 設定                           -->
  <!-- ============================================ -->
  
  <property>
    <name>mapreduce.task.io.sort.mb</name>
    <value>256</value>
    <description>Map 出力のソートバッファサイズ (MB)</description>
  </property>

  <property>
    <name>mapreduce.map.sort.spill.percent</name>
    <value>0.80</value>
  </property>

  <property>
    <name>mapreduce.task.io.sort.factor</name>
    <value>64</value>
    <description>マージファクタ (同時にマージするストリーム数)</description>
  </property>

  <property>
    <name>mapreduce.reduce.shuffle.parallelcopies</name>
    <value>10</value>
  </property>

  <!-- ============================================ -->
  <!-- 圧縮設定                                      -->
  <!-- ============================================ -->
  
  <!-- Map 出力の圧縮 -->
  <property>
    <name>mapreduce.map.output.compress</name>
    <value>true</value>
  </property>

  <property>
    <name>mapreduce.map.output.compress.codec</name>
    <value>org.apache.hadoop.io.compress.SnappyCodec</value>
  </property>

  <!-- 最終出力の圧縮 -->
  <property>
    <name>mapreduce.output.fileoutputformat.compress</name>
    <value>true</value>
  </property>

  <property>
    <name>mapreduce.output.fileoutputformat.compress.codec</name>
    <value>org.apache.hadoop.io.compress.GzipCodec</value>
  </property>

  <!-- ============================================ -->
  <!-- Speculative Execution                        -->
  <!-- ============================================ -->
  
  <property>
    <name>mapreduce.map.speculative</name>
    <value>true</value>
  </property>

  <property>
    <name>mapreduce.reduce.speculative</name>
    <value>false</value>
    <description>
      Reduce の投機実行はデフォルトで無効。
      Reduce はシャッフルフェーズがあるため、
      投機実行のコストが高い。
    </description>
  </property>

</configuration>

7. セキュリティ

7.1 Kerberos 認証

Hadoop のセキュリティモデルは Kerberos ベースの認証を基盤としている。

Kerberos 認証フロー
=====================

  +--------+           +----------+           +-----------+
  | Client |           |   KDC    |           |  Hadoop   |
  | (User) |           |(Kerberos)|           |  Service  |
  +---+----+           +----+-----+           +-----+-----+
      |                     |                       |
      | 1. kinit (AS-REQ)   |                       |
      |-------------------->|                       |
      |                     |                       |
      | 2. TGT (AS-REP)    |                       |
      |<--------------------|                       |
      |                     |                       |
      | 3. Service Ticket   |                       |
      |    Request (TGS-REQ)|                       |
      |-------------------->|                       |
      |                     |                       |
      | 4. Service Ticket   |                       |
      |    (TGS-REP)        |                       |
      |<--------------------|                       |
      |                     |                       |
      | 5. Authenticate with Service Ticket         |
      |-------------------------------------------->|
      |                     |                       |
      | 6. Service Response |                       |
      |<--------------------------------------------|
      
  KDC = Key Distribution Center
  TGT = Ticket Granting Ticket
  AS  = Authentication Service
  TGS = Ticket Granting Service

Kerberos の設定:

# /etc/krb5.conf の例
[libdefaults]
    default_realm = EXAMPLE.COM
    dns_lookup_realm = false
    dns_lookup_kdc = false
    ticket_lifetime = 24h
    renew_lifetime = 7d
    forwardable = true

[realms]
    EXAMPLE.COM = {
        kdc = kdc.example.com
        admin_server = kdc.example.com
    }

[domain_realm]
    .example.com = EXAMPLE.COM
    example.com = EXAMPLE.COM

Hadoop サービスプリンシパルの作成:

# Kerberos プリンシパルの作成
kadmin.local -q "addprinc -randkey nn/namenode1.example.com@EXAMPLE.COM"
kadmin.local -q "addprinc -randkey nn/namenode2.example.com@EXAMPLE.COM"
kadmin.local -q "addprinc -randkey dn/datanode1.example.com@EXAMPLE.COM"
kadmin.local -q "addprinc -randkey rm/rm.example.com@EXAMPLE.COM"
kadmin.local -q "addprinc -randkey nm/nodemanager1.example.com@EXAMPLE.COM"
kadmin.local -q "addprinc -randkey HTTP/namenode1.example.com@EXAMPLE.COM"

# Keytab ファイルの作成
kadmin.local -q "xst -k /etc/hadoop/conf/nn.keytab nn/namenode1.example.com@EXAMPLE.COM"
kadmin.local -q "xst -k /etc/hadoop/conf/dn.keytab dn/datanode1.example.com@EXAMPLE.COM"

# Keytab の確認
klist -kt /etc/hadoop/conf/nn.keytab

hdfs-site.xml セキュリティ設定:

<!-- NameNode Kerberos 設定 -->
<property>
  <name>dfs.namenode.kerberos.principal</name>
  <value>nn/_HOST@EXAMPLE.COM</value>
</property>
<property>
  <name>dfs.namenode.keytab.file</name>
  <value>/etc/hadoop/conf/nn.keytab</value>
</property>
<property>
  <name>dfs.namenode.kerberos.internal.spnego.principal</name>
  <value>HTTP/_HOST@EXAMPLE.COM</value>
</property>

<!-- DataNode Kerberos 設定 -->
<property>
  <name>dfs.datanode.kerberos.principal</name>
  <value>dn/_HOST@EXAMPLE.COM</value>
</property>
<property>
  <name>dfs.datanode.keytab.file</name>
  <value>/etc/hadoop/conf/dn.keytab</value>
</property>

<!-- ブロックアクセストークン -->
<property>
  <name>dfs.block.access.token.enable</name>
  <value>true</value>
</property>

7.2 HDFS ACLs (アクセス制御リスト)

POSIX ACL を拡張した細粒度のアクセス制御:

# ACL の有効化 (hdfs-site.xml)
# dfs.namenode.acls.enabled = true

# ACL の設定
hdfs dfs -setfacl -m user:alice:rwx /data/project
hdfs dfs -setfacl -m group:analytics:r-x /data/project
hdfs dfs -setfacl -m other::--- /data/project
hdfs dfs -setfacl -m default:user:alice:rwx /data/project  # デフォルト ACL

# ACL の確認
hdfs dfs -getfacl /data/project

# 出力例:
# file: /data/project
# owner: hadoop
# group: hadoop
# user::rwx
# user:alice:rwx
# group::r-x
# group:analytics:r-x
# mask::rwx
# other::---
# default:user::rwx
# default:user:alice:rwx
# default:group::r-x
# default:mask::rwx
# default:other::---

# ACL の削除
hdfs dfs -setfacl -b /data/project              # 全 ACL 削除
hdfs dfs -setfacl -x user:alice /data/project    # 特定 ACL 削除

7.3 Apache Ranger

Apache Ranger は Hadoop エコシステム全体にわたる包括的なセキュリティ 管理フレームワークである。

Apache Ranger アーキテクチャ
=============================

+------------------------------------------------------------+
|                    Ranger Admin Portal                     |
|  +--------------------+  +--------------------+            |
|  | Policy Management  |  | Audit Management   |            |
|  |  - HDFS Policies   |  |  - Access Logs     |            |
|  |  - Hive Policies   |  |  - Admin Logs      |            |
|  |  - HBase Policies  |  |  - Plugin Logs     |            |
|  |  - YARN Policies   |  +--------------------+            |
|  |  - Kafka Policies  |                                    |
|  +--------------------+                                    |
+------+-----------------------------------+--------+--------+
       |                                   |        |
       | Policy Sync                       |        |
       |                                   |        |
+------v-------+  +--------v--------+  +--v-----------+
| HDFS Plugin  |  |  Hive Plugin    |  | HBase Plugin |
| (NameNode)   |  |  (HiveServer2) |  | (RegionSvr)  |
+--------------+  +-----------------+  +---------------+

機能:
- 集中的なポリシー管理
- 細粒度のアクセス制御 (行・列レベル)
- データマスキング
- 監査ログの集約
- タグベースのポリシー (Apache Atlas 連携)

7.4 暗号化ゾーン (Encryption Zones)

HDFS Transparent Data Encryption (TDE) により、保存データを暗号化する:

# KMS (Key Management Server) の設定
# kms-site.xml で暗号化キープロバイダを設定

# 暗号化キーの作成
hadoop key create mykey -size 128

# 暗号化キーの一覧
hadoop key list -metadata

# 暗号化ゾーンの作成
hdfs crypto -createZone -keyName mykey -path /data/encrypted

# 暗号化ゾーンの一覧
hdfs crypto -listZones

# 出力例:
# /data/encrypted  mykey

# 暗号化ゾーン内のファイル操作は通常と同じ
hdfs dfs -put sensitive-data.csv /data/encrypted/
hdfs dfs -cat /data/encrypted/sensitive-data.csv  # 透過的に復号
暗号化フロー
=============

書き込み:
  Client --> HDFS (暗号化) --> DataNode (暗号化データを保存)
           DEK を使用して暗号化
           
読み取り:
  DataNode (暗号化データ) --> Client (復号) --> アプリケーション
                              DEK を使用して復号

DEK (Data Encryption Key):
  - 各ファイルに固有の暗号化キー
  - EDEK (Encrypted DEK) として NameNode に保存
  - KMS が DEK の暗号化/復号を管理

EZ (Encryption Zone):
  - ディレクトリ単位で暗号化を適用
  - ゾーン内のすべてのファイルが自動暗号化
  - 暗号化はクライアント側で実行 (end-to-end)

8. パフォーマンスチューニング

8.1 ブロックサイズの最適化

ブロックサイズとパフォーマンスの関係
======================================

小さいブロック (64MB 以下):
  利点:
  - Map タスクの並列度が高い
  - 小さなファイルへのアクセスが効率的
  欠点:
  - NameNode のメモリ消費が増加
  - Map タスクの起動オーバーヘッドが増加
  - ネットワーク接続数が増加

大きいブロック (256MB 以上):
  利点:
  - NameNode のメモリ消費が減少
  - シーケンシャル I/O のスループットが向上
  - Map タスク数が減少 (オーバーヘッド削減)
  欠点:
  - 並列度が低下する可能性
  - 小さなファイルに対するストレージ効率が悪い

推奨値:
  - 汎用: 128MB (デフォルト)
  - 大規模 ETL: 256MB
  - ストリーミング処理: 128MB
  - ログ分析: 256MB

8.2 レプリケーション設定

レプリケーションファクタの選択
================================

+--------+----------+------------------+-----------------+
| Factor | 耐障害性 | ストレージコスト    | 読取スループット  |
+--------+----------+------------------+-----------------+
|   1    | 低       | 1x (最小)         | 低              |
|   2    | 中       | 2x               | 中              |
|   3    | 高       | 3x (デフォルト)    | 高              |
|   4+   | 非常に高  | 4x+              | 非常に高         |
+--------+----------+------------------+-----------------+

推奨:
- 開発環境: 1
- 本番環境 (一般): 3
- ホットデータ: 3-5
- コールドデータ: Erasure Coding (RS-6-3) で 1.5x

8.3 YARN スケジューラ設定の最適化

コンテナサイズの計算
=====================

前提: ノードスペック
  - RAM: 256GB
  - CPU: 64 コア
  - ディスク: 12 x 4TB HDD

1. OS + サービス用メモリの確保
   OS + HBase RegionServer 等: 32GB
   YARN 利用可能メモリ: 256 - 32 = 224GB

2. コンテナ数の決定
   ディスク数に基づく: min(2 * ディスク数, 利用可能vコア)
   = min(2 * 12, 62) = 24 コンテナ

3. コンテナメモリサイズ
   224GB / 24 = 約 9.3GB → 9216MB (9GB)

4. Map/Reduce タスクメモリ
   Map コンテナ: 9216MB
   Map Java ヒープ: 9216 * 0.8 = 7372MB
   
   Reduce コンテナ: 18432MB (Map の 2倍)
   Reduce Java ヒープ: 18432 * 0.8 = 14745MB

設定値:
  yarn.nodemanager.resource.memory-mb = 229376
  yarn.nodemanager.resource.cpu-vcores = 62
  mapreduce.map.memory.mb = 9216
  mapreduce.map.java.opts = -Xmx7372m
  mapreduce.reduce.memory.mb = 18432
  mapreduce.reduce.java.opts = -Xmx14745m

8.4 Speculative Execution (投機的実行)

投機的実行の仕組み
===================

通常実行:
  Task A: ████████████████████████████████ 100% (30秒)
  Task B: ████████████████░░░░░░░░░░░░░░░░  50% (既に60秒経過)
  Task C: ████████████████████████████████ 100% (25秒)
  
  → Task B がストラグラー (遅延タスク)

投機的実行:
  Task B:     ████████████████░░░░░░░░░░░░░░ 50% (元のタスク)
  Task B':    ████████████████████████████████ 100% (投機コピー, 35秒)
  
  → Task B' が先に完了 → Task B' の結果を採用, Task B をkill

設定:
  mapreduce.map.speculative = true       # Map の投機実行
  mapreduce.reduce.speculative = false   # Reduce の投機実行 (通常 false)
  
  yarn.app.mapreduce.am.job.speculator.class =
    org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator

注意:
  - 投機実行は追加リソースを消費する
  - 副作用のあるタスクでは注意が必要
  - クラスタの負荷が高い場合は無効化を検討

8.5 その他のチューニングポイント

# ============================================
# パフォーマンスチューニング チェックリスト
# ============================================

# 1. 圧縮の活用
#    Map 出力: Snappy (高速) or LZ4
#    最終出力: Gzip (高圧縮率) or Zstandard
#    中間データ量を 60-80% 削減可能

# 2. JVM 再利用
mapreduce.job.jvm.numtasks = -1  # JVM をタスク間で再利用 (-1 = 無制限)

# 3. Map 側の結合 (Map-side Join)
#    小さなテーブルを Distributed Cache に配置
#    Reduce フェーズを不要にする

# 4. 入力分割の最適化
mapreduce.input.fileinputformat.split.minsize = 268435456  # 256MB
mapreduce.input.fileinputformat.split.maxsize = 536870912  # 512MB

# 5. 出力のバッファリング
mapreduce.reduce.shuffle.input.buffer.percent = 0.70
mapreduce.reduce.shuffle.merge.percent = 0.66

# 6. Uber モード (小さなジョブの最適化)
mapreduce.job.ubertask.enable = true
mapreduce.job.ubertask.maxmaps = 9
mapreduce.job.ubertask.maxreduces = 1
mapreduce.job.ubertask.maxbytes = 536870912  # 512MB

9. メモリ管理

9.1 NameNode のメモリ管理

NameNode メモリ計算
=====================

メタデータのメモリ使用量:
  - 各ファイル/ディレクトリ: 約 150 バイト
  - 各ブロック: 約 150 バイト
  - 各レプリカ参照: 約 16 バイト

例: 1億ファイル、2億ブロック (レプリケーション 3)

  ファイル/ディレクトリ: 100,000,000 * 150B = 15 GB
  ブロック情報:          200,000,000 * 150B = 30 GB
  レプリカ参照:          600,000,000 * 16B  =  9.6 GB
  その他のオーバーヘッド:                    = 10 GB
  -----------------------------------------------
  合計推定:                                  ≈ 65 GB

NameNode JVM 設定:
  HADOOP_NAMENODE_OPTS="-Xms65g -Xmx65g -XX:+UseG1GC
    -XX:MaxGCPauseMillis=200
    -XX:InitiatingHeapOccupancyPercent=45
    -XX:G1HeapRegionSize=32m
    -XX:+ParallelRefProcEnabled
    -XX:+UnlockExperimentalVMOptions
    -XX:G1NewSizePercent=5
    -XX:G1MaxNewSizePercent=10"

hadoop-env.sh の設定例:

# NameNode JVM 設定
export HDFS_NAMENODE_OPTS="-Xms65g -Xmx65g \
  -XX:+UseG1GC \
  -XX:MaxGCPauseMillis=200 \
  -XX:InitiatingHeapOccupancyPercent=45 \
  -XX:G1HeapRegionSize=32m \
  -verbose:gc \
  -XX:+PrintGCDetails \
  -XX:+PrintGCTimeStamps \
  -XX:+PrintGCDateStamps \
  -Xloggc:/var/log/hadoop-hdfs/namenode-gc.log \
  -XX:+UseGCLogFileRotation \
  -XX:NumberOfGCLogFiles=10 \
  -XX:GCLogFileSize=64M"

# DataNode JVM 設定
export HDFS_DATANODE_OPTS="-Xms4g -Xmx4g \
  -XX:+UseG1GC \
  -XX:MaxGCPauseMillis=100"

# ResourceManager JVM 設定
export YARN_RESOURCEMANAGER_OPTS="-Xms8g -Xmx8g \
  -XX:+UseG1GC \
  -XX:MaxGCPauseMillis=200"

# NodeManager JVM 設定
export YARN_NODEMANAGER_OPTS="-Xms4g -Xmx4g \
  -XX:+UseG1GC"

9.2 MapReduce タスクのメモリ管理

MapReduce メモリ構成
======================

+--------------------------------------------------+
|              YARN Container (4096 MB)             |
|                                                  |
|  +--------------------------------------------+  |
|  |         JVM Heap (-Xmx3276m)               |  |
|  |                                            |  |
|  |  +----------+  +----------+  +----------+  |  |
|  |  | Map/     |  | Sort     |  | Framework|  |  |
|  |  | Reduce   |  | Buffer   |  | Overhead |  |  |
|  |  | User Code|  | (100MB)  |  |          |  |  |
|  |  +----------+  +----------+  +----------+  |  |
|  +--------------------------------------------+  |
|                                                  |
|  +--------------------------------------------+  |
|  |    Non-Heap (820 MB)                       |  |
|  |    - Direct Buffers                        |  |
|  |    - Stack Memory                          |  |
|  |    - Metaspace                             |  |
|  |    - Native Libraries                      |  |
|  +--------------------------------------------+  |
+--------------------------------------------------+

メモリ計算:
  Container Memory = JVM Heap + Non-Heap Overhead
  4096 MB         = 3276 MB  + 820 MB

ルール:
  JVM Heap = Container Memory * 0.8
  Non-Heap = Container Memory * 0.2

9.3 YARN コンテナメモリ管理

YARN メモリ管理階層
=====================

クラスタレベル:
  yarn.scheduler.maximum-allocation-mb = 65536  (最大コンテナサイズ)
  yarn.scheduler.minimum-allocation-mb = 1024   (最小コンテナサイズ)

ノードレベル:
  yarn.nodemanager.resource.memory-mb = 122880  (ノード全体のYARNメモリ)
  yarn.nodemanager.vmem-pmem-ratio = 2.1        (仮想/物理メモリ比)
  yarn.nodemanager.vmem-check-enabled = false   (仮想メモリチェック)
  yarn.nodemanager.pmem-check-enabled = true    (物理メモリチェック)

コンテナレベル:
  mapreduce.map.memory.mb = 4096
  mapreduce.reduce.memory.mb = 8192
  
メモリ超過時の動作:
  1. 物理メモリ超過 → コンテナが kill される
  2. 仮想メモリ超過 → vmem-check-enabled が true の場合 kill
  3. GC による回復が間に合わない場合 → OOM エラー

10. モニタリング

10.1 JMX メトリクス

# NameNode JMX メトリクスの取得
curl -s http://namenode:9870/jmx

# 特定の MBean の取得
curl -s "http://namenode:9870/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem"

# 主要なメトリクス:
# FSNamesystem:
#   - CapacityTotal         : 総ストレージ容量
#   - CapacityUsed          : 使用済み容量
#   - CapacityRemaining     : 残り容量
#   - BlocksTotal           : 総ブロック数
#   - FilesTotal            : 総ファイル数
#   - MissingBlocks         : 欠損ブロック数
#   - UnderReplicatedBlocks : レプリケーション不足ブロック数
#   - CorruptBlocks         : 破損ブロック数
#   - NumLiveDataNodes      : 稼働中 DataNode 数
#   - NumDeadDataNodes      : 死亡 DataNode 数

# DataNode メトリクス
curl -s http://datanode:9864/jmx

# ResourceManager メトリクス
curl -s http://resourcemanager:8088/jmx

10.2 Web UI

Hadoop Web UI 一覧
===================

+------------------+-------+----------------------------------+
| サービス          | ポート | URL                              |
+------------------+-------+----------------------------------+
| NameNode         | 9870  | http://namenode:9870             |
| DataNode         | 9864  | http://datanode:9864             |
| ResourceManager  | 8088  | http://rm:8088                   |
| NodeManager      | 8042  | http://nm:8042                   |
| JobHistory       | 19888 | http://historyserver:19888       |
| Timeline Service | 8188  | http://timeline:8188             |
| JournalNode      | 8480  | http://jn:8480                   |
+------------------+-------+----------------------------------+

注: Hadoop 3.x ではポート番号が変更された
     (すべて非特権ポート > 1024 に変更)
     
     Hadoop 2.x → 3.x のポート変更:
     NameNode:       50070 → 9870
     DataNode:       50075 → 9864
     DataNode IPC:   50020 → 9867
     Secondary NN:   50090 → 9868

10.3 メトリクス収集の構成

hadoop-metrics2.properties の例:

# ============================================
# Hadoop Metrics2 設定
# ============================================

# -- NameNode メトリクス --
namenode.sink.graphite.class=org.apache.hadoop.metrics2.sink.GraphiteSink
namenode.sink.graphite.server_host=graphite.example.com
namenode.sink.graphite.server_port=2003
namenode.sink.graphite.metrics_prefix=hadoop.namenode

# -- DataNode メトリクス --
datanode.sink.graphite.class=org.apache.hadoop.metrics2.sink.GraphiteSink
datanode.sink.graphite.server_host=graphite.example.com
datanode.sink.graphite.server_port=2003
datanode.sink.graphite.metrics_prefix=hadoop.datanode

# -- ResourceManager メトリクス --
resourcemanager.sink.graphite.class=org.apache.hadoop.metrics2.sink.GraphiteSink
resourcemanager.sink.graphite.server_host=graphite.example.com
resourcemanager.sink.graphite.server_port=2003
resourcemanager.sink.graphite.metrics_prefix=hadoop.resourcemanager

# ファイル出力 (デバッグ用)
# *.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
# namenode.sink.file.filename=/tmp/namenode-metrics.out

# JMX 出力
*.source.filter.class=org.apache.hadoop.metrics2.filter.GlobFilter
*.sink.ganglia.tagsForPrefix.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31

10.4 監視すべき重要メトリクス

クリティカルメトリクス チェックリスト
======================================

【HDFS - 即時対応が必要】
  □ MissingBlocks > 0        → データ損失の可能性
  □ CorruptBlocks > 0        → データ破損
  □ NumDeadDataNodes > 0     → ノード障害
  □ CapacityRemaining < 20%  → ディスク容量不足
  □ SafeMode = ON            → クラスタが読み取り専用

【HDFS - 要監視】
  □ UnderReplicatedBlocks    → レプリケーション不足
  □ PendingReplicationBlocks → レプリケーション待ち
  □ VolumeFailures           → ディスク障害
  □ StaleDataNodes           → 応答遅延ノード

【YARN - 即時対応が必要】
  □ UnhealthyNodes > 0       → 異常ノード
  □ LostNodes > 0            → ノード喪失
  □ AvailableMB < 10%        → メモリ不足
  □ AvailableVCores < 10%    → CPU 不足

【YARN - 要監視】
  □ PendingContainers        → 待機コンテナ数
  □ AppsRunning              → 実行中アプリ数
  □ AppsPending              → 待機中アプリ数
  □ QueueCapacity            → キュー使用率

【MapReduce - 要監視】
  □ FailedMaps               → 失敗 Map タスク数
  □ FailedReduces            → 失敗 Reduce タスク数
  □ KilledMaps               → kill された Map 数
  □ GCTimeMillis             → GC 時間

10.5 Prometheus / Grafana 連携

# Prometheus - hadoop_jmx_exporter 設定例
# jmx_prometheus_javaagent を使用

# namenode-jmx-config.yaml
---
startDelaySeconds: 0
hostPort: namenode:9870
ssl: false
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
  - pattern: "Hadoop<service=NameNode, name=FSNamesystem><>(\\w+)"
    name: "hadoop_namenode_fsnamesystem_$1"
    type: GAUGE
  - pattern: "Hadoop<service=NameNode, name=FSNamesystemState><>(\\w+)"
    name: "hadoop_namenode_fsnamesystem_state_$1"
    type: GAUGE
  - pattern: "Hadoop<service=NameNode, name=NameNodeActivity><>(\\w+)"
    name: "hadoop_namenode_activity_$1"
    type: GAUGE

11. エコシステム

11.1 Hadoop エコシステム全体図

                    Hadoop エコシステム
                    ==================

+-------------------------------------------------------------------+
|                        ワークフロー / 管理                          |
|  +--------+  +--------+  +----------+  +-----------+              |
|  | Oozie  |  | Airflow|  |Ambari/   |  |ZooKeeper  |              |
|  |(ワーク  |  |(ワーク  |  |Cloudera  |  |(分散協調  |              |
|  | フロー) |  | フロー) |  |Manager   |  | サービス) |              |
|  +--------+  +--------+  +----------+  +-----------+              |
+-------------------------------------------------------------------+
|                        データアクセス                               |
|  +------+ +------+ +--------+ +------+ +-------+ +--------+      |
|  | Hive | | Pig  | | Spark  | | Tez  | |Impala | |Presto/ |      |
|  | (SQL)| |(Scrpt)| |(Unified| |(DAG) | |(MPP  | |Trino   |      |
|  |      | |      | | Engine)| |      | | Query)| |(SQL)   |      |
|  +------+ +------+ +--------+ +------+ +-------+ +--------+      |
+-------------------------------------------------------------------+
|                     データ取込 / メッセージング                      |
|  +-------+ +--------+ +--------+ +--------+ +---------+          |
|  | Sqoop | | Flume  | | Kafka  | | NiFi   | |Kafka    |          |
|  |(RDBMS | |(ログ   | |(Stream | |(Data   | |Connect  |          |
|  | 連携) | | 収集)  | | Plat.) | | Flow)  | |         |          |
|  +-------+ +--------+ +--------+ +--------+ +---------+          |
+-------------------------------------------------------------------+
|                        NoSQL / データベース                         |
|  +--------+ +----------+ +----------+ +----------+               |
|  | HBase  | | Cassandra| | Phoenix  | | Kudu     |               |
|  |(Column | |(Column   | |(SQL on   | |(Columnar |               |
|  | Store) | | Store)   | | HBase)   | | Storage) |               |
|  +--------+ +----------+ +----------+ +----------+               |
+-------------------------------------------------------------------+
|                        リソース管理                                 |
|  +----------------------------------------------------+          |
|  |                    YARN                             |          |
|  +----------------------------------------------------+          |
+-------------------------------------------------------------------+
|                        ストレージ                                   |
|  +----------------------------------------------------+          |
|  |                    HDFS                             |          |
|  +----------------------------------------------------+          |
+-------------------------------------------------------------------+

11.2 主要コンポーネント

Apache Hive

-- Hive: SQL ライクなインターフェースで HDFS データを操作

-- テーブル作成
CREATE TABLE IF NOT EXISTS sales (
    transaction_id STRING,
    customer_id    INT,
    product_name   STRING,
    amount         DECIMAL(10,2),
    transaction_dt DATE
)
PARTITIONED BY (year INT, month INT)
STORED AS ORC
TBLPROPERTIES ('orc.compress'='SNAPPY');

-- データ投入
INSERT INTO sales PARTITION (year=2026, month=4)
SELECT transaction_id, customer_id, product_name, amount, transaction_dt
FROM staging_sales
WHERE year(transaction_dt) = 2026 AND month(transaction_dt) = 4;

-- 集計クエリ
SELECT 
    product_name,
    COUNT(*) as transaction_count,
    SUM(amount) as total_amount,
    AVG(amount) as avg_amount
FROM sales
WHERE year = 2026
GROUP BY product_name
ORDER BY total_amount DESC
LIMIT 10;

Apache Spark

# Spark: 統合的なビッグデータ処理エンジン

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, avg, count

# SparkSession の作成
spark = SparkSession.builder \
    .appName("SalesAnalysis") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

# データの読み込み
df = spark.read.parquet("hdfs:///data/sales/")

# データ処理
result = df.filter(col("year") == 2026) \
    .groupBy("product_name") \
    .agg(
        count("*").alias("transaction_count"),
        spark_sum("amount").alias("total_amount"),
        avg("amount").alias("avg_amount")
    ) \
    .orderBy(col("total_amount").desc())

# 結果の保存
result.write \
    .mode("overwrite") \
    .parquet("hdfs:///data/results/sales_summary/")

spark.stop()

Apache HBase

// HBase: HDFS 上のカラム指向 NoSQL データベース

// テーブル作成
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();

TableDescriptorBuilder tableDescriptor = 
    TableDescriptorBuilder.newBuilder(TableName.valueOf("users"));
tableDescriptor.setColumnFamily(
    ColumnFamilyDescriptorBuilder.newBuilder("info".getBytes()).build());
tableDescriptor.setColumnFamily(
    ColumnFamilyDescriptorBuilder.newBuilder("activity".getBytes()).build());
admin.createTable(tableDescriptor.build());

// データの書き込み
Table table = connection.getTable(TableName.valueOf("users"));
Put put = new Put("user001".getBytes());
put.addColumn("info".getBytes(), "name".getBytes(), "Alice".getBytes());
put.addColumn("info".getBytes(), "email".getBytes(), "alice@example.com".getBytes());
put.addColumn("activity".getBytes(), "last_login".getBytes(), "2026-04-11".getBytes());
table.put(put);

// データの読み取り
Get get = new Get("user001".getBytes());
Result result = table.get(get);
String name = Bytes.toString(result.getValue("info".getBytes(), "name".getBytes()));

Apache Kafka

Kafka + Hadoop 連携パターン
=============================

  +----------+     +---------+     +----------+
  | Producer |---->|  Kafka  |---->| Consumer |
  | (App/    |     | Cluster |     | (Spark   |
  |  Sensor) |     |         |     | Streaming|
  +----------+     +---------+     | /Flink)  |
                        |          +----+-----+
                        |               |
                   +----v-----+    +----v-----+
                   | Kafka    |    |   HDFS   |
                   | Connect  |    | (結果保存) |
                   | (HDFS    |    +----------+
                   |  Sink)   |
                   +----+-----+
                        |
                   +----v-----+
                   |   HDFS   |
                   | (生データ |
                   |  アーカイブ)|
                   +----------+

Apache ZooKeeper

ZooKeeper の役割
==================

+-----------------------------------------------------------+
|                    ZooKeeper Ensemble                     |
|  (3 or 5 ノード構成 - 奇数が必要)                          |
|                                                          |
|  +--------+     +--------+     +--------+                |
|  | ZK #1  |<--->| ZK #2  |<--->| ZK #3  |                |
|  |(Leader) |     |(Follower)    |(Follower)|                |
|  +--------+     +--------+     +--------+                |
+-----------------------------------------------------------+
         |             |              |
    +----+----+   +----+----+   +----+----+
    |HDFS HA  |   |YARN HA  |   |HBase   |
    |NameNode |   |Resource |   |Master  |
    |Failover |   |Manager  |   |Election|
    +---------+   |Failover |   +---------+
                  +---------+

Hadoop での ZooKeeper の用途:
  1. HDFS NameNode の自動フェイルオーバー
  2. YARN ResourceManager の自動フェイルオーバー
  3. HBase RegionServer の管理
  4. Kafka のブローカー管理 (旧バージョン)
  5. 分散ロック、リーダー選出

Apache Sqoop / Apache Oozie

# Sqoop: RDBMS ↔ HDFS のデータ転送

# RDBMSからHDFSへインポート
sqoop import \
  --connect jdbc:mysql://db.example.com:3306/mydb \
  --username dbuser \
  --password-file hdfs:///user/hadoop/sqoop.password \
  --table customers \
  --target-dir /data/customers/ \
  --split-by customer_id \
  --num-mappers 4 \
  --as-parquetfile \
  --compress \
  --compression-codec snappy \
  --incremental lastmodified \
  --check-column updated_at \
  --last-value "2026-04-10 00:00:00"

# HDFSからRDBMSへエクスポート
sqoop export \
  --connect jdbc:mysql://db.example.com:3306/mydb \
  --table results \
  --export-dir /data/results/ \
  --input-fields-terminated-by '\t' \
  --num-mappers 4
<!-- Oozie: ワークフロースケジューラ -->
<!-- workflow.xml -->
<workflow-app name="etl-pipeline" xmlns="uri:oozie:workflow:0.5">
    
    <start to="ingest-data"/>
    
    <action name="ingest-data">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <command>import --connect jdbc:mysql://db:3306/mydb --table sales --target-dir /data/raw/sales</command>
        </sqoop>
        <ok to="transform-data"/>
        <error to="fail"/>
    </action>
    
    <action name="transform-data">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>cluster</mode>
            <name>ETL Transform</name>
            <class>com.example.EtlJob</class>
            <jar>/user/hadoop/apps/etl.jar</jar>
            <spark-opts>--executor-memory 4g --num-executors 10</spark-opts>
        </spark>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    
    <kill name="fail">
        <message>ETL Pipeline failed: ${wf:errorMessage(wf:lastErrorNode())}</message>
    </kill>
    
    <end name="end"/>
</workflow-app>

12. Hadoop 3.x の新機能

12.1 主要な変更点一覧

Hadoop 3.x 新機能サマリー
===========================

+-------------------------------+-------------------------------------------+
| 機能                          | 説明                                       |
+-------------------------------+-------------------------------------------+
| Erasure Coding                | レプリケーションの代替、ストレージ50%削減      |
| 2+ NameNode HA               | Standby NameNode を複数台構成可能            |
| YARN Timeline Service v2     | スケーラブルなタイムラインサービス              |
| GPU/FPGA スケジューリング       | GPU リソースの管理と割当                      |
| Docker コンテナサポート         | YARN 上での Docker コンテナ実行               |
| S3A コネクタ改善               | クラウドストレージ連携の強化                   |
| Intra-DataNode バランサー      | DataNode 内のディスク間バランシング            |
| ポート番号変更                  | 非特権ポート (>1024) に変更                  |
| Java 8+                      | Java 8 以降が必須 (Java 11+ 推奨)           |
| シェルスクリプトの書き換え       | bash ベースから POSIX 準拠へ                  |
| オーバーコミット                | YARN のオポチュニスティックコンテナ             |
+-------------------------------+-------------------------------------------+

12.2 YARN Timeline Service v2

Timeline Service v1 vs v2
============================

v1 の課題:
- スケーラビリティの制限 (単一サーバー)
- 書き込みボトルネック
- 大規模クラスタでのパフォーマンス問題

v2 のアーキテクチャ:

+---------------------------------------------------+
|          YARN Timeline Service v2                 |
|                                                   |
|  +--------+  +--------+  +--------+              |
|  |Collector|  |Collector|  |Collector|  (分散)     |
|  | (NM内) |  | (NM内) |  | (NM内) |              |
|  +----+---+  +----+---+  +----+---+              |
|       |           |           |                   |
|       +-----+-----+-----+----+                   |
|             |                                     |
|       +-----v------+                              |
|       |   HBase    |  (バックエンドストレージ)       |
|       | (Timeline  |                              |
|       |  Store)    |                              |
|       +-----+------+                              |
|             |                                     |
|       +-----v------+                              |
|       |   Reader   |  (クエリサービス)              |
|       |  Service   |                              |
|       +------------+                              |
+---------------------------------------------------+

改善点:
- 分散コレクターによるスケーラビリティ向上
- HBase バックエンドによる高スループット書き込み
- フロー (アプリケーション群) レベルの集約
- サブアプリケーションレベルのメトリクス

12.3 GPU スケジューリング

<!-- resource-types.xml -->
<configuration>
  <property>
    <name>yarn.resource-types</name>
    <value>yarn.io/gpu</value>
  </property>
</configuration>

<!-- yarn-site.xml -->
<property>
  <name>yarn.nodemanager.resource-plugins</name>
  <value>yarn.io/gpu</value>
</property>

<property>
  <name>yarn.nodemanager.resource-plugins.gpu.allowed-gpu-devices</name>
  <value>auto</value>
  <!-- auto = 自動検出, または明示的に 0,1,2,3 と指定 -->
</property>

<property>
  <name>yarn.nodemanager.resource-plugins.gpu.path-to-discovery-executables</name>
  <value>/usr/bin</value>
</property>
# GPU を使用するアプリケーションの実行
yarn jar my-ml-app.jar \
  -Dyarn.app.mapreduce.am.resource.gpu=1 \
  -Dmapreduce.map.resource.gpu=1 \
  input output

# Spark での GPU 利用
spark-submit \
  --master yarn \
  --conf spark.executor.resource.gpu.amount=1 \
  --conf spark.executor.resource.gpu.discoveryScript=/opt/spark/getGpusResources.sh \
  --conf spark.task.resource.gpu.amount=0.5 \
  my-gpu-app.py

12.4 S3A コネクタの改善

<!-- core-site.xml - S3A 設定 (Hadoop 3.x 改善版) -->

<!-- 認証 -->
<property>
  <name>fs.s3a.aws.credentials.provider</name>
  <value>
    com.amazonaws.auth.InstanceProfileCredentialsProvider,
    com.amazonaws.auth.EnvironmentVariableCredentialsProvider
  </value>
</property>

<!-- パフォーマンス設定 -->
<property>
  <name>fs.s3a.fast.upload</name>
  <value>true</value>
</property>

<property>
  <name>fs.s3a.fast.upload.buffer</name>
  <value>disk</value>
  <!-- array = バイト配列 (メモリ), disk = ローカルディスク, bytebuffer = Direct ByteBuffer -->
</property>

<property>
  <name>fs.s3a.multipart.size</name>
  <value>67108864</value>
  <!-- 64MB マルチパートアップロードサイズ -->
</property>

<property>
  <name>fs.s3a.connection.maximum</name>
  <value>200</value>
</property>

<!-- S3Guard (メタデータ一貫性) - Hadoop 3.x -->
<property>
  <name>fs.s3a.metadatastore.impl</name>
  <value>org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore</value>
</property>

<!-- S3A Committers (効率的な出力コミット) -->
<property>
  <name>fs.s3a.committer.name</name>
  <value>magic</value>
  <!-- directory, partitioned, magic から選択 -->
</property>

13. モダンな代替技術との比較

13.1 Hadoop MapReduce vs Apache Spark

比較表: MapReduce vs Spark
============================

+------------------+------------------------+------------------------+
| 観点             | Hadoop MapReduce       | Apache Spark           |
+------------------+------------------------+------------------------+
| 処理モデル        | バッチ (ディスクベース)   | バッチ + ストリーミング   |
|                  |                        | (メモリベース)           |
+------------------+------------------------+------------------------+
| 速度             | 遅い (ディスク I/O)      | 10-100x 高速            |
|                  |                        | (インメモリ処理)         |
+------------------+------------------------+------------------------+
| プログラミング    | Java (主要)             | Scala, Java, Python,   |
| 言語             | Python/Ruby (Streaming) | R, SQL                 |
+------------------+------------------------+------------------------+
| 障害耐性          | 高い (ディスク永続化)     | 高い (RDD リネージ)      |
+------------------+------------------------+------------------------+
| メモリ使用量      | 少ない                  | 多い                    |
+------------------+------------------------+------------------------+
| リアルタイム処理   | 不向き                  | Spark Streaming /       |
|                  |                        | Structured Streaming   |
+------------------+------------------------+------------------------+
| 機械学習          | Mahout (非推奨)         | MLlib (充実)            |
+------------------+------------------------+------------------------+
| グラフ処理        | 限定的                  | GraphX                 |
+------------------+------------------------+------------------------+
| コスト           | 低い (ディスクベース)    | 高い (大量メモリ必要)    |
+------------------+------------------------+------------------------+
| 適用領域          | 大規模バッチ ETL        | 汎用 (バッチ、SQL、      |
|                  | (コスト重視)            | ストリーム、ML)          |
+------------------+------------------------+------------------------+

推奨選択基準:
  MapReduce を選ぶ場合:
    - 非常に大規模なデータの単純なバッチ処理
    - メモリリソースが限られている環境
    - コスト最適化が最重要
  
  Spark を選ぶ場合:
    - 反復的な処理 (機械学習など)
    - インタラクティブな分析
    - ストリーミング処理が必要
    - 複雑なデータパイプライン

13.2 HDFS vs クラウドストレージ

比較表: HDFS vs クラウドストレージ
===================================

+------------------+------------------------+------------------------+
| 観点             | HDFS                   | Cloud Storage          |
|                  |                        | (S3/GCS/ADLS)         |
+------------------+------------------------+------------------------+
| 運用コスト        | 高い (ハードウェア+運用) | 低い (従量課金)         |
+------------------+------------------------+------------------------+
| スケーラビリティ   | 制限あり (ノード追加)    | 事実上無制限            |
+------------------+------------------------+------------------------+
| データローカリティ | 高い (計算とストレージ   | 低い (ネットワーク       |
|                  | が同一ノード)           | 経由でアクセス)          |
+------------------+------------------------+------------------------+
| スループット      | 非常に高い              | ネットワーク帯域に依存    |
+------------------+------------------------+------------------------+
| レイテンシー      | 低い                    | 高い                   |
+------------------+------------------------+------------------------+
| 可用性           | セルフ管理              | SLA保証 (99.99%+)      |
+------------------+------------------------+------------------------+
| Compute/Storage  | 結合 (coupled)          | 分離 (decoupled)       |
| の分離           |                        |                        |
+------------------+------------------------+------------------------+
| ストレージ       | メタデータ管理が必要     | マネージドサービス        |
| 管理             | (NameNode)             |                        |
+------------------+------------------------+------------------------+

現代のトレンド:
  - コンピューティングとストレージの分離が主流
  - HDFS は高スループットが必要なワークロードに使用
  - クラウドストレージは汎用的なデータレイクに使用
  - ハイブリッド構成 (ホットデータ=HDFS, コールドデータ=S3) も一般的

14. ユースケースとベストプラクティス

14.1 代表的なユースケース

Hadoop の主要ユースケース
=========================

1. ETL / データウェアハウス
   - 大規模なデータ変換パイプライン
   - データレイクからデータウェアハウスへの投入
   - 日次/時次のバッチ処理

2. ログ分析
   - Web サーバーログの集約と分析
   - アプリケーションログの異常検知
   - セキュリティログの監査

3. 推薦システム
   - ユーザー行動データの分析
   - 協調フィルタリングの計算
   - A/B テストデータの処理

4. 機械学習 (特徴量エンジニアリング)
   - 大規模データからの特徴量生成
   - モデルトレーニングデータの前処理
   - バッチ推論

5. データアーカイブ
   - コールドデータの長期保存
   - コンプライアンス要件への対応
   - Erasure Coding によるコスト効率的な保存

6. IoT データ処理
   - センサーデータの集約
   - 時系列データの分析
   - デバイスログの保存と分析

14.2 ベストプラクティス

運用ベストプラクティス
======================

【クラスタ設計】
  1. NameNode は専用ハードウェアに配置 (十分なメモリ)
  2. JournalNode は 3 台以上 (奇数) で構成
  3. ZooKeeper は 3 台以上 (奇数) で構成
  4. DataNode/NodeManager は同一マシンにコロケーション
  5. マスターノードとワーカーノードは分離

【データ管理】
  1. 小さなファイル問題を回避 (CombineFileInputFormat、HAR、Hive の小ファイルマージ)
  2. 適切な圧縮コーデックを選択
     - 中間データ: Snappy/LZ4 (高速)
     - 最終保存: Gzip/Zstandard (高圧縮)
  3. 適切なファイルフォーマットを使用
     - カラムナー: ORC, Parquet
     - 行指向: Avro, SequenceFile
  4. パーティショニング戦略を活用 (日付/地域等)
  5. HDFS のゴミ箱を有効化

【セキュリティ】
  1. Kerberos 認証を有効化
  2. HDFS ACL で最小権限の原則を適用
  3. Ranger/Sentry でポリシーベースのアクセス制御
  4. 暗号化ゾーンで機密データを保護
  5. ネットワークの暗号化 (TLS/SSL)
  6. 定期的な監査ログのレビュー

【監視・運用】
  1. 全コンポーネントのメトリクスを収集
  2. MissingBlocks, CorruptBlocks のアラート設定
  3. NameNode ヒープ使用率の監視
  4. HDFS 容量の使用率監視 (80% でアラート)
  5. 定期的な fsck の実行
  6. バランサーの定期実行

14.3 小さなファイル問題への対処

小さなファイル問題
==================

問題:
  - HDFS の各ファイル/ブロックは NameNode で約 150 バイトのメモリを消費
  - 100万個の 1KB ファイル → 1GB のストレージ + 150MB のメモリ
  - 100万個の 128MB ファイル → 128TB のストレージ + 150MB のメモリ
  - ファイル数が増えると NameNode のメモリが枯渇

対策:

  1. CombineFileInputFormat
     - 複数の小さなファイルを 1 つの Map タスクで処理
     
  2. Hadoop Archive (HAR)
     hdfs archive -archiveName data.har -p /data/small-files/ /data/archives/
     
  3. SequenceFile
     - 小さなファイルを key-value ペアとしてまとめて保存
     
  4. Hive の小ファイルマージ
     SET hive.merge.mapfiles = true;
     SET hive.merge.mapredfiles = true;
     SET hive.merge.size.per.task = 256000000;
     SET hive.merge.smallfiles.avgsize = 16000000;
     
  5. コンパクション
     - 定期的に小さなファイルを大きなファイルにマージするジョブを実行

15. まとめ

15.1 Apache Hadoop の現在地

Apache Hadoop は 2006 年の誕生以来、ビッグデータ処理の基盤技術として 業界に大きな影響を与えてきた。Google の GFS と MapReduce 論文から着想を得た このプロジェクトは、オープンソースのエコシステムとして大きく発展し、 現代のデータインフラストラクチャの礎となっている。

15.2 主要な技術的貢献

Hadoop が業界にもたらした主要な貢献
=====================================

1. 分散ストレージの民主化
   - コモディティハードウェアでのペタバイト規模のストレージ
   - 高可用性と障害耐性の標準化

2. 分散処理の簡素化
   - MapReduce による並列処理の抽象化
   - YARN による汎用リソース管理

3. エコシステムの形成
   - SQL (Hive), NoSQL (HBase), Stream (Kafka) の統合基盤
   - Spark, Flink 等の次世代フレームワークの基盤

4. データレイク概念の確立
   - スキーマオンリード
   - 構造化/半構造化/非構造化データの統合保存

15.3 今後の方向性

Hadoop の進化と今後のトレンド
==============================

現在のトレンド:
  1. コンピューティングとストレージの分離
     - HDFS → クラウドオブジェクトストレージ (S3, GCS, ADLS)
     - オンデマンドの計算リソース
  
  2. コンテナネイティブ化
     - YARN 上の Docker コンテナ
     - Kubernetes 上での Spark/Flink 実行
  
  3. リアルタイム処理への移行
     - バッチ処理からストリーミング処理へ
     - Lambda/Kappa アーキテクチャ
  
  4. データレイクハウス
     - Delta Lake, Apache Iceberg, Apache Hudi
     - ACID トランザクション + スキーマ進化

Hadoop が引き続き重要な領域:
  - 大規模バッチ ETL
  - コスト効率的なデータアーカイブ (Erasure Coding)
  - オンプレミスのデータレイク
  - 規制要件によるオンプレミス必須環境
  - 既存の大規模 Hadoop クラスタの運用

15.4 学習リソース

推奨学習リソース
=================

公式ドキュメント:
  - Apache Hadoop: https://hadoop.apache.org/docs/current/
  - HDFS Architecture: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
  - YARN Architecture: https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html

書籍:
  - "Hadoop: The Definitive Guide" (Tom White, O'Reilly)
  - "Hadoop Operations" (Eric Sammer, O'Reilly)
  - "Programming Hive" (Edward Capriolo et al., O'Reilly)

認定資格:
  - Cloudera Certified Administrator (CCA)
  - Hortonworks Certified Administrator (HCA)
  - Cloudera Certified Developer (CCD)

本記事は AI によって生成された技術資料です。 内容の正確性については、公式ドキュメントと照合して確認することを推奨します。 作成日: 2026-04-11