Apache Hudi
Apache Hudi 包括的技術ガイド
Apache Hudi (Hadoop Upserts Deletes and Incrementals) は、大規模データレイク上でトランザクショナルなデータ管理を実現するオープンソースのデータレイクハウスプラットフォームである。本稿では、Hudi のアーキテクチャ、テーブルタイプ、書き込み操作、クエリタイプ、インデックス、コンパクション、クラスタリング、スキーマ進化、ストリーミング取り込み、エコシステム統合、パフォーマンスチューニング、そして最新機能に至るまで、包括的に解説する。
目次
- はじめに
- 歴史と背景
- アーキテクチャ概要
- テーブルタイプ
- タイムラインとメタデータ管理
- インデックスタイプ
- 書き込み操作
- クエリタイプ
- 同時実行制御
- コンパクション
- クラスタリング
- クリーニングとアーカイブ
- スキーマ進化
- Hudi Streamer (DeltaStreamer)
- エコシステム統合
- 設定例
- テーブルサービス
- メタデータテーブル
- マルチモーダルインデックス
- パフォーマンスチューニング
- モニタリング
- セキュリティ
- Iceberg・Delta Lake との比較
- ユースケースとベストプラクティス
- 最新機能と今後の方向性
- まとめ
1. はじめに
1.1 Apache Hudi とは何か
Apache Hudi(Hadoop Upserts Deletes and Incrementals)は、データレイク上にトランザクショナルなレイヤーを提供するオープンソースのデータ管理フレームワークである。従来のデータレイクが抱えていた「一度書き込んだら変更できない」という制約を克服し、RDBMS のような UPSERT(更新+挿入)、DELETE(削除)、そして増分処理(Incremental Processing)を大規模分散ストレージ上で実現する。
+------------------------------------------------------------------+
| データレイクハウス |
| |
| +--------------------+ +--------------------+ +--------------+ |
| | 分析エンジン | | ストリーミング | | BI ツール | |
| | (Spark, Trino, | | (Flink, Kafka) | | (Tableau等) | |
| | Presto, Hive) | | | | | |
| +--------+-----------+ +--------+-----------+ +------+-------+ |
| | | | |
| +--------v-----------------------v----------------------v------+ |
| | Apache Hudi レイヤー | |
| | +----------+ +----------+ +------------+ +---------------+ | |
| | | Timeline | | Metadata | | Index | | Table | | |
| | | 管理 | | Table | | (Bloom等) | | Services | | |
| | +----------+ +----------+ +------------+ +---------------+ | |
| +--------------------------------------------------------------+ |
| | |
| +---------------------------v----------------------------------+ |
| | ストレージレイヤー | |
| | (HDFS / S3 / GCS / Azure Blob) | |
| | Parquet / Avro / ORC ファイル形式 | |
| +--------------------------------------------------------------+ |
+------------------------------------------------------------------+
1.2 なぜ Hudi が必要なのか
従来のデータレイクアーキテクチャでは、以下のような課題が存在していた:
-
変更データの処理が困難: Parquet や ORC などの不変ファイル形式では、レコードの更新・削除が直接的にはサポートされない。テーブル全体の再作成が必要となり、膨大な計算コストが発生する。
-
増分処理の欠如: バッチ処理では毎回全データを再処理する必要があり、処理時間とコストが線形に増大する。
-
データ鮮度の問題: バッチ処理のレイテンシにより、分析データが数時間〜数日遅れることが一般的であった。
-
ACID トランザクションの不在: 同時読み書きにおけるデータ整合性の保証が困難であった。
-
コンプライアンス対応: GDPR や CCPA などの規制により、個人データの更新・削除要件が発生したが、データレイクでの対応が困難であった。
Hudi はこれらすべての課題に対する統合的なソリューションを提供する。
1.3 Hudi の基本概念
Hudi の名前に含まれる3つの主要概念を理解することが重要である:
- Upserts(アップサート): レコードが存在すれば更新、存在しなければ挿入する操作。データベースの MERGE 文に相当する。
- Deletes(デリート): 特定のレコードを論理的または物理的に削除する操作。GDPR の忘れられる権利への対応などに必須。
- Incrementals(インクリメンタル): 前回の処理以降に変更されたレコードのみを取得する増分クエリ。ETL パイプラインの効率化に不可欠。
従来のデータレイク:
バッチ1: [全データ書き込み] ──> パーティション全体を再作成
バッチ2: [全データ書き込み] ──> パーティション全体を再作成
バッチ3: [全データ書き込み] ──> パーティション全体を再作成
問題: 毎回全データを処理 → コスト大、レイテンシ大
Apache Hudi:
バッチ1: [全データ書き込み] ──> 初期ロード
バッチ2: [変更分のみ] ──> Upsert(高速)
バッチ3: [変更分のみ] ──> Upsert(高速)
利点: 変更分のみ処理 → コスト小、レイテンシ小
2. 歴史と背景
2.1 Uber における誕生
Apache Hudi は 2016 年に Uber 社内で誕生した。当時、Uber は急速に成長するビジネスを支えるために、毎日数ペタバイト規模のデータを処理する必要があった。
Uber のデータプラットフォームチームは以下の課題に直面していた:
- ライドデータの遅延更新: ライドの開始・進行・完了に伴い、同一レコードが複数回更新される
- GDPR コンプライアンス: ヨーロッパでのサービス展開に伴い、ユーザーデータの削除要件への対応
- リアルタイム分析: ドライバーの供給と乗客の需要のリアルタイムマッチング
- 膨大なデータ量: 1日あたり数百億件のイベント処理
これらの課題を解決するため、Uber のエンジニアリングチーム(Vinoth Chandar 氏をリード)が「Hoodie」(後の Hudi)プロジェクトを開始した。
2.2 年表
2016年 Uber 社内で Hoodie プロジェクト開始
2017年 Uber がオープンソースとして GitHub に公開
2018年 大規模な本番稼働実績を積み重ねる
2019年1月 Apache Software Foundation (ASF) のインキュベーターに提案
2019年5月 ASF インキュベーションプロジェクトとして正式承認
2020年1月 ASF トップレベルプロジェクト (TLP) に昇格
2020年 Hudi 0.6.0 リリース(Flink 統合の初期サポート)
2021年 Hudi 0.9.0 リリース(Spark 3.x サポート)
2021年 Hudi 0.10.0 リリース(Flink 1.13 サポート)
2022年 Hudi 0.11.0 リリース(マルチモーダルインデックス)
2022年 Hudi 0.12.0 リリース(メタデータテーブル改善)
2023年 Hudi 0.13.0 リリース(Record-level Index)
2023年 Hudi 0.14.0 リリース(パフォーマンス改善)
2024年 Hudi 1.0.0 リリース候補(大規模リアーキテクチャ)
2025年 Hudi 1.x 系の安定化と機能拡充
2.3 コミュニティの成長
Apache Hudi は ASF トップレベルプロジェクトとして、活発なコミュニティを持つ:
- コントリビュータ数: 800名以上
- GitHub スター: 5,000以上
- 導入企業: Uber, Amazon, Disney+, Robinhood, ByteDance, Alibaba, Tencent など
- クラウドサービスでの統合: Amazon EMR, Google Dataproc, Azure HDInsight, Databricks
2.4 名前の由来
当初は「Hoodie」というプロジェクト名であったが、ASF インキュベーション時に「Hudi」に変更された。「Hudi」は Hadoop Upserts Deletes and Incrementals の頭文字を取ったバクロニムである。この名前はプロジェクトの核心的機能を端的に表現している。
3. アーキテクチャ概要
3.1 全体アーキテクチャ
Apache Hudi のアーキテクチャは、以下の主要コンポーネントで構成される:
+===========================================================================+
| Apache Hudi アーキテクチャ |
+===========================================================================+
| |
| +-----------------------+ +------------------------+ |
| | Writer クライアント | | Reader クライアント | |
| | (Spark/Flink/Java) | | (Spark/Trino/Hive | |
| | | | /Presto/Flink) | |
| +----------+------------+ +-----------+------------+ |
| | | |
| v v |
| +----------+------------------------------+------------+ |
| | Hudi Table レイヤー | |
| | | |
| | +-------------+ +-------------+ +---------------+ | |
| | | Timeline | | File | | Index | | |
| | | (タイムライン)| | Groups | | (インデックス) | | |
| | | | | (ファイル | | | | |
| | | .hoodie/ | | グループ) | | Bloom/ | | |
| | | | | | | Bucket/ | | |
| | | instant | | Base files | | HBase/ | | |
| | | actions | | Log files | | Record-level | | |
| | +-------------+ +-------------+ +---------------+ | |
| | | |
| | +-------------+ +-------------+ +---------------+ | |
| | | Metadata | | Table | | Concurrency | | |
| | | Table | | Services | | Control | | |
| | | | | | | | | |
| | | files | | Compaction | | OCC / MVCC | | |
| | | column_ | | Cleaning | | Lock | | |
| | | stats | | Clustering | | Provider | | |
| | | bloom_ | | Indexing | | | | |
| | | filter | | | | | | |
| | +-------------+ +-------------+ +---------------+ | |
| +-------------------------------------------------------+ |
| | |
| +---------------------------v------------------------------------------+ |
| | ストレージ抽象化レイヤー | |
| | (HadoopFileSystem / S3 / GCS / ADLS) | |
| +----------------------------------------------------------------------+ |
+===========================================================================+
3.2 ストレージレイアウト
Hudi テーブルは、ストレージ上で以下のディレクトリ構造を持つ:
/data/hudi/my_table/
├── .hoodie/ # メタデータディレクトリ
│ ├── hoodie.properties # テーブルプロパティ
│ ├── 20240101080000000.commit # コミット完了メタデータ
│ ├── 20240101090000000.commit # コミット完了メタデータ
│ ├── 20240101100000000.inflight # 進行中のコミット
│ ├── 20240101100000000.requested # リクエスト済みアクション
│ ├── archived/ # アーカイブ済みタイムライン
│ │ └── .commits_.archive.1 # アーカイブファイル
│ └── .aux/ # 補助メタデータ
│ └── .compaction/ # コンパクション計画
│
├── 2024/01/01/ # パーティション (日付ベース)
│ ├── file_group_1.parquet # ベースファイル (COW)
│ ├── file_group_2.parquet # ベースファイル (COW)
│ ├── .file_group_3_base.parquet # ベースファイル (MOR)
│ ├── .file_group_3_log.1 # ログファイル (MOR)
│ └── .file_group_3_log.2 # ログファイル (MOR)
│
├── 2024/01/02/ # パーティション
│ ├── file_group_4.parquet
│ └── file_group_5.parquet
│
└── .hoodie_metadata/ # メタデータテーブル (Hudi 0.11+)
├── files/ # ファイルリスティング
├── column_stats/ # カラム統計
└── bloom_filters/ # ブルームフィルター
3.3 ファイルグループの概念
Hudi の最も重要な概念の一つが**ファイルグループ(File Group)**である。各ファイルグループは一意の FileGroupId を持ち、以下で構成される:
ファイルグループ構造:
+--------------------------------------------------+
| File Group (ID: fg-001) |
| |
| +----------------------------------------------+ |
| | File Slice (最新) | |
| | | |
| | Base File: fg-001_1-0-1_20240102.parquet | |
| | Log Files: | |
| | .fg-001_20240102.log.1 | |
| | .fg-001_20240102.log.2 | |
| +----------------------------------------------+ |
| |
| +----------------------------------------------+ |
| | File Slice (1つ前) | |
| | | |
| | Base File: fg-001_1-0-1_20240101.parquet | |
| | Log Files: | |
| | .fg-001_20240101.log.1 | |
| +----------------------------------------------+ |
+--------------------------------------------------+
- ベースファイル (Base File): Parquet 形式の列指向ファイル。圧縮された完全なデータを含む。
- ログファイル (Log File): Avro 形式の行指向ファイル。差分変更データ(デルタ)を含む。MOR テーブルでのみ使用。
- ファイルスライス (File Slice): 特定の時点におけるベースファイルとそれに関連するログファイルのセット。
3.4 レコードキーとパーティション
Hudi テーブルの各レコードは以下の3つの要素で一意に識別される:
レコードの一意識別子:
+------------------+-------------------+-----------------+
| Record Key | Partition Path | File Group ID |
| (レコードキー) | (パーティション) | (ファイルGrp) |
+------------------+-------------------+-----------------+
| order_id=12345 | 2024/01/15 | fg-001 |
+------------------+-------------------+-----------------+
Record Key: テーブル内でレコードを一意に識別するキー
Partition Path: レコードが属するパーティション
File Group ID: レコードが格納されるファイルグループ
-- Hudi テーブル作成時のキー指定例
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
amount DECIMAL(10,2),
order_date DATE,
status STRING,
updated_at TIMESTAMP
) USING hudi
TBLPROPERTIES (
'primaryKey' = 'order_id', -- レコードキー
'preCombineField' = 'updated_at', -- 重複排除フィールド
'hoodie.table.name' = 'orders'
)
PARTITIONED BY (order_date); -- パーティションキー
3.5 preCombineField の役割
preCombineField は、同一レコードキーのレコードが複数存在する場合に、どちらを採用するかを決定するフィールドである。通常はタイムスタンプや更新日時フィールドを指定する。
preCombineField の動作:
レコード1: {order_id: "123", status: "pending", updated_at: "2024-01-01 10:00"}
レコード2: {order_id: "123", status: "shipped", updated_at: "2024-01-01 14:00"}
↓ preCombineField = "updated_at" により比較
結果: レコード2 が採用される (updated_at が新しい)
4. テーブルタイプ
Apache Hudi は2種類のテーブルタイプを提供する。用途に応じた適切な選択が、システムの性能を大きく左右する。
4.1 Copy-on-Write (COW) テーブル
Copy-on-Write テーブルでは、書き込み時にベースファイル全体を新しいバージョンとして再作成する。
COW テーブルの書き込み動作:
[初期状態]
fg-001.parquet (v1): {id:1, name:"Alice"}, {id:2, name:"Bob"}, {id:3, name:"Carol"}
[Upsert: id:2 → name:"Bobby"]
fg-001.parquet (v2): {id:1, name:"Alice"}, {id:2, name:"Bobby"}, {id:3, name:"Carol"}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ファイル全体を新しいバージョンとして書き出す(古いファイルは後でクリーニング)
タイムライン:
──────────────────────────────────────────>
commit_1 (v1作成) commit_2 (v2作成)
COW テーブルの特徴:
| 項目 | 詳細 |
|---|---|
| 書き込みコスト | 高い - ファイル全体の再作成 |
| 読み取りコスト | 低い - Parquet を直接読むだけ |
| ストレージ形式 | Parquet のみ(ログファイルなし) |
| 読み取りレイテンシ | 低い - マージ処理不要 |
| 書き込みレイテンシ | 高い - ファイル再作成のオーバーヘッド |
| 適用場面 | 読み取り頻度 >> 書き込み頻度 |
# COW テーブルの Spark 作成例
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("HudiCOWExample") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.getOrCreate()
# COW テーブルへの書き込み
hudi_options = {
'hoodie.table.name': 'orders_cow',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field': 'order_id',
'hoodie.datasource.write.partitionpath.field': 'order_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.upsert.shuffle.parallelism': '200',
}
df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save("/data/hudi/orders_cow")
4.2 Merge-on-Read (MOR) テーブル
Merge-on-Read テーブルでは、変更データをログファイルに追記し、読み取り時にベースファイルとログファイルをマージする。
MOR テーブルの書き込み動作:
[初期状態]
fg-001_base.parquet (v1): {id:1, name:"Alice"}, {id:2, name:"Bob"}, {id:3, name:"Carol"}
[Upsert: id:2 → name:"Bobby"]
fg-001_base.parquet (v1): {id:1, name:"Alice"}, {id:2, name:"Bob"}, {id:3, name:"Carol"}
fg-001.log.1: {id:2, name:"Bobby"} ← ログファイルに追記のみ!
[さらに Upsert: id:1 → name:"Alicia"]
fg-001_base.parquet (v1): {id:1, name:"Alice"}, {id:2, name:"Bob"}, {id:3, name:"Carol"}
fg-001.log.1: {id:2, name:"Bobby"}
fg-001.log.2: {id:1, name:"Alicia"} ← さらにログ追記
[コンパクション実行]
fg-001_base.parquet (v2): {id:1, name:"Alicia"}, {id:2, name:"Bobby"}, {id:3, name:"Carol"}
↑ ベースファイルとログファイルがマージされ、新しいベースファイルが生成
MOR テーブルの特徴:
| 項目 | 詳細 |
|---|---|
| 書き込みコスト | 低い - ログファイルへの追記のみ |
| 読み取りコスト | 高い - ベース + ログのマージが必要 |
| ストレージ形式 | Parquet (ベース) + Avro (ログ) |
| 読み取りレイテンシ | クエリタイプに依存 |
| 書き込みレイテンシ | 低い - ログ追記は高速 |
| 適用場面 | 書き込み頻度 >> 読み取り頻度、ニアリアルタイム |
# MOR テーブルの Spark 作成例
hudi_options = {
'hoodie.table.name': 'orders_mor',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field': 'order_id',
'hoodie.datasource.write.partitionpath.field': 'order_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.compact.inline': 'true',
'hoodie.compact.inline.max.delta.commits': '5',
}
df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save("/data/hudi/orders_mor")
4.3 COW vs MOR 比較表
+--------------------+------------------------+------------------------+
| 比較項目 | Copy-on-Write (COW) | Merge-on-Read (MOR) |
+--------------------+------------------------+------------------------+
| 書き込み速度 | 遅い (ファイル再作成) | 速い (ログ追記) |
| 読み取り速度 | 速い (直接読み取り) | 普通〜遅い (マージ) |
| ストレージ効率 | 低い (重複バージョン) | 高い (デルタのみ) |
| データ鮮度 | コミット完了後 | ほぼリアルタイム |
| コンパクション | 不要 | 必要 |
| Snapshot クエリ | 高速 | マージコストあり |
| Read Optimized | N/A (Snapshot=RO) | 高速(最新ベースのみ) |
| 増分クエリ | サポート | サポート |
| 適切なユースケース | 分析ワークロード | ストリーミング取り込み |
| | 読み取り中心 | 書き込み中心 |
| | バッチ更新 | CDC パイプライン |
+--------------------+------------------------+------------------------+
4.4 テーブルタイプ選択ガイド
テーブルタイプ選択フローチャート:
開始
|
v
書き込み頻度は高いか?
/ \
Yes No
| |
v v
ニアリアルタイム 読み取り性能が
の鮮度が必要か? 最重要か?
/ \ / \
Yes No Yes No
| | | |
v v v v
MOR MOR COW COW
(推奨) (推奨) (推奨) (推奨)
具体的な推奨シナリオ:
-
COW を選択すべき場合:
- 日次・時間単位のバッチ更新
- BI ダッシュボードのバックエンド
- 読み取りレイテンシが最重要
- コンパクション管理の複雑さを避けたい
-
MOR を選択すべき場合:
- CDC(Change Data Capture)パイプライン
- ストリーミング取り込み(Kafka → Hudi)
- 高頻度の書き込み(分単位以下)
- 書き込みレイテンシが重要
5. タイムラインとメタデータ管理
5.1 タイムラインの概念
Hudi のタイムラインは、テーブルに対するすべての操作(アクション)を時系列で管理するコアメカニズムである。これにより、テーブルの完全な履歴が追跡可能となる。
Hudi タイムライン:
時間 ──────────────────────────────────────────────────────>
[commit_1] [commit_2] [deltacommit_3] [compaction_4] [commit_5]
20240101 20240101 20240102 20240102 20240103
080000 120000 060000 120000 080000
| | | | |
v v v v v
初期ロード バッチ更新 ストリーミング コンパクション バッチ更新
(bulk_insert) (upsert) 取り込み 実行 (upsert)
(MOR log追記) (log→base)
5.2 アクションタイプ
タイムライン上の各エントリは「インスタント(Instant)」と呼ばれ、以下の要素で構成される:
インスタント = アクションタイプ + タイムスタンプ + 状態
アクションタイプ:
+-------------------+------------------------------------------+
| アクション | 説明 |
+-------------------+------------------------------------------+
| commit | COW テーブルへの書き込みコミット |
| deltacommit | MOR テーブルへの書き込みコミット |
| compaction | MOR テーブルのコンパクション |
| clean | 古いファイルバージョンのクリーニング |
| rollback | 失敗したコミットのロールバック |
| savepoint | セーブポイント(復旧用のマーカー) |
| replace | クラスタリングやINSERT_OVERWRITEの結果 |
| indexing | インデックス更新 |
| restore | セーブポイントからの復旧 |
+-------------------+------------------------------------------+
状態遷移:
REQUESTED ──> INFLIGHT ──> COMPLETED
│ │
└────────────┴──> (失敗時は rollback)
5.3 タイムラインの物理構造
.hoodie/
├── 20240101080000000.commit.requested # コミットリクエスト
├── 20240101080000000.commit.inflight # コミット進行中
├── 20240101080000000.commit # コミット完了
│
├── 20240101120000000.deltacommit.requested
├── 20240101120000000.deltacommit.inflight
├── 20240101120000000.deltacommit # デルタコミット完了
│
├── 20240102060000000.compaction.requested # コンパクションリクエスト
├── 20240102060000000.compaction.inflight # コンパクション進行中
├── 20240102060000000.commit # コンパクション完了 (commitとして)
│
├── 20240102120000000.clean.requested # クリーニングリクエスト
├── 20240102120000000.clean.inflight
├── 20240102120000000.clean # クリーニング完了
│
├── hoodie.properties # テーブルプロパティ
│
└── archived/ # アーカイブ済みインスタント
├── .commits_.archive.1
└── .commits_.archive.2
5.4 タイムラインを利用したクエリ
# タイムラインの確認(Spark Shell)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Hudi テーブルのタイムラインを取得
timeline_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", "20240101080000000") \
.option("hoodie.datasource.read.end.instanttime", "20240102080000000") \
.load("/data/hudi/orders")
# タイムラインの各コミットの詳細確認
# Hudi CLI を使用
# hudi-> connect --path /data/hudi/orders
# hudi:orders-> timeline show --limit 20
# Hudi CLI でのタイムライン操作
$ hudi-cli
hudi-> connect --path s3://my-bucket/hudi/orders
hudi:orders-> timeline show
╔══════════════════════╦═══════════════╦═══════════╦════════════╗
║ Instant Time ║ Action ║ State ║ Partition ║
╠══════════════════════╬═══════════════╬═══════════╬════════════╣
║ 20240101080000000 ║ commit ║ COMPLETED ║ 5 ║
║ 20240101120000000 ║ deltacommit ║ COMPLETED ║ 3 ║
║ 20240102060000000 ║ compaction ║ COMPLETED ║ 3 ║
║ 20240102120000000 ║ clean ║ COMPLETED ║ - ║
║ 20240103080000000 ║ commit ║ COMPLETED ║ 7 ║
╚══════════════════════╩═══════════════╩═══════════╩════════════╝
hudi:orders-> commits show --limit 5
hudi:orders-> compactions show all
6. インデックスタイプ
6.1 インデックスの役割
Hudi のインデックスは、Upsert 操作時にレコードキーからファイルグループへのマッピングを効率的に行う仕組みである。インデックスがなければ、すべてのファイルをスキャンして該当レコードを探す必要があり、パフォーマンスが著しく低下する。
インデックスの役割:
入力レコード: {order_id: "12345", ...}
|
v
+-----------------------------------+
| Hudi Index |
| order_id: "12345" |
| → partition: "2024/01/15" |
| → file_group: "fg-001" |
+-----------------------------------+
|
v
fg-001 のみを読み込んでマージ
(他のファイルグループは読み込み不要)
6.2 Bloom Filter インデックス
Bloom Filter インデックスは Hudi のデフォルトインデックスである。各 Parquet ファイルのフッターにブルームフィルターを格納し、レコードキーの存在確率を高速に判定する。
Bloom Filter インデックスの動作:
Step 1: レコードキー "12345" のブルームフィルターチェック
fg-001.parquet → Bloom Filter: "12345" が存在する可能性あり ✓
fg-002.parquet → Bloom Filter: "12345" は確実に存在しない ✗
fg-003.parquet → Bloom Filter: "12345" は確実に存在しない ✗
Step 2: fg-001 のみを実際に読み込んで確認
特徴:
- False Positive あり(実際には存在しないのに「存在可能」と判定)
- False Negative なし(存在するものを「不在」と誤判定しない)
- メタデータテーブル有効時はさらに高速
# Bloom Filter インデックスの設定
hudi_options = {
'hoodie.index.type': 'BLOOM',
# Bloom Filter パラメータ
'hoodie.bloom.index.filter.type': 'DYNAMIC_V0',
'hoodie.index.bloom.num.entries': '60000', # エントリ数
'hoodie.index.bloom.fpp': '0.000000001', # 偽陽性率
'hoodie.bloom.index.parallelism': '200', # 並列度
'hoodie.bloom.index.prune.by.ranges': 'true', # 範囲プルーニング
'hoodie.bloom.index.use.caching': 'true', # キャッシング有効
'hoodie.bloom.index.use.metadata': 'true', # メタデータテーブル使用
}
6.3 Simple インデックス
Simple インデックスは、ブルームフィルターを使用せず、対象パーティション内の全ファイルを直接読み込んでレコードキーを照合する。
# Simple インデックスの設定
hudi_options = {
'hoodie.index.type': 'SIMPLE',
'hoodie.simple.index.parallelism': '200',
}
# 適用場面:
# - 小規模データセット
# - レコードキーの分布が均一でない場合
# - ブルームフィルターの偽陽性が問題になる場合
6.4 HBase インデックス
外部の Apache HBase をインデックスストアとして使用する。大規模データセットでの高速ルックアップに適している。
# HBase インデックスの設定
hudi_options = {
'hoodie.index.type': 'HBASE',
'hoodie.index.hbase.zkquorum': 'zk1:2181,zk2:2181,zk3:2181',
'hoodie.index.hbase.zkport': '2181',
'hoodie.index.hbase.zknode.path': '/hbase',
'hoodie.index.hbase.table': 'hudi_index_orders',
'hoodie.index.hbase.get.batch.size': '1000',
'hoodie.index.hbase.put.batch.size': '1000',
'hoodie.index.hbase.qps.fraction': '0.5',
}
HBase インデックスのアーキテクチャ:
Hudi Writer ──────> HBase Cluster
┌─────────────────────────┐
│ hudi_index_orders │
│ ┌─────────┬───────────┐ │
│ │ RowKey │ FileGroup │ │
│ ├─────────┼───────────┤ │
│ │ 12345 │ fg-001 │ │
│ │ 12346 │ fg-002 │ │
│ │ 12347 │ fg-001 │ │
│ └─────────┴───────────┘ │
└─────────────────────────┘
利点: O(1) ルックアップ
欠点: 外部依存性、運用コスト
6.5 Bucket インデックス
Bucket インデックスは、レコードキーのハッシュ値に基づいてファイルグループを決定する。インデックスの参照が不要で、計算のみで対象ファイルグループが決まるため、大規模データセットで非常に効率的である。
Bucket インデックスの動作:
レコードキー "order_12345"
|
v
hash("order_12345") % num_buckets = bucket_id
|
v
bucket_id = 3 → file_group_3 に書き込み
バケット数: 256 の場合
+----------+----------+----------+-----+----------+
| Bucket 0 | Bucket 1 | Bucket 2 | ... | Bucket 255|
| fg-000 | fg-001 | fg-002 | | fg-255 |
+----------+----------+----------+-----+----------+
# Bucket インデックスの設定
hudi_options = {
'hoodie.index.type': 'BUCKET',
'hoodie.bucket.index.num.buckets': '256', # バケット数
'hoodie.bucket.index.hash.field': 'order_id', # ハッシュフィールド
'hoodie.storage.layout.type': 'BUCKET',
'hoodie.storage.layout.partitioner.class':
'org.apache.hudi.table.action.commit.BucketIndexPartitioner',
}
Bucket インデックスの注意点:
- バケット数は作成後に変更できない(変更にはテーブルの再作成が必要)
- 適切なバケット数の見積もりが重要(ファイルサイズが均一になるように)
- Hudi 0.14+ では Consistent Hashing Bucket Index により動的なバケット数変更が可能
6.6 Record-level Index
Hudi 0.13+ で導入された Record-level Index は、レコードキーからファイルグループへのマッピングを Hudi のメタデータテーブル内に格納する。
Record-level Index:
メタデータテーブル内の record_index パーティション:
+──────────────+──────────────+──────────────+
│ Record Key │ Partition │ File Group │
+──────────────+──────────────+──────────────+
│ order_12345 │ 2024/01/15 │ fg-001 │
│ order_12346 │ 2024/01/15 │ fg-002 │
│ order_12347 │ 2024/01/16 │ fg-003 │
+──────────────+──────────────+──────────────+
利点:
- 外部依存なし(メタデータテーブル内に格納)
- グローバルインデックス(パーティション横断)
- 高速ルックアップ
# Record-level インデックスの設定
hudi_options = {
'hoodie.index.type': 'RECORD_INDEX',
'hoodie.metadata.index.record.index.enable': 'true',
'hoodie.metadata.enable': 'true',
}
6.7 インデックスタイプ比較表
+-------------------+----------+----------+----------+----------+
| | Bloom | Simple | HBase | Bucket |
| | Filter | | | |
+-------------------+----------+----------+----------+----------+
| ルックアップ速度 | 中 | 遅い | 速い | 非常に速い|
| ストレージ | テーブル内 | テーブル内 | 外部 | テーブル内|
| 外部依存 | なし | なし | HBase | なし |
| グローバル対応 | 制限あり | 可能 | 可能 | 不可 |
| スケーラビリティ | 中 | 低い | 高い | 高い |
| 設定複雑度 | 低い | 低い | 高い | 低い |
| 推奨データ規模 | 中規模 | 小規模 | 大規模 | 大規模 |
+-------------------+----------+----------+----------+----------+
+-------------------+----------+
| | Record- |
| | level |
+-------------------+----------+
| ルックアップ速度 | 速い |
| ストレージ | MDT内 |
| 外部依存 | なし |
| グローバル対応 | 可能 |
| スケーラビリティ | 高い |
| 設定複雑度 | 中 |
| 推奨データ規模 | 大規模 |
+-------------------+----------+
7. 書き込み操作
7.1 Upsert
Upsert は Hudi の最も基本的な書き込み操作で、レコードが存在すれば更新、存在しなければ挿入する。
# Upsert 操作
from pyspark.sql.types import *
schema = StructType([
StructField("order_id", StringType()),
StructField("customer_id", StringType()),
StructField("amount", DoubleType()),
StructField("order_date", StringType()),
StructField("status", StringType()),
StructField("updated_at", LongType()),
])
data = [
("ord_001", "cust_100", 150.00, "2024-01-15", "completed", 1705300000),
("ord_002", "cust_101", 250.00, "2024-01-15", "pending", 1705300100),
("ord_003", "cust_102", 350.00, "2024-01-16", "shipped", 1705300200),
]
df = spark.createDataFrame(data, schema)
hudi_options = {
'hoodie.table.name': 'orders',
'hoodie.datasource.write.recordkey.field': 'order_id',
'hoodie.datasource.write.partitionpath.field': 'order_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.upsert.shuffle.parallelism': '200',
}
df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save("s3://my-bucket/hudi/orders")
Upsert の内部動作:
入力レコード:
[ord_001, ord_002, ord_003]
|
v
┌────────────────────────┐
│ Step 1: Index Lookup │ レコードキーでインデックス検索
│ (インデックス参照) │
└───────────┬────────────┘
|
v
┌────────────────────────┐
│ Step 2: Tag Records │ 既存レコード / 新規レコードをタグ付け
│ (レコードタグ付け) │
│ │
│ ord_001 → UPDATE (fg-001 に既存)
│ ord_002 → UPDATE (fg-002 に既存)
│ ord_003 → INSERT (新規)
└───────────┬────────────┘
|
v
┌────────────────────────┐
│ Step 3: Write │
│ (書き込み) │
│ │
│ COW: fg-001, fg-002 を再作成、fg-new に ord_003 を挿入
│ MOR: fg-001, fg-002 のログに追記、fg-new に ord_003 を挿入
└────────────────────────┘
7.2 Insert
Insert 操作は、すべてのレコードを新規レコードとして挿入する。インデックスの参照を行わないため、Upsert より高速だが、重複レコードが発生する可能性がある。
# Insert 操作
hudi_options = {
'hoodie.table.name': 'events',
'hoodie.datasource.write.recordkey.field': 'event_id',
'hoodie.datasource.write.partitionpath.field': 'event_date',
'hoodie.datasource.write.precombine.field': 'event_time',
'hoodie.datasource.write.operation': 'insert',
'hoodie.insert.shuffle.parallelism': '200',
}
events_df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save("s3://my-bucket/hudi/events")
7.3 Bulk Insert
Bulk Insert は大量データの初期ロードに最適化された操作で、インデックスの作成やタグ付けをスキップして高速に書き込む。
# Bulk Insert 操作(初期ロード)
hudi_options = {
'hoodie.table.name': 'orders_history',
'hoodie.datasource.write.recordkey.field': 'order_id',
'hoodie.datasource.write.partitionpath.field': 'order_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'bulk_insert',
'hoodie.bulkinsert.shuffle.parallelism': '500',
'hoodie.bulkinsert.sort.mode': 'PARTITION_SORT',
# ファイルサイズ制御
'hoodie.parquet.max.file.size': '134217728', # 128MB
'hoodie.parquet.small.file.limit': '104857600', # 100MB
}
historical_df.write.format("hudi") \
.options(**hudi_options) \
.mode("overwrite") \
.save("s3://my-bucket/hudi/orders_history")
Bulk Insert のソートモード:
GLOBAL_SORT: 全データをグローバルにソート → 最適なファイルサイズ
PARTITION_SORT: パーティション内でソート → バランスの良い選択
NONE: ソートなし → 最速だがファイルサイズがバラつく
推奨: 初期ロードには PARTITION_SORT を使用
7.4 Delete
Delete 操作は、指定されたレコードを削除する。Hudi はソフトデリート(論理削除)とハードデリート(物理削除)の両方をサポートする。
# ハードデリート(物理削除)
delete_df = spark.createDataFrame(
[("ord_001",), ("ord_002",)],
["order_id"]
)
hudi_options = {
'hoodie.table.name': 'orders',
'hoodie.datasource.write.recordkey.field': 'order_id',
'hoodie.datasource.write.partitionpath.field': 'order_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'delete',
}
delete_df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save("s3://my-bucket/hudi/orders")
# ソフトデリート(論理削除)
# _hoodie_is_deleted カラムを true に設定
soft_delete_df = spark.createDataFrame(
[("ord_003", True)],
["order_id", "_hoodie_is_deleted"]
)
hudi_options_soft = {
'hoodie.table.name': 'orders',
'hoodie.datasource.write.recordkey.field': 'order_id',
'hoodie.datasource.write.partitionpath.field': 'order_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.payload.class':
'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
}
soft_delete_df.write.format("hudi") \
.options(**hudi_options_soft) \
.mode("append") \
.save("s3://my-bucket/hudi/orders")
7.5 Insert Overwrite
パーティション単位またはテーブル全体を上書きする操作。特定パーティションの完全な再作成に使用する。
# Insert Overwrite(パーティション上書き)
hudi_options = {
'hoodie.table.name': 'orders',
'hoodie.datasource.write.recordkey.field': 'order_id',
'hoodie.datasource.write.partitionpath.field': 'order_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'insert_overwrite',
}
# order_date = "2024-01-15" のパーティションを完全に上書き
partition_df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save("s3://my-bucket/hudi/orders")
7.6 書き込み操作の比較
+-------------------+-----------+-----------+-----------+
| | Upsert | Insert | Bulk |
| | | | Insert |
+-------------------+-----------+-----------+-----------+
| インデックス参照 | あり | なし | なし |
| 重複排除 | あり | なし | オプション |
| 速度 | 中 | 速い | 最速 |
| 用途 | 増分更新 | 追記のみ | 初期ロード |
| COW 対応 | ○ | ○ | ○ |
| MOR 対応 | ○ | ○ | ○ |
+-------------------+-----------+-----------+-----------+
8. クエリタイプ
8.1 Snapshot クエリ
Snapshot クエリは、テーブルの最新の完全な状態を返す。COW テーブルでは最新のベースファイルを読み取り、MOR テーブルではベースファイルとログファイルをマージして最新の状態を返す。
# Snapshot クエリ(デフォルト)
snapshot_df = spark.read.format("hudi") \
.load("s3://my-bucket/hudi/orders")
# SparkSQL での Snapshot クエリ
spark.sql("""
SELECT order_id, customer_id, amount, status
FROM hudi_orders
WHERE order_date = '2024-01-15'
AND status = 'completed'
""")
Snapshot クエリの動作:
COW テーブル:
┌───────────────────────┐
│ fg-001.parquet (v3) │ ← 最新バージョンのみ読み取り
│ fg-002.parquet (v2) │
│ fg-003.parquet (v1) │
└───────────────────────┘
MOR テーブル:
┌───────────────────────┐
│ fg-001_base.parquet │ ← ベースファイル
│ + fg-001.log.1 │ ← ログファイル1
│ + fg-001.log.2 │ ← ログファイル2
│ │
│ = マージ結果 │ ← 読み取り時にマージ
└───────────────────────┘
8.2 Incremental クエリ
Incremental クエリは、指定した時刻以降に変更されたレコードのみを返す。ETL パイプラインで増分処理を実現するための核心的な機能である。
# Incremental クエリ
incremental_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", "20240101080000000") \
.option("hoodie.datasource.read.end.instanttime", "20240102080000000") \
.load("s3://my-bucket/hudi/orders")
print(f"変更されたレコード数: {incremental_df.count()}")
# 増分 ETL パイプラインの例
# 前回の処理時刻以降の変更を取得
last_checkpoint = get_last_checkpoint() # 前回のコミット時刻を取得
new_changes = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", last_checkpoint) \
.load("s3://my-bucket/hudi/orders")
# 変更されたレコードを下流テーブルに反映
new_changes.write.format("hudi") \
.options(**downstream_options) \
.mode("append") \
.save("s3://my-bucket/hudi/orders_aggregated")
# チェックポイントを更新
save_checkpoint(get_latest_commit_time())
Incremental クエリの動作:
タイムライン:
───[commit_1]────[commit_2]────[commit_3]────[commit_4]──>
10:00 12:00 14:00 16:00
^ ^
| |
begin.instanttime end.instanttime
返されるデータ: commit_2 と commit_3 で変更されたレコードのみ
8.3 Read Optimized クエリ
Read Optimized クエリは MOR テーブルでのみ意味を持ち、ベースファイルのみを読み取る(ログファイルを無視する)。最新のコンパクション時点のデータが返されるため、データ鮮度は低いが、読み取り性能が最も高い。
# Read Optimized クエリ(MOR テーブル用)
read_optimized_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "read_optimized") \
.load("s3://my-bucket/hudi/orders_mor")
# Trino/Presto では "_ro" サフィックスのテーブル名を使用
# SELECT * FROM hudi.default.orders_mor_ro
Read Optimized クエリの動作 (MOR テーブル):
┌───────────────────────┐
│ fg-001_base.parquet │ ← ベースファイルのみ読み取り
│ │
│ (fg-001.log.1 は無視) │ ← ログファイルは無視
│ (fg-001.log.2 は無視) │
└───────────────────────┘
ベースファイルの時点のデータが返される(最新ではない可能性あり)
トレードオフ:
- 利点: Parquet 直接読み取りで高速
- 欠点: 最新のログファイルの変更が含まれない
8.4 Time Travel クエリ
Hudi は特定の時刻のデータ状態を照会する Time Travel クエリをサポートする。
# Time Travel クエリ(Spark)
# 方法1: オプションで指定
historical_df = spark.read.format("hudi") \
.option("as.of.instant", "20240115120000000") \
.load("s3://my-bucket/hudi/orders")
# 方法2: SparkSQL で指定
spark.sql("""
SELECT * FROM hudi_orders
TIMESTAMP AS OF '2024-01-15 12:00:00'
WHERE order_date = '2024-01-15'
""")
# 方法3: バージョン指定
spark.sql("""
SELECT * FROM hudi_orders
VERSION AS OF 3
""")
8.5 クエリタイプ比較表
+--------------------+-------------+-------------+-------------+
| | Snapshot | Incremental | Read |
| | | | Optimized |
+--------------------+-------------+-------------+-------------+
| 対応テーブル | COW & MOR | COW & MOR | MOR のみ |
| データ鮮度 | 最新 | N/A(差分) | コンパクション|
| | | | 時点 |
| 読み取り性能 | COW:高速 | 中 | 高速 |
| | MOR:中 | | |
| ログマージ | MOR:あり | MOR:あり | なし |
| 用途 | 分析 | 増分ETL | BI/レポート |
+--------------------+-------------+-------------+-------------+
9. 同時実行制御
9.1 楽観的同時実行制御 (OCC)
Hudi は楽観的同時実行制御(Optimistic Concurrency Control, OCC)を採用しており、複数のライターが同時にテーブルに書き込むことを許可する。
OCC の動作フロー:
Writer A Writer B
| |
v v
[1. Write Begin] [1. Write Begin]
ロックを取得せずに ロックを取得せずに
書き込み開始 書き込み開始
| |
v v
[2. Write Data] [2. Write Data]
データを一時領域に データを一時領域に
書き込み 書き込み
| |
v |
[3. Conflict Check] |
コミット時に競合チェック |
→ 競合なし → コミット成功 |
| v
| [3. Conflict Check]
| コミット時に競合チェック
| → ファイルグループの
| 重複がなければ成功
| → 重複があれば失敗
v (リトライ)
[COMPLETED]
# OCC の設定
hudi_options = {
# 同時実行制御の有効化
'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
'hoodie.cleaner.policy.failed.writes': 'LAZY',
# ロックプロバイダの設定
'hoodie.write.lock.provider':
'org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider',
'hoodie.write.lock.zookeeper.url': 'zk1:2181,zk2:2181,zk3:2181',
'hoodie.write.lock.zookeeper.port': '2181',
'hoodie.write.lock.zookeeper.lock_key': 'orders_table_lock',
'hoodie.write.lock.zookeeper.base_path': '/hudi/locks',
# リトライ設定
'hoodie.write.lock.wait_time_ms': '60000',
'hoodie.write.lock.num_retries': '3',
'hoodie.write.lock.conflict.resolution.strategy':
'org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy',
}
9.2 ロックプロバイダ
Hudi は複数のロックプロバイダをサポートする:
ロックプロバイダの種類:
+----------------------------+----------------------------------+
| プロバイダ | 説明 |
+----------------------------+----------------------------------+
| ZookeeperBasedLockProvider | Apache ZooKeeper を使用 |
| HiveMetastoreBasedLock | Hive Metastore を使用 |
| Provider | |
| DynamoDBBasedLockProvider | AWS DynamoDB を使用 (S3 向け) |
| FileSystemBasedLock | ファイルシステムベース |
| Provider | (テスト・開発用) |
| InProcessLockProvider | JVM 内プロセス間ロック |
| | (単一ライターの場合) |
+----------------------------+----------------------------------+
# DynamoDB ロックプロバイダ(AWS 環境向け)
hudi_options = {
'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
'hoodie.write.lock.provider':
'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider',
'hoodie.write.lock.dynamodb.table': 'hudi_locks',
'hoodie.write.lock.dynamodb.region': 'us-east-1',
'hoodie.write.lock.dynamodb.partition_key': 'orders_table',
'hoodie.write.lock.dynamodb.billing_mode': 'PAY_PER_REQUEST',
}
9.3 マルチライターサポート
Hudi 0.12+ では、複数の独立したライターが同一テーブルに書き込むマルチライター構成をサポートする。
マルチライターアーキテクチャ:
+----------------+ +----------------+ +----------------+
| Spark Writer | | Flink Writer | | Spark Writer |
| (バッチ更新) | | (ストリーミング) | | (コンパクション) |
+-------+--------+ +-------+--------+ +-------+--------+
| | |
v v v
+----------------------------------------------------+
| Lock Provider (ZK / DynamoDB) |
+----------------------------------------------------+
| | |
v v v
+----------------------------------------------------+
| Hudi Table (共有ストレージ) |
| |
| Writer A: partition=2024/01/15 の fg-001 を更新 |
| Writer B: partition=2024/01/16 の fg-005 を更新 |
| Writer C: compaction を実行 |
| |
| → ファイルグループが重複しなければ並行実行可能 |
+----------------------------------------------------+
10. コンパクション
10.1 コンパクションの概要
コンパクションは MOR テーブルにおいて、ログファイルとベースファイルをマージして新しいベースファイルを生成するプロセスである。COW テーブルではコンパクションは不要(書き込み時にベースファイルが更新されるため)。
コンパクションの動作:
Before:
┌──────────────────────────────────────┐
│ fg-001_base.parquet (100MB) │
│ fg-001.log.1 (5MB) │
│ fg-001.log.2 (3MB) │
│ fg-001.log.3 (7MB) │
│ │
│ 読み取り時: base + 3つのログをマージ │
│ → 読み取りコスト: 高い │
└──────────────────────────────────────┘
Compaction 実行
After:
┌──────────────────────────────────────┐
│ fg-001_base.parquet (102MB) ← NEW │
│ │
│ 読み取り時: base のみ読み取り │
│ → 読み取りコスト: 低い │
└──────────────────────────────────────┘
(古い base + log ファイルは cleaning で削除)
10.2 インラインコンパクション
インラインコンパクションは、書き込み操作と同一のジョブ内でコンパクションを実行する方式。
# インラインコンパクションの設定
hudi_options = {
'hoodie.compact.inline': 'true',
'hoodie.compact.inline.max.delta.commits': '5', # 5コミットごとにコンパクション
'hoodie.compact.inline.trigger.strategy':
'NUM_COMMITS', # NUM_COMMITS, NUM_AND_TIME, NUM_OR_TIME, TIME_ELAPSED
}
10.3 非同期コンパクション
非同期コンパクションは、書き込みジョブとは別のプロセスでコンパクションを実行する方式。本番環境での推奨アプローチ。
# 非同期コンパクション(別の Spark ジョブとして実行)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("HudiAsyncCompaction") \
.getOrCreate()
# コンパクションの実行
hudi_options = {
'hoodie.compact.inline': 'false',
'hoodie.compact.schedule.inline': 'true', # スケジュールのみインライン
}
# Hudi CLI でのコンパクション実行
# hudi:orders_mor-> compaction schedule
# hudi:orders_mor-> compaction run --compactionInstant 20240115120000
# spark-submit でのコンパクション実行
spark-submit \
--class org.apache.hudi.utilities.HoodieCompactor \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 4g \
hudi-utilities-bundle.jar \
--base-path s3://my-bucket/hudi/orders_mor \
--table-name orders_mor \
--instant-time 20240115120000 \
--parallelism 200 \
--schema-file s3://my-bucket/schemas/orders.avsc \
--strategy org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy
10.4 コンパクション戦略
コンパクション戦略:
+------------------------------------------+-----------------------------------+
| 戦略 | 説明 |
+------------------------------------------+-----------------------------------+
| UnBoundedCompactionStrategy | すべてのファイルスライスを |
| | コンパクション(デフォルト) |
+------------------------------------------+-----------------------------------+
| LogFileSizeBasedCompactionStrategy | ログファイルサイズが閾値を |
| | 超えたもののみコンパクション |
+------------------------------------------+-----------------------------------+
| BoundedIOCompactionStrategy | I/O 量の上限を設定して |
| | コンパクション |
+------------------------------------------+-----------------------------------+
| BoundedPartitionAwareCompaction | パーティション数の上限を設定して |
| Strategy | コンパクション |
+------------------------------------------+-----------------------------------+
| DayBasedCompactionStrategy | 日次パーティション単位で |
| | コンパクション |
+------------------------------------------+-----------------------------------+
# LogFileSizeBasedCompactionStrategy の設定例
hudi_options = {
'hoodie.compaction.strategy':
'org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy',
'hoodie.compaction.logfile.size.threshold': '134217728', # 128MB
'hoodie.compaction.target.io': '536870912', # 512MB I/O上限
}
11. クラスタリング
11.1 クラスタリングの概要
クラスタリングは、データのレイアウトを再編成してクエリパフォーマンスを最適化する機能である。小さなファイルの統合や、クエリパターンに合わせたデータのソートを行う。
クラスタリングの動作:
Before (小ファイル問題):
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ file_1 │ │ file_2 │ │ file_3 │ │ file_4 │
│ 10MB │ │ 15MB │ │ 8MB │ │ 12MB │
│ 混在データ │ │ 混在データ │ │ 混在データ │ │ 混在データ │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
Clustering 実行 (region でソート)
After (最適化済み):
┌────────────────────┐ ┌────────────────────┐
│ file_new_1 │ │ file_new_2 │
│ 128MB │ │ 128MB │
│ region=東日本 │ │ region=西日本 │
│ (ソート済み) │ │ (ソート済み) │
└────────────────────┘ └────────────────────┘
利点:
- ファイル数削減 → メタデータ管理コスト低減
- ソートによるデータスキッピング効率向上
- 均一なファイルサイズ
11.2 クラスタリングの設定
# インラインクラスタリングの設定
hudi_options = {
# クラスタリング有効化
'hoodie.clustering.inline': 'true',
'hoodie.clustering.inline.max.commits': '4',
# クラスタリング戦略
'hoodie.clustering.plan.strategy.class':
'org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy',
'hoodie.clustering.execution.strategy.class':
'org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy',
# ソート設定
'hoodie.clustering.plan.strategy.sort.columns': 'region,customer_id',
# ファイルサイズ設定
'hoodie.clustering.plan.strategy.target.file.max.bytes': '134217728', # 128MB
'hoodie.clustering.plan.strategy.small.file.limit': '104857600', # 100MB
'hoodie.clustering.plan.strategy.max.num.groups': '30',
'hoodie.clustering.plan.strategy.max.bytes.per.group': '2147483648', # 2GB
}
11.3 Z-Order / Hilbert Curve クラスタリング
多次元ソートによるクラスタリングは、複数カラムに対するクエリのデータスキッピング効率を向上させる。
# Z-Order クラスタリング
hudi_options = {
'hoodie.clustering.inline': 'true',
'hoodie.clustering.plan.strategy.sort.columns': 'city,category,price',
'hoodie.layout.optimize.strategy': 'z-order', # Z-Order
# または
'hoodie.layout.optimize.strategy': 'hilbert', # Hilbert Curve
}
Z-Order vs Linear Sort:
Linear Sort (city でソート):
┌──────────────────────────┐
│ city=A, category=混在 │ → category での絞り込みが非効率
│ city=B, category=混在 │
│ city=C, category=混在 │
└──────────────────────────┘
Z-Order (city + category):
┌──────────────────────────┐
│ city={A,B}, cat={X,Y} │ → 両方のカラムで
│ city={A,B}, cat={Z,W} │ データスキッピングが効く
│ city={C,D}, cat={X,Y} │
│ city={C,D}, cat={Z,W} │
└──────────────────────────┘
12. クリーニングとアーカイブ
12.1 クリーニング
クリーニングは、不要になった古いファイルバージョンを削除するプロセスである。ストレージコストの管理に不可欠。
# クリーニングの設定
hudi_options = {
# クリーニングポリシー
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': '10', # 保持するコミット数
# または時間ベース
# 'hoodie.cleaner.policy': 'KEEP_LATEST_BY_HOURS',
# 'hoodie.cleaner.hours.retained': '168', # 7日間保持
# または最新ファイルスライスのみ
# 'hoodie.cleaner.policy': 'KEEP_LATEST_FILE_VERSIONS',
# 'hoodie.cleaner.fileversions.retained': '3', # 3バージョン保持
# インラインクリーニング
'hoodie.clean.automatic': 'true',
'hoodie.clean.async': 'false', # true で非同期
}
クリーニングの動作:
KEEP_LATEST_COMMITS (retained=3) の場合:
タイムライン:
[commit_1] [commit_2] [commit_3] [commit_4] [commit_5]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
最新3コミットを保持
commit_1, commit_2 で使用されていたファイルが削除対象
(ただし commit_3 以降で参照されているファイルは保持)
12.2 アーカイブ
アーカイブは、タイムライン上の古いインスタントを .hoodie/archived/ に移動するプロセスである。タイムラインファイルの肥大化を防ぐ。
# アーカイブの設定
hudi_options = {
'hoodie.archive.automatic': 'true',
'hoodie.keep.min.commits': '20', # 最小保持コミット数
'hoodie.keep.max.commits': '30', # 最大保持コミット数
# max を超えるとアーカイブ開始、min まで削減
'hoodie.archive.merge.enable': 'true', # アーカイブファイルのマージ
'hoodie.archive.merge.small.file.limit.bytes': '20971520', # 20MB
}
アーカイブの動作:
.hoodie/ ディレクトリ:
アクティブタイムライン (最新 20-30 コミット):
├── 20240501.commit
├── 20240502.commit
├── ...
└── 20240530.commit
アーカイブ済み:
└── archived/
├── .commits_.archive.1 (古いコミット群)
├── .commits_.archive.2
└── .commits_.archive.3
13. スキーマ進化
13.1 スキーマ進化の概要
Apache Hudi は、テーブルのスキーマを安全に変更するスキーマ進化(Schema Evolution)をサポートする。基盤となるスキーマ管理には Apache Avro を使用し、後方互換性と前方互換性のルールに基づいたスキーマ変更を許可する。
サポートされるスキーマ変更:
✓ カラムの追加(末尾)
✓ カラムの追加(特定位置)
✓ カラム名の変更
✓ カラム型の拡張(int → long, float → double)
✓ Nullable への変更(NOT NULL → NULLABLE)
✓ ネストされた構造のカラム追加
✗ カラムの削除(Hudi 0.14+ でサポート)
✗ カラム型の縮小(long → int)
✗ Nullable 制約の追加(NULLABLE → NOT NULL)
13.2 Spark SQL によるスキーマ進化
-- カラムの追加
ALTER TABLE hudi_orders ADD COLUMNS (
discount DECIMAL(10,2) COMMENT '割引額',
coupon_code STRING COMMENT 'クーポンコード'
);
-- カラムの追加(ネスト構造)
ALTER TABLE hudi_orders ADD COLUMNS (
shipping_address STRUCT<
street: STRING,
city: STRING,
state: STRING,
zip: STRING
> COMMENT '配送先住所'
);
-- カラム名の変更(Hudi 0.13+)
ALTER TABLE hudi_orders RENAME COLUMN coupon_code TO promo_code;
-- カラム型の変更
ALTER TABLE hudi_orders ALTER COLUMN amount TYPE DECIMAL(12,2);
-- カラムの削除(Hudi 0.14+)
ALTER TABLE hudi_orders DROP COLUMN discount;
13.3 プログラムによるスキーマ進化
# Spark DataFrame API でのスキーマ進化
from pyspark.sql.types import *
# 新しいスキーマのデータを書き込むことでスキーマが自動進化
new_schema_df = spark.createDataFrame([
("ord_100", "cust_200", 500.00, "2024-01-20", "pending", 1705700000, 10.0),
], ["order_id", "customer_id", "amount", "order_date", "status", "updated_at", "discount"])
hudi_options = {
'hoodie.table.name': 'orders',
'hoodie.datasource.write.recordkey.field': 'order_id',
'hoodie.datasource.write.partitionpath.field': 'order_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'upsert',
# スキーマ進化の有効化
'hoodie.datasource.write.reconcile.schema': 'true',
'hoodie.schema.on.read.enable': 'true',
}
new_schema_df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save("s3://my-bucket/hudi/orders")
13.4 Schema-on-Read
Hudi 0.13+ では Schema-on-Read をサポートし、異なるスキーマバージョンで書き込まれたファイルを読み取り時に統合する。
Schema-on-Read の動作:
File Group 1 (Schema v1):
┌─────────────────────────────────┐
│ order_id | amount | status │
│ ord_001 | 150.0 | completed │
└─────────────────────────────────┘
File Group 2 (Schema v2 - discount カラム追加):
┌──────────────────────────────────────────┐
│ order_id | amount | status | discount │
│ ord_002 | 250.0 | pending | 10.0 │
└──────────────────────────────────────────┘
読み取り時 (Schema v2 で統合):
┌──────────────────────────────────────────┐
│ order_id | amount | status | discount │
│ ord_001 | 150.0 | completed | NULL │ ← 旧スキーマに discount なし
│ ord_002 | 250.0 | pending | 10.0 │
└──────────────────────────────────────────┘
14. Hudi Streamer (DeltaStreamer)
14.1 概要
Hudi Streamer(旧称 DeltaStreamer)は、外部ソースから Hudi テーブルへのストリーミング取り込みを実現するユーティリティツールである。Kafka、DFS(分散ファイルシステム)、JDBC など多様なソースからのデータ取り込みをサポートする。
Hudi Streamer アーキテクチャ:
+-------------+ +------------------+ +------------------+
| Source | | Hudi Streamer | | Hudi Table |
| | | | | |
| Kafka ────┼────>│ Source │ │ |
| DFS ────┼────>│ ↓ │ │ .hoodie/ |
| JDBC ────┼────>│ Schema Provider │────>│ partition_1/ |
| S3 ────┼────>│ ↓ │ │ partition_2/ |
| Custom ────┼────>│ Transformer │ │ ... |
| | │ ↓ │ │ |
| | │ Writer │ │ |
+-------------+ +------------------+ +------------------+
14.2 Kafka からの取り込み
# Kafka → Hudi (Spark Submit)
spark-submit \
--class org.apache.hudi.utilities.streamer.HoodieStreamer \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 4g \
--executor-cores 2 \
--packages org.apache.hudi:hudi-utilities-bundle_2.12:0.14.0 \
-- \
--table-type MERGE_ON_READ \
--target-base-path s3://my-bucket/hudi/orders \
--target-table orders \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field event_timestamp \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--props s3://my-bucket/config/kafka-source.properties \
--continuous \
--min-sync-interval-seconds 60 \
--op UPSERT \
--hoodie-conf hoodie.datasource.write.recordkey.field=order_id \
--hoodie-conf hoodie.datasource.write.partitionpath.field=order_date \
--hoodie-conf hoodie.datasource.write.precombine.field=event_timestamp \
--hoodie-conf hoodie.streamer.source.kafka.topic=orders-topic \
--hoodie-conf hoodie.streamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
# kafka-source.properties
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
schema.registry.url=http://schema-registry:8081
hoodie.streamer.source.kafka.topic=orders-topic
hoodie.streamer.schemaprovider.registry.url=http://schema-registry:8081/subjects/orders-value/versions/latest
auto.offset.reset=earliest
group.id=hudi-orders-consumer
# Hudi 書き込み設定
hoodie.datasource.write.recordkey.field=order_id
hoodie.datasource.write.partitionpath.field=order_date
hoodie.datasource.write.precombine.field=event_timestamp
hoodie.upsert.shuffle.parallelism=200
hoodie.insert.shuffle.parallelism=200
# コンパクション設定
hoodie.compact.inline=false
hoodie.compact.inline.max.delta.commits=5
hoodie.compact.schedule.inline=true
# クリーニング設定
hoodie.clean.automatic=true
hoodie.cleaner.policy=KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained=10
14.3 CDC (Change Data Capture) パイプライン
CDC パイプライン構成:
+----------+ +-----------+ +-------+ +----------------+
| MySQL / | | Debezium | | Kafka | | Hudi Streamer |
| Postgres |────>| Connector |────>| Topic |────>| (CDC Source) |
| (Source | | | | | | |
| DB) | | | | | +-------+--------+
+----------+ +-----------+ +-------+ |
v
+-------+--------+
| Hudi Table |
| (MOR) |
| CDC 変更が |
| リアルタイム反映 |
+----------------+
# Debezium CDC → Hudi
spark-submit \
--class org.apache.hudi.utilities.streamer.HoodieStreamer \
-- \
--table-type MERGE_ON_READ \
--target-base-path s3://my-bucket/hudi/cdc_orders \
--target-table cdc_orders \
--source-class org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource \
--source-ordering-field _event_origin_ts_ms \
--payload-class org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload \
--continuous \
--props s3://my-bucket/config/debezium-cdc.properties
14.4 Transformer の使用
Hudi Streamer はデータの変換パイプラインもサポートする。
# カスタム Transformer の例
from org.apache.hudi.utilities.transform import Transformer
class OrderEnrichmentTransformer(Transformer):
def apply(self, jsc, spark_session, source_df, props):
"""ソースデータを変換して Hudi テーブルに書き込む"""
from pyspark.sql.functions import current_timestamp, lit, when
return source_df \
.withColumn("ingestion_time", current_timestamp()) \
.withColumn("region",
when(source_df.country == "JP", lit("APAC"))
.when(source_df.country == "US", lit("NA"))
.otherwise(lit("OTHER"))) \
.drop("internal_id") # 不要カラムを削除
# SQL Transformer を使用した変換
spark-submit \
--class org.apache.hudi.utilities.streamer.HoodieStreamer \
-- \
--target-base-path s3://my-bucket/hudi/orders_transformed \
--target-table orders_transformed \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--hoodie-conf hoodie.streamer.transformer.sql="SELECT *, CURRENT_TIMESTAMP() as ingested_at FROM <SRC>" \
--continuous \
--props s3://my-bucket/config/kafka-source.properties
14.5 マルチテーブルストリーマー
# 複数テーブルの同時取り込み
spark-submit \
--class org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer \
-- \
--props s3://my-bucket/config/multi-table.properties \
--config-folder s3://my-bucket/config/tables/ \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--base-path-prefix s3://my-bucket/hudi/ \
--table-type MERGE_ON_READ \
--continuous
15. エコシステム統合
15.1 Apache Spark 統合
Apache Spark は Hudi の最も成熟した統合先であり、読み取り・書き込みの両方を完全にサポートする。
# Spark セッション設定
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("HudiSparkIntegration") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.jars.packages",
"org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0") \
.getOrCreate()
# SparkSQL でのテーブル操作
spark.sql("""
CREATE TABLE IF NOT EXISTS hudi_catalog.default.orders (
order_id STRING,
customer_id STRING,
amount DECIMAL(10,2),
order_date DATE,
status STRING,
updated_at TIMESTAMP
) USING hudi
TBLPROPERTIES (
primaryKey = 'order_id',
preCombineField = 'updated_at',
type = 'cow'
)
PARTITIONED BY (order_date)
LOCATION 's3://my-bucket/hudi/orders'
""")
# MERGE INTO (Spark 3.x)
spark.sql("""
MERGE INTO orders AS target
USING updates AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# UPDATE
spark.sql("""
UPDATE orders
SET status = 'cancelled', updated_at = current_timestamp()
WHERE order_id = 'ord_001'
""")
# DELETE
spark.sql("""
DELETE FROM orders
WHERE order_date < '2023-01-01'
""")
# INSERT INTO
spark.sql("""
INSERT INTO orders
SELECT * FROM staging_orders
""")
15.2 Apache Flink 統合
Apache Flink は Hudi のストリーミング書き込みにおける主要なエンジンである。
-- Flink SQL でのテーブル作成
CREATE TABLE orders_hudi (
order_id STRING PRIMARY KEY NOT ENFORCED,
customer_id STRING,
amount DECIMAL(10,2),
order_date STRING,
status STRING,
updated_at TIMESTAMP(3),
`partition` STRING
) PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 's3://my-bucket/hudi/orders',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'order_id',
'hoodie.datasource.write.precombine.field' = 'updated_at',
'write.tasks' = '4',
'write.operation' = 'upsert',
'compaction.async.enabled' = 'true',
'compaction.trigger.strategy' = 'num_commits',
'compaction.delta_commits' = '5',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '4'
);
-- Kafka → Hudi ストリーミング
INSERT INTO orders_hudi
SELECT
order_id,
customer_id,
amount,
order_date,
status,
event_time as updated_at,
DATE_FORMAT(event_time, 'yyyy-MM-dd') as `partition`
FROM kafka_orders;
-- ストリーミング読み取り
SELECT * FROM orders_hudi /*+ OPTIONS('read.streaming.enabled'='true') */;
// Flink DataStream API での使用
import org.apache.hudi.sink.HoodieSinkFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka Source → Hudi Sink
DataStream<RowData> kafkaStream = env.addSource(kafkaSource);
Map<String, String> hudiOptions = new HashMap<>();
hudiOptions.put("path", "s3://my-bucket/hudi/orders");
hudiOptions.put("table.type", "MERGE_ON_READ");
hudiOptions.put("hoodie.datasource.write.recordkey.field", "order_id");
hudiOptions.put("hoodie.datasource.write.precombine.field", "updated_at");
hudiOptions.put("write.operation", "upsert");
kafkaStream
.addSink(new HoodieSinkFunction<>(hudiOptions))
.name("Hudi Sink");
env.execute("Flink to Hudi Pipeline");
15.3 Trino (旧 PrestoSQL) 統合
-- Trino での Hudi テーブル読み取り
-- catalog: hudi, schema: default
-- Snapshot クエリ
SELECT order_id, customer_id, amount, status
FROM hudi.default.orders
WHERE order_date = DATE '2024-01-15';
-- Read Optimized クエリ (MOR テーブル)
SELECT *
FROM hudi.default.orders_mor_ro
WHERE order_date = DATE '2024-01-15';
-- Snapshot クエリ (MOR テーブル)
SELECT *
FROM hudi.default.orders_mor_rt
WHERE order_date = DATE '2024-01-15';
# Trino Hudi Connector 設定 (catalog/hudi.properties)
connector.name=hudi
hive.metastore.uri=thrift://hive-metastore:9083
hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
hudi.metadata-table.enabled=true
hudi.parquet.use-column-names=true
15.4 Apache Hive 統合
-- Hive での Hudi テーブル登録
-- Hive Sync Tool で自動同期
-- Hive でのクエリ
SELECT * FROM orders
WHERE order_date = '2024-01-15'
AND status = 'completed';
-- MOR テーブルのビュー
-- orders_ro: Read Optimized ビュー
-- orders_rt: Real-Time (Snapshot) ビュー
SELECT * FROM orders_ro WHERE order_date = '2024-01-15';
SELECT * FROM orders_rt WHERE order_date = '2024-01-15';
# Hive Sync の設定
hudi_options = {
# Hive Sync 設定
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.mode': 'hms', # HMS モード
'hoodie.datasource.hive_sync.database': 'default',
'hoodie.datasource.hive_sync.table': 'orders',
'hoodie.datasource.hive_sync.partition_fields': 'order_date',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.metastore.uris': 'thrift://hive-metastore:9083',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
# MOR テーブルの場合、追加ビュー設定
'hoodie.datasource.hive_sync.support_timestamp': 'true',
}
15.5 Presto 統合
-- Presto での読み取り
SELECT order_id, amount, status
FROM hive.default.orders
WHERE order_date = '2024-01-15'
ORDER BY amount DESC
LIMIT 100;
15.6 統合比較表
+------------------+--------+--------+-------+------+--------+
| 機能 | Spark | Flink | Trino | Hive | Presto |
+------------------+--------+--------+-------+------+--------+
| 読み取り | ✓ | ✓ | ✓ | ✓ | ✓ |
| 書き込み | ✓ | ✓ | ✗ | ✗ | ✗ |
| Snapshot Query | ✓ | ✓ | ✓ | ✓ | ✓ |
| Incremental Query| ✓ | ✓ | ✓ | ✓ | ✗ |
| Read Optimized | ✓ | ✓ | ✓ | ✓ | ✓ |
| Time Travel | ✓ | ✓ | ✓ | ✗ | ✗ |
| DDL (CREATE etc) | ✓ | ✓ | ✗ | ✗ | ✗ |
| MERGE INTO | ✓ | ✓ | ✗ | ✗ | ✗ |
+------------------+--------+--------+-------+------+--------+
16. 設定例
16.1 基本設定
# 包括的な Hudi 設定例
# === テーブル基本設定 ===
table_config = {
'hoodie.table.name': 'orders',
'hoodie.table.type': 'MERGE_ON_READ', # COPY_ON_WRITE or MERGE_ON_READ
'hoodie.table.base.file.format': 'PARQUET', # PARQUET, ORC, HFILE
'hoodie.table.log.file.format': 'HOODIE_LOG', # ログファイル形式
'hoodie.table.timeline.layout.version': '1', # タイムラインレイアウト
'hoodie.table.checksum': '0',
}
# === 書き込み設定 ===
write_config = {
'hoodie.datasource.write.recordkey.field': 'order_id',
'hoodie.datasource.write.partitionpath.field': 'order_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
# 並列度
'hoodie.upsert.shuffle.parallelism': '200',
'hoodie.insert.shuffle.parallelism': '200',
'hoodie.bulkinsert.shuffle.parallelism': '500',
'hoodie.delete.shuffle.parallelism': '200',
# ファイルサイズ
'hoodie.parquet.max.file.size': '134217728', # 128MB
'hoodie.parquet.small.file.limit': '104857600', # 100MB
'hoodie.logfile.max.size': '1073741824', # 1GB
'hoodie.logfile.data.block.max.size': '268435456', # 256MB
# レコードマージ
'hoodie.datasource.write.payload.class':
'org.apache.hudi.common.model.OverwriteWithLatestAvroPayload',
}
# === インデックス設定 ===
index_config = {
'hoodie.index.type': 'BLOOM', # BLOOM, SIMPLE, HBASE, BUCKET
'hoodie.index.bloom.num.entries': '60000',
'hoodie.index.bloom.fpp': '0.000000001',
'hoodie.bloom.index.parallelism': '200',
'hoodie.bloom.index.prune.by.ranges': 'true',
'hoodie.bloom.index.use.caching': 'true',
'hoodie.bloom.index.use.metadata': 'true',
}
# === コンパクション設定 ===
compaction_config = {
'hoodie.compact.inline': 'false',
'hoodie.compact.schedule.inline': 'true',
'hoodie.compact.inline.max.delta.commits': '5',
'hoodie.compaction.strategy':
'org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy',
'hoodie.compaction.target.io': '536870912', # 512MB
'hoodie.compaction.logfile.size.threshold': '134217728', # 128MB
'hoodie.compaction.daybased.target.partitions': '10',
}
# === クリーニング設定 ===
clean_config = {
'hoodie.clean.automatic': 'true',
'hoodie.clean.async': 'true',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': '10',
'hoodie.cleaner.parallelism': '200',
'hoodie.cleaner.delete.bootstrap.base.file': 'false',
}
# === クラスタリング設定 ===
clustering_config = {
'hoodie.clustering.inline': 'false',
'hoodie.clustering.schedule.inline': 'true',
'hoodie.clustering.inline.max.commits': '4',
'hoodie.clustering.plan.strategy.class':
'org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy',
'hoodie.clustering.execution.strategy.class':
'org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy',
'hoodie.clustering.plan.strategy.sort.columns': 'region,customer_id',
'hoodie.clustering.plan.strategy.target.file.max.bytes': '134217728',
'hoodie.clustering.plan.strategy.small.file.limit': '67108864',
}
# === メタデータテーブル設定 ===
metadata_config = {
'hoodie.metadata.enable': 'true',
'hoodie.metadata.index.bloom.filter.enable': 'true',
'hoodie.metadata.index.column.stats.enable': 'true',
'hoodie.metadata.index.column.stats.column.list': 'order_id,customer_id,order_date',
}
# === アーカイブ設定 ===
archive_config = {
'hoodie.archive.automatic': 'true',
'hoodie.keep.min.commits': '20',
'hoodie.keep.max.commits': '30',
}
# === すべての設定を結合 ===
all_config = {}
all_config.update(table_config)
all_config.update(write_config)
all_config.update(index_config)
all_config.update(compaction_config)
all_config.update(clean_config)
all_config.update(clustering_config)
all_config.update(metadata_config)
all_config.update(archive_config)
# DataFrame の書き込み
df.write.format("hudi") \
.options(**all_config) \
.mode("append") \
.save("s3://my-bucket/hudi/orders")
16.2 本番環境向け推奨設定
# 本番環境推奨設定(大規模 MOR テーブル)
production_config = {
# テーブル基本
'hoodie.table.name': 'prod_orders',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field': 'order_id',
'hoodie.datasource.write.partitionpath.field': 'order_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'upsert',
# パフォーマンスチューニング
'hoodie.upsert.shuffle.parallelism': '400',
'hoodie.parquet.max.file.size': '134217728', # 128MB
'hoodie.parquet.small.file.limit': '104857600', # 100MB
'hoodie.parquet.compression.codec': 'snappy',
'hoodie.logfile.max.size': '1073741824', # 1GB
# インデックス(大規模向け: Bucket Index)
'hoodie.index.type': 'BUCKET',
'hoodie.bucket.index.num.buckets': '512',
# メタデータテーブル
'hoodie.metadata.enable': 'true',
'hoodie.metadata.index.bloom.filter.enable': 'true',
'hoodie.metadata.index.column.stats.enable': 'true',
# コンパクション(非同期推奨)
'hoodie.compact.inline': 'false',
'hoodie.compact.schedule.inline': 'true',
'hoodie.compact.inline.max.delta.commits': '5',
# クリーニング
'hoodie.clean.automatic': 'true',
'hoodie.clean.async': 'true',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': '24', # 24コミット保持
# 同時実行制御
'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
'hoodie.write.lock.provider':
'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider',
'hoodie.write.lock.dynamodb.table': 'hudi_locks',
'hoodie.write.lock.dynamodb.region': 'us-east-1',
# Hive Sync
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.mode': 'hms',
'hoodie.datasource.hive_sync.database': 'production',
'hoodie.datasource.hive_sync.table': 'orders',
# アーカイブ
'hoodie.keep.min.commits': '40',
'hoodie.keep.max.commits': '50',
}
17. テーブルサービス
17.1 テーブルサービスの概要
Hudi のテーブルサービスは、テーブルのライフサイクル管理を自動化する一連のバックグラウンド処理である。
テーブルサービスの全体像:
+-----------------------------------------------------------+
| Hudi Table Services |
| |
| ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ |
| │Compaction│ │ Cleaning │ │Clustering│ │ Indexing │ |
| │ │ │ │ │ │ │ │ |
| │ Log→Base │ │ 古いファイル│ │ ファイル │ │ インデックス│ |
| │ マージ │ │ の削除 │ │ レイアウト│ │ の更新 │ |
| │ │ │ │ │ 最適化 │ │ │ |
| └──────────┘ └──────────┘ └──────────┘ └──────────┘ |
| |
| 実行モード: |
| ┌──────────┐ ┌──────────┐ ┌──────────┐ |
| │ Inline │ │ Async │ │ Scheduled│ |
| │ (同期) │ │ (非同期) │ │ (定期) │ |
| └──────────┘ └──────────┘ └──────────┘ |
+-----------------------------------------------------------+
17.2 各サービスの詳細
# テーブルサービスの統合設定
table_services_config = {
# === コンパクション ===
'hoodie.compact.inline': 'false', # インライン無効
'hoodie.compact.schedule.inline': 'true', # スケジュールのみインライン
'hoodie.compact.inline.max.delta.commits': '5', # 5コミットごと
# === クリーニング ===
'hoodie.clean.automatic': 'true', # 自動クリーニング
'hoodie.clean.async': 'true', # 非同期実行
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': '10',
# === クラスタリング ===
'hoodie.clustering.inline': 'false', # インライン無効
'hoodie.clustering.schedule.inline': 'true', # スケジュールのみインライン
'hoodie.clustering.inline.max.commits': '4',
# === インデキシング ===
'hoodie.metadata.enable': 'true',
'hoodie.metadata.index.bloom.filter.enable': 'true',
'hoodie.metadata.index.column.stats.enable': 'true',
}
17.3 非同期テーブルサービス
# 非同期コンパクションジョブ
spark-submit \
--class org.apache.hudi.utilities.HoodieCompactor \
--master yarn \
--deploy-mode cluster \
hudi-utilities-bundle.jar \
--base-path s3://my-bucket/hudi/orders \
--table-name orders \
--instant-time 20240115120000 \
--parallelism 200
# 非同期クラスタリングジョブ
spark-submit \
--class org.apache.hudi.utilities.HoodieClusteringJob \
--master yarn \
--deploy-mode cluster \
hudi-utilities-bundle.jar \
--base-path s3://my-bucket/hudi/orders \
--table-name orders \
--instant-time 20240115120000 \
--parallelism 200 \
--spark-memory 8g
# 非同期クリーニングジョブ
spark-submit \
--class org.apache.hudi.utilities.HoodieCleaner \
--master yarn \
--deploy-mode cluster \
hudi-utilities-bundle.jar \
--base-path s3://my-bucket/hudi/orders \
--table-name orders
18. メタデータテーブル
18.1 メタデータテーブルの概要
Hudi メタデータテーブルは、テーブルのメタデータを効率的に管理するための内部 Hudi テーブルである。ファイルリスティング、ブルームフィルター、カラム統計などを格納し、クエリプランニングの高速化を実現する。
メタデータテーブルの構造:
.hoodie_metadata/
├── files/ # ファイルリスティング
│ └── パーティションごとのファイル一覧
│
├── column_stats/ # カラム統計
│ └── 各ファイルのmin/max/null_count等
│
├── bloom_filters/ # ブルームフィルター
│ └── 各ファイルのブルームフィルター
│
├── record_index/ # レコードインデックス
│ └── レコードキー → ファイルグループ マッピング
│
└── functional_index/ # 機能インデックス (Hudi 1.x)
└── カスタム関数ベースのインデックス
18.2 ファイルリスティングの高速化
従来のファイルリスティング (メタデータテーブルなし):
クエリプランニング時:
1. ストレージ API (S3 LIST) を使用してパーティションを列挙
2. 各パーティションのファイルを列挙
3. 大規模テーブルでは数分かかることもある
問題点:
- S3 LIST は遅い (パーティション数 × API 呼び出し)
- 頻繁な API コールによるコストとレート制限
メタデータテーブルを使用したファイルリスティング:
1. メタデータテーブル (files パーティション) を参照
2. 単一ファイルの読み取りですべてのファイル情報を取得
3. 数秒で完了
改善効果:
- S3 LIST の回避 → 大幅な高速化
- コスト削減 (API 呼び出し削減)
# メタデータテーブルの設定
metadata_config = {
'hoodie.metadata.enable': 'true', # メタデータテーブル有効化
'hoodie.metadata.index.bloom.filter.enable': 'true', # ブルームフィルター
'hoodie.metadata.index.column.stats.enable': 'true', # カラム統計
'hoodie.metadata.index.column.stats.column.list':
'order_id,customer_id,amount,order_date', # 統計対象カラム
'hoodie.metadata.index.record.index.enable': 'true', # レコードインデックス
# メタデータテーブルのコンパクション
'hoodie.metadata.compact.max.delta.commits': '10',
'hoodie.metadata.cleaner.commits.retained': '3',
}
18.3 カラム統計によるデータスキッピング
カラム統計の活用:
クエリ: SELECT * FROM orders WHERE amount > 1000
ファイル1: min_amount=10, max_amount=500 → スキップ ✗
ファイル2: min_amount=100, max_amount=2000 → 読み取り ✓
ファイル3: min_amount=5, max_amount=300 → スキップ ✗
ファイル4: min_amount=800, max_amount=5000 → 読み取り ✓
結果: 4ファイル中2ファイルのみ読み取り → 50% のデータスキッピング
19. マルチモーダルインデックス
19.1 概要
Hudi のマルチモーダルインデックスは、メタデータテーブル内に複数種類のインデックスを格納する統合インデックスフレームワークである。
マルチモーダルインデックスの構成:
+-----------------------------------------------------------+
| Metadata Table (メタデータテーブル) |
| |
| ┌────────────┐ ┌────────────┐ ┌────────────────────┐ |
| │ files │ │ bloom_ │ │ column_stats │ |
| │ partition │ │ filters │ │ │ |
| │ │ │ partition │ │ min/max/null_count │ |
| │ ファイル │ │ │ │ 各カラムの統計情報 │ |
| │ リスティング│ │ ブルーム │ │ │ |
| │ │ │ フィルター │ │ │ |
| └────────────┘ └────────────┘ └────────────────────┘ |
| |
| ┌────────────┐ ┌────────────────────┐ |
| │ record_ │ │ functional_index │ |
| │ index │ │ (Hudi 1.x) │ |
| │ │ │ │ |
| │ レコード │ │ 関数ベース │ |
| │ レベル │ │ インデックス │ |
| │ インデックス│ │ (例: UPPER(name)) │ |
| └────────────┘ └────────────────────┘ |
+-----------------------------------------------------------+
19.2 設定と活用
# マルチモーダルインデックスの包括的設定
multi_modal_index_config = {
# メタデータテーブル
'hoodie.metadata.enable': 'true',
# ブルームフィルターインデックス
'hoodie.metadata.index.bloom.filter.enable': 'true',
'hoodie.metadata.index.bloom.filter.column.list': 'order_id',
# カラム統計インデックス
'hoodie.metadata.index.column.stats.enable': 'true',
'hoodie.metadata.index.column.stats.column.list': 'amount,order_date,region',
# レコードレベルインデックス
'hoodie.metadata.index.record.index.enable': 'true',
}
20. パフォーマンスチューニング
20.1 書き込みパフォーマンス
# 書き込みパフォーマンスの最適化設定
write_perf_config = {
# === 並列度の最適化 ===
# 経験則: executor数 × コア数 × 2-3
'hoodie.upsert.shuffle.parallelism': '400',
'hoodie.insert.shuffle.parallelism': '400',
'hoodie.bulkinsert.shuffle.parallelism': '800',
# === ファイルサイズの最適化 ===
# 大きすぎるとメモリ消費、小さすぎるとファイル数増大
'hoodie.parquet.max.file.size': '134217728', # 128MB(推奨)
'hoodie.parquet.small.file.limit': '104857600', # 100MB
'hoodie.parquet.compression.codec': 'snappy', # snappy or zstd
# === メモリ最適化 ===
'hoodie.memory.merge.fraction': '0.6', # Merge に割り当てるメモリ割合
'hoodie.memory.compaction.fraction': '0.6',
# === 書き込みバッファ ===
'hoodie.write.buffer.limit.bytes': '536870912', # 512MB
# === マーカー最適化 ===
'hoodie.write.markers.type': 'TIMELINE_SERVER_BASED', # マーカータイプ
# === Spark 設定 ===
# spark.executor.memory=8g
# spark.executor.cores=4
# spark.default.parallelism=400
# spark.sql.shuffle.partitions=400
}
20.2 読み取りパフォーマンス
# 読み取りパフォーマンスの最適化設定
read_perf_config = {
# === データスキッピング ===
'hoodie.metadata.enable': 'true',
'hoodie.metadata.index.column.stats.enable': 'true',
'hoodie.enable.data.skipping': 'true',
# === ファイルリスティング最適化 ===
'hoodie.metadata.index.bloom.filter.enable': 'true',
# === Parquet 読み取り最適化 ===
'hoodie.parquet.readoptimized.read.column.names': 'true',
# === 述語プッシュダウン ===
# Spark 設定
# spark.sql.parquet.filterPushdown=true
# spark.sql.parquet.recordLevelFilter.enabled=true
}
20.3 パフォーマンスチェックリスト
パフォーマンスチューニングチェックリスト:
書き込み側:
□ 適切なインデックスタイプを選択しているか
□ 並列度はクラスターリソースに適合しているか
□ ファイルサイズは 128MB 前後か
□ Bucket Index を大規模テーブルで検討したか
□ コンパクションは非同期で実行しているか
□ マーカーは TIMELINE_SERVER_BASED か
読み取り側:
□ メタデータテーブルを有効化しているか
□ カラム統計が有効か
□ クラスタリングでデータを適切にソートしているか
□ パーティションプルーニングが効いているか
□ Read Optimized クエリで十分な場合に使用しているか
インフラ側:
□ Executor メモリは十分か (8GB+)
□ Executor コア数は適切か (4-8)
□ ネットワーク帯域は十分か
□ ストレージ (S3等) の I/O 性能は十分か
21. モニタリング
21.1 メトリクス
Hudi は JMX、Datadog、Prometheus、CloudWatch、Graphite などの複数のメトリクスレポーターをサポートする。
# メトリクス設定
metrics_config = {
# メトリクス有効化
'hoodie.metrics.on': 'true',
# Datadog レポーター
'hoodie.metrics.reporter.type': 'DATADOG',
'hoodie.metrics.datadog.api.site': 'datadoghq.com',
'hoodie.metrics.datadog.api.key': '<DATADOG_API_KEY>',
'hoodie.metrics.datadog.metric.prefix': 'hudi.orders',
'hoodie.metrics.datadog.metric.host': 'hudi-writer-1',
'hoodie.metrics.datadog.report.period.seconds': '60',
# JMX レポーター
# 'hoodie.metrics.reporter.type': 'JMX',
# 'hoodie.metrics.jmx.host': 'localhost',
# 'hoodie.metrics.jmx.port': '9999',
# Prometheus レポーター
# 'hoodie.metrics.reporter.type': 'PROMETHEUS_PUSHGATEWAY',
# 'hoodie.metrics.pushgateway.host': 'prometheus-pushgateway',
# 'hoodie.metrics.pushgateway.port': '9091',
# 'hoodie.metrics.pushgateway.job.name': 'hudi_orders',
# GraphiteReporter
# 'hoodie.metrics.reporter.type': 'GRAPHITE',
# 'hoodie.metrics.graphite.host': 'graphite-host',
# 'hoodie.metrics.graphite.port': '2003',
# 'hoodie.metrics.graphite.metric.prefix': 'hudi',
}
21.2 主要メトリクス
監視すべき主要メトリクス:
書き込みメトリクス:
┌─────────────────────────────────┬────────────────────────┐
│ メトリクス名 │ 説明 │
├─────────────────────────────────┼────────────────────────┤
│ commitTime │ コミット所要時間 │
│ numInserts │ 挿入レコード数 │
│ numUpdates │ 更新レコード数 │
│ numDeletes │ 削除レコード数 │
│ numFilesInserted │ 挿入ファイル数 │
│ numFilesUpdated │ 更新ファイル数 │
│ totalBytesWritten │ 書き込みバイト数 │
│ totalLogFilesSize │ ログファイルサイズ │
└─────────────────────────────────┴────────────────────────┘
コンパクションメトリクス:
┌─────────────────────────────────┬────────────────────────┐
│ compactionTime │ コンパクション所要時間 │
│ numFilesCompacted │ コンパクトファイル数 │
│ totalLogRecords │ マージログレコード数 │
└─────────────────────────────────┴────────────────────────┘
テーブルヘルスメトリクス:
┌─────────────────────────────────┬────────────────────────┐
│ pendingCompactions │ 保留中コンパクション数 │
│ numSmallFiles │ スモールファイル数 │
│ totalPartitions │ パーティション数 │
│ totalFileCount │ 総ファイル数 │
└─────────────────────────────────┴────────────────────────┘
21.3 アラート設定の推奨
推奨アラート:
Critical:
- commitTime > 30分 → 書き込みの遅延
- pendingCompactions > 20 → コンパクションの蓄積
- コミット失敗 → 書き込みエラー
Warning:
- commitTime > 10分 → パフォーマンス低下
- numSmallFiles > 1000 → ファイル数増大
- totalLogFilesSize > 10GB → ログファイルの蓄積
Info:
- 定期的な書き込みサマリー
- テーブルサイズの推移
- パーティション数の推移
22. セキュリティ
22.1 データ暗号化
# ストレージレベルの暗号化(S3 SSE)
spark_config = {
'spark.hadoop.fs.s3a.server-side-encryption-algorithm': 'SSE-KMS',
'spark.hadoop.fs.s3a.server-side-encryption.key': 'arn:aws:kms:us-east-1:123456:key/xxx',
}
# Parquet レベルの暗号化
hudi_config = {
'hoodie.parquet.encryption.footer.key': 'k1',
'hoodie.parquet.encryption.column.keys': 'k2:ssn,credit_card_number',
}
22.2 アクセス制御
アクセス制御のレイヤー:
1. ストレージレベル: IAM ポリシー、S3 バケットポリシー
2. メタストアレベル: Hive Metastore の権限管理
3. クエリエンジンレベル: Trino/Spark の行・列レベルセキュリティ
4. カタログレベル: Unity Catalog, AWS LakeFormation, Ranger
# AWS LakeFormation 統合例
hudi_config = {
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.mode': 'glue', # AWS Glue Catalog
'hoodie.datasource.hive_sync.database': 'production',
'hoodie.datasource.hive_sync.table': 'orders',
}
# Spark で LakeFormation を使用
spark_config = {
'spark.sql.catalog.glue_catalog': 'org.apache.iceberg.spark.SparkCatalog',
'spark.hadoop.fs.s3a.aws.credentials.provider':
'com.amazonaws.auth.DefaultAWSCredentialsProviderChain',
}
22.3 監査ログ
# カスタム監査コールバック
hudi_config = {
'hoodie.write.commit.callback.on': 'true',
'hoodie.write.commit.callback.class':
'org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback',
'hoodie.write.commit.callback.http.url':
'https://audit-service.internal/api/hudi-commits',
'hoodie.write.commit.callback.http.timeout.seconds': '30',
}
23. Iceberg・Delta Lake との比較
23.1 概要比較
Apache Hudi、Apache Iceberg、Delta Lake は、データレイクハウスを実現する3大テーブルフォーマットである。
+---------------------------+-------------------+-------------------+-------------------+
| 項目 | Apache Hudi | Apache Iceberg | Delta Lake |
+---------------------------+-------------------+-------------------+-------------------+
| 開発元 | Uber → ASF | Netflix → ASF | Databricks → LF |
| 初期リリース | 2017 | 2018 | 2019 |
| ライセンス | Apache 2.0 | Apache 2.0 | Apache 2.0 |
| ガバナンス | ASF | ASF | Linux Foundation |
+---------------------------+-------------------+-------------------+-------------------+
| UPSERT | ✓ (ネイティブ) | ✓ (MERGE INTO) | ✓ (MERGE INTO) |
| 増分クエリ | ✓ (ネイティブ) | ✓ (Snapshot Diff) | ✓ (CDF) |
| コンパクション | ✓ (自動/手動) | ✓ (手動) | ✓ (自動/手動) |
| クラスタリング | ✓ | ✓ (Sort Order) | ✓ (Z-Order) |
| スキーマ進化 | ✓ | ✓ | ✓ |
| Time Travel | ✓ | ✓ | ✓ |
| ACID トランザクション | ✓ | ✓ | ✓ |
+---------------------------+-------------------+-------------------+-------------------+
| テーブルタイプ | COW / MOR | なし(統一形式) | なし(ログベース) |
| インデックス | 多様(Bloom等) | 制限的 | 制限的 |
| ストリーミング取り込み | ✓ (ネイティブ) | △ | ✓ (Structured |
| | | | Streaming) |
| CDC サポート | ✓ (ネイティブ) | ✓ (0.14+) | ✓ (CDF) |
+---------------------------+-------------------+-------------------+-------------------+
| Spark | ✓ | ✓ | ✓ |
| Flink | ✓ | ✓ | △ (限定的) |
| Trino | ✓ | ✓ (推奨) | ✓ |
| Hive | ✓ | ✓ | △ |
+---------------------------+-------------------+-------------------+-------------------+
23.2 Hudi の差別化ポイント
Hudi の強み:
1. Upsert のネイティブサポート(最も成熟)
2. 多様なインデックスタイプ(Bloom, Bucket, Record-level等)
3. テーブルサービスの自動化(コンパクション、クリーニング、クラスタリング)
4. ストリーミング取り込みの成熟度(Hudi Streamer)
5. CDC パイプラインのネイティブサポート
6. MOR テーブルによる書き込み最適化
Iceberg の強み:
1. シンプルなアーキテクチャ
2. 広範なエンジンサポート(特に Trino)
3. Hidden Partitioning(パーティション進化)
4. Time Travel の柔軟性
Delta Lake の強み:
1. Databricks との深い統合
2. Spark との親和性
3. Delta Sharing(データ共有プロトコル)
4. Photon エンジンとの最適化
24. ユースケースとベストプラクティス
24.1 主要ユースケース
1. CDC パイプライン(最も一般的なユースケース)
OLTP Database → Debezium → Kafka → Hudi Streamer → Hudi Table (MOR)
|
分析エンジン
(Trino, Spark)
# CDC パイプラインのベストプラクティス
cdc_config = {
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.index.type': 'BUCKET',
'hoodie.bucket.index.num.buckets': '256',
'hoodie.compact.inline': 'false',
'hoodie.compact.schedule.inline': 'true',
'hoodie.compact.inline.max.delta.commits': '5',
'hoodie.clean.automatic': 'true',
'hoodie.cleaner.commits.retained': '24',
}
2. 日次バッチ ETL
# 日次バッチ ETL のベストプラクティス
batch_config = {
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.index.type': 'BLOOM',
'hoodie.clean.automatic': 'true',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': '7',
}
3. GDPR 対応(データ削除)
# GDPR 削除パイプライン
def gdpr_delete(spark, table_path, user_ids):
"""ユーザーデータの削除"""
delete_df = spark.createDataFrame(
[(uid,) for uid in user_ids],
["user_id"]
)
hudi_options = {
'hoodie.table.name': 'user_data',
'hoodie.datasource.write.recordkey.field': 'user_id',
'hoodie.datasource.write.partitionpath.field': '',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'delete',
}
delete_df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save(table_path)
print(f"Deleted {len(user_ids)} users from {table_path}")
4. イベントログの取り込み
# イベントログ取り込みのベストプラクティス
event_config = {
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.operation': 'insert', # 追記のみ
'hoodie.datasource.write.recordkey.field': 'event_id',
'hoodie.datasource.write.partitionpath.field': 'event_date',
'hoodie.datasource.write.precombine.field': 'event_timestamp',
# 追記のみなのでインデックス不要
'hoodie.index.type': 'SIMPLE',
}
5. データレイクからのマイグレーション
# 既存 Parquet テーブルから Hudi への移行
# Step 1: Bulk Insert で初期ロード
existing_df = spark.read.parquet("s3://old-lake/orders/")
hudi_options = {
'hoodie.table.name': 'orders',
'hoodie.datasource.write.recordkey.field': 'order_id',
'hoodie.datasource.write.partitionpath.field': 'order_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'bulk_insert',
'hoodie.bulkinsert.shuffle.parallelism': '500',
'hoodie.bulkinsert.sort.mode': 'PARTITION_SORT',
}
existing_df.write.format("hudi") \
.options(**hudi_options) \
.mode("overwrite") \
.save("s3://new-lake/hudi/orders")
# Step 2: 以降は増分更新
# hoodie.datasource.write.operation=upsert に変更
24.2 ベストプラクティスまとめ
一般的なベストプラクティス:
テーブル設計:
□ 適切なレコードキーを選択(一意性が保証されるキー)
□ 適切なパーティションキーを選択(カーディナリティが高すぎない)
□ preCombineField にはタイムスタンプを使用
□ COW vs MOR はワークロードに応じて選択
インデックス:
□ 大規模テーブルには Bucket Index を検討
□ グローバルインデックスが必要な場合は Record-level Index
□ メタデータテーブルを有効化
テーブルサービス:
□ コンパクションは非同期実行を推奨
□ クリーニングポリシーを適切に設定
□ クラスタリングでクエリパターンに合わせた最適化
モニタリング:
□ メトリクスを有効化して監視
□ コミット時間のトレンドを追跡
□ 保留コンパクション数のアラート設定
運用:
□ Hudi CLI でテーブルの状態を定期確認
□ セーブポイントを重要な変更前に作成
□ アーカイブ設定でタイムラインの肥大化を防止
25. 最新機能と今後の方向性
25.1 Hudi 1.x の主要変更
Hudi 1.0 は大規模なリアーキテクチャを含むメジャーリリースである。
Hudi 1.x の主な新機能・変更:
1. Non-blocking Concurrency Control (ノンブロッキング同時実行制御)
- MVCC ベースの同時実行制御
- 読み取りが書き込みをブロックしない
2. Functional Index (機能インデックス)
- カスタム関数ベースのインデックス
- 例: UPPER(name), DATE_TRUNC('month', event_date)
3. Record-level Index の安定化
- メタデータテーブル内のレコードインデックス
- グローバルインデックスの高速化
4. Consistent Hashing Bucket Index
- バケット数の動的変更
- テーブル再作成なしでスケーリング
5. Log-based CDC
- 変更データキャプチャの改善
- 効率的な変更ストリーム
6. Table Format 互換性
- Apache XTable (旧 OneTable) との統合
- Iceberg/Delta Lake メタデータの自動生成
7. Secondary Index
- ベースインデックス以外のセカンダリインデックス
- クエリパフォーマンスの向上
8. Partial Updates
- レコードの一部フィールドのみ更新
- ネットワーク/ストレージ効率の向上
25.2 Apache XTable (旧 OneTable) との統合
Apache XTable (OneTable):
Hudi テーブル ─────> XTable ─────> Iceberg メタデータ
└────────> Delta Lake メタデータ
効果: 1つのテーブルを Hudi, Iceberg, Delta Lake の
すべてのフォーマットで読み取り可能
25.3 今後のロードマップ
Hudi の今後の方向性:
短期 (2025-2026):
- Hudi 1.x の安定化
- Functional Index の拡充
- Flink 統合の強化
- クラウドネイティブ最適化
中期 (2026-2027):
- サーバーレスコンパクション
- AI/ML ワークロード統合
- より高度な自動チューニング
- ストリーミングファースト設計の深化
長期ビジョン:
- ユニバーサルデータレイクハウスプラットフォーム
- リアルタイムデータメッシュへの対応
- エッジコンピューティングとの統合
26. まとめ
26.1 Apache Hudi の本質
Apache Hudi は、データレイクにデータベースの能力をもたらすプラットフォームである。単なるテーブルフォーマットを超え、以下の価値を提供する:
- トランザクショナルなデータレイク: ACID トランザクション、Upsert、Delete をデータレイク上で実現
- 増分処理: 変更分のみの効率的な処理で、コストとレイテンシを削減
- ストリーミングとバッチの統合: 同一テーブルに対してストリーミングとバッチの両方のワークロードを処理
- 自動化されたテーブル管理: コンパクション、クリーニング、クラスタリングの自動化
- 柔軟なクエリモデル: Snapshot、Incremental、Read Optimized の3種類のクエリ
26.2 選択指針
Apache Hudi を選ぶべき場合:
✓ CDC パイプラインが主要なユースケース
✓ 高頻度の Upsert が必要
✓ ストリーミング取り込みがメインのワークロード
✓ GDPR 対応でレコード削除が必須
✓ 増分 ETL で処理効率を最大化したい
✓ テーブルサービスの自動化が重要
Apache Hudi 以外を検討すべき場合:
△ 読み取り専用のワークロード(Iceberg が適している可能性)
△ Databricks エコシステムに閉じている(Delta Lake が自然な選択)
△ シンプルなアーキテクチャを重視(Iceberg が適している可能性)
26.3 学習リソース
公式リソース:
- Apache Hudi 公式サイト: https://hudi.apache.org/
- GitHub: https://github.com/apache/hudi
- Hudi RFC: https://github.com/apache/hudi/tree/master/rfc
- Hudi Blog: https://hudi.apache.org/blog
コミュニティ:
- Apache Hudi Slack: https://join.slack.com/t/apache-hudi
- Stack Overflow: apache-hudi タグ
- Hudi Mailing List: dev@hudi.apache.org
本稿は Apache Hudi の包括的な技術ガイドとして、アーキテクチャ、設定、運用のすべての側面を網羅した。データレイクハウスの構築にあたり、本ガイドが実践的な参考となれば幸いである。
最終更新: 2026年4月