Apache Iceberg
Apache Iceberg 包括的技術ガイド
本ドキュメントについて: Apache Iceberg のアーキテクチャ、機能、運用ベストプラクティスを包括的に解説する技術記事です。 データエンジニア、プラットフォームエンジニア、SRE を主な対象読者としています。 最終更新: 2026年4月
目次
- はじめに - テーブルフォーマットとは何か
- 歴史と進化
- アーキテクチャ概要
- テーブル仕様の詳細
- メタデータ構造
- タイムトラベルとスナップショット管理
- スキーマ進化
- パーティション進化
- ACIDトランザクションと同時実行制御
- ファイルフォーマット
- クエリエンジン統合
- カタログ
- メンテナンス操作
- 行レベル更新
- ブランチングとタグ付け
- 設定例
- パフォーマンス最適化
- セキュリティ
- Hudi・Delta Lake との比較
- ベストプラクティス
- 最新機能と今後の方向性
- まとめ
1. はじめに
1.1 テーブルフォーマットとは何か
データレイクにおける「テーブルフォーマット」とは、ファイルシステム上に保存された多数のデータファイルを、リレーショナルデータベースのテーブルのように扱うためのメタデータの仕組みです。
従来のデータレイクでは、Hive テーブルのようにディレクトリベースでデータを管理していました。この方式には以下の課題がありました:
従来の Hive テーブルの課題:
+------------------------------------------+
| 1. ディレクトリリスティングの遅延 |
| - S3 等のオブジェクトストレージでは |
| ディレクトリ一覧の取得が遅い |
| |
| 2. パーティション変更の困難さ |
| - パーティション構造を変えるには |
| 全データの書き換えが必要 |
| |
| 3. スキーマ進化の制限 |
| - カラムの追加・削除が安全にできない |
| |
| 4. ACID トランザクションの欠如 |
| - 同時書き込みでデータ破損のリスク |
| |
| 5. タイムトラベル不可 |
| - 過去の状態に遡ることができない |
+------------------------------------------+
オープンテーブルフォーマット(Open Table Format: OTF)は、これらの課題を解決するために生まれた新しいレイヤーです。現在、主に3つのオープンテーブルフォーマットが存在します:
| フォーマット | 起源 | 初回リリース | ライセンス |
|---|---|---|---|
| Apache Iceberg | Netflix | 2018年 | Apache 2.0 |
| Apache Hudi | Uber | 2017年 | Apache 2.0 |
| Delta Lake | Databricks | 2019年 | Apache 2.0 |
1.2 なぜ Apache Iceberg なのか
Apache Iceberg は、大規模データレイクのテーブルフォーマットとして、以下の特徴により注目を集めています:
エンジン非依存 (Engine-Agnostic)
- 特定のクエリエンジンに依存しない設計
- Spark, Trino, Flink, Hive, Dremio, Athena, BigQuery 等で利用可能
真のスキーマ進化
- カラムの追加・削除・リネーム・並び替えをデータ書き換えなしで実現
- カラムID ベースの追跡により、リネーム後も正しくデータを読める
隠蔽パーティショニング (Hidden Partitioning)
- ユーザーがクエリ時にパーティション構造を意識する必要がない
- パーティション進化によりデータ書き換えなしで構造変更が可能
ACID トランザクション
- スナップショットベースの楽観的同時実行制御
- 読み取り一貫性の保証
タイムトラベル
- 過去の任意のスナップショットにクエリ可能
- 監査やデバッグに有用
Apache Iceberg の位置づけ:
+-------------------+ +-------------------+ +-------------------+
| Apache Spark | | Trino | | Apache Flink |
+-------------------+ +-------------------+ +-------------------+
| | |
+-------------------------+-------------------------+
|
+---------------------+
| Apache Iceberg | <-- テーブルフォーマット層
| (Table Format) |
+---------------------+
|
+-------------------------+-------------------------+
| | |
+-------------------+ +-------------------+ +-------------------+
| Parquet | | ORC | | Avro |
+-------------------+ +-------------------+ +-------------------+
| | |
+-------------------------+-------------------------+
|
+-------------------------+-------------------------+
| | |
+-------------------+ +-------------------+ +-------------------+
| S3 | | HDFS | | GCS |
+-------------------+ +-------------------+ +-------------------+
2. 歴史と進化
2.1 Netflix での誕生
Apache Iceberg は、Netflix のデータエンジニアリングチームで Ryan Blue 氏を中心に開発されました。Netflix は当時、ペタバイト規模のデータレイクを運用しており、Hive テーブルフォーマットの限界に直面していました。
Netflix データレイクの課題 (2017年頃):
問題1: S3 リスティングの遅延
- 数千パーティションのテーブルでクエリ計画に数分
- S3 の eventual consistency による不整合
問題2: パーティション管理の破綻
- event_date=2017-01-01/event_hour=00/ 形式の硬直的な構造
- 細かすぎるパーティションで small files 問題
問題3: unsafe な操作
- ALTER TABLE でスキーマ変更時にデータ破損
- OVERWRITE 中の読み取りで不完全なデータを返す
2.2 Apache Software Foundation への寄贈
タイムライン:
2017年 Netflix 社内で開発開始
|
2018年 Apache Incubator プロジェクトとして提案
11月 Apache Incubator に正式受理
|
2020年 Apache Iceberg 0.9.0 リリース
05月 Apache トップレベルプロジェクト (TLP) に昇格
|
2021年 Iceberg 0.12.x: Row-level deletes サポート
|
2022年 Iceberg 0.14.x: REST Catalog の導入
Q3 Iceberg 1.0.0 リリース (テーブル仕様 v2 安定化)
|
2023年 Iceberg 1.3.x: Views サポート
Iceberg 1.4.x: パフォーマンス改善
|
2024年 Iceberg 1.5.x / 1.6.x: Variant 型等の新機能
Polaris Catalog のオープンソース化
|
2025年 Iceberg 1.7.x / 1.8.x: さらなる最適化
Table Format v3 の検討
|
2026年 エコシステムの成熟
クラウドネイティブ統合の深化
2.3 コミュニティとエコシステムの成長
Apache Iceberg は、以下の主要企業がコントリビュータとして参加するオープンソースプロジェクトに成長しました:
- Netflix: 創始者、継続的なコントリビューション
- Apple: 大規模な Iceberg テーブルの運用、REST Catalog の推進
- AWS: Athena, EMR, Glue での Iceberg サポート
- Google: BigQuery との統合
- Snowflake: Iceberg テーブルのネイティブサポート
- Databricks: Unity Catalog での Iceberg 互換性 (UniForm)
- Cloudera: CDP での Iceberg サポート
- Dremio: Nessie カタログと Iceberg の深い統合
- Tabular: Ryan Blue 氏が設立した Iceberg 専門企業 (2024年に Databricks が買収)
3. アーキテクチャ概要
3.1 全体構造
Apache Iceberg のアーキテクチャは、大きく3つのレイヤーで構成されます:
┌─────────────────────────────────────────────────────────────────┐
│ Iceberg カタログ │
│ (Hive Metastore / REST Catalog / AWS Glue / Nessie / etc.) │
│ │
│ テーブル名 → 現在のメタデータファイルのパスを管理 │
└────────────────────────────────┬─────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ メタデータレイヤー │
│ │
│ ┌──────────────┐ │
│ │ Metadata File │ ← テーブルスキーマ、パーティション仕様、 │
│ │ (JSON) │ スナップショット一覧、プロパティ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │Manifest List │ ← スナップショットごとのマニフェスト一覧 │
│ │ (Avro) │ 各マニフェストのパーティション範囲サマリ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │Manifest File │ ← データファイルのリスト │
│ │ (Avro) │ 各ファイルのカラム統計情報 │
│ └──────────────┘ │
│ │
└────────────────────────────────┬─────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ データレイヤー │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Parquet │ │ Parquet │ │ ORC │ │ Avro │ │
│ │ File 1 │ │ File 2 │ │ File 3 │ │ File 4 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ストレージ: S3, HDFS, GCS, Azure Blob Storage, MinIO │
└─────────────────────────────────────────────────────────────────┘
3.2 カタログ (Catalog)
カタログは Iceberg テーブルの「電話帳」です。テーブル名から現在のメタデータファイルの場所を解決する役割を持ちます。
カタログの役割:
クライアント: "db.orders テーブルを読みたい"
│
▼
┌──────────────────┐
│ カタログ │
│ │
│ db.orders → │
│ s3://bucket/ │──→ メタデータファイルへ
│ warehouse/db/ │
│ orders/metadata/ │
│ v42.metadata.json│
└──────────────────┘
カタログは原子的なメタデータポインタの更新を保証する必要があります。これにより、テーブルの状態が一貫性を保ちます。
3.3 メタデータファイル (Metadata File)
メタデータファイル(JSON形式)は、テーブルの完全な定義を含みます:
{
"format-version": 2,
"table-uuid": "bf289591-dcc0-4234-ad4f-5c3abb2f4848",
"location": "s3://my-bucket/warehouse/db/orders",
"last-sequence-number": 42,
"last-updated-ms": 1713456789000,
"last-column-id": 8,
"current-schema-id": 1,
"schemas": [
{
"schema-id": 0,
"type": "struct",
"fields": [
{"id": 1, "name": "order_id", "required": true, "type": "long"},
{"id": 2, "name": "customer_id", "required": true, "type": "long"},
{"id": 3, "name": "order_date", "required": true, "type": "date"},
{"id": 4, "name": "amount", "required": false, "type": "decimal(10,2)"}
]
},
{
"schema-id": 1,
"type": "struct",
"fields": [
{"id": 1, "name": "order_id", "required": true, "type": "long"},
{"id": 2, "name": "customer_id", "required": true, "type": "long"},
{"id": 3, "name": "order_date", "required": true, "type": "date"},
{"id": 4, "name": "amount", "required": false, "type": "decimal(10,2)"},
{"id": 5, "name": "status", "required": false, "type": "string"},
{"id": 6, "name": "region", "required": false, "type": "string"}
]
}
],
"current-snapshot-id": 1001,
"snapshots": [
{
"snapshot-id": 1000,
"timestamp-ms": 1713400000000,
"manifest-list": "s3://my-bucket/.../snap-1000-manifest-list.avro"
},
{
"snapshot-id": 1001,
"parent-snapshot-id": 1000,
"timestamp-ms": 1713456789000,
"manifest-list": "s3://my-bucket/.../snap-1001-manifest-list.avro",
"summary": {
"operation": "append",
"added-data-files": "5",
"added-records": "125000",
"total-records": "1250000",
"total-data-files": "50"
}
}
],
"default-spec-id": 1,
"partition-specs": [
{
"spec-id": 0,
"fields": [
{"source-id": 3, "field-id": 1000, "name": "order_date", "transform": "identity"}
]
},
{
"spec-id": 1,
"fields": [
{"source-id": 3, "field-id": 1001, "name": "order_month", "transform": "month"}
]
}
],
"default-sort-order-id": 0,
"sort-orders": [
{
"order-id": 0,
"fields": [
{"source-id": 3, "direction": "asc", "null-order": "nulls-first", "transform": "identity"}
]
}
],
"properties": {
"write.format.default": "parquet",
"write.parquet.compression-codec": "zstd"
}
}
3.4 マニフェストリスト (Manifest List)
マニフェストリストは Avro 形式のファイルで、スナップショットに含まれる全マニフェストファイルへの参照を保持します:
マニフェストリスト (snap-1001-manifest-list.avro):
┌─────────────────────────────────────────────────────────────────┐
│ Manifest Path │ Content │ Partitions Summary │
├─────────────────────────────┼─────────┼─────────────────────────┤
│ s3://.../manifest-a.avro │ DATA │ order_month: [2024-01, │
│ │ │ 2024-03] │
├─────────────────────────────┼─────────┼─────────────────────────┤
│ s3://.../manifest-b.avro │ DATA │ order_month: [2024-04, │
│ │ │ 2024-06] │
├─────────────────────────────┼─────────┼─────────────────────────┤
│ s3://.../manifest-c.avro │ DELETE │ order_month: [2024-01, │
│ │ │ 2024-02] │
└─────────────────────────────┴─────────┴─────────────────────────┘
パーティションサマリの重要性: クエリが order_month = '2024-05' の条件を持つ場合、マニフェストリストレベルで manifest-a と manifest-c をスキップできます。これにより、不要なマニフェストファイルの読み込みを回避できます。
3.5 マニフェストファイル (Manifest File)
マニフェストファイルは Avro 形式で、個々のデータファイルのメタデータを保持します:
マニフェストファイル (manifest-b.avro):
┌────────────────────────────────────────────────────────────────────┐
│ File Path │ Format │ Records │ Size │ Column Stats │
├────────────────────┼─────────┼─────────┼────────┼──────────────────┤
│ s3://.../data- │ PARQUET │ 25,000 │ 12 MB │ order_id: │
│ 001.parquet │ │ │ │ min=1, max=25000│
│ │ │ │ │ amount: │
│ │ │ │ │ min=1.0, │
│ │ │ │ │ max=9999.99 │
│ │ │ │ │ null_count=5 │
├────────────────────┼─────────┼─────────┼────────┼──────────────────┤
│ s3://.../data- │ PARQUET │ 25,000 │ 11 MB │ order_id: │
│ 002.parquet │ │ │ │ min=25001, │
│ │ │ │ │ max=50000 │
│ │ │ │ │ amount: │
│ │ │ │ │ min=0.5, │
│ │ │ │ │ max=8500.00 │
│ │ │ │ │ null_count=2 │
├────────────────────┼─────────┼─────────┼────────┼──────────────────┤
│ s3://.../data- │ PARQUET │ 25,000 │ 13 MB │ order_id: │
│ 003.parquet │ │ │ │ min=50001, │
│ │ │ │ │ max=75000 │
│ │ │ │ │ amount: │
│ │ │ │ │ min=2.0, │
│ │ │ │ │ max=15000.00 │
│ │ │ │ │ null_count=0 │
└────────────────────┴─────────┴─────────┴────────┴──────────────────┘
3.6 データファイル (Data Files)
データファイルは実際のレコードを格納するファイルです。Iceberg は以下のフォーマットをサポートします:
- Apache Parquet: 最も一般的。列指向フォーマットで分析ワークロードに最適
- Apache ORC: Hive エコシステムで広く使用される列指向フォーマット
- Apache Avro: 行指向フォーマット。ストリーミングワークロードに適する
データファイルの構造例 (Parquet):
data-001.parquet
┌─────────────────────────────┐
│ Row Group 1 │
│ ┌───────┬───────┬────────┐│
│ │order_ │custom │amount ││
│ │id │er_id │ ││
│ ├───────┼───────┼────────┤│
│ │ 1 │ 101 │ 29.99 ││
│ │ 2 │ 102 │ 149.50 ││
│ │ ... │ ... │ ... ││
│ └───────┴───────┴────────┘│
│ │
│ Row Group 2 │
│ ┌───────┬───────┬────────┐│
│ │order_ │custom │amount ││
│ │id │er_id │ ││
│ ├───────┼───────┼────────┤│
│ │ 5001 │ 501 │ 75.00 ││
│ │ 5002 │ 502 │ 200.00 ││
│ │ ... │ ... │ ... ││
│ └───────┴───────┴────────┘│
│ │
│ Footer (schema, stats) │
└─────────────────────────────┘
4. テーブル仕様の詳細
4.1 テーブル仕様バージョン
Iceberg には2つのテーブル仕様バージョンがあります:
| 仕様 | バージョン | 主な特徴 |
|---|---|---|
| v1 | Format Version 1 | 基本機能。append-only の操作。Position/Equality delete なし |
| v2 | Format Version 2 | Row-level delete サポート。Sequence number 導入。推奨 |
-- テーブル作成時にフォーマットバージョンを指定
CREATE TABLE catalog.db.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2),
status STRING
)
USING iceberg
TBLPROPERTIES (
'format-version' = '2'
);
4.2 スキーマ (Schema)
Iceberg のスキーマは、各カラムに一意の ID を割り当てます。これにより、カラム名の変更後もデータファイルとの対応関係が維持されます。
スキーマとカラム ID:
Schema ID: 0 Schema ID: 1 (進化後)
┌────┬─────────────┬──────────┐ ┌────┬─────────────┬──────────┐
│ ID │ Name │ Type │ │ ID │ Name │ Type │
├────┼─────────────┼──────────┤ ├────┼─────────────┼──────────┤
│ 1 │ order_id │ long │ │ 1 │ order_id │ long │
│ 2 │ customer_id │ long │ │ 2 │ customer_id │ long │
│ 3 │ order_date │ date │ │ 3 │ order_date │ date │
│ 4 │ amount │ decimal │ │ 4 │ total_amount│ decimal │ ← リネーム
│ │ │ │ │ 5 │ status │ string │ ← 追加
│ │ │ │ │ 6 │ region │ string │ ← 追加
└────┴─────────────┴──────────┘ └────┴─────────────┴──────────┘
カラム ID=4 は "amount" → "total_amount" にリネームされたが、
データファイル内の ID=4 のカラムは引き続き正しく読める
サポートされるデータ型:
| カテゴリ | 型 |
|---|---|
| プリミティブ | boolean, int, long, float, double, decimal(p,s), date, time, timestamp, timestamptz, string, uuid, fixed(n), binary |
| 複合型 | struct, list, map |
4.3 パーティション仕様 (Partition Spec)
Iceberg のパーティショニングは、ソースカラムに変換関数(Transform)を適用して定義します:
パーティション Transform 一覧:
┌──────────────┬───────────────────────────────────────────────┐
│ Transform │ 説明 │
├──────────────┼───────────────────────────────────────────────┤
│ identity │ 値をそのまま使用 │
│ bucket[N] │ ハッシュ値を N 個のバケットに分割 │
│ truncate[W] │ 値を幅 W で切り詰め │
│ year │ タイムスタンプから年を抽出 │
│ month │ タイムスタンプから年月を抽出 │
│ day │ タイムスタンプから年月日を抽出 │
│ hour │ タイムスタンプから年月日時を抽出 │
│ void │ パーティションなし (常に null) │
└──────────────┴───────────────────────────────────────────────┘
-- パーティション定義の例
CREATE TABLE catalog.db.events (
event_id BIGINT,
event_ts TIMESTAMP,
user_id BIGINT,
event_type STRING,
payload STRING
)
USING iceberg
PARTITIONED BY (
days(event_ts), -- 日単位パーティション
bucket(16, user_id) -- user_id を16バケットに分割
);
4.4 隠蔽パーティショニング (Hidden Partitioning)
Iceberg の最も重要な特徴の一つが隠蔽パーティショニングです。ユーザーはクエリ時にパーティション構造を意識する必要がありません:
Hive テーブル (従来):
-- ユーザーがパーティションカラムを明示的に指定する必要がある
SELECT * FROM events
WHERE event_date = '2024-05-15'; -- パーティションカラムを知っている必要がある
-- 以下のクエリはフルスキャンになってしまう!
SELECT * FROM events
WHERE event_ts = '2024-05-15 10:30:00'; -- event_ts と event_date の関係を知らない
Iceberg テーブル:
-- パーティション構造を意識せずにクエリできる
SELECT * FROM events
WHERE event_ts > '2024-05-15 00:00:00'
AND event_ts < '2024-05-16 00:00:00';
-- Iceberg が自動的に days(event_ts) パーティションを使ってプルーニング!
-- ユーザーはパーティションの存在すら知らなくてよい
隠蔽パーティショニングの仕組み:
クエリ条件: event_ts BETWEEN '2024-05-15' AND '2024-05-16'
│
▼
┌──────────────────────────────────────┐
│ Iceberg プランニング │
│ │
│ パーティション仕様: │
│ days(event_ts) │
│ │
│ 条件を変換: │
│ days(event_ts) IN [2024-05-15, │
│ 2024-05-16] │
│ │
│ → 該当パーティションのファイルのみ │
│ スキャン対象とする │
└──────────────────────────────────────┘
4.5 ソートオーダー (Sort Orders)
Iceberg テーブルにはデフォルトのソートオーダーを定義できます。書き込みエンジンはこのオーダーに従ってデータをソートします:
-- ソートオーダーの定義
ALTER TABLE catalog.db.events
WRITE ORDERED BY (event_ts ASC NULLS FIRST, user_id ASC);
-- ソートオーダー付きテーブル作成
CREATE TABLE catalog.db.events (
event_id BIGINT,
event_ts TIMESTAMP,
user_id BIGINT,
event_type STRING
)
USING iceberg
PARTITIONED BY (days(event_ts))
TBLPROPERTIES (
'write.distribution-mode' = 'hash',
'write.sort-order' = 'event_ts ASC NULLS FIRST, user_id ASC'
);
ソートの効果:
ソートなし: ソートあり:
┌────────────────────────┐ ┌────────────────────────┐
│ File 1 │ │ File 1 │
│ event_ts: [scattered] │ │ event_ts: [10:00-10:30]│
│ user_id: [scattered] │ │ user_id: [1-100] │
│ min/max が広い範囲 │ │ min/max が狭い範囲 │
│ → プルーニング効果低い │ │ → プルーニング効果高い │
└────────────────────────┘ └────────────────────────┘
5. メタデータ構造
5.1 スナップショットの全体像
Iceberg のメタデータは、不変(イミュータブル)なファイルのツリー構造で管理されます。各操作は新しいスナップショットを生成し、以前のスナップショットは変更されません。
スナップショットの時系列:
Snapshot 1000 Snapshot 1001 Snapshot 1002
(Initial Load) (Append) (Delete + Append)
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Manifest │ │ Manifest │ │ Manifest │
│ List A │ │ List B │ │ List C │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┬──────────┐ ┌──────────┬──────────┬──────────┐
│Manifest 1│ │Manifest 1│Manifest 2│ │Manifest 1│Manifest 2│Manifest 3│
│(既存) │ │(既存) │(新規) │ │(既存) │(新規) │(削除) │
└────┬─────┘ └────┬─────┴────┬─────┘ └────┬─────┴────┬─────┴────┬─────┘
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐┌─────────┐ ┌─────────┐┌─────────┐┌─────────┐
│Data │ │Data ││Data │ │Data ││Data ││Delete │
│Files │ │Files ││Files │ │Files ││Files ││Files │
│A1,A2,A3 │ │A1,A2,A3 ││B1,B2 │ │A1,A2,A3 ││C1,C2 ││D1 │
└─────────┘ └─────────┘└─────────┘ └─────────┘└─────────┘└─────────┘
重要: Manifest 1 は全スナップショットで再利用される (不変なので)
5.2 メタデータファイルの詳細構造
メタデータファイル詳細 (v42.metadata.json):
┌──────────────────────────────────────────────────────┐
│ Table Metadata │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ format-version: 2 │ │
│ │ table-uuid: "bf289591-..." │ │
│ │ location: "s3://bucket/warehouse/db/orders" │ │
│ │ last-sequence-number: 42 │ │
│ │ last-updated-ms: 1713456789000 │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Schemas (list) │ │
│ │ - Schema 0: {order_id, customer_id, ...} │ │
│ │ - Schema 1: {order_id, ..., status, region} │ │
│ │ current-schema-id: 1 │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Partition Specs (list) │ │
│ │ - Spec 0: [identity(order_date)] │ │
│ │ - Spec 1: [month(order_date)] │ │
│ │ default-spec-id: 1 │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Sort Orders (list) │ │
│ │ - Order 0: [order_date ASC] │ │
│ │ default-sort-order-id: 0 │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Snapshots (list) │ │
│ │ - Snap 1000: manifest-list → snap-1000.avro │ │
│ │ - Snap 1001: manifest-list → snap-1001.avro │ │
│ │ current-snapshot-id: 1001 │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Snapshot Log (変更履歴) │ │
│ │ - {ts: ..., snapshot-id: 1000} │ │
│ │ - {ts: ..., snapshot-id: 1001} │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Refs (ブランチ / タグ) │ │
│ │ - "main": {snapshot-id: 1001, type: branch} │ │
│ │ - "audit-2024": {snapshot-id: 1000, type: tag}│ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Properties (テーブルプロパティ) │ │
│ │ - write.format.default: parquet │ │
│ │ - write.parquet.compression-codec: zstd │ │
│ │ - commit.retry.num-retries: 4 │ │
│ └────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────┘
5.3 クエリ計画の流れ
Iceberg がクエリを計画する際のメタデータの読み取り順序を理解することは重要です:
クエリ: SELECT * FROM orders WHERE order_date = '2024-05-15' AND amount > 100
Step 1: カタログ
┌──────────────┐
│ catalog.db. │ → s3://.../v42.metadata.json
│ orders │
└──────┬───────┘
│
Step 2: メタデータファイル読み取り
┌──────┴───────────────────────────────────────┐
│ v42.metadata.json │
│ current-snapshot-id: 1001 │
│ → snap-1001-manifest-list.avro │
└──────┬───────────────────────────────────────┘
│
Step 3: マニフェストリスト読み取り + パーティションプルーニング
┌──────┴───────────────────────────────────────┐
│ snap-1001-manifest-list.avro │
│ │
│ Manifest A: order_month [2024-01, 2024-03] │
│ → order_date='2024-05-15' は範囲外 │
│ → スキップ! │
│ │
│ Manifest B: order_month [2024-04, 2024-06] │
│ → order_date='2024-05-15' は範囲内! │
│ → 読み取り対象 │
│ │
│ Manifest C: order_month [2024-07, 2024-09] │
│ → order_date='2024-05-15' は範囲外 │
│ → スキップ! │
└──────┬───────────────────────────────────────┘
│
Step 4: マニフェストファイル読み取り + ファイルプルーニング
┌──────┴───────────────────────────────────────┐
│ manifest-b.avro │
│ │
│ data-010.parquet: │
│ order_date: [2024-04-01, 2024-04-30] │
│ → 日付が範囲外 → スキップ! │
│ │
│ data-011.parquet: │
│ order_date: [2024-05-01, 2024-05-31] │
│ amount: min=10.00, max=5000.00 │
│ → 日付が範囲内、amount > 100 も可能 │
│ → 読み取り対象! │
│ │
│ data-012.parquet: │
│ order_date: [2024-05-01, 2024-05-31] │
│ amount: min=0.50, max=50.00 │
│ → amount max=50 < 100 │
│ → スキップ! (min/max フィルタリング) │
└──────┬───────────────────────────────────────┘
│
Step 5: データファイル読み取り
┌──────┴───────────────────────────────────────┐
│ data-011.parquet のみを読み取り │
│ (Parquet 内部の Row Group 統計でさらに │
│ プルーニング可能) │
└──────────────────────────────────────────────┘
6. タイムトラベルとスナップショット管理
6.1 タイムトラベルクエリ
Iceberg はスナップショットを保持しているため、過去の任意の時点のデータにアクセスできます:
-- スナップショット ID を指定してクエリ
SELECT * FROM catalog.db.orders VERSION AS OF 1000;
-- タイムスタンプを指定してクエリ
SELECT * FROM catalog.db.orders TIMESTAMP AS OF '2024-05-15 10:00:00';
-- Spark DataFrame API
val df = spark.read
.option("snapshot-id", 1000L)
.format("iceberg")
.load("catalog.db.orders")
val df2 = spark.read
.option("as-of-timestamp", "1713200000000")
.format("iceberg")
.load("catalog.db.orders")
6.2 スナップショット間の差分
-- 2つのスナップショット間の変更を確認 (Incremental Read)
-- Spark SQL
SELECT * FROM catalog.db.orders.changes
WHERE _change_type IN ('insert', 'delete', 'update_before', 'update_after');
-- Spark DataFrame API
val changes = spark.read
.format("iceberg")
.option("start-snapshot-id", 1000L)
.option("end-snapshot-id", 1001L)
.load("catalog.db.orders")
6.3 スナップショットの一覧確認
-- スナップショット一覧
SELECT * FROM catalog.db.orders.snapshots;
-- 結果:
-- +-------------+-------------------+-----------+----------+---------+
-- | committed_at| snapshot_id | parent_id | operation| summary |
-- +-------------+-------------------+-----------+----------+---------+
-- | 2024-05-01 | 1000 | null | append | {...} |
-- | 2024-05-15 | 1001 | 1000 | append | {...} |
-- | 2024-05-20 | 1002 | 1001 | overwrite| {...} |
-- +-------------+-------------------+-----------+----------+---------+
-- メタデータテーブル: 履歴
SELECT * FROM catalog.db.orders.history;
-- メタデータテーブル: マニフェスト
SELECT * FROM catalog.db.orders.manifests;
-- メタデータテーブル: データファイル
SELECT * FROM catalog.db.orders.files;
-- メタデータテーブル: パーティション
SELECT * FROM catalog.db.orders.partitions;
6.4 ロールバック
-- 特定のスナップショットにロールバック
CALL catalog.system.rollback_to_snapshot('db.orders', 1000);
-- 特定のタイムスタンプにロールバック
CALL catalog.system.rollback_to_timestamp('db.orders', TIMESTAMP '2024-05-01 00:00:00');
ロールバックの仕組み:
Before rollback:
current-snapshot-id: 1002
Snap 1000 → Snap 1001 → Snap 1002 (current)
After ROLLBACK TO SNAPSHOT 1000:
current-snapshot-id: 1000
Snap 1000 (current) → Snap 1001 → Snap 1002
※ スナップショット 1001, 1002 は削除されない
※ メタデータファイルが新しく作成され、
current-snapshot-id が 1000 に変更される
※ expire_snapshots で後からクリーンアップ可能
7. スキーマ進化
7.1 スキーマ進化の基本原則
Iceberg のスキーマ進化は以下の原則に基づいています:
- カラムID ベース: 各カラムは一意の整数 ID を持つ
- メタデータのみの変更: データファイルの書き換えは不要
- 後方互換性: 古いデータファイルは新しいスキーマで正しく読める
- 完全な進化: 追加、削除、リネーム、並び替え、型昇格をサポート
7.2 カラムの追加
-- カラムの追加
ALTER TABLE catalog.db.orders ADD COLUMNS (
status STRING COMMENT 'Order status',
region STRING COMMENT 'Geographic region'
);
-- ネストされた構造体へのカラム追加
ALTER TABLE catalog.db.orders ADD COLUMNS (
shipping STRUCT<
address: STRING,
city: STRING,
zip_code: STRING
>
);
-- 既存の構造体にフィールド追加
ALTER TABLE catalog.db.orders ADD COLUMNS (
shipping.country STRING AFTER shipping.zip_code
);
カラム追加の内部動作:
古いデータファイル (Schema 0):
┌──────────┬─────────────┬──────────┐
│ order_id │ customer_id │ amount │
│ (id=1) │ (id=2) │ (id=4) │
├──────────┼─────────────┼──────────┤
│ 1 │ 101 │ 29.99 │
│ 2 │ 102 │ 149.50 │
└──────────┴─────────────┴──────────┘
新しいスキーマ (Schema 1) で読み取り:
┌──────────┬─────────────┬──────────┬────────┬────────┐
│ order_id │ customer_id │ amount │ status │ region │
│ (id=1) │ (id=2) │ (id=4) │ (id=5) │ (id=6) │
├──────────┼─────────────┼──────────┼────────┼────────┤
│ 1 │ 101 │ 29.99 │ null │ null │
│ 2 │ 102 │ 149.50 │ null │ null │
└──────────┴─────────────┴──────────┴────────┴────────┘
※ 古いデータファイルでは status, region は null として返される
7.3 カラムの削除
-- カラムの削除
ALTER TABLE catalog.db.orders DROP COLUMN region;
-- ネストされたフィールドの削除
ALTER TABLE catalog.db.orders DROP COLUMN shipping.zip_code;
重要: カラムを削除しても、既存のデータファイルは変更されません。削除されたカラムのデータはファイルに残りますが、読み取り時に無視されます。
7.4 カラムのリネーム
-- カラムのリネーム
ALTER TABLE catalog.db.orders RENAME COLUMN amount TO total_amount;
-- ネストされたフィールドのリネーム
ALTER TABLE catalog.db.orders RENAME COLUMN shipping.address TO shipping.street_address;
リネームの内部動作:
データファイル内: スキーマ定義:
カラム ID=4 のデータ ←→ ID=4, name="total_amount"
(ファイル内の名前は (メタデータ上の名前が変更)
変更不要)
7.5 カラムの並び替え
-- カラムの並び替え
ALTER TABLE catalog.db.orders ALTER COLUMN status FIRST;
ALTER TABLE catalog.db.orders ALTER COLUMN region AFTER customer_id;
7.6 型昇格 (Type Promotion)
Iceberg は安全な型昇格をサポートします:
サポートされる型昇格:
int → long
float → double
decimal(P, S) → decimal(P', S) (P' > P, 精度の拡大)
-- 型昇格
ALTER TABLE catalog.db.orders ALTER COLUMN order_id TYPE bigint;
-- decimal の精度拡大
ALTER TABLE catalog.db.orders ALTER COLUMN amount TYPE decimal(12,2);
7.7 Java / Python API によるスキーマ進化
// Java API
Table table = catalog.loadTable(TableIdentifier.of("db", "orders"));
table.updateSchema()
.addColumn("status", Types.StringType.get(), "Order status")
.addColumn("region", Types.StringType.get(), "Geographic region")
.renameColumn("amount", "total_amount")
.moveAfter("region", "customer_id")
.commit();
# Python (PyIceberg) API
from pyiceberg.catalog import load_catalog
catalog = load_catalog("my_catalog")
table = catalog.load_table("db.orders")
with table.update_schema() as update:
update.add_column("status", StringType(), doc="Order status")
update.add_column("region", StringType(), doc="Geographic region")
update.rename_column("amount", "total_amount")
8. パーティション進化
8.1 パーティション進化とは
Iceberg の最も革新的な機能の一つが、データの書き換えなしにパーティション構造を変更できる「パーティション進化」です。
従来の Hive テーブル:
パーティション変更 = 全データの書き換え (非常にコストが高い)
例: daily → monthly に変更
BEFORE:
/events/date=2024-01-01/
/events/date=2024-01-02/
...
/events/date=2024-12-31/ (365個のパーティション)
→ 全365パーティションのデータを読み取り、
12個の月次パーティションに書き直す必要がある
Iceberg テーブル:
パーティション変更 = メタデータの更新のみ (即座に完了)
8.2 パーティション進化の実行
-- 初期パーティション: 日単位
CREATE TABLE catalog.db.events (
event_id BIGINT,
event_ts TIMESTAMP,
user_id BIGINT,
event_type STRING
)
USING iceberg
PARTITIONED BY (days(event_ts));
-- データ量が増えたので月単位に変更
ALTER TABLE catalog.db.events REPLACE PARTITION FIELD days(event_ts) WITH months(event_ts);
-- または、パーティションフィールドの追加
ALTER TABLE catalog.db.events ADD PARTITION FIELD bucket(16, user_id);
-- パーティションフィールドの削除
ALTER TABLE catalog.db.events DROP PARTITION FIELD bucket(16, user_id);
8.3 パーティション進化の内部動作
パーティション進化の内部動作:
Time ──────────────────────────────────────────────────→
Spec 0: days(event_ts) Spec 1: months(event_ts)
[データファイル群 A] [データファイル群 B]
┌─────────────────────┐ ┌─────────────────────┐
│ Partition: │ │ Partition: │
│ event_ts_day=2024- │ │ event_ts_month= │
│ 05-01 │ │ 2024-07 │
│ ┌────────┐ │ │ ┌────────┐ │
│ │file1.pq│ │ │ │file5.pq│ │
│ └────────┘ │ │ └────────┘ │
│ ┌────────┐ │ │ ┌────────┐ │
│ │file2.pq│ │ │ │file6.pq│ │
│ └────────┘ │ │ └────────┘ │
└─────────────────────┘ └─────────────────────┘
※ 古いデータファイル (Spec 0) はそのまま残る
※ 新しいデータ (Spec 1) は新しいパーティション仕様で書き込まれる
※ クエリ時は両方のパーティション仕様を考慮して
プルーニングが行われる
8.4 Void Transform による非パーティション化
-- パーティションを完全に無効化
ALTER TABLE catalog.db.events REPLACE PARTITION FIELD months(event_ts) WITH void(event_ts);
-- これにより、新しいデータはパーティションなしで書き込まれる
-- 古いデータは以前のパーティション構造のまま残る
9. ACID トランザクションと同時実行制御
9.1 楽観的同時実行制御 (Optimistic Concurrency Control)
Iceberg は楽観的同時実行制御を採用しています。複数のライターが同時にテーブルに書き込む場合、コミット時に競合チェックが行われます。
楽観的同時実行制御の流れ:
Writer A Writer B
│ │
│ Read metadata v5 │ Read metadata v5
│ (base snapshot: 100) │ (base snapshot: 100)
│ │
│ Write data files │ Write data files
│ Create new manifest │ Create new manifest
│ │
│ Commit: v5 → v6 │
│ (snapshot 100 → 101) │
│ ✓ Success │
│ │
│ │ Commit: v5 → v6
│ │ ✗ Conflict! (v5 は既に v6)
│ │
│ │ Retry:
│ │ Read metadata v6
│ │ Validate no conflict
│ │ Commit: v6 → v7
│ │ (snapshot 101 → 102)
│ │ ✓ Success
9.2 コミットの原子性
カタログの実装によって原子的なコミットの方法が異なります:
カタログ別のコミット方式:
┌──────────────────┬────────────────────────────────────┐
│ カタログ │ 原子性の保証方法 │
├──────────────────┼────────────────────────────────────┤
│ Hive Metastore │ メタストアのロック機構を使用 │
│ │ (LOCK TABLE) │
├──────────────────┼────────────────────────────────────┤
│ Hadoop Catalog │ ファイルシステムの rename 操作 │
│ │ (atomic rename) │
├──────────────────┼────────────────────────────────────┤
│ REST Catalog │ サーバーサイドの CAS (Compare-And- │
│ │ Swap) 操作 │
├──────────────────┼────────────────────────────────────┤
│ AWS Glue │ Glue API の条件付き更新 │
│ │ (updateTable with VersionId) │
├──────────────────┼────────────────────────────────────┤
│ Nessie │ Git-like な CAS 操作 │
│ │ (ブランチの参照更新) │
└──────────────────┴────────────────────────────────────┘
9.3 競合解決 (Conflict Resolution)
Iceberg は操作の種類に応じて自動的な競合解決を試みます:
競合解決マトリクス:
後のコミット
Append | Overwrite | Delete
先の Append | OK | OK | OK
コミット Overwrite| OK(*) | CONFLICT | CONFLICT
Delete | OK | CONFLICT | CONFLICT
OK: 自動的に解決可能 (リトライで成功)
OK(*): 書き込み先のパーティションが異なれば解決可能
CONFLICT: 手動介入が必要な場合がある
9.4 分離レベル
Iceberg の分離レベル:
Snapshot Isolation (スナップショット分離):
┌─────────────────────────────────────────────────┐
│ - 読み取りは常にコミット済みスナップショットを参照 │
│ - 書き込み中の未コミットデータは見えない │
│ - 読み取りが書き込みをブロックしない │
│ - 書き込みが読み取りをブロックしない │
└─────────────────────────────────────────────────┘
Serializable (直列化可能):
┌─────────────────────────────────────────────────┐
│ - MERGE INTO 等の read-modify-write 操作で │
│ Serializable 分離レベルが適用される │
│ - 読み取った行が他のトランザクションで変更された │
│ 場合、コミットが拒否される │
└─────────────────────────────────────────────────┘
9.5 リトライ設定
// コミットリトライの設定
table.updateProperties()
.set("commit.retry.num-retries", "4")
.set("commit.retry.min-wait-ms", "100")
.set("commit.retry.max-wait-ms", "60000")
.set("commit.retry.total-timeout-ms", "1800000")
.commit();
-- テーブルプロパティで設定
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
'commit.retry.num-retries' = '10',
'commit.retry.min-wait-ms' = '100',
'commit.retry.max-wait-ms' = '60000'
);
10. ファイルフォーマット
10.1 Apache Parquet
Parquet は Iceberg で最も広く使用されるファイルフォーマットです。列指向のストレージで、分析ワークロードに最適化されています。
Parquet ファイルの内部構造:
┌─────────────────────────────────┐
│ Parquet File │
│ │
│ ┌───────────────────────┐ │
│ │ Row Group 1 │ │
│ │ ┌─────┬─────┬─────┐ │ │
│ │ │Col A│Col B│Col C│ │ │
│ │ │Page │Page │Page │ │ │
│ │ │ 1 │ 1 │ 1 │ │ │
│ │ └─────┴─────┴─────┘ │ │
│ │ ┌─────┬─────┬─────┐ │ │
│ │ │Col A│Col B│Col C│ │ │
│ │ │Page │Page │Page │ │ │
│ │ │ 2 │ 2 │ 2 │ │ │
│ │ └─────┴─────┴─────┘ │ │
│ └───────────────────────┘ │
│ │
│ ┌───────────────────────┐ │
│ │ Row Group 2 │ │
│ │ ... │ │
│ └───────────────────────┘ │
│ │
│ ┌───────────────────────────┐ │
│ │ Footer │ │
│ │ - Schema │ │
│ │ - Row Group metadata │ │
│ │ - Column min/max stats │ │
│ │ - Bloom filters │ │
│ └───────────────────────────┘ │
└─────────────────────────────────┘
-- Parquet フォーマットの設定
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd',
'write.parquet.compression-level' = '3',
'write.parquet.row-group-size-bytes' = '134217728', -- 128MB
'write.parquet.page-size-bytes' = '1048576', -- 1MB
'write.parquet.dict-size-bytes' = '2097152', -- 2MB
'write.parquet.bloom-filter-enabled.column.user_id' = 'true',
'write.parquet.bloom-filter-max-bytes' = '1048576'
);
10.2 Apache ORC
-- ORC フォーマットの設定
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
'write.format.default' = 'orc',
'write.orc.compression-codec' = 'zstd',
'write.orc.stripe-size-bytes' = '67108864', -- 64MB
'write.orc.block-size-bytes' = '268435456', -- 256MB
'write.orc.bloom.filter.columns' = 'user_id,order_id'
);
10.3 Apache Avro
-- Avro フォーマットの設定 (主にストリーミング用途)
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
'write.format.default' = 'avro',
'write.avro.compression-codec' = 'snappy'
);
10.4 フォーマット比較
┌───────────────┬──────────┬──────────┬──────────┐
│ 特性 │ Parquet │ ORC │ Avro │
├───────────────┼──────────┼──────────┼──────────┤
│ ストレージ型 │ 列指向 │ 列指向 │ 行指向 │
│ 圧縮効率 │ 高い │ 高い │ 中程度 │
│ 読み取り性能 │ 優秀 │ 優秀 │ 良い │
│ 書き込み性能 │ 良い │ 良い │ 優秀 │
│ スキーマ進化 │ サポート │ サポート │ サポート │
│ 主な用途 │ 分析 │ 分析 │ CDC/ │
│ │ │ │ ストリーム│
│ 推奨度 │ ★★★★★ │ ★★★★☆ │ ★★★☆☆ │
└───────────────┴──────────┴──────────┴──────────┘
11. クエリエンジン統合
11.1 Apache Spark
Spark は Iceberg の最もフル機能なエンジン統合を提供します:
// Spark セッションの設定
val spark = SparkSession.builder()
.appName("IcebergApp")
.config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.my_catalog.type", "rest")
.config("spark.sql.catalog.my_catalog.uri", "http://localhost:8181")
.config("spark.sql.catalog.my_catalog.warehouse", "s3://my-bucket/warehouse")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
-- Spark SQL での Iceberg 操作
USE my_catalog;
-- テーブル作成
CREATE TABLE db.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2),
status STRING
)
USING iceberg
PARTITIONED BY (months(order_date))
TBLPROPERTIES (
'format-version' = '2',
'write.format.default' = 'parquet'
);
-- データ挿入
INSERT INTO db.orders VALUES
(1, 101, DATE '2024-05-15', 29.99, 'completed'),
(2, 102, DATE '2024-05-16', 149.50, 'pending');
-- MERGE INTO (Upsert)
MERGE INTO db.orders t
USING updates s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- DELETE
DELETE FROM db.orders WHERE status = 'cancelled';
-- UPDATE
UPDATE db.orders SET status = 'shipped' WHERE order_id = 1;
11.2 Trino (旧 Presto)
-- Trino の Iceberg コネクタ設定 (catalog properties file)
-- etc/catalog/iceberg.properties:
-- connector.name=iceberg
-- iceberg.catalog.type=rest
-- iceberg.rest-catalog.uri=http://localhost:8181
-- iceberg.rest-catalog.warehouse=s3://my-bucket/warehouse
-- Trino SQL
USE iceberg.db;
-- テーブル作成
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2),
status VARCHAR
)
WITH (
format = 'PARQUET',
partitioning = ARRAY['month(order_date)'],
format_version = 2
);
-- クエリ
SELECT * FROM orders WHERE order_date = DATE '2024-05-15';
-- タイムトラベル
SELECT * FROM orders FOR VERSION AS OF 1000;
SELECT * FROM orders FOR TIMESTAMP AS OF TIMESTAMP '2024-05-15 10:00:00 UTC';
-- メタデータテーブル
SELECT * FROM "orders$snapshots";
SELECT * FROM "orders$manifests";
SELECT * FROM "orders$files";
SELECT * FROM "orders$history";
SELECT * FROM "orders$partitions";
11.3 Apache Flink
// Flink Table API での Iceberg 設定
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE CATALOG iceberg_catalog WITH (" +
"'type' = 'iceberg'," +
"'catalog-type' = 'rest'," +
"'uri' = 'http://localhost:8181'," +
"'warehouse' = 's3://my-bucket/warehouse'" +
")");
tEnv.useCatalog("iceberg_catalog");
-- Flink SQL
USE CATALOG iceberg_catalog;
USE db;
-- ストリーミング書き込み
INSERT INTO orders
SELECT * FROM kafka_source;
-- ストリーミング読み取り (Incremental)
SELECT * FROM orders /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s') */;
Flink ストリーミング書き込みの動作:
Kafka Source → Flink Job → Iceberg Sink
│ │ │
│ │ ▼
│ │ ┌──────────────┐
│ │ │ Checkpoint 1 │
│ │ │ → Snapshot N │
│ │ └──────────────┘
│ │ │
│ │ ▼
│ │ ┌──────────────┐
│ │ │ Checkpoint 2 │
│ │ │ → Snapshot N+1│
│ │ └──────────────┘
Flink のチェックポイント = Iceberg のコミット
→ Exactly-once セマンティクスを保証
11.4 その他のエンジン
エンジン別サポート状況:
┌──────────────┬────────┬────────┬────────┬────────┬────────┐
│ 機能 │ Spark │ Trino │ Flink │ Hive │ Dremio │
├──────────────┼────────┼────────┼────────┼────────┼────────┤
│ Read │ ✓ │ ✓ │ ✓ │ ✓ │ ✓ │
│ Write/Append │ ✓ │ ✓ │ ✓ │ ✓ │ ✓ │
│ Overwrite │ ✓ │ ✓ │ ✓ │ ✓ │ ✓ │
│ MERGE INTO │ ✓ │ ✓ │ ✓(*) │ ✗ │ ✓ │
│ DELETE │ ✓ │ ✓ │ ✗ │ ✗ │ ✓ │
│ UPDATE │ ✓ │ ✓ │ ✗ │ ✗ │ ✓ │
│ Time Travel │ ✓ │ ✓ │ ✗ │ ✓ │ ✓ │
│ Streaming │ ✓ │ ✗ │ ✓ │ ✗ │ ✗ │
│ Schema Evol. │ ✓ │ ✓ │ ✓ │ ✓ │ ✓ │
│ Partition Ev.│ ✓ │ ✓ │ ✗ │ ✗ │ ✓ │
│ Branching │ ✓ │ ✓ │ ✗ │ ✗ │ ✓ │
└──────────────┴────────┴────────┴────────┴────────┴────────┘
(*) Flink の MERGE INTO は一部制限あり
クラウドサービス:
┌──────────────┬──────────────────────────────────────────┐
│ AWS Athena │ Iceberg テーブルのネイティブサポート │
│ AWS EMR │ Spark/Flink/Trino で Iceberg をサポート │
│ BigQuery │ BigLake 経由で Iceberg テーブルを読み取り│
│ Snowflake │ Iceberg テーブル (外部テーブル) サポート │
│ Databricks │ UniForm で Iceberg 互換性を提供 │
└──────────────┴──────────────────────────────────────────┘
12. カタログ
12.1 カタログの概要
カタログは Iceberg テーブルの名前空間管理とメタデータポインタの解決を行う中核コンポーネントです。
カタログの責務:
1. テーブルの名前解決
"db.orders" → s3://bucket/.../metadata/v42.metadata.json
2. 原子的なメタデータ更新
v42.metadata.json → v43.metadata.json (CAS操作)
3. 名前空間の管理
CREATE NAMESPACE, DROP NAMESPACE, LIST NAMESPACES
4. テーブルのライフサイクル管理
CREATE TABLE, DROP TABLE, RENAME TABLE
12.2 Hive Metastore カタログ
# Spark 設定
spark.sql.catalog.hive_catalog=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_catalog.type=hive
spark.sql.catalog.hive_catalog.uri=thrift://hive-metastore:9083
spark.sql.catalog.hive_catalog.warehouse=s3://my-bucket/warehouse
Hive Metastore の動作:
┌────────────────────┐
│ Hive Metastore │
│ │
│ TABLE: db.orders │
│ LOCATION: s3://... │
│ PARAMETERS: │────→ metadata_location = s3://.../v42.metadata.json
│ metadata_location│
└────────────────────┘
コミット時:
1. 新しい metadata ファイルを書き込み (v43.metadata.json)
2. Hive Metastore の metadata_location を v43 に更新 (ロック使用)
12.3 REST カタログ
REST カタログは Apache Iceberg コミュニティが推奨する標準的なカタログインターフェースです:
REST Catalog API:
GET /v1/config # カタログ設定
GET /v1/namespaces # 名前空間一覧
POST /v1/namespaces # 名前空間作成
GET /v1/namespaces/{ns} # 名前空間詳細
DELETE /v1/namespaces/{ns} # 名前空間削除
GET /v1/namespaces/{ns}/tables # テーブル一覧
POST /v1/namespaces/{ns}/tables # テーブル作成
GET /v1/namespaces/{ns}/tables/{table} # テーブル詳細
POST /v1/namespaces/{ns}/tables/{table} # テーブル更新 (コミット)
DELETE /v1/namespaces/{ns}/tables/{table} # テーブル削除
POST /v1/namespaces/{ns}/tables/{table}/metrics # メトリクス送信
# Spark 設定 (REST Catalog)
spark.sql.catalog.rest_catalog=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.rest_catalog.type=rest
spark.sql.catalog.rest_catalog.uri=http://catalog-server:8181
spark.sql.catalog.rest_catalog.warehouse=s3://my-bucket/warehouse
spark.sql.catalog.rest_catalog.credential=client-id:client-secret
spark.sql.catalog.rest_catalog.token=<oauth-token>
12.4 AWS Glue カタログ
# Spark 設定 (AWS Glue)
spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
spark.sql.catalog.glue_catalog.warehouse=s3://my-bucket/warehouse
spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
12.5 Nessie カタログ
Nessie は Git-like なバージョン管理をカタログレベルで提供します:
# Spark 設定 (Nessie)
spark.sql.catalog.nessie_catalog=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.nessie_catalog.catalog-impl=org.apache.iceberg.nessie.NessieCatalog
spark.sql.catalog.nessie_catalog.uri=http://nessie-server:19120/api/v1
spark.sql.catalog.nessie_catalog.ref=main
spark.sql.catalog.nessie_catalog.warehouse=s3://my-bucket/warehouse
Nessie のブランチ管理:
main ───●───●───●───●───●───●───●
│ ▲
│ │ merge
▼ │
dev ───────●───●───●───●──────┘
│
▼
feature ────────●───●
複数テーブルにまたがるトランザクション:
- dev ブランチで orders と shipments を同時に更新
- テスト完了後に main にマージ
- 不整合のリスクなし
12.6 Polaris Catalog
Polaris は Snowflake がオープンソース化した Iceberg REST カタログの実装です:
# Polaris の設定例 (docker-compose.yml)
services:
polaris:
image: polarisio/polaris:latest
ports:
- "8181:8181"
environment:
- POLARIS_PERSISTENCE=in-memory # 本番では external-db を使用
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=us-east-1
カタログ選択ガイド:
┌──────────────────┬─────────────────────────────────────┐
│ カタログ │ 適したユースケース │
├──────────────────┼─────────────────────────────────────┤
│ Hive Metastore │ 既存の Hive エコシステムとの統合 │
│ │ 小〜中規模環境 │
├──────────────────┼─────────────────────────────────────┤
│ REST Catalog │ マルチエンジン環境 │
│ │ カスタム認証・認可が必要な場合 │
│ │ 推奨される標準選択 │
├──────────────────┼─────────────────────────────────────┤
│ AWS Glue │ AWS ネイティブ環境 │
│ │ Athena, EMR との統合 │
├──────────────────┼─────────────────────────────────────┤
│ Nessie │ Git-like バージョン管理が必要 │
│ │ マルチテーブルトランザクション │
├──────────────────┼─────────────────────────────────────┤
│ Polaris │ Snowflake 連携 │
│ │ OSS REST Catalog が必要な場合 │
├──────────────────┼─────────────────────────────────────┤
│ Unity Catalog │ Databricks エコシステム │
│ │ UniForm (Delta + Iceberg) │
└──────────────────┴─────────────────────────────────────┘
13. メンテナンス操作
13.1 メンテナンスの重要性
Iceberg テーブルは時間の経過とともにメタデータやデータファイルが蓄積されます。定期的なメンテナンスは、パフォーマンスとストレージコストの最適化に不可欠です。
メンテナンスが必要な理由:
Time ──────────────────────────────────────────────→
スナップショット蓄積:
S1 → S2 → S3 → S4 → S5 → ... → S1000
問題:
1. メタデータファイルのサイズ増大 (全スナップショットを参照)
2. 不要なデータファイルがストレージを占有
3. マニフェストファイルの断片化
4. 小さなデータファイルの蓄積 (small files problem)
13.2 スナップショットの期限切れ (Expire Snapshots)
-- Spark SQL でスナップショットを期限切れにする
CALL catalog.system.expire_snapshots(
table => 'db.orders',
older_than => TIMESTAMP '2024-04-01 00:00:00',
retain_last => 10,
max_concurrent_deletes => 100,
stream_results => true
);
// Java API
Table table = catalog.loadTable(TableIdentifier.of("db", "orders"));
table.expireSnapshots()
.expireOlderThan(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7))
.retainLast(10)
.commit();
# PyIceberg API
from pyiceberg.catalog import load_catalog
import time
catalog = load_catalog("my_catalog")
table = catalog.load_table("db.orders")
# 7日より古いスナップショットを期限切れ
seven_days_ago_ms = int((time.time() - 7 * 24 * 3600) * 1000)
table.manage_snapshots().expire_snapshots_older_than(seven_days_ago_ms).commit()
スナップショット期限切れの動作:
Before:
S1 → S2 → S3 → S4 → S5 (current)
expire_snapshots(retain_last=2):
S1, S2, S3 が削除対象
┌────────────────────────────────────────────┐
│ 1. S1, S2, S3 のみで参照されるデータファイル │
│ を特定して削除 │
│ 2. S1, S2, S3 のマニフェストリストを削除 │
│ 3. 不要になったマニフェストファイルを削除 │
│ 4. メタデータファイルからスナップショット │
│ エントリを削除 │
└────────────────────────────────────────────┘
After:
S4 → S5 (current)
※ S4, S5 が参照するデータファイルは保持される
13.3 孤立ファイルの削除 (Remove Orphan Files)
孤立ファイルは、コミットに失敗した書き込みやその他の理由で、どのスナップショットからも参照されなくなったファイルです。
-- 孤立ファイルの削除
CALL catalog.system.remove_orphan_files(
table => 'db.orders',
older_than => TIMESTAMP '2024-05-01 00:00:00',
dry_run => true -- まずドライランで確認
);
-- 実際に削除
CALL catalog.system.remove_orphan_files(
table => 'db.orders',
older_than => TIMESTAMP '2024-05-01 00:00:00',
dry_run => false,
max_concurrent_deletes => 100
);
孤立ファイルが生まれる状況:
Writer A:
1. データファイルを書き込み (data-orphan.parquet)
2. マニフェストファイルを作成
3. コミット → 失敗! (競合)
4. リトライ → 新しいデータファイルで成功
結果: data-orphan.parquet がどのスナップショットからも
参照されない「孤立ファイル」になる
13.4 データファイルの書き換え (Rewrite Data Files / Compaction)
小さなファイルの統合(コンパクション)やソート順の最適化のために、データファイルを書き換えます。
-- データファイルの書き換え (コンパクション)
CALL catalog.system.rewrite_data_files(
table => 'db.orders',
strategy => 'binpack',
options => map(
'target-file-size-bytes', '536870912', -- 512MB
'min-file-size-bytes', '67108864', -- 64MB (最小)
'max-file-size-bytes', '1073741824', -- 1GB (最大)
'min-input-files', '5', -- 最低5ファイルから統合
'max-concurrent-file-group-rewrites', '100',
'partial-progress.enabled', 'true',
'partial-progress.max-commits', '10'
)
);
-- ソート順を適用した書き換え
CALL catalog.system.rewrite_data_files(
table => 'db.orders',
strategy => 'sort',
sort_order => 'order_date ASC NULLS FIRST, customer_id ASC',
options => map(
'target-file-size-bytes', '536870912',
'min-input-files', '5'
)
);
-- Z-Order ソートによる書き換え (複数カラムの範囲クエリに最適)
CALL catalog.system.rewrite_data_files(
table => 'db.orders',
strategy => 'sort',
sort_order => 'zorder(order_date, customer_id)',
options => map(
'target-file-size-bytes', '536870912'
)
);
コンパクション (Bin-Pack) の動作:
Before:
┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐
│2MB │ │5MB │ │1MB │ │3MB │ │8MB │ │2MB │ │4MB │ │1MB │
└────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘
8 files, total ~26MB
After (target=128MB):
┌──────────────────────────────────────────┐
│ 26MB │
└──────────────────────────────────────────┘
1 file, total ~26MB
Sort の動作:
Before (unsorted):
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ date: mixed │ │ date: mixed │ │ date: mixed │
│ min/max wide │ │ min/max wide │ │ min/max wide │
└──────────────┘ └──────────────┘ └──────────────┘
After (sorted by date):
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ date: Jan │ │ date: Feb │ │ date: Mar │
│ min/max tight│ │ min/max tight│ │ min/max tight│
└──────────────┘ └──────────────┘ └──────────────┘
→ min/max フィルタリングの効果が大幅に向上
13.5 マニフェストの書き換え (Rewrite Manifests)
-- マニフェストの書き換え
CALL catalog.system.rewrite_manifests(
table => 'db.orders',
use_caching => true
);
マニフェスト書き換えの効果:
Before:
┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
│ Manifest 1 │ │ Manifest 2 │ │ Manifest 3 │ │ Manifest 4 │
│ 10 files │ │ 3 files │ │ 2 files │ │ 1 file │
│ Mixed │ │ Mixed │ │ Mixed │ │ 2024-05 │
│ partitions │ │ partitions │ │ partitions │ │ only │
└────────────┘ └────────────┘ └────────────┘ └────────────┘
4 manifests, パーティション境界が曖昧
After:
┌──────────────────┐ ┌──────────────────┐
│ Manifest A │ │ Manifest B │
│ 8 files │ │ 8 files │
│ 2024-01 ~ 03 │ │ 2024-04 ~ 06 │
│ パーティション │ │ パーティション │
│ 境界が明確 │ │ 境界が明確 │
└──────────────────┘ └──────────────────┘
2 manifests, パーティション境界が明確
→ マニフェストレベルのプルーニング効果向上
13.6 メンテナンス自動化の例
# Airflow DAG によるメンテナンス自動化
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-platform',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'iceberg_maintenance',
default_args=default_args,
description='Iceberg table maintenance',
schedule_interval='0 2 * * *', # 毎日 2:00 AM
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
expire_snapshots = SparkSubmitOperator(
task_id='expire_snapshots',
application='s3://scripts/iceberg_maintenance.py',
application_args=['expire-snapshots', '--table', 'db.orders', '--days', '7'],
conf={
'spark.sql.catalog.catalog': 'org.apache.iceberg.spark.SparkCatalog',
},
)
compact_data_files = SparkSubmitOperator(
task_id='compact_data_files',
application='s3://scripts/iceberg_maintenance.py',
application_args=['compact', '--table', 'db.orders', '--strategy', 'binpack'],
conf={
'spark.sql.catalog.catalog': 'org.apache.iceberg.spark.SparkCatalog',
},
)
remove_orphans = SparkSubmitOperator(
task_id='remove_orphan_files',
application='s3://scripts/iceberg_maintenance.py',
application_args=['remove-orphans', '--table', 'db.orders', '--days', '3'],
conf={
'spark.sql.catalog.catalog': 'org.apache.iceberg.spark.SparkCatalog',
},
)
rewrite_manifests = SparkSubmitOperator(
task_id='rewrite_manifests',
application='s3://scripts/iceberg_maintenance.py',
application_args=['rewrite-manifests', '--table', 'db.orders'],
conf={
'spark.sql.catalog.catalog': 'org.apache.iceberg.spark.SparkCatalog',
},
)
# 実行順序: スナップショット期限切れ → コンパクション → 孤立ファイル削除 → マニフェスト書き換え
expire_snapshots >> compact_data_files >> remove_orphans >> rewrite_manifests
14. 行レベル更新
14.1 Copy-on-Write (CoW) vs Merge-on-Read (MoR)
Iceberg は行レベルの更新(UPDATE, DELETE, MERGE)に対して2つのアプローチを提供します:
Copy-on-Write (CoW):
┌─────────────────────────────────────────────────────────┐
│ 書き込み時に影響を受けるファイルを完全に書き換える │
│ │
│ DELETE WHERE id = 5: │
│ │
│ Before: After: │
│ ┌──────────┐ ┌──────────┐ │
│ │ File A │ │ File A' │ (新規作成) │
│ │ id=1,2,3 │ ──書き換え──→ │ id=1,2,3 │ │
│ │ id=4,5,6 │ │ id=4,6 │ (id=5 除外) │
│ └──────────┘ └──────────┘ │
│ │
│ 長所: 読み取り時のオーバーヘッドなし │
│ 短所: 書き込みコストが高い (ファイル全体を書き換え) │
└─────────────────────────────────────────────────────────┘
Merge-on-Read (MoR):
┌─────────────────────────────────────────────────────────┐
│ 削除情報を別ファイルに記録し、読み取り時にマージ │
│ │
│ DELETE WHERE id = 5: │
│ │
│ ┌──────────┐ ┌──────────────┐ │
│ │ File A │ │ Delete File │ │
│ │ id=1,2,3 │ │ (position │ │
│ │ id=4,5,6 │ │ deletes) │ │
│ └──────────┘ │ file=A,pos=4 │ (id=5 の位置を記録) │
│ └──────────────┘ │
│ │
│ 読み取り時: File A の内容 - Delete File = 結果 │
│ │
│ 長所: 書き込みが高速 (小さな delete file のみ作成) │
│ 短所: 読み取り時にマージ処理が必要 │
└─────────────────────────────────────────────────────────┘
14.2 Delete ファイルの種類
Position Delete (位置ベース削除):
Position Delete ファイル:
┌────────────────────────────────┐
│ file_path │ pos │
├────────────────────┼───────────┤
│ s3://.../data-a.pq│ 4 │
│ s3://.../data-a.pq│ 7 │
│ s3://.../data-b.pq│ 12 │
└────────────────────┴───────────┘
データファイル data-a.pq 内のレコード:
pos 0: {id=1, ...}
pos 1: {id=2, ...}
...
pos 4: {id=5, ...} ← 削除対象
...
pos 7: {id=8, ...} ← 削除対象
Equality Delete (等価ベース削除):
Equality Delete ファイル:
┌────────────────┐
│ id │ status │
├────────┼────────┤
│ 5 │ null │
│ 8 │ null │
└────────┴────────┘
等価条件: id = 5 OR id = 8 に一致するレコードを削除
※ Equality Delete は Position Delete より読み取りコストが高い
(全レコードとの突合が必要なため)
14.3 CoW / MoR の設定
-- Copy-on-Write を使用 (デフォルト)
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
'write.delete.mode' = 'copy-on-write',
'write.update.mode' = 'copy-on-write',
'write.merge.mode' = 'copy-on-write'
);
-- Merge-on-Read を使用
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read',
'write.merge.mode' = 'merge-on-read'
);
-- 操作ごとに異なるモードを設定
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
'write.delete.mode' = 'merge-on-read', -- DELETE は MoR
'write.update.mode' = 'copy-on-write', -- UPDATE は CoW
'write.merge.mode' = 'merge-on-read' -- MERGE は MoR
);
14.4 CoW / MoR の選択ガイド
選択ガイド:
┌────────────────────────────┬───────────┬───────────┐
│ シナリオ │ CoW │ MoR │
├────────────────────────────┼───────────┼───────────┤
│ 更新頻度が低い │ ★★★★★ │ ★★★☆☆ │
│ 更新頻度が高い │ ★★☆☆☆ │ ★★★★★ │
│ 読み取り性能重視 │ ★★★★★ │ ★★★☆☆ │
│ 書き込みレイテンシ重視 │ ★★☆☆☆ │ ★★★★★ │
│ ストリーミング更新 │ ★★☆☆☆ │ ★★★★★ │
│ バッチ更新 (大量) │ ★★★★☆ │ ★★★☆☆ │
│ GDPR 削除 (散在的) │ ★★★☆☆ │ ★★★★★ │
└────────────────────────────┴───────────┴───────────┘
一般的な推奨:
- デフォルトは CoW (読み取り性能が優先される場合が多い)
- ストリーミングやCDC ワークロードでは MoR
- MoR を使う場合はコンパクションの自動化が重要
15. ブランチングとタグ付け
15.1 概要
Iceberg はテーブルレベルでの Git-like なブランチとタグをサポートします。これにより、Write-Audit-Publish (WAP) パターンや A/B テスト等の高度なワークフローが実現できます。
ブランチとタグの概念:
main (branch)
●───●───●───●───●───●───●
│ ▲
│ │ cherry-pick / fast-forward
▼ │
audit (branch)
●───●───●───●───●──────┘
v1.0 (tag)
●───●───●───●
↑
tag: "v1.0" (不変の参照点)
15.2 ブランチの操作
-- ブランチの作成
ALTER TABLE catalog.db.orders CREATE BRANCH audit_branch;
-- 特定のスナップショットからブランチを作成
ALTER TABLE catalog.db.orders CREATE BRANCH audit_branch
AS OF VERSION 1000;
-- ブランチの保持期限設定
ALTER TABLE catalog.db.orders CREATE BRANCH audit_branch
RETAIN 7 DAYS;
-- ブランチにデータを書き込み
INSERT INTO catalog.db.orders.branch_audit_branch
VALUES (100, 999, DATE '2024-06-01', 50.00, 'test');
-- ブランチからデータを読み取り
SELECT * FROM catalog.db.orders VERSION AS OF 'audit_branch';
-- ブランチの削除
ALTER TABLE catalog.db.orders DROP BRANCH audit_branch;
15.3 タグの操作
-- タグの作成
ALTER TABLE catalog.db.orders CREATE TAG release_v1;
-- 特定のスナップショットにタグを付ける
ALTER TABLE catalog.db.orders CREATE TAG release_v1
AS OF VERSION 1000;
-- タグの保持期限設定
ALTER TABLE catalog.db.orders CREATE TAG monthly_backup_202405
AS OF VERSION 1001
RETAIN 365 DAYS;
-- タグからデータを読み取り
SELECT * FROM catalog.db.orders VERSION AS OF 'release_v1';
-- タグの削除
ALTER TABLE catalog.db.orders DROP TAG release_v1;
15.4 Write-Audit-Publish (WAP) パターン
WAP パターンは、データ品質チェックをコミット前に行うワークフローです:
WAP パターンの流れ:
Step 1: audit ブランチを作成
┌──────────────────────────────┐
│ main: S1 → S2 → S3 │
│ │ │
│ audit: └→ S3 │ (ブランチ分岐点)
└──────────────────────────────┘
Step 2: audit ブランチにデータを書き込み
┌──────────────────────────────┐
│ main: S1 → S2 → S3 │
│ │
│ audit: S3 → S4a │ (新データ)
└──────────────────────────────┘
Step 3: audit ブランチでデータ品質チェック
┌──────────────────────────────┐
│ SELECT COUNT(*) FROM │
│ orders.branch_audit │
│ WHERE amount < 0; │
│ │
│ → 0 件: OK! │
│ → N 件: NG! (ブランチを削除) │
└──────────────────────────────┘
Step 4: 品質チェック通過後に main にマージ
┌──────────────────────────────────┐
│ main: S1 → S2 → S3 → S4 │
│ ▲ │
│ audit: S3 → S4a ─┘ │ (fast-forward)
└──────────────────────────────────┘
-- WAP パターンの実装例
-- 1. audit ブランチを作成
ALTER TABLE catalog.db.orders CREATE BRANCH audit_branch;
-- 2. audit ブランチにデータを書き込み
-- Spark SQL で wap.branch を指定
SET spark.wap.branch = audit_branch;
INSERT INTO catalog.db.orders VALUES (...);
-- 3. データ品質チェック
SELECT COUNT(*) AS invalid_records
FROM catalog.db.orders VERSION AS OF 'audit_branch'
WHERE amount < 0 OR customer_id IS NULL;
-- 4-a. 品質OK → main にマージ (fast-forward)
CALL catalog.system.fast_forward('db.orders', 'main', 'audit_branch');
-- 4-b. 品質NG → ブランチを削除
ALTER TABLE catalog.db.orders DROP BRANCH audit_branch;
15.5 A/B テスト
-- A/B テスト用の2つのブランチを作成
ALTER TABLE catalog.db.ml_features CREATE BRANCH model_v1 AS OF VERSION 1000;
ALTER TABLE catalog.db.ml_features CREATE BRANCH model_v2 AS OF VERSION 1000;
-- 各ブランチに異なる特徴量を書き込み
INSERT INTO catalog.db.ml_features.branch_model_v1
SELECT *, feature_set_v1(...) FROM raw_data;
INSERT INTO catalog.db.ml_features.branch_model_v2
SELECT *, feature_set_v2(...) FROM raw_data;
-- A/B テストで性能比較
SELECT * FROM catalog.db.ml_features VERSION AS OF 'model_v1';
SELECT * FROM catalog.db.ml_features VERSION AS OF 'model_v2';
-- 勝者を main にマージ
CALL catalog.system.fast_forward('db.ml_features', 'main', 'model_v2');
16. 設定例
16.1 Spark 設定 (完全版)
# spark-defaults.conf
# === Iceberg カタログ設定 ===
spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type=rest
spark.sql.catalog.iceberg.uri=http://catalog-server:8181
spark.sql.catalog.iceberg.warehouse=s3://my-datalake/warehouse
spark.sql.catalog.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
# Iceberg Spark Extensions (DDL, DML サポート)
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
# === S3 設定 ===
spark.sql.catalog.iceberg.s3.endpoint=https://s3.amazonaws.com
spark.sql.catalog.iceberg.s3.path-style-access=false
spark.sql.catalog.iceberg.s3.sse.type=s3
# === 書き込み設定 ===
spark.sql.catalog.iceberg.default-table-properties.write.format.default=parquet
spark.sql.catalog.iceberg.default-table-properties.write.parquet.compression-codec=zstd
spark.sql.catalog.iceberg.default-table-properties.write.target-file-size-bytes=536870912
spark.sql.catalog.iceberg.default-table-properties.write.distribution-mode=hash
# === 読み取り設定 ===
spark.sql.catalog.iceberg.default-table-properties.read.split.target-size=134217728
spark.sql.catalog.iceberg.default-table-properties.read.split.metadata-target-size=33554432
# === パフォーマンス設定 ===
spark.sql.iceberg.planning.preserve-data-grouping=true
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
<!-- pom.xml (Maven 依存関係) -->
<dependencies>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.5_2.12</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>1.7.1</version>
</dependency>
</dependencies>
16.2 Trino 設定 (完全版)
# etc/catalog/iceberg.properties
# === コネクタ設定 ===
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://catalog-server:8181
iceberg.rest-catalog.warehouse=s3://my-datalake/warehouse
# === ファイルシステム設定 ===
iceberg.file-format=PARQUET
iceberg.compression-codec=ZSTD
# === S3 設定 ===
fs.native-s3.enabled=true
s3.endpoint=https://s3.amazonaws.com
s3.region=us-east-1
s3.path-style-access=false
# === パフォーマンス設定 ===
iceberg.max-partitions-per-writer=100
iceberg.target-max-file-size=512MB
iceberg.split-size=128MB
# === メタデータキャッシュ ===
iceberg.metadata.cache-ttl=5m
iceberg.metadata.cache-max-size=1000
# === 書き込み設定 ===
iceberg.unique-table-location=true
iceberg.dynamic-filtering.wait-timeout=30s
# === 統計情報 ===
iceberg.extended-statistics.enabled=true
iceberg.projection-pushdown-enabled=true
16.3 Flink 設定 (完全版)
# flink-conf.yaml
# === Iceberg カタログ設定 (SQL Client) ===
# SQL Client で CREATE CATALOG を使用
# === チェックポイント設定 (ストリーミング書き込みに必須) ===
execution.checkpointing.interval: 60000 # 60秒
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 600000 # 10分
execution.checkpointing.min-pause: 30000 # 最小間隔 30秒
state.backend: rocksdb
state.checkpoints.dir: s3://my-bucket/flink/checkpoints
-- Flink SQL Client での設定
CREATE CATALOG iceberg WITH (
'type' = 'iceberg',
'catalog-type' = 'rest',
'uri' = 'http://catalog-server:8181',
'warehouse' = 's3://my-datalake/warehouse',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO'
);
USE CATALOG iceberg;
-- ストリーミングテーブルの作成
CREATE TABLE db.streaming_events (
event_id BIGINT,
event_ts TIMESTAMP(6),
user_id BIGINT,
event_type STRING,
payload STRING
)
PARTITIONED BY (days(`event_ts`))
WITH (
'format-version' = '2',
'write.format.default' = 'parquet',
'write.target-file-size-bytes' = '134217728',
'write.upsert.enabled' = 'true',
'write.distribution-mode' = 'hash'
);
-- Kafka からのストリーミング書き込み
INSERT INTO db.streaming_events
SELECT
event_id,
event_ts,
user_id,
event_type,
payload
FROM kafka_events;
16.4 PyIceberg 設定
# ~/.pyiceberg.yaml
catalog:
default:
type: rest
uri: http://catalog-server:8181
warehouse: s3://my-datalake/warehouse
s3.endpoint: https://s3.amazonaws.com
s3.region: us-east-1
s3.access-key-id: ${AWS_ACCESS_KEY_ID}
s3.secret-access-key: ${AWS_SECRET_ACCESS_KEY}
glue:
type: glue
warehouse: s3://my-datalake/warehouse
hive:
type: hive
uri: thrift://hive-metastore:9083
# PyIceberg の使用例
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
NestedField, LongType, StringType, DateType, DecimalType
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import MonthTransform
# カタログの読み込み
catalog = load_catalog("default")
# テーブルの作成
schema = Schema(
NestedField(1, "order_id", LongType(), required=True),
NestedField(2, "customer_id", LongType(), required=True),
NestedField(3, "order_date", DateType(), required=True),
NestedField(4, "amount", DecimalType(10, 2)),
NestedField(5, "status", StringType()),
)
partition_spec = PartitionSpec(
PartitionField(
source_id=3, field_id=1000,
transform=MonthTransform(), name="order_month"
)
)
table = catalog.create_table(
identifier="db.orders",
schema=schema,
partition_spec=partition_spec,
properties={
"format-version": "2",
"write.format.default": "parquet",
}
)
# テーブルの読み取り (Arrow)
table = catalog.load_table("db.orders")
arrow_table = table.scan(
row_filter="order_date >= '2024-05-01'",
selected_fields=("order_id", "amount", "status"),
).to_arrow()
# Pandas DataFrame に変換
df = arrow_table.to_pandas()
print(df.head())
17. パフォーマンス最適化
17.1 パーティションプルーニング
パーティションプルーニングの効果:
テーブル: 1年分のデータ, 月次パーティション (12パーティション)
クエリ: WHERE order_date BETWEEN '2024-05-01' AND '2024-05-31'
パーティションプルーニングなし:
┌─────┐┌─────┐┌─────┐┌─────┐┌─────┐┌─────┐
│ Jan ││ Feb ││ Mar ││ Apr ││ May ││ Jun │...
│SCAN ││SCAN ││SCAN ││SCAN ││SCAN ││SCAN │
└─────┘└─────┘└─────┘└─────┘└─────┘└─────┘
→ 全12パーティションをスキャン
パーティションプルーニングあり:
┌─────┐┌─────┐┌─────┐┌─────┐┌─────┐┌─────┐
│ Jan ││ Feb ││ Mar ││ Apr ││ May ││ Jun │...
│SKIP ││SKIP ││SKIP ││SKIP ││SCAN ││SKIP │
└─────┘└─────┘└─────┘└─────┘└─────┘└─────┘
→ May パーティションのみスキャン (1/12)
17.2 Min/Max フィルタリング
Min/Max フィルタリングの効果:
クエリ: WHERE customer_id = 12345
Manifest File 内のカラム統計:
┌──────────────────────────────────────────────────┐
│ data-001.pq: customer_id min=1, max=10000 │ → SKIP
│ data-002.pq: customer_id min=10001, max=20000 │ → SCAN (12345 含む)
│ data-003.pq: customer_id min=20001, max=30000 │ → SKIP
│ data-004.pq: customer_id min=30001, max=40000 │ → SKIP
└──────────────────────────────────────────────────┘
→ 4ファイル中1ファイルのみスキャン (75%削減)
17.3 ファイルプルーニング
-- Bloom フィルタの有効化 (高カーディナリティカラムに効果的)
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
'write.parquet.bloom-filter-enabled.column.order_id' = 'true',
'write.parquet.bloom-filter-enabled.column.customer_id' = 'true',
'write.parquet.bloom-filter-max-bytes' = '1048576'
);
17.4 述語プッシュダウン (Predicate Pushdown)
述語プッシュダウンの階層:
Level 1: パーティションプルーニング (Manifest List)
┌─────────────────────────────────────────┐
│ WHERE order_month = '2024-05' │
│ → 該当パーティションのマニフェストのみ │
└─────────────────────────────────────────┘
│
Level 2: ファイルプルーニング (Manifest File)
┌─────────────────────────────────────────┐
│ WHERE customer_id = 12345 │
│ → min/max 統計で該当ファイルのみ │
└─────────────────────────────────────────┘
│
Level 3: Row Group プルーニング (Parquet内)
┌─────────────────────────────────────────┐
│ Parquet Row Group の min/max 統計で │
│ 該当 Row Group のみ読み取り │
└─────────────────────────────────────────┘
│
Level 4: Page プルーニング (Parquet内)
┌─────────────────────────────────────────┐
│ Parquet Page Index でさらに細かい │
│ プルーニング │
└─────────────────────────────────────────┘
17.5 パフォーマンスチューニングの推奨設定
-- === ファイルサイズの最適化 ===
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
-- ターゲットファイルサイズ: 128MB-512MB (ワークロードに応じて調整)
'write.target-file-size-bytes' = '536870912', -- 512MB
-- 書き込み分散モード
-- 'none': 分散なし
-- 'hash': パーティションキーでハッシュ分散
-- 'range': ソートオーダーで範囲分散
'write.distribution-mode' = 'hash'
);
-- === メタデータの最適化 ===
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
-- マニフェストのターゲットサイズ
'write.manifest.target-size-bytes' = '8388608', -- 8MB
-- マニフェストのマージ有効化
'commit.manifest-merge.enabled' = 'true',
-- メタデータの前のバージョン保持数
'write.metadata.previous-versions-max' = '100',
-- メタデータの削除後のGC有効化
'write.metadata.delete-after-commit.enabled' = 'true'
);
-- === 読み取りの最適化 ===
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
-- スプリットサイズ
'read.split.target-size' = '134217728', -- 128MB
-- プランニングの並列度
'read.split.planning-lookback' = '10',
-- ベクトル化読み取り (Spark)
'read.parquet.vectorization.enabled' = 'true',
'read.parquet.vectorization.batch-size' = '5000'
);
17.6 Small Files Problem の対策
Small Files Problem:
ストリーミング書き込みで発生しやすい問題:
Flink (60秒チェックポイント) → 毎分新しいファイルが作成
1日 = 1,440 ファイル × パーティション数
対策:
┌────────────────────────────────────────────────────────┐
│ 1. 定期的なコンパクション │
│ CALL system.rewrite_data_files(...) │
│ - 毎時間 or 毎日のバッチジョブで小ファイルを統合 │
│ │
│ 2. チェックポイント間隔の調整 │
│ execution.checkpointing.interval: 300000 (5分) │
│ - 間隔を長くすることでファイル数を削減 │
│ - ただしレイテンシとのトレードオフ │
│ │
│ 3. Flink の自動コンパクション │
│ 'flink.auto-compact.enabled' = 'true' │
│ 'flink.auto-compact.target-file-size' = '128MB' │
│ │
│ 4. 書き込み分散モードの最適化 │
│ 'write.distribution-mode' = 'hash' │
│ - パーティション内のファイルをまとめる │
└────────────────────────────────────────────────────────┘
18. セキュリティ
18.1 アクセス制御の層
Iceberg のセキュリティアーキテクチャ:
Layer 1: カタログレベルのアクセス制御
┌──────────────────────────────────────────┐
│ REST Catalog: │
│ OAuth 2.0 / OIDC 認証 │
│ テーブルレベルの認可 │
│ │
│ AWS Glue: │
│ IAM ポリシー │
│ Lake Formation による細粒度制御 │
│ │
│ Nessie: │
│ OpenID Connect 認証 │
│ ブランチレベルのアクセス制御 │
└──────────────────────────────────────────┘
Layer 2: ストレージレベルのアクセス制御
┌──────────────────────────────────────────┐
│ S3: IAM ポリシー, バケットポリシー │
│ HDFS: POSIX パーミッション, ACL │
│ GCS: IAM, ACL │
│ ADLS: Azure RBAC, ACL │
└──────────────────────────────────────────┘
Layer 3: データ暗号化
┌──────────────────────────────────────────┐
│ 転送時: TLS/SSL │
│ 保存時: サーバーサイド暗号化 (SSE) │
│ - SSE-S3 │
│ - SSE-KMS │
│ - SSE-C (カスタムキー) │
│ Parquet カラムレベル暗号化 │
└──────────────────────────────────────────┘
18.2 REST Catalog の OAuth 2.0 設定
# Spark 設定 (OAuth 2.0)
spark.sql.catalog.secure_catalog=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.secure_catalog.type=rest
spark.sql.catalog.secure_catalog.uri=https://catalog-server:8181
spark.sql.catalog.secure_catalog.credential=client-id:client-secret
spark.sql.catalog.secure_catalog.scope=catalog
spark.sql.catalog.secure_catalog.oauth2-server-uri=https://auth-server/oauth/token
18.3 AWS Lake Formation との統合
// Lake Formation テーブルレベルのアクセス制御
{
"CatalogId": "123456789012",
"DataLakePrincipal": {
"DataLakePrincipalIdentifier": "arn:aws:iam::123456789012:role/DataAnalystRole"
},
"Resource": {
"Table": {
"DatabaseName": "iceberg_db",
"Name": "orders"
}
},
"Permissions": ["SELECT", "DESCRIBE"],
"PermissionsWithGrantOption": []
}
18.4 暗号化設定
-- S3 サーバーサイド暗号化 (SSE-KMS)
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
'write.object-storage.s3.sse.type' = 'kms',
'write.object-storage.s3.sse.key' = 'arn:aws:kms:us-east-1:123456789012:key/xxxx'
);
-- Parquet カラムレベル暗号化
ALTER TABLE catalog.db.orders SET TBLPROPERTIES (
'write.parquet.encryption.column.keys' = 'key1:ssn,credit_card;key2:email',
'write.parquet.encryption.footer.key' = 'footer_key'
);
19. Hudi・Delta Lake との比較
19.1 概要比較
┌────────────────────┬──────────────┬──────────────┬──────────────┐
│ 特性 │ Iceberg │ Hudi │ Delta Lake │
├────────────────────┼──────────────┼──────────────┼──────────────┤
│ 起源 │ Netflix │ Uber │ Databricks │
│ ライセンス │ Apache 2.0 │ Apache 2.0 │ Apache 2.0 │
│ テーブル仕様 │ 独自 (公開) │ 独自 (公開) │ 独自 (公開) │
│ メタデータ形式 │ JSON + Avro │ Avro/Parquet │ JSON (Delta │
│ │ │ │ Log) │
│ エンジン非依存 │ ★★★★★ │ ★★★☆☆ │ ★★★☆☆ │
│ 隠蔽パーティション │ ✓ │ ✗ │ ✗ (※1) │
│ パーティション進化 │ ✓ │ ✗ │ ✗ │
│ スキーマ進化 │ ✓ (完全) │ ✓ (部分的) │ ✓ (部分的) │
│ ACID │ ✓ │ ✓ │ ✓ │
│ タイムトラベル │ ✓ │ ✓ │ ✓ │
│ CDC サポート │ ✓ (MoR) │ ✓ (ネイティブ)│ ✓ (CDF) │
│ ストリーミング │ ✓ (Flink) │ ✓ (ネイティブ)│ ✓ (Spark SS) │
│ コンパクション │ 手動/自動 │ 自動 (強力) │ 手動/自動 │
│ ブランチ/タグ │ ✓ │ ✗ │ ✗ (※2) │
│ Views │ ✓ │ ✗ │ ✗ │
│ REST Catalog │ ✓ (標準) │ ✗ │ ✓ (Unity) │
└────────────────────┴──────────────┴──────────────┴──────────────┘
(※1) Delta Lake は Liquid Clustering で類似機能を提供
(※2) Delta Lake は Shallow Clone で類似機能を提供
19.2 ユースケース別の推奨
ユースケース別の推奨:
マルチエンジン環境 (Spark + Trino + Flink):
→ Apache Iceberg (エンジン非依存設計が最も優れている)
Uber-style のリアルタイム CDC:
→ Apache Hudi (CDC 最適化が最も進んでいる)
Databricks エコシステム:
→ Delta Lake (ネイティブ統合、UniForm で Iceberg 互換も)
データレイクのモダナイゼーション:
→ Apache Iceberg (隠蔽パーティション、パーティション進化)
大規模バッチ分析:
→ Apache Iceberg or Delta Lake (どちらも優秀)
19.3 相互運用性
相互運用性のトレンド:
Apache XTable (旧 OneTable):
┌──────────────────────────────────────────┐
│ Iceberg ←→ Hudi ←→ Delta Lake │
│ │
│ メタデータの相互変換により、 │
│ 同一データを異なるフォーマットで読み取り │
└──────────────────────────────────────────┘
Databricks UniForm:
┌──────────────────────────────────────────┐
│ Delta Lake テーブルを自動的に │
│ Iceberg 互換メタデータで公開 │
│ → Iceberg 対応エンジンで読み取り可能 │
└──────────────────────────────────────────┘
20. ベストプラクティス
20.1 テーブル設計
テーブル設計のベストプラクティス:
1. パーティション設計:
┌──────────────────────────────────────────────────────┐
│ ✓ パーティションサイズは 100MB - 1GB を目安に │
│ ✓ クエリパターンに合わせたパーティションキーを選択 │
│ ✓ 高カーディナリティなキーには bucket を使用 │
│ ✓ タイムスタンプには month/day を適切に選択 │
│ │
│ ✗ パーティション数が多すぎる (> 10,000) │
│ ✗ パーティションサイズが小さすぎる (< 10MB) │
│ ✗ パーティションキーが頻繁に変わるカラム │
└──────────────────────────────────────────────────────┘
2. ファイルサイズ:
┌──────────────────────────────────────────────────────┐
│ ✓ ターゲットファイルサイズ: 128MB - 512MB │
│ ✓ ストリーミングワークロード: 128MB │
│ ✓ バッチワークロード: 256MB - 512MB │
│ │
│ ✗ 10MB 未満のファイル (small files problem) │
│ ✗ 1GB 超のファイル (並列度が低下) │
└──────────────────────────────────────────────────────┘
3. スキーマ設計:
┌──────────────────────────────────────────────────────┐
│ ✓ 必要なカラムは required: true で定義 │
│ ✓ ネストされた構造体で関連フィールドをグループ化 │
│ ✓ コメント (doc) を全カラムに付与 │
│ │
│ ✗ ID カラムの削除 (一度割り当てたIDは再利用不可) │
│ ✗ 安全でない型変更 (long → int 等) │
└──────────────────────────────────────────────────────┘
20.2 運用ベストプラクティス
4. メンテナンス:
┌──────────────────────────────────────────────────────┐
│ ✓ expire_snapshots を毎日実行 (保持期間: 3-7日) │
│ ✓ remove_orphan_files を週次で実行 │
│ ✓ rewrite_data_files を必要に応じて実行 │
│ ✓ rewrite_manifests を月次で実行 │
│ ✓ メンテナンスジョブを Airflow 等で自動化 │
│ │
│ ✗ メンテナンスを実行せずに放置 │
│ ✗ 保持期間を短くしすぎる (タイムトラベルが使えなくなる)│
│ ✗ ピーク時間帯にコンパクションを実行 │
└──────────────────────────────────────────────────────┘
5. 監視:
┌──────────────────────────────────────────────────────┐
│ ✓ テーブルのスナップショット数を監視 │
│ ✓ データファイル数とサイズの分布を監視 │
│ ✓ マニフェストファイル数を監視 │
│ ✓ コミット失敗率 (リトライ回数) を監視 │
│ ✓ クエリの計画時間 (planning time) を監視 │
└──────────────────────────────────────────────────────┘
-- 監視クエリの例
-- スナップショット数の確認
SELECT COUNT(*) AS snapshot_count FROM catalog.db.orders.snapshots;
-- データファイルのサイズ分布
SELECT
CASE
WHEN file_size_in_bytes < 10485760 THEN 'tiny (<10MB)'
WHEN file_size_in_bytes < 67108864 THEN 'small (10-64MB)'
WHEN file_size_in_bytes < 134217728 THEN 'medium (64-128MB)'
WHEN file_size_in_bytes < 536870912 THEN 'target (128-512MB)'
ELSE 'large (>512MB)'
END AS size_category,
COUNT(*) AS file_count,
SUM(file_size_in_bytes) / 1073741824 AS total_size_gb
FROM catalog.db.orders.files
GROUP BY 1
ORDER BY 1;
-- マニフェスト数の確認
SELECT COUNT(*) AS manifest_count FROM catalog.db.orders.manifests;
-- パーティションのスキュー確認
SELECT
partition,
file_count,
total_record_count,
total_data_file_size_in_bytes / 1073741824 AS total_size_gb
FROM catalog.db.orders.partitions
ORDER BY total_data_file_size_in_bytes DESC
LIMIT 20;
20.3 コスト最適化
6. ストレージコスト最適化:
┌──────────────────────────────────────────────────────┐
│ ✓ ZSTD 圧縮を使用 (圧縮率と速度のバランスが良い) │
│ ✓ 不要なスナップショットを定期的に削除 │
│ ✓ S3 Intelligent-Tiering を活用 │
│ ✓ 古いパーティションを S3 Glacier に移行 │
│ │
│ 圧縮コーデック比較: │
│ ┌──────────┬──────────┬──────────┬──────────┐ │
│ │ Codec │ 圧縮率 │ 書き込み │ 読み取り │ │
│ ├──────────┼──────────┼──────────┼──────────┤ │
│ │ zstd │ ★★★★★ │ ★★★★☆ │ ★★★★★ │ 推奨 │
│ │ snappy │ ★★★☆☆ │ ★★★★★ │ ★★★★★ │ │
│ │ gzip │ ★★★★☆ │ ★★☆☆☆ │ ★★★☆☆ │ │
│ │ lz4 │ ★★☆☆☆ │ ★★★★★ │ ★★★★★ │ │
│ └──────────┴──────────┴──────────┴──────────┘ │
└──────────────────────────────────────────────────────┘
21. 最新機能と今後の方向性
21.1 最近の主な機能追加
2024-2026 の主な機能追加:
Iceberg 1.5.x - 1.8.x:
┌──────────────────────────────────────────────────────┐
│ ● Variant 型 (半構造化データの効率的な格納) │
│ - JSON データを型情報付きバイナリで保存 │
│ - Parquet 内で効率的にクエリ可能 │
│ │
│ ● Puffin 統計ファイル │
│ - NDV (Number of Distinct Values) の保存 │
│ - クエリオプティマイザの改善に寄与 │
│ │
│ ● REST Catalog 仕様の成熟 │
│ - OAuth 2.0 サポートの強化 │
│ - Multi-table transaction API │
│ - View management API │
│ │
│ ● Views サポートの安定化 │
│ - カタログ管理の View 定義 │
│ - エンジン間で共有可能な View │
│ │
│ ● Object Storage Layout の改善 │
│ - S3 のスロットリング回避のためのレイアウト最適化 │
│ - ランダムプレフィックスによる分散 │
└──────────────────────────────────────────────────────┘
21.2 今後の方向性
今後の方向性 (2026年以降):
Table Format v3:
┌──────────────────────────────────────────────────────┐
│ ● Geospatial 型のネイティブサポート │
│ ● Multi-arg Transform (複合パーティション変換) │
│ ● Row lineage (行レベルの系譜追跡) │
│ ● Default values (カラムのデフォルト値) │
└──────────────────────────────────────────────────────┘
エコシステム:
┌──────────────────────────────────────────────────────┐
│ ● Polaris Catalog の成熟 │
│ ● クラウド間のフェデレーション │
│ ● リアルタイム分析の強化 │
│ ● AI/ML ワークロードとの統合深化 │
│ ● Lakehouse アーキテクチャの標準化 │
└──────────────────────────────────────────────────────┘
22. まとめ
22.1 Apache Iceberg の価値
Apache Iceberg は、データレイクの課題を根本的に解決するオープンテーブルフォーマットです。
Apache Iceberg が解決する課題:
┌─────────────────────────┬─────────────────────────────────┐
│ 課題 │ Iceberg の解決策 │
├─────────────────────────┼─────────────────────────────────┤
│ スキーマの硬直性 │ 完全なスキーマ進化 │
│ パーティションの固定化 │ パーティション進化 + 隠蔽 │
│ データの不整合 │ ACID トランザクション │
│ 過去データへのアクセス不可 │ タイムトラベル │
│ エンジンロックイン │ エンジン非依存の設計 │
│ 遅いクエリ計画 │ 階層的メタデータ + プルーニング │
│ 小ファイル問題 │ コンパクション + ソート │
│ データ品質の保証不足 │ WAP パターン + ブランチ │
└─────────────────────────┴─────────────────────────────────┘
22.2 導入のステップ
Apache Iceberg 導入のステップ:
Step 1: カタログの選択
┌─────────────────────────────────────────┐
│ - AWS 環境: Glue Catalog │
│ - マルチクラウド: REST Catalog │
│ - 既存 Hive: Hive Metastore │
│ - 高度な管理: Nessie / Polaris │
└─────────────────────────────────────────┘
│
Step 2: 既存テーブルの移行
┌─────────────────────────────────────────┐
│ - Hive テーブルからの in-place 移行 │
│ - CTAS (CREATE TABLE AS SELECT) での移行│
│ - 段階的な移行計画の策定 │
└─────────────────────────────────────────┘
│
Step 3: メンテナンス自動化の構築
┌─────────────────────────────────────────┐
│ - expire_snapshots の自動化 │
│ - コンパクションジョブの設定 │
│ - 監視ダッシュボードの構築 │
└─────────────────────────────────────────┘
│
Step 4: 運用の成熟
┌─────────────────────────────────────────┐
│ - パフォーマンスチューニング │
│ - セキュリティの強化 │
│ - データガバナンスの整備 │
└─────────────────────────────────────────┘
22.3 参考リンク
- Apache Iceberg 公式サイト: https://iceberg.apache.org/
- Iceberg GitHub: https://github.com/apache/iceberg
- Iceberg テーブル仕様: https://iceberg.apache.org/spec/
- PyIceberg: https://py.iceberg.apache.org/
- Iceberg REST Catalog 仕様: https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml
免責事項: 本記事は 2026年4月時点の情報に基づいています。Apache Iceberg は活発に開発が進んでいるプロジェクトであり、最新の情報については公式ドキュメントを参照してください。