Delta Lake
Delta Lake 徹底解説:アーキテクチャ・機能・設定の全容
目次
- はじめに
- Delta Lake の概要と歴史
- アーキテクチャ全体像
- トランザクションログの仕組み
- ACIDトランザクション
- タイムトラベル
- スキーマ管理
- DML操作
- Change Data Feed(CDF)
- Liquid Clustering
- Deletion Vectors
- UniForm(Universal Format)
- テーブルメンテナンス
- エンジン連携
- Delta Sharing
- 設定とチューニング
- パフォーマンス最適化
- セキュリティ
- 監視と運用
- 他フォーマットとの比較
- ユースケースとベストプラクティス
- 最新動向と将来展望
- まとめ
1. はじめに
Delta Lake は、データレイク上に信頼性、品質、パフォーマンスをもたらすオープンソースのストレージレイヤーである。Apache Parquet ファイルの上にトランザクションログを追加することで、ACID トランザクション、スキーマ強制、タイムトラベル等の機能を実現する。
Databricks 社が開発し、2019年にオープンソース化された Delta Lake は、レイクハウス(Lakehouse)アーキテクチャの中核技術として、データレイクの柔軟性とデータウェアハウスの信頼性を統合する。
1.1 レイクハウスアーキテクチャ
従来のアーキテクチャ:
┌──────────┐ ETL ┌──────────────┐
│ Data Lake │ ──────────► │ Data Warehouse│
│ (安価だが │ │ (信頼性高いが │
│ 品質低い)│ │ 高コスト) │
└──────────┘ └──────────────┘
レイクハウスアーキテクチャ:
┌────────────────────────────────────────┐
│ Lakehouse (Delta Lake) │
│ │
│ ┌──────────────────────────────────┐ │
│ │ ACID Transactions + Schema │ │
│ │ Enforcement + Time Travel │ │
│ ├──────────────────────────────────┤ │
│ │ Delta Lake Format │ │
│ │ (Parquet + Transaction Log) │ │
│ ├──────────────────────────────────┤ │
│ │ Cloud Object Storage │ │
│ │ (S3 / GCS / ADLS / HDFS) │ │
│ └──────────────────────────────────┘ │
│ │
│ データレイクのコスト + DWHの信頼性 │
└────────────────────────────────────────┘
1.2 対象読者
- データレイク・レイクハウスの構築を担当するデータエンジニア
- ETL/ELTパイプラインを設計するアーキテクト
- Delta Lake テーブルを運用する SRE / プラットフォームエンジニア
- テーブルフォーマットの技術選定を行う意思決定者
2. Delta Lake の概要と歴史
2.1 誕生の背景
Delta Lake は Databricks 社のエンジニアが、データレイクの信頼性問題(データ破損、不整合、スキーマ不一致等)を解決するために開発した。
2.2 バージョンの変遷
| バージョン | リリース年 | 主な特徴 |
|---|---|---|
| 0.1.0 | 2019 | 初のOSS版、ACID、スキーマ強制 |
| 0.3.0 | 2019 | MERGE操作、Python API |
| 0.5.0 | 2019 | パーティションカラムの上書き |
| 0.7.0 | 2020 | DESCRIBE HISTORY、Change Data Feed |
| 0.8.0 | 2020 | 制約(CHECK/NOT NULL)、Generated Columns |
| 1.0.0 | 2021 | 初のメジャーリリース |
| 1.2.0 | 2022 | Z-ORDER最適化の改善 |
| 2.0.0 | 2022 | DROP COLUMN、RENAME COLUMN |
| 2.3.0 | 2023 | Deletion Vectors、Change Data Feed改善 |
| 2.4.0 | 2023 | Coordinated Commits(プレビュー) |
| 3.0.0 | 2024 | Liquid Clustering GA、UniForm、Row Tracking |
| 3.1.0 | 2024 | Type Widening、Variant型サポート |
| 3.2.0 | 2024 | Collation サポート、Clustering改善 |
| 4.0.0 | 2025 | Delta Kernel GA、In-Commit Timestamps |
2.3 ガバナンスとコミュニティ
- 開発主体: Databricks 社が主導、Linux Foundation Delta Lake プロジェクト
- ライセンス: Apache License 2.0
- プロトコル仕様: Delta Lake Protocol(オープン仕様)
- 主要コントリビューター: Databricks, Apple, Microsoft, Alibaba 等
3. アーキテクチャ全体像
3.1 Delta テーブルの構造
delta-table/
├── _delta_log/ # トランザクションログ
│ ├── 00000000000000000000.json # バージョン0のコミット
│ ├── 00000000000000000001.json # バージョン1のコミット
│ ├── 00000000000000000002.json # バージョン2のコミット
│ ├── ...
│ ├── 00000000000000000010.json # バージョン10のコミット
│ ├── 00000000000000000010.checkpoint.parquet # チェックポイント
│ ├── ...
│ ├── 00000000000000000020.json
│ ├── 00000000000000000020.checkpoint.parquet
│ └── _last_checkpoint # 最新チェックポイントへのポインタ
├── year=2024/ # パーティションディレクトリ
│ ├── month=01/
│ │ ├── part-00000-xxx.parquet # データファイル
│ │ ├── part-00001-xxx.parquet
│ │ └── part-00002-xxx.parquet
│ └── month=02/
│ ├── part-00000-xxx.parquet
│ └── part-00001-xxx.parquet
└── year=2023/
└── ...
3.2 読み取りフロー
クエリ実行
│
▼
_last_checkpoint を読む
│
▼
最新チェックポイント(Parquet)を読む
│ (テーブルの現在のスナップショット情報)
▼
チェックポイント以降のJSONログを読む
│ (増分変更を適用)
▼
有効なファイルリストを構築
│ (Add アクション - Remove アクション)
▼
データスキッピング(統計情報)
│ (min/max値でファイルをフィルタ)
▼
対象Parquetファイルのみ読み取り
4. トランザクションログの仕組み
4.1 アクションの種類
各JSONコミットファイルは、1つ以上のアクションで構成される:
| アクション | 説明 |
|---|---|
add | 新しいデータファイルの追加 |
remove | データファイルの論理削除 |
metadata | テーブルメタデータの変更(スキーマ等) |
protocol | テーブルプロトコルバージョンの変更 |
commitInfo | コミットの付加情報(誰が、いつ、何を) |
txn | アプリケーション固有のトランザクションID |
domainMetadata | ドメイン固有のメタデータ(Delta 3.0+) |
4.2 トランザクションログの例
// 00000000000000000001.json
{"commitInfo":{"timestamp":1705300800000,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"year\",\"month\"]"},"readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true}}
{"add":{"path":"year=2024/month=01/part-00000-abc.snappy.parquet","partitionValues":{"year":"2024","month":"01"},"size":54321,"modificationTime":1705300800000,"dataChange":true,"stats":"{\"numRecords\":10000,\"minValues\":{\"id\":1,\"amount\":0.01},\"maxValues\":{\"id\":10000,\"amount\":9999.99},\"nullCount\":{\"id\":0,\"amount\":5}}"}}
{"add":{"path":"year=2024/month=01/part-00001-def.snappy.parquet","partitionValues":{"year":"2024","month":"01"},"size":54000,"modificationTime":1705300800000,"dataChange":true,"stats":"{\"numRecords\":10000,\"minValues\":{\"id\":10001,\"amount\":0.05},\"maxValues\":{\"id\":20000,\"amount\":9999.50},\"nullCount\":{\"id\":0,\"amount\":3}}"}}
4.3 チェックポイント
10コミットごと(デフォルト)にチェックポイントが作成される。チェックポイントはParquet形式で、テーブルの現在の状態(全有効アクション)をまとめたものである。
# チェックポイント間隔の設定
spark.conf.set("spark.databricks.delta.properties.defaults.checkpointInterval", "10")
# テーブルレベルで設定
ALTER TABLE delta_table SET TBLPROPERTIES ('delta.checkpointInterval' = '20');
4.4 ログコンパクション(Delta 3.0+)
-- ログコンパクション: 古いJSONログをまとめる
ALTER TABLE delta_table SET TBLPROPERTIES (
'delta.logRetentionDuration' = 'interval 30 days',
'delta.checkpointInterval' = '10'
);
5. ACIDトランザクション
5.1 楽観的同時実行制御
Delta Lake は楽観的同時実行制御(Optimistic Concurrency Control)を使用する。
Writer A: read v0 ──► compute ──► try commit v1 ──► SUCCESS (v1)
Writer B: read v0 ──► compute ──────────────────► try commit v1
│
▼
CONFLICT!
│
▼
re-read v1 ──► re-compute
│
▼
try commit v2 ──► SUCCESS (v2)
5.2 分離レベル
| 分離レベル | 説明 | デフォルト |
|---|---|---|
| WriteSerializable | 書き込みは直列化可能 | ✓(デフォルト) |
| Serializable | 完全な直列化可能分離 | 明示的に設定 |
-- テーブルの分離レベルを変更
ALTER TABLE delta_table SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable');
5.3 コンフリクト解決
from delta.tables import DeltaTable
# 自動リトライの設定
spark.conf.set("spark.databricks.delta.retryWriteConflict.enabled", "true")
spark.conf.set("spark.databricks.delta.retryWriteConflict.limit", "3")
6. タイムトラベル
6.1 バージョンベースのタイムトラベル
# Python API
from delta.tables import DeltaTable
# バージョンを指定して読み取り
df_v5 = spark.read.format("delta").option("versionAsOf", 5).load("/path/to/delta-table")
# タイムスタンプを指定して読み取り
df_ts = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-15T00:00:00.000Z") \
.load("/path/to/delta-table")
-- SQLでのタイムトラベル
SELECT * FROM delta_table VERSION AS OF 5;
SELECT * FROM delta_table TIMESTAMP AS OF '2024-01-15';
-- 履歴の確認
DESCRIBE HISTORY delta_table;
DESCRIBE HISTORY delta_table LIMIT 10;
-- バージョン間の差分
SELECT * FROM delta_table VERSION AS OF 10
EXCEPT
SELECT * FROM delta_table VERSION AS OF 5;
6.2 テーブルの復元
-- 特定バージョンに復元
RESTORE TABLE delta_table TO VERSION AS OF 5;
-- 特定タイムスタンプに復元
RESTORE TABLE delta_table TO TIMESTAMP AS OF '2024-01-15';
7. スキーマ管理
7.1 スキーマ強制(Schema Enforcement)
Delta Lake はデフォルトで書き込み時にスキーマの一致を強制する。
# スキーマが一致しない書き込みはエラーになる
df_wrong_schema.write.format("delta").mode("append").save("/path/to/table")
# → AnalysisException: A schema mismatch detected when writing to the Delta table
# スキーママージ(自動スキーマ進化)を有効にする
df.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/path/to/table")
7.2 スキーマ進化
-- カラムの追加
ALTER TABLE delta_table ADD COLUMNS (
new_col STRING COMMENT 'New column',
nested.new_field INT
);
-- カラムの削除(Delta 2.0+)
ALTER TABLE delta_table DROP COLUMN old_column;
-- カラムのリネーム(Delta 2.0+)
ALTER TABLE delta_table RENAME COLUMN old_name TO new_name;
-- カラムの型変更(Type Widening, Delta 3.1+)
ALTER TABLE delta_table ALTER COLUMN amount TYPE DOUBLE; -- INT → DOUBLE
-- カラムの順序変更
ALTER TABLE delta_table ALTER COLUMN col_name FIRST;
ALTER TABLE delta_table ALTER COLUMN col_name AFTER other_col;
-- NOT NULL制約
ALTER TABLE delta_table ALTER COLUMN user_id SET NOT NULL;
-- CHECK制約
ALTER TABLE delta_table ADD CONSTRAINT valid_amount CHECK (amount >= 0);
-- Generated Columns
ALTER TABLE delta_table ADD COLUMNS (
event_date DATE GENERATED ALWAYS AS (CAST(event_timestamp AS DATE))
);
8. DML操作
8.1 INSERT
-- 標準INSERT
INSERT INTO delta_table VALUES (1, 'Alice', 100.00, CURRENT_TIMESTAMP);
-- INSERT OVERWRITE(パーティション上書き)
INSERT OVERWRITE delta_table
PARTITION (year = 2024, month = 1)
SELECT * FROM staging_table;
-- COPY INTO(冪等性のあるファイル取り込み)
COPY INTO delta_table
FROM 's3://raw-data/events/'
FILEFORMAT = PARQUET
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
8.2 UPDATE / DELETE
-- UPDATE
UPDATE delta_table
SET status = 'inactive', updated_at = CURRENT_TIMESTAMP
WHERE last_login < '2023-01-01';
-- DELETE
DELETE FROM delta_table
WHERE created_at < '2022-01-01';
8.3 MERGE(Upsert)
-- MERGE: 存在すれば更新、なければ挿入
MERGE INTO target_table AS target
USING source_table AS source
ON target.id = source.id
WHEN MATCHED AND source.is_deleted = true THEN
DELETE
WHEN MATCHED THEN
UPDATE SET
target.name = source.name,
target.amount = source.amount,
target.updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
INSERT (id, name, amount, created_at, updated_at)
VALUES (source.id, source.name, source.amount, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);
# Python API でのMERGE
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/path/to/target")
delta_table.alias("target").merge(
source_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={
"name": "source.name",
"amount": "source.amount",
"updated_at": "current_timestamp()"
}).whenNotMatchedInsert(values={
"id": "source.id",
"name": "source.name",
"amount": "source.amount",
"created_at": "current_timestamp()",
"updated_at": "current_timestamp()"
}).whenMatchedDelete(
condition="source.is_deleted = true"
).execute()
8.4 SCD Type 2(緩やかに変化するディメンション)
# SCD Type 2 の実装
from delta.tables import DeltaTable
from pyspark.sql.functions import *
target = DeltaTable.forPath(spark, "/path/to/dim_customer")
# 変更があったレコードを特定
changes = source_df.alias("s").join(
target.toDF().alias("t"),
(col("s.customer_id") == col("t.customer_id")) & (col("t.is_current") == True),
"left"
).where(
col("t.customer_id").isNull() |
(col("s.name") != col("t.name")) |
(col("s.email") != col("t.email"))
)
# 既存レコードを非アクティブに
target.alias("t").merge(
changes.alias("s"),
"t.customer_id = s.customer_id AND t.is_current = true"
).whenMatchedUpdate(set={
"is_current": "false",
"end_date": "current_date()"
}).execute()
# 新バージョンを挿入
new_records = changes.select(
col("s.customer_id"),
col("s.name"),
col("s.email"),
current_date().alias("start_date"),
lit(None).cast("date").alias("end_date"),
lit(True).alias("is_current")
)
new_records.write.format("delta").mode("append").save("/path/to/dim_customer")
9. Change Data Feed(CDF)
9.1 CDFの有効化と使用
Change Data Feed は、テーブルへの変更(INSERT, UPDATE, DELETE)を追跡し、下流のパイプラインで消費可能にする機能である。
-- CDFの有効化
ALTER TABLE delta_table SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
-- テーブル作成時に有効化
CREATE TABLE events (
id BIGINT,
name STRING,
amount DECIMAL(10, 2)
) USING DELTA
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
# CDFの読み取り(バージョン指定)
changes = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 5) \
.option("endingVersion", 10) \
.table("delta_table")
# CDFの読み取り(タイムスタンプ指定)
changes = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2024-01-15") \
.option("endingTimestamp", "2024-01-16") \
.table("delta_table")
# CDFのカラム:
# _change_type: insert, update_preimage, update_postimage, delete
# _commit_version: コミットバージョン
# _commit_timestamp: コミットタイムスタンプ
changes.show()
# +---+-----+------+----------------+----------------+-------------------+
# | id| name|amount| _change_type |_commit_version |_commit_timestamp |
# +---+-----+------+----------------+----------------+-------------------+
# | 1|Alice|100.00|update_preimage | 5|2024-01-15 10:00:00|
# | 1|Alice|150.00|update_postimage| 5|2024-01-15 10:00:00|
# | 5| Eve| 50.00| insert| 5|2024-01-15 10:00:00|
# | 3|Carol| 75.00| delete| 6|2024-01-15 11:00:00|
# +---+-----+------+----------------+----------------+-------------------+
# ストリーミングでのCDF消費
cdf_stream = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("delta_table")
10. Liquid Clustering
10.1 概要
Liquid Clustering(Delta Lake 3.0+)は、従来のパーティショニングとZ-ORDERを置き換える新しいデータレイアウト最適化機能である。
従来:
├── パーティショニング: 静的、変更困難、カーディナリティ制約あり
├── Z-ORDER: 手動実行、全データ再書き込み
└── 問題: パーティション進化不可、小ファイル問題
Liquid Clustering:
├── 動的クラスタリングキーの変更が可能
├── インクリメンタルクラスタリング(変更部分のみ)
├── パーティショニング不要
└── ZCube(Space-filling curves)による効率的なレイアウト
-- Liquid Clusteringの有効化(テーブル作成時)
CREATE TABLE events (
event_id BIGINT,
user_id BIGINT,
event_type STRING,
event_timestamp TIMESTAMP,
amount DECIMAL(10, 2)
) USING DELTA
CLUSTER BY (user_id, event_type);
-- 既存テーブルへの適用
ALTER TABLE events CLUSTER BY (user_id, event_type);
-- クラスタリングキーの変更(データ再書き込み不要!)
ALTER TABLE events CLUSTER BY (event_timestamp, user_id);
-- クラスタリングの削除
ALTER TABLE events CLUSTER BY NONE;
-- OPTIMIZEでクラスタリングを実行
OPTIMIZE events;
-- 自動OPTIMIZE(Databricksの場合)
ALTER TABLE events SET TBLPROPERTIES (
'delta.autoOptimize.autoCompact' = 'true',
'delta.autoOptimize.optimizeWrite' = 'true'
);
10.2 Liquid Clustering vs パーティショニング vs Z-ORDER
| 特徴 | パーティショニング | Z-ORDER | Liquid Clustering |
|---|---|---|---|
| キー変更 | テーブル再作成必要 | 設定変更可能 | ALTER文で即座に変更 |
| データ再書き込み | 全データ | 全データ | 変更部分のみ |
| 小ファイル問題 | あり | なし | なし |
| カーディナリティ | 低い必要あり | 任意 | 任意 |
| 推奨 | レガシー | レガシー | 新規テーブルで推奨 |
11. Deletion Vectors
11.1 概要
Deletion Vectors(DV)は、行レベルの削除を効率化する機能である。従来のCopy-on-Write方式では、1行の削除でもファイル全体を書き直す必要があったが、DVはどの行が削除されたかをビットマップで記録する。
従来(Copy-on-Write):
┌────────────┐ DELETE ┌────────────┐
│ File A │ ──────────► │ File A' │ (ファイル全体を再書き込み)
│ 100万行 │ 1行削除 │ 999,999行 │
└────────────┘ └────────────┘
Deletion Vectors:
┌────────────┐ DELETE ┌────────────┐ ┌──────────┐
│ File A │ ──────────► │ File A │ │ DV │
│ 100万行 │ 1行削除 │ (変更なし) │ │ row 42 │
└────────────┘ └────────────┘ │ deleted │
└──────────┘
(元ファイル保持 + 削除ビットマップ追加)
-- Deletion Vectorsの有効化
ALTER TABLE delta_table SET TBLPROPERTIES (
'delta.enableDeletionVectors' = 'true'
);
-- テーブル作成時に有効化
CREATE TABLE events (
id BIGINT,
data STRING
) USING DELTA
TBLPROPERTIES ('delta.enableDeletionVectors' = 'true');
11.2 利点
- 高速なDELETE/UPDATE: ファイル書き換え不要
- 低い書き込みアンプリフィケーション: 変更が最小限
- MERGE性能の向上: 特に小規模な変更で大きな効果
12. UniForm(Universal Format)
12.1 概要
UniForm(Delta Lake 3.0+)は、Delta Lake テーブルを Apache Iceberg および Apache Hudi 形式で同時に読み取り可能にする機能である。
┌────────────────────────────────────────┐
│ Delta Lake Table │
│ │
│ ┌────────────────────────────────┐ │
│ │ Parquet Data Files │ │
│ └────────────────────────────────┘ │
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Delta │ │Iceberg│ │ Hudi │ │
│ │ Log │ │Metadata│ │Meta- │ │
│ │ │ │(自動生成)│ │data │ │
│ └──────┘ └──────┘ └──────┘ │
│ │
│ 書き込み: Delta API │
│ 読み取り: Delta / Iceberg / Hudi │
└────────────────────────────────────────┘
-- UniFormの有効化(Iceberg互換)
ALTER TABLE delta_table SET TBLPROPERTIES (
'delta.universalFormat.enabledFormats' = 'iceberg'
);
-- テーブル作成時
CREATE TABLE events (
id BIGINT,
data STRING,
ts TIMESTAMP
) USING DELTA
TBLPROPERTIES (
'delta.universalFormat.enabledFormats' = 'iceberg',
'delta.enableIcebergCompatV2' = 'true'
);
-- Icebergとして読み取り(Trino等から)
-- catalog: iceberg_catalog
-- SELECT * FROM iceberg_catalog.db.events;
12.2 UniForm の制約
- 書き込みは Delta API からのみ
- 一部の Delta 機能は Iceberg/Hudi メタデータに反映されない場合がある
- Iceberg REST Catalog との連携が推奨
13. テーブルメンテナンス
13.1 OPTIMIZE(コンパクション)
-- テーブル全体の最適化
OPTIMIZE delta_table;
-- 条件付きOPTIMIZE
OPTIMIZE delta_table WHERE year = 2024 AND month = 1;
-- Z-ORDER(Liquid Clusteringが利用できない場合)
OPTIMIZE delta_table ZORDER BY (user_id, event_type);
-- 小ファイルの自動コンパクション
ALTER TABLE delta_table SET TBLPROPERTIES (
'delta.autoOptimize.autoCompact' = 'true',
'delta.autoOptimize.optimizeWrite' = 'true'
);
13.2 VACUUM(不要ファイルの削除)
-- 7日以上前のファイルを削除(デフォルト)
VACUUM delta_table;
-- 保持期間を指定
VACUUM delta_table RETAIN 168 HOURS; -- 7日
-- ドライラン(削除されるファイルの確認)
VACUUM delta_table DRY RUN;
# Python API
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/path/to/table")
delta_table.vacuum(168) # 168時間 = 7日
# 短い保持期間の設定(注意: タイムトラベルに影響)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_table.vacuum(0) # 即座に削除(危険!)
13.3 メンテナンスの自動化
# Airflow DAG でのメンテナンス自動化
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
with DAG('delta_table_maintenance', schedule_interval='0 3 * * *') as dag:
optimize = SparkSubmitOperator(
task_id='optimize_table',
application='maintenance.py',
application_args=['--action', 'optimize', '--table', 'events'],
)
vacuum = SparkSubmitOperator(
task_id='vacuum_table',
application='maintenance.py',
application_args=['--action', 'vacuum', '--table', 'events', '--hours', '168'],
)
optimize >> vacuum
14. エンジン連携
14.1 Apache Spark(ネイティブ)
# pip install delta-spark
from delta import configure_spark_with_delta_pip
builder = SparkSession.builder \
.appName("DeltaApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Delta テーブルの読み書き
df.write.format("delta").save("/path/to/table")
df = spark.read.format("delta").load("/path/to/table")
14.2 Apache Flink
-- Flink SQL でのDelta Lake連携
CREATE CATALOG delta_catalog WITH (
'type' = 'delta-catalog',
'catalog-type' = 'in-memory'
);
CREATE TABLE delta_catalog.db.events (
id BIGINT,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'delta',
'table-path' = 's3://bucket/delta/events'
);
-- ストリーミング読み取り
SELECT * FROM delta_catalog.db.events /*+ OPTIONS('mode' = 'streaming') */;
14.3 Trino / Presto
# etc/catalog/delta.properties (Trino)
connector.name=delta_lake
hive.metastore.uri=thrift://hive-metastore:9083
delta.enable-non-concurrent-writes=true
-- Trinoからの読み取り
SELECT * FROM delta.schema_name.events WHERE year = 2024;
-- Trinoからの書き込み(制限あり)
INSERT INTO delta.schema_name.events SELECT * FROM staging;
14.4 Delta Kernel(Delta 4.0+)
Delta Kernel は、任意のエンジンから Delta Lake テーブルを読み書きするための低レベルライブラリである。
// Delta Kernel API
import io.delta.kernel.*;
import io.delta.kernel.defaults.*;
Engine engine = DefaultEngine.create(hadoopConf);
Table table = Table.forPath(engine, "/path/to/delta-table");
Snapshot snapshot = table.getLatestSnapshot(engine);
// スキーマの取得
StructType schema = snapshot.getSchema(engine);
// データの読み取り
ScanBuilder scanBuilder = snapshot.getScanBuilder(engine);
Scan scan = scanBuilder.build();
CloseableIterator<FilteredColumnarBatch> data = scan.getScanFiles(engine);
15. Delta Sharing
Delta Sharing は、組織間でデータを安全に共有するためのオープンプロトコルである。
# Delta Sharing サーバー設定
# sharing-server-config.yaml
shares:
- name: "marketing_share"
schemas:
- name: "analytics"
tables:
- name: "customer_events"
location: "s3://data-lake/delta/customer_events"
# Delta Sharing クライアント(Python)
import delta_sharing
# プロファイルを使用してテーブルにアクセス
profile = "config.share"
client = delta_sharing.SharingClient(profile)
# 利用可能なテーブルの一覧
tables = client.list_all_tables()
# Pandas DataFrameとして読み取り
df = delta_sharing.load_as_pandas(f"{profile}#marketing_share.analytics.customer_events")
# Spark DataFrameとして読み取り
df = delta_sharing.load_as_spark(f"{profile}#marketing_share.analytics.customer_events")
16. 設定とチューニング
16.1 テーブルプロパティ
-- 主要なテーブルプロパティ
ALTER TABLE delta_table SET TBLPROPERTIES (
-- ログ保持
'delta.logRetentionDuration' = 'interval 30 days',
'delta.deletedFileRetentionDuration' = 'interval 7 days',
-- チェックポイント
'delta.checkpointInterval' = '10',
-- 自動最適化
'delta.autoOptimize.autoCompact' = 'true',
'delta.autoOptimize.optimizeWrite' = 'true',
-- ターゲットファイルサイズ
'delta.targetFileSize' = '134217728', -- 128MB
'delta.tuneFileSizesForRewrites' = 'true',
-- データスキッピング
'delta.dataSkippingNumIndexedCols' = '32',
'delta.dataSkippingStatsColumns' = 'id,user_id,event_timestamp',
-- Deletion Vectors
'delta.enableDeletionVectors' = 'true',
-- Change Data Feed
'delta.enableChangeDataFeed' = 'true',
-- Row Tracking
'delta.enableRowTracking' = 'true'
);
16.2 Spark 設定
# Delta Lake Spark設定
spark.conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# 書き込み最適化
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
# マージ最適化
spark.conf.set("spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled", "true")
spark.conf.set("spark.databricks.delta.merge.repartitionBeforeWrite.enabled", "true")
# ファイルサイズ
spark.conf.set("spark.databricks.delta.properties.defaults.targetFileSize", "134217728")
# 統計情報
spark.conf.set("spark.databricks.delta.properties.defaults.dataSkippingNumIndexedCols", "32")
# 並列度
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.databricks.delta.snapshotPartitions", "8")
17. パフォーマンス最適化
17.1 データスキッピング
Delta Lake はファイルレベルの統計情報(min/max/nullCount)を活用して、クエリに無関係なファイルの読み取りをスキップする。
-- 統計収集カラムの指定
ALTER TABLE delta_table SET TBLPROPERTIES (
'delta.dataSkippingStatsColumns' = 'id,user_id,event_timestamp,country'
);
-- 効果的なクエリ(データスキッピングが適用される)
SELECT * FROM events WHERE user_id = 12345; -- min/maxでフィルタ
SELECT * FROM events WHERE event_timestamp >= '2024-01-01'; -- 範囲フィルタ
-- 効果が低いクエリ
SELECT * FROM events WHERE description LIKE '%error%'; -- 文字列パターンは統計に含まれない
17.2 最適化テクニック
-- 1. Liquid Clustering(推奨)
CREATE TABLE events (...) USING DELTA CLUSTER BY (user_id, event_date);
OPTIMIZE events;
-- 2. Z-ORDER(Liquid Clustering未使用時)
OPTIMIZE events ZORDER BY (user_id, event_type);
-- 3. パーティションプルーニング
SELECT * FROM events WHERE year = 2024 AND month = 1; -- パーティションカラム
-- 4. ファイルサイズの最適化
ALTER TABLE events SET TBLPROPERTIES ('delta.targetFileSize' = '134217728');
-- 5. キャッシュ
CACHE SELECT * FROM events WHERE event_date = CURRENT_DATE;
-- 6. Photon エンジン(Databricks)
-- 自動的にベクトル化実行で高速化
17.3 書き込みパフォーマンス
# Optimized Write: 自動的に最適なファイルサイズに調整
df.write.format("delta") \
.option("optimizeWrite", "true") \
.mode("append") \
.save("/path/to/table")
# Auto Compact: 小ファイルの自動コンパクション
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.minNumFiles", "50")
# パーティションの上書き(動的)
df.write.format("delta") \
.mode("overwrite") \
.option("partitionOverwriteMode", "dynamic") \
.partitionBy("date") \
.save("/path/to/table")
18. セキュリティ
18.1 アクセス制御
-- テーブルレベルの権限(Unity Catalog / Databricks)
GRANT SELECT ON TABLE delta_table TO `analysts`;
GRANT MODIFY ON TABLE delta_table TO `etl_user`;
GRANT ALL PRIVILEGES ON TABLE delta_table TO `admin`;
-- カラムマスキング
CREATE FUNCTION mask_email(email STRING) RETURNS STRING
RETURN CONCAT(LEFT(email, 2), '***@***');
ALTER TABLE users ALTER COLUMN email SET MASK mask_email;
-- 行フィルター
CREATE FUNCTION region_filter(region STRING) RETURNS BOOLEAN
RETURN region = CURRENT_USER_REGION();
ALTER TABLE sales SET ROW FILTER region_filter ON (region);
18.2 暗号化
# サーバーサイド暗号化(S3)
spark.conf.set("spark.hadoop.fs.s3a.server-side-encryption-algorithm", "SSE-KMS")
spark.conf.set("spark.hadoop.fs.s3a.server-side-encryption.key", "arn:aws:kms:...")
# クライアントサイド暗号化
spark.conf.set("spark.hadoop.fs.s3a.encryption.algorithm", "CSE-KMS")
spark.conf.set("spark.hadoop.fs.s3a.encryption.key", "arn:aws:kms:...")
19. 監視と運用
19.1 テーブル情報の確認
-- テーブル詳細
DESCRIBE DETAIL delta_table;
-- numFiles, sizeInBytes, numPartitions, etc.
-- テーブル履歴
DESCRIBE HISTORY delta_table;
-- テーブルプロパティ
SHOW TBLPROPERTIES delta_table;
-- カラム統計
DESCRIBE EXTENDED delta_table column_name;
19.2 監視メトリクス
# Delta テーブルの健全性チェック
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/path/to/table")
# テーブル詳細
detail = dt.detail().collect()[0]
print(f"ファイル数: {detail['numFiles']}")
print(f"サイズ: {detail['sizeInBytes'] / (1024**3):.2f} GB")
print(f"パーティション数: {detail['numPartitions']}")
# 履歴
history = dt.history(10).select("version", "timestamp", "operation", "operationMetrics")
history.show(truncate=False)
# 小ファイル検出
file_stats = spark.read.format("delta").load("/path/to/table") \
.inputFiles()
small_files = [f for f in file_stats if get_file_size(f) < 10 * 1024 * 1024] # 10MB未満
print(f"小ファイル数: {len(small_files)} / 全ファイル: {len(file_stats)}")
20. 他フォーマットとの比較
20.1 Delta Lake vs Apache Iceberg vs Apache Hudi
| 特徴 | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| 開発元 | Databricks | Netflix → ASF | Uber → ASF |
| ガバナンス | Linux Foundation | ASF | ASF |
| メタデータ | トランザクションログ(JSON) | マニフェスト階層 | タイムライン |
| ACID | ✓ | ✓ | ✓ |
| タイムトラベル | ✓ | ✓ | ✓ |
| スキーマ進化 | ✓ | ✓(より柔軟) | ✓ |
| パーティション進化 | Liquid Clustering | ✓(ネイティブ) | 限定的 |
| 行レベル操作 | Deletion Vectors | CoW / MoR | CoW / MoR |
| ストリーミング | Spark Structured Streaming | Flink / Spark | Flink / Spark |
| CDC | Change Data Feed | Incremental Read | Incremental Query |
| マルチエンジン | △(Spark優位) | ◎(最も広い) | ○ |
| 相互運用性 | UniForm | XTable | XTable |
| Databricks統合 | ◎(ネイティブ) | ○ | △ |
20.2 選択の指針
- Delta Lake: Databricks/Spark中心のエコシステム、Liquid Clustering活用、UniFormで互換性確保
- Iceberg: マルチエンジン要件、パーティション進化、オープンスタンダード重視
- Hudi: 高頻度Upsert、CDC、インクリメンタル処理
21. ユースケースとベストプラクティス
21.1 代表的なユースケース
メダリオンアーキテクチャ
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Bronze │────►│ Silver │────►│ Gold │
│ (Raw) │ │(Cleansed)│ │(Curated) │
│ │ │ │ │ │
│ APPEND │ │ MERGE │ │ AGGREGATE│
│ only │ │ dedupe │ │ KPIs │
│ │ │ validate │ │ reports │
└──────────┘ └──────────┘ └──────────┘
Delta Table Delta Table Delta Table
# Bronze: 生データの取り込み
raw_df = spark.readStream \
.format("kafka") \
.option("subscribe", "events") \
.load()
raw_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/bronze") \
.toTable("bronze.events")
# Silver: クレンジングと重複排除
bronze_df = spark.readStream.table("bronze.events")
silver_df = bronze_df \
.dropDuplicates(["event_id"]) \
.filter(col("event_type").isNotNull()) \
.withColumn("processed_at", current_timestamp())
silver_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/silver") \
.toTable("silver.events")
# Gold: 集約
gold_query = """
MERGE INTO gold.daily_metrics AS target
USING (
SELECT
date_trunc('day', event_timestamp) as event_date,
event_type,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM silver.events
WHERE event_timestamp >= current_date() - INTERVAL 1 DAY
GROUP BY 1, 2
) AS source
ON target.event_date = source.event_date AND target.event_type = source.event_type
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(gold_query)
21.2 ベストプラクティス
- Liquid Clustering を使用する: 新規テーブルではパーティショニングよりLiquid Clusteringを推奨
- Deletion Vectors を有効にする: UPDATE/DELETE/MERGE性能の大幅向上
- Change Data Feed を活用する: 下流パイプラインのインクリメンタル処理
- 定期的にOPTIMIZEを実行する: 小ファイル問題の防止
- VACUUMを定期実行する: 不要ファイルの削除でストレージコスト削減
- 適切な保持期間を設定する: タイムトラベルとストレージコストのバランス
- UniFormを検討する: マルチエンジン環境での互換性確保
- ANALYZE TABLEで統計を更新する: CBO性能の向上
- テーブルプロパティを適切に設定する: ターゲットファイルサイズ、統計カラム数
- メダリオンアーキテクチャを採用する: Bronze → Silver → Gold のレイヤード設計
22. 最新動向と将来展望
22.1 最近の主要機能
| 機能 | バージョン | 説明 |
|---|---|---|
| Liquid Clustering | 3.0 | 動的データレイアウト最適化 |
| UniForm | 3.0 | Iceberg/Hudi互換メタデータ |
| Deletion Vectors | 2.3+ | 効率的な行レベル削除 |
| Row Tracking | 3.0 | 行の追跡とリネージ |
| Type Widening | 3.1 | 安全な型変更 |
| Variant型 | 3.1 | 半構造化データ対応 |
| Delta Kernel | 4.0 | エンジン非依存の読み書きライブラリ |
| Coordinated Commits | 3.x+ | マルチクラスター書き込み |
| In-Commit Timestamps | 4.0 | 正確なコミットタイムスタンプ |
22.2 将来の方向性
- Delta Kernel の成熟: あらゆるエンジンからのネイティブアクセス
- UniForm の拡張: より深い Iceberg/Hudi 互換性
- サーバーレス最適化: クラウドネイティブな自動チューニング
- AI/ML統合: Feature Store、モデルバージョニングとの連携強化
- リアルタイム処理の強化: Structured Streamingとの更なる統合
23. まとめ
Delta Lake はデータレイクに信頼性とパフォーマンスをもたらすストレージレイヤーであり、レイクハウスアーキテクチャの中核技術である。
技術的要点
| カテゴリ | 要点 |
|---|---|
| アーキテクチャ | Parquet + トランザクションログ(_delta_log) |
| トランザクション | 楽観的同時実行制御、ACID保証 |
| スキーマ | 強制(Enforcement)+ 進化(Evolution) |
| データレイアウト | Liquid Clustering(推奨)/ Z-ORDER / パーティション |
| 行レベル操作 | Deletion Vectors による高効率化 |
| 変更追跡 | Change Data Feed |
| 互換性 | UniForm(Iceberg/Hudi読み取り互換) |
| メンテナンス | OPTIMIZE + VACUUM |
| エンジン | Spark(ネイティブ), Flink, Trino, Delta Kernel |
選定の指針
Delta Lake が適しているケース:
- Databricks / Spark 中心のエコシステム: ネイティブ統合の利点を最大活用
- メダリオンアーキテクチャ: Bronze/Silver/Gold のレイヤード設計
- 高頻度MERGE/Upsert: Deletion Vectorsによる高性能
- レイクハウス構築: ACID + タイムトラベル + スキーマ管理
他のツールを検討すべきケース:
- マルチエンジン重視: Apache Iceberg(より広いエンジン対応)
- CDC / インクリメンタル処理重視: Apache Hudi
- Databricks非使用環境: Iceberg の方がエコシステムが広い
参考文献: