Delta Lake

Delta Lake 徹底解説:アーキテクチャ・機能・設定の全容

目次

  1. はじめに
  2. Delta Lake の概要と歴史
  3. アーキテクチャ全体像
  4. トランザクションログの仕組み
  5. ACIDトランザクション
  6. タイムトラベル
  7. スキーマ管理
  8. DML操作
  9. Change Data Feed(CDF)
  10. Liquid Clustering
  11. Deletion Vectors
  12. UniForm(Universal Format)
  13. テーブルメンテナンス
  14. エンジン連携
  15. Delta Sharing
  16. 設定とチューニング
  17. パフォーマンス最適化
  18. セキュリティ
  19. 監視と運用
  20. 他フォーマットとの比較
  21. ユースケースとベストプラクティス
  22. 最新動向と将来展望
  23. まとめ

1. はじめに

Delta Lake は、データレイク上に信頼性、品質、パフォーマンスをもたらすオープンソースのストレージレイヤーである。Apache Parquet ファイルの上にトランザクションログを追加することで、ACID トランザクション、スキーマ強制、タイムトラベル等の機能を実現する。

Databricks 社が開発し、2019年にオープンソース化された Delta Lake は、レイクハウス(Lakehouse)アーキテクチャの中核技術として、データレイクの柔軟性とデータウェアハウスの信頼性を統合する。

1.1 レイクハウスアーキテクチャ

従来のアーキテクチャ:
┌──────────┐     ETL      ┌──────────────┐
│ Data Lake │ ──────────► │ Data Warehouse│
│ (安価だが │             │ (信頼性高いが │
│  品質低い)│             │  高コスト)    │
└──────────┘             └──────────────┘

レイクハウスアーキテクチャ:
┌────────────────────────────────────────┐
│          Lakehouse (Delta Lake)         │
│                                        │
│  ┌──────────────────────────────────┐  │
│  │   ACID Transactions + Schema     │  │
│  │   Enforcement + Time Travel      │  │
│  ├──────────────────────────────────┤  │
│  │        Delta Lake Format          │  │
│  │   (Parquet + Transaction Log)     │  │
│  ├──────────────────────────────────┤  │
│  │      Cloud Object Storage         │  │
│  │     (S3 / GCS / ADLS / HDFS)     │  │
│  └──────────────────────────────────┘  │
│                                        │
│  データレイクのコスト + DWHの信頼性      │
└────────────────────────────────────────┘

1.2 対象読者

  • データレイク・レイクハウスの構築を担当するデータエンジニア
  • ETL/ELTパイプラインを設計するアーキテクト
  • Delta Lake テーブルを運用する SRE / プラットフォームエンジニア
  • テーブルフォーマットの技術選定を行う意思決定者

2. Delta Lake の概要と歴史

2.1 誕生の背景

Delta Lake は Databricks 社のエンジニアが、データレイクの信頼性問題(データ破損、不整合、スキーマ不一致等)を解決するために開発した。

2.2 バージョンの変遷

バージョンリリース年主な特徴
0.1.02019初のOSS版、ACID、スキーマ強制
0.3.02019MERGE操作、Python API
0.5.02019パーティションカラムの上書き
0.7.02020DESCRIBE HISTORY、Change Data Feed
0.8.02020制約(CHECK/NOT NULL)、Generated Columns
1.0.02021初のメジャーリリース
1.2.02022Z-ORDER最適化の改善
2.0.02022DROP COLUMN、RENAME COLUMN
2.3.02023Deletion Vectors、Change Data Feed改善
2.4.02023Coordinated Commits(プレビュー)
3.0.02024Liquid Clustering GA、UniForm、Row Tracking
3.1.02024Type Widening、Variant型サポート
3.2.02024Collation サポート、Clustering改善
4.0.02025Delta Kernel GA、In-Commit Timestamps

2.3 ガバナンスとコミュニティ

  • 開発主体: Databricks 社が主導、Linux Foundation Delta Lake プロジェクト
  • ライセンス: Apache License 2.0
  • プロトコル仕様: Delta Lake Protocol(オープン仕様)
  • 主要コントリビューター: Databricks, Apple, Microsoft, Alibaba 等

3. アーキテクチャ全体像

3.1 Delta テーブルの構造

delta-table/
├── _delta_log/                    # トランザクションログ
│   ├── 00000000000000000000.json  # バージョン0のコミット
│   ├── 00000000000000000001.json  # バージョン1のコミット
│   ├── 00000000000000000002.json  # バージョン2のコミット
│   ├── ...
│   ├── 00000000000000000010.json  # バージョン10のコミット
│   ├── 00000000000000000010.checkpoint.parquet  # チェックポイント
│   ├── ...
│   ├── 00000000000000000020.json
│   ├── 00000000000000000020.checkpoint.parquet
│   └── _last_checkpoint          # 最新チェックポイントへのポインタ
├── year=2024/                     # パーティションディレクトリ
│   ├── month=01/
│   │   ├── part-00000-xxx.parquet # データファイル
│   │   ├── part-00001-xxx.parquet
│   │   └── part-00002-xxx.parquet
│   └── month=02/
│       ├── part-00000-xxx.parquet
│       └── part-00001-xxx.parquet
└── year=2023/
    └── ...

3.2 読み取りフロー

クエリ実行
    │
    ▼
_last_checkpoint を読む
    │
    ▼
最新チェックポイント(Parquet)を読む
    │  (テーブルの現在のスナップショット情報)
    ▼
チェックポイント以降のJSONログを読む
    │  (増分変更を適用)
    ▼
有効なファイルリストを構築
    │  (Add アクション - Remove アクション)
    ▼
データスキッピング(統計情報)
    │  (min/max値でファイルをフィルタ)
    ▼
対象Parquetファイルのみ読み取り

4. トランザクションログの仕組み

4.1 アクションの種類

各JSONコミットファイルは、1つ以上のアクションで構成される:

アクション説明
add新しいデータファイルの追加
removeデータファイルの論理削除
metadataテーブルメタデータの変更(スキーマ等)
protocolテーブルプロトコルバージョンの変更
commitInfoコミットの付加情報(誰が、いつ、何を)
txnアプリケーション固有のトランザクションID
domainMetadataドメイン固有のメタデータ(Delta 3.0+)

4.2 トランザクションログの例

// 00000000000000000001.json
{"commitInfo":{"timestamp":1705300800000,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"year\",\"month\"]"},"readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true}}
{"add":{"path":"year=2024/month=01/part-00000-abc.snappy.parquet","partitionValues":{"year":"2024","month":"01"},"size":54321,"modificationTime":1705300800000,"dataChange":true,"stats":"{\"numRecords\":10000,\"minValues\":{\"id\":1,\"amount\":0.01},\"maxValues\":{\"id\":10000,\"amount\":9999.99},\"nullCount\":{\"id\":0,\"amount\":5}}"}}
{"add":{"path":"year=2024/month=01/part-00001-def.snappy.parquet","partitionValues":{"year":"2024","month":"01"},"size":54000,"modificationTime":1705300800000,"dataChange":true,"stats":"{\"numRecords\":10000,\"minValues\":{\"id\":10001,\"amount\":0.05},\"maxValues\":{\"id\":20000,\"amount\":9999.50},\"nullCount\":{\"id\":0,\"amount\":3}}"}}

4.3 チェックポイント

10コミットごと(デフォルト)にチェックポイントが作成される。チェックポイントはParquet形式で、テーブルの現在の状態(全有効アクション)をまとめたものである。

# チェックポイント間隔の設定
spark.conf.set("spark.databricks.delta.properties.defaults.checkpointInterval", "10")

# テーブルレベルで設定
ALTER TABLE delta_table SET TBLPROPERTIES ('delta.checkpointInterval' = '20');

4.4 ログコンパクション(Delta 3.0+)

-- ログコンパクション: 古いJSONログをまとめる
ALTER TABLE delta_table SET TBLPROPERTIES (
    'delta.logRetentionDuration' = 'interval 30 days',
    'delta.checkpointInterval' = '10'
);

5. ACIDトランザクション

5.1 楽観的同時実行制御

Delta Lake は楽観的同時実行制御(Optimistic Concurrency Control)を使用する。

Writer A:  read v0 ──► compute ──► try commit v1 ──► SUCCESS (v1)
Writer B:  read v0 ──► compute ──────────────────► try commit v1
                                                      │
                                                      ▼
                                                   CONFLICT!
                                                      │
                                                      ▼
                                            re-read v1 ──► re-compute
                                                              │
                                                              ▼
                                                   try commit v2 ──► SUCCESS (v2)

5.2 分離レベル

分離レベル説明デフォルト
WriteSerializable書き込みは直列化可能✓(デフォルト)
Serializable完全な直列化可能分離明示的に設定
-- テーブルの分離レベルを変更
ALTER TABLE delta_table SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable');

5.3 コンフリクト解決

from delta.tables import DeltaTable

# 自動リトライの設定
spark.conf.set("spark.databricks.delta.retryWriteConflict.enabled", "true")
spark.conf.set("spark.databricks.delta.retryWriteConflict.limit", "3")

6. タイムトラベル

6.1 バージョンベースのタイムトラベル

# Python API
from delta.tables import DeltaTable

# バージョンを指定して読み取り
df_v5 = spark.read.format("delta").option("versionAsOf", 5).load("/path/to/delta-table")

# タイムスタンプを指定して読み取り
df_ts = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-15T00:00:00.000Z") \
    .load("/path/to/delta-table")
-- SQLでのタイムトラベル
SELECT * FROM delta_table VERSION AS OF 5;
SELECT * FROM delta_table TIMESTAMP AS OF '2024-01-15';

-- 履歴の確認
DESCRIBE HISTORY delta_table;
DESCRIBE HISTORY delta_table LIMIT 10;

-- バージョン間の差分
SELECT * FROM delta_table VERSION AS OF 10
EXCEPT
SELECT * FROM delta_table VERSION AS OF 5;

6.2 テーブルの復元

-- 特定バージョンに復元
RESTORE TABLE delta_table TO VERSION AS OF 5;

-- 特定タイムスタンプに復元
RESTORE TABLE delta_table TO TIMESTAMP AS OF '2024-01-15';

7. スキーマ管理

7.1 スキーマ強制(Schema Enforcement)

Delta Lake はデフォルトで書き込み時にスキーマの一致を強制する。

# スキーマが一致しない書き込みはエラーになる
df_wrong_schema.write.format("delta").mode("append").save("/path/to/table")
# → AnalysisException: A schema mismatch detected when writing to the Delta table

# スキーママージ(自動スキーマ進化)を有効にする
df.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/path/to/table")

7.2 スキーマ進化

-- カラムの追加
ALTER TABLE delta_table ADD COLUMNS (
    new_col STRING COMMENT 'New column',
    nested.new_field INT
);

-- カラムの削除(Delta 2.0+)
ALTER TABLE delta_table DROP COLUMN old_column;

-- カラムのリネーム(Delta 2.0+)
ALTER TABLE delta_table RENAME COLUMN old_name TO new_name;

-- カラムの型変更(Type Widening, Delta 3.1+)
ALTER TABLE delta_table ALTER COLUMN amount TYPE DOUBLE;  -- INT → DOUBLE

-- カラムの順序変更
ALTER TABLE delta_table ALTER COLUMN col_name FIRST;
ALTER TABLE delta_table ALTER COLUMN col_name AFTER other_col;

-- NOT NULL制約
ALTER TABLE delta_table ALTER COLUMN user_id SET NOT NULL;

-- CHECK制約
ALTER TABLE delta_table ADD CONSTRAINT valid_amount CHECK (amount >= 0);

-- Generated Columns
ALTER TABLE delta_table ADD COLUMNS (
    event_date DATE GENERATED ALWAYS AS (CAST(event_timestamp AS DATE))
);

8. DML操作

8.1 INSERT

-- 標準INSERT
INSERT INTO delta_table VALUES (1, 'Alice', 100.00, CURRENT_TIMESTAMP);

-- INSERT OVERWRITE(パーティション上書き)
INSERT OVERWRITE delta_table
PARTITION (year = 2024, month = 1)
SELECT * FROM staging_table;

-- COPY INTO(冪等性のあるファイル取り込み)
COPY INTO delta_table
FROM 's3://raw-data/events/'
FILEFORMAT = PARQUET
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

8.2 UPDATE / DELETE

-- UPDATE
UPDATE delta_table
SET status = 'inactive', updated_at = CURRENT_TIMESTAMP
WHERE last_login < '2023-01-01';

-- DELETE
DELETE FROM delta_table
WHERE created_at < '2022-01-01';

8.3 MERGE(Upsert)

-- MERGE: 存在すれば更新、なければ挿入
MERGE INTO target_table AS target
USING source_table AS source
ON target.id = source.id
WHEN MATCHED AND source.is_deleted = true THEN
    DELETE
WHEN MATCHED THEN
    UPDATE SET
        target.name = source.name,
        target.amount = source.amount,
        target.updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
    INSERT (id, name, amount, created_at, updated_at)
    VALUES (source.id, source.name, source.amount, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);
# Python API でのMERGE
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/path/to/target")

delta_table.alias("target").merge(
    source_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdate(set={
    "name": "source.name",
    "amount": "source.amount",
    "updated_at": "current_timestamp()"
}).whenNotMatchedInsert(values={
    "id": "source.id",
    "name": "source.name",
    "amount": "source.amount",
    "created_at": "current_timestamp()",
    "updated_at": "current_timestamp()"
}).whenMatchedDelete(
    condition="source.is_deleted = true"
).execute()

8.4 SCD Type 2(緩やかに変化するディメンション)

# SCD Type 2 の実装
from delta.tables import DeltaTable
from pyspark.sql.functions import *

target = DeltaTable.forPath(spark, "/path/to/dim_customer")

# 変更があったレコードを特定
changes = source_df.alias("s").join(
    target.toDF().alias("t"),
    (col("s.customer_id") == col("t.customer_id")) & (col("t.is_current") == True),
    "left"
).where(
    col("t.customer_id").isNull() |
    (col("s.name") != col("t.name")) |
    (col("s.email") != col("t.email"))
)

# 既存レコードを非アクティブに
target.alias("t").merge(
    changes.alias("s"),
    "t.customer_id = s.customer_id AND t.is_current = true"
).whenMatchedUpdate(set={
    "is_current": "false",
    "end_date": "current_date()"
}).execute()

# 新バージョンを挿入
new_records = changes.select(
    col("s.customer_id"),
    col("s.name"),
    col("s.email"),
    current_date().alias("start_date"),
    lit(None).cast("date").alias("end_date"),
    lit(True).alias("is_current")
)

new_records.write.format("delta").mode("append").save("/path/to/dim_customer")

9. Change Data Feed(CDF)

9.1 CDFの有効化と使用

Change Data Feed は、テーブルへの変更(INSERT, UPDATE, DELETE)を追跡し、下流のパイプラインで消費可能にする機能である。

-- CDFの有効化
ALTER TABLE delta_table SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

-- テーブル作成時に有効化
CREATE TABLE events (
    id BIGINT,
    name STRING,
    amount DECIMAL(10, 2)
) USING DELTA
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
# CDFの読み取り(バージョン指定)
changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 5) \
    .option("endingVersion", 10) \
    .table("delta_table")

# CDFの読み取り(タイムスタンプ指定)
changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingTimestamp", "2024-01-15") \
    .option("endingTimestamp", "2024-01-16") \
    .table("delta_table")

# CDFのカラム:
# _change_type: insert, update_preimage, update_postimage, delete
# _commit_version: コミットバージョン
# _commit_timestamp: コミットタイムスタンプ

changes.show()
# +---+-----+------+----------------+----------------+-------------------+
# | id| name|amount|   _change_type |_commit_version |_commit_timestamp  |
# +---+-----+------+----------------+----------------+-------------------+
# |  1|Alice|100.00|update_preimage |               5|2024-01-15 10:00:00|
# |  1|Alice|150.00|update_postimage|               5|2024-01-15 10:00:00|
# |  5|  Eve| 50.00|          insert|               5|2024-01-15 10:00:00|
# |  3|Carol| 75.00|          delete|               6|2024-01-15 11:00:00|
# +---+-----+------+----------------+----------------+-------------------+

# ストリーミングでのCDF消費
cdf_stream = spark.readStream.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .table("delta_table")

10. Liquid Clustering

10.1 概要

Liquid Clustering(Delta Lake 3.0+)は、従来のパーティショニングとZ-ORDERを置き換える新しいデータレイアウト最適化機能である。

従来:
├── パーティショニング: 静的、変更困難、カーディナリティ制約あり
├── Z-ORDER: 手動実行、全データ再書き込み
└── 問題: パーティション進化不可、小ファイル問題

Liquid Clustering:
├── 動的クラスタリングキーの変更が可能
├── インクリメンタルクラスタリング(変更部分のみ)
├── パーティショニング不要
└── ZCube(Space-filling curves)による効率的なレイアウト
-- Liquid Clusteringの有効化(テーブル作成時)
CREATE TABLE events (
    event_id BIGINT,
    user_id BIGINT,
    event_type STRING,
    event_timestamp TIMESTAMP,
    amount DECIMAL(10, 2)
) USING DELTA
CLUSTER BY (user_id, event_type);

-- 既存テーブルへの適用
ALTER TABLE events CLUSTER BY (user_id, event_type);

-- クラスタリングキーの変更(データ再書き込み不要!)
ALTER TABLE events CLUSTER BY (event_timestamp, user_id);

-- クラスタリングの削除
ALTER TABLE events CLUSTER BY NONE;

-- OPTIMIZEでクラスタリングを実行
OPTIMIZE events;

-- 自動OPTIMIZE(Databricksの場合)
ALTER TABLE events SET TBLPROPERTIES (
    'delta.autoOptimize.autoCompact' = 'true',
    'delta.autoOptimize.optimizeWrite' = 'true'
);

10.2 Liquid Clustering vs パーティショニング vs Z-ORDER

特徴パーティショニングZ-ORDERLiquid Clustering
キー変更テーブル再作成必要設定変更可能ALTER文で即座に変更
データ再書き込み全データ全データ変更部分のみ
小ファイル問題ありなしなし
カーディナリティ低い必要あり任意任意
推奨レガシーレガシー新規テーブルで推奨

11. Deletion Vectors

11.1 概要

Deletion Vectors(DV)は、行レベルの削除を効率化する機能である。従来のCopy-on-Write方式では、1行の削除でもファイル全体を書き直す必要があったが、DVはどの行が削除されたかをビットマップで記録する。

従来(Copy-on-Write):
┌────────────┐     DELETE     ┌────────────┐
│ File A     │  ──────────►  │ File A'    │  (ファイル全体を再書き込み)
│ 100万行    │   1行削除      │ 999,999行  │
└────────────┘               └────────────┘

Deletion Vectors:
┌────────────┐     DELETE     ┌────────────┐  ┌──────────┐
│ File A     │  ──────────►  │ File A     │  │   DV     │
│ 100万行    │   1行削除      │ (変更なし)  │  │ row 42   │
└────────────┘               └────────────┘  │ deleted  │
                                             └──────────┘
                             (元ファイル保持 + 削除ビットマップ追加)
-- Deletion Vectorsの有効化
ALTER TABLE delta_table SET TBLPROPERTIES (
    'delta.enableDeletionVectors' = 'true'
);

-- テーブル作成時に有効化
CREATE TABLE events (
    id BIGINT,
    data STRING
) USING DELTA
TBLPROPERTIES ('delta.enableDeletionVectors' = 'true');

11.2 利点

  • 高速なDELETE/UPDATE: ファイル書き換え不要
  • 低い書き込みアンプリフィケーション: 変更が最小限
  • MERGE性能の向上: 特に小規模な変更で大きな効果

12. UniForm(Universal Format)

12.1 概要

UniForm(Delta Lake 3.0+)は、Delta Lake テーブルを Apache Iceberg および Apache Hudi 形式で同時に読み取り可能にする機能である。

┌────────────────────────────────────────┐
│           Delta Lake Table              │
│                                        │
│  ┌────────────────────────────────┐    │
│  │      Parquet Data Files        │    │
│  └────────────────────────────────┘    │
│                                        │
│  ┌──────┐  ┌──────┐  ┌──────┐        │
│  │Delta │  │Iceberg│  │ Hudi │        │
│  │ Log  │  │Metadata│ │Meta- │        │
│  │      │  │(自動生成)│ │data  │        │
│  └──────┘  └──────┘  └──────┘        │
│                                        │
│  書き込み: Delta API                     │
│  読み取り: Delta / Iceberg / Hudi       │
└────────────────────────────────────────┘
-- UniFormの有効化(Iceberg互換)
ALTER TABLE delta_table SET TBLPROPERTIES (
    'delta.universalFormat.enabledFormats' = 'iceberg'
);

-- テーブル作成時
CREATE TABLE events (
    id BIGINT,
    data STRING,
    ts TIMESTAMP
) USING DELTA
TBLPROPERTIES (
    'delta.universalFormat.enabledFormats' = 'iceberg',
    'delta.enableIcebergCompatV2' = 'true'
);

-- Icebergとして読み取り(Trino等から)
-- catalog: iceberg_catalog
-- SELECT * FROM iceberg_catalog.db.events;

12.2 UniForm の制約

  • 書き込みは Delta API からのみ
  • 一部の Delta 機能は Iceberg/Hudi メタデータに反映されない場合がある
  • Iceberg REST Catalog との連携が推奨

13. テーブルメンテナンス

13.1 OPTIMIZE(コンパクション)

-- テーブル全体の最適化
OPTIMIZE delta_table;

-- 条件付きOPTIMIZE
OPTIMIZE delta_table WHERE year = 2024 AND month = 1;

-- Z-ORDER(Liquid Clusteringが利用できない場合)
OPTIMIZE delta_table ZORDER BY (user_id, event_type);

-- 小ファイルの自動コンパクション
ALTER TABLE delta_table SET TBLPROPERTIES (
    'delta.autoOptimize.autoCompact' = 'true',
    'delta.autoOptimize.optimizeWrite' = 'true'
);

13.2 VACUUM(不要ファイルの削除)

-- 7日以上前のファイルを削除(デフォルト)
VACUUM delta_table;

-- 保持期間を指定
VACUUM delta_table RETAIN 168 HOURS;  -- 7日

-- ドライラン(削除されるファイルの確認)
VACUUM delta_table DRY RUN;
# Python API
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/path/to/table")
delta_table.vacuum(168)  # 168時間 = 7日

# 短い保持期間の設定(注意: タイムトラベルに影響)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_table.vacuum(0)  # 即座に削除(危険!)

13.3 メンテナンスの自動化

# Airflow DAG でのメンテナンス自動化
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG('delta_table_maintenance', schedule_interval='0 3 * * *') as dag:
    
    optimize = SparkSubmitOperator(
        task_id='optimize_table',
        application='maintenance.py',
        application_args=['--action', 'optimize', '--table', 'events'],
    )
    
    vacuum = SparkSubmitOperator(
        task_id='vacuum_table',
        application='maintenance.py',
        application_args=['--action', 'vacuum', '--table', 'events', '--hours', '168'],
    )
    
    optimize >> vacuum

14. エンジン連携

14.1 Apache Spark(ネイティブ)

# pip install delta-spark
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("DeltaApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Delta テーブルの読み書き
df.write.format("delta").save("/path/to/table")
df = spark.read.format("delta").load("/path/to/table")

14.2 Apache Flink

-- Flink SQL でのDelta Lake連携
CREATE CATALOG delta_catalog WITH (
    'type' = 'delta-catalog',
    'catalog-type' = 'in-memory'
);

CREATE TABLE delta_catalog.db.events (
    id BIGINT,
    data STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'delta',
    'table-path' = 's3://bucket/delta/events'
);

-- ストリーミング読み取り
SELECT * FROM delta_catalog.db.events /*+ OPTIONS('mode' = 'streaming') */;

14.3 Trino / Presto

# etc/catalog/delta.properties (Trino)
connector.name=delta_lake
hive.metastore.uri=thrift://hive-metastore:9083
delta.enable-non-concurrent-writes=true
-- Trinoからの読み取り
SELECT * FROM delta.schema_name.events WHERE year = 2024;

-- Trinoからの書き込み(制限あり)
INSERT INTO delta.schema_name.events SELECT * FROM staging;

14.4 Delta Kernel(Delta 4.0+)

Delta Kernel は、任意のエンジンから Delta Lake テーブルを読み書きするための低レベルライブラリである。

// Delta Kernel API
import io.delta.kernel.*;
import io.delta.kernel.defaults.*;

Engine engine = DefaultEngine.create(hadoopConf);
Table table = Table.forPath(engine, "/path/to/delta-table");
Snapshot snapshot = table.getLatestSnapshot(engine);

// スキーマの取得
StructType schema = snapshot.getSchema(engine);

// データの読み取り
ScanBuilder scanBuilder = snapshot.getScanBuilder(engine);
Scan scan = scanBuilder.build();
CloseableIterator<FilteredColumnarBatch> data = scan.getScanFiles(engine);

15. Delta Sharing

Delta Sharing は、組織間でデータを安全に共有するためのオープンプロトコルである。

# Delta Sharing サーバー設定
# sharing-server-config.yaml
shares:
  - name: "marketing_share"
    schemas:
      - name: "analytics"
        tables:
          - name: "customer_events"
            location: "s3://data-lake/delta/customer_events"

# Delta Sharing クライアント(Python)
import delta_sharing

# プロファイルを使用してテーブルにアクセス
profile = "config.share"
client = delta_sharing.SharingClient(profile)

# 利用可能なテーブルの一覧
tables = client.list_all_tables()

# Pandas DataFrameとして読み取り
df = delta_sharing.load_as_pandas(f"{profile}#marketing_share.analytics.customer_events")

# Spark DataFrameとして読み取り
df = delta_sharing.load_as_spark(f"{profile}#marketing_share.analytics.customer_events")

16. 設定とチューニング

16.1 テーブルプロパティ

-- 主要なテーブルプロパティ
ALTER TABLE delta_table SET TBLPROPERTIES (
    -- ログ保持
    'delta.logRetentionDuration' = 'interval 30 days',
    'delta.deletedFileRetentionDuration' = 'interval 7 days',
    
    -- チェックポイント
    'delta.checkpointInterval' = '10',
    
    -- 自動最適化
    'delta.autoOptimize.autoCompact' = 'true',
    'delta.autoOptimize.optimizeWrite' = 'true',
    
    -- ターゲットファイルサイズ
    'delta.targetFileSize' = '134217728',  -- 128MB
    'delta.tuneFileSizesForRewrites' = 'true',
    
    -- データスキッピング
    'delta.dataSkippingNumIndexedCols' = '32',
    'delta.dataSkippingStatsColumns' = 'id,user_id,event_timestamp',
    
    -- Deletion Vectors
    'delta.enableDeletionVectors' = 'true',
    
    -- Change Data Feed
    'delta.enableChangeDataFeed' = 'true',
    
    -- Row Tracking
    'delta.enableRowTracking' = 'true'
);

16.2 Spark 設定

# Delta Lake Spark設定
spark.conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

# 書き込み最適化
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

# マージ最適化
spark.conf.set("spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled", "true")
spark.conf.set("spark.databricks.delta.merge.repartitionBeforeWrite.enabled", "true")

# ファイルサイズ
spark.conf.set("spark.databricks.delta.properties.defaults.targetFileSize", "134217728")

# 統計情報
spark.conf.set("spark.databricks.delta.properties.defaults.dataSkippingNumIndexedCols", "32")

# 並列度
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.databricks.delta.snapshotPartitions", "8")

17. パフォーマンス最適化

17.1 データスキッピング

Delta Lake はファイルレベルの統計情報(min/max/nullCount)を活用して、クエリに無関係なファイルの読み取りをスキップする。

-- 統計収集カラムの指定
ALTER TABLE delta_table SET TBLPROPERTIES (
    'delta.dataSkippingStatsColumns' = 'id,user_id,event_timestamp,country'
);

-- 効果的なクエリ(データスキッピングが適用される)
SELECT * FROM events WHERE user_id = 12345;  -- min/maxでフィルタ
SELECT * FROM events WHERE event_timestamp >= '2024-01-01';  -- 範囲フィルタ

-- 効果が低いクエリ
SELECT * FROM events WHERE description LIKE '%error%';  -- 文字列パターンは統計に含まれない

17.2 最適化テクニック

-- 1. Liquid Clustering(推奨)
CREATE TABLE events (...) USING DELTA CLUSTER BY (user_id, event_date);
OPTIMIZE events;

-- 2. Z-ORDER(Liquid Clustering未使用時)
OPTIMIZE events ZORDER BY (user_id, event_type);

-- 3. パーティションプルーニング
SELECT * FROM events WHERE year = 2024 AND month = 1;  -- パーティションカラム

-- 4. ファイルサイズの最適化
ALTER TABLE events SET TBLPROPERTIES ('delta.targetFileSize' = '134217728');

-- 5. キャッシュ
CACHE SELECT * FROM events WHERE event_date = CURRENT_DATE;

-- 6. Photon エンジン(Databricks)
-- 自動的にベクトル化実行で高速化

17.3 書き込みパフォーマンス

# Optimized Write: 自動的に最適なファイルサイズに調整
df.write.format("delta") \
    .option("optimizeWrite", "true") \
    .mode("append") \
    .save("/path/to/table")

# Auto Compact: 小ファイルの自動コンパクション
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.minNumFiles", "50")

# パーティションの上書き(動的)
df.write.format("delta") \
    .mode("overwrite") \
    .option("partitionOverwriteMode", "dynamic") \
    .partitionBy("date") \
    .save("/path/to/table")

18. セキュリティ

18.1 アクセス制御

-- テーブルレベルの権限(Unity Catalog / Databricks)
GRANT SELECT ON TABLE delta_table TO `analysts`;
GRANT MODIFY ON TABLE delta_table TO `etl_user`;
GRANT ALL PRIVILEGES ON TABLE delta_table TO `admin`;

-- カラムマスキング
CREATE FUNCTION mask_email(email STRING) RETURNS STRING
RETURN CONCAT(LEFT(email, 2), '***@***');

ALTER TABLE users ALTER COLUMN email SET MASK mask_email;

-- 行フィルター
CREATE FUNCTION region_filter(region STRING) RETURNS BOOLEAN
RETURN region = CURRENT_USER_REGION();

ALTER TABLE sales SET ROW FILTER region_filter ON (region);

18.2 暗号化

# サーバーサイド暗号化(S3)
spark.conf.set("spark.hadoop.fs.s3a.server-side-encryption-algorithm", "SSE-KMS")
spark.conf.set("spark.hadoop.fs.s3a.server-side-encryption.key", "arn:aws:kms:...")

# クライアントサイド暗号化
spark.conf.set("spark.hadoop.fs.s3a.encryption.algorithm", "CSE-KMS")
spark.conf.set("spark.hadoop.fs.s3a.encryption.key", "arn:aws:kms:...")

19. 監視と運用

19.1 テーブル情報の確認

-- テーブル詳細
DESCRIBE DETAIL delta_table;
-- numFiles, sizeInBytes, numPartitions, etc.

-- テーブル履歴
DESCRIBE HISTORY delta_table;

-- テーブルプロパティ
SHOW TBLPROPERTIES delta_table;

-- カラム統計
DESCRIBE EXTENDED delta_table column_name;

19.2 監視メトリクス

# Delta テーブルの健全性チェック
from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, "/path/to/table")

# テーブル詳細
detail = dt.detail().collect()[0]
print(f"ファイル数: {detail['numFiles']}")
print(f"サイズ: {detail['sizeInBytes'] / (1024**3):.2f} GB")
print(f"パーティション数: {detail['numPartitions']}")

# 履歴
history = dt.history(10).select("version", "timestamp", "operation", "operationMetrics")
history.show(truncate=False)

# 小ファイル検出
file_stats = spark.read.format("delta").load("/path/to/table") \
    .inputFiles()
small_files = [f for f in file_stats if get_file_size(f) < 10 * 1024 * 1024]  # 10MB未満
print(f"小ファイル数: {len(small_files)} / 全ファイル: {len(file_stats)}")

20. 他フォーマットとの比較

20.1 Delta Lake vs Apache Iceberg vs Apache Hudi

特徴Delta LakeApache IcebergApache Hudi
開発元DatabricksNetflix → ASFUber → ASF
ガバナンスLinux FoundationASFASF
メタデータトランザクションログ(JSON)マニフェスト階層タイムライン
ACID
タイムトラベル
スキーマ進化✓(より柔軟)
パーティション進化Liquid Clustering✓(ネイティブ)限定的
行レベル操作Deletion VectorsCoW / MoRCoW / MoR
ストリーミングSpark Structured StreamingFlink / SparkFlink / Spark
CDCChange Data FeedIncremental ReadIncremental Query
マルチエンジン△(Spark優位)◎(最も広い)
相互運用性UniFormXTableXTable
Databricks統合◎(ネイティブ)

20.2 選択の指針

  • Delta Lake: Databricks/Spark中心のエコシステム、Liquid Clustering活用、UniFormで互換性確保
  • Iceberg: マルチエンジン要件、パーティション進化、オープンスタンダード重視
  • Hudi: 高頻度Upsert、CDC、インクリメンタル処理

21. ユースケースとベストプラクティス

21.1 代表的なユースケース

メダリオンアーキテクチャ

┌──────────┐     ┌──────────┐     ┌──────────┐
│  Bronze  │────►│  Silver  │────►│   Gold   │
│  (Raw)   │     │(Cleansed)│     │(Curated) │
│          │     │          │     │          │
│ APPEND   │     │ MERGE    │     │ AGGREGATE│
│ only     │     │ dedupe   │     │ KPIs     │
│          │     │ validate │     │ reports  │
└──────────┘     └──────────┘     └──────────┘
  Delta Table      Delta Table      Delta Table
# Bronze: 生データの取り込み
raw_df = spark.readStream \
    .format("kafka") \
    .option("subscribe", "events") \
    .load()

raw_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/bronze") \
    .toTable("bronze.events")

# Silver: クレンジングと重複排除
bronze_df = spark.readStream.table("bronze.events")

silver_df = bronze_df \
    .dropDuplicates(["event_id"]) \
    .filter(col("event_type").isNotNull()) \
    .withColumn("processed_at", current_timestamp())

silver_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/silver") \
    .toTable("silver.events")

# Gold: 集約
gold_query = """
    MERGE INTO gold.daily_metrics AS target
    USING (
        SELECT 
            date_trunc('day', event_timestamp) as event_date,
            event_type,
            COUNT(*) as event_count,
            COUNT(DISTINCT user_id) as unique_users
        FROM silver.events
        WHERE event_timestamp >= current_date() - INTERVAL 1 DAY
        GROUP BY 1, 2
    ) AS source
    ON target.event_date = source.event_date AND target.event_type = source.event_type
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(gold_query)

21.2 ベストプラクティス

  1. Liquid Clustering を使用する: 新規テーブルではパーティショニングよりLiquid Clusteringを推奨
  2. Deletion Vectors を有効にする: UPDATE/DELETE/MERGE性能の大幅向上
  3. Change Data Feed を活用する: 下流パイプラインのインクリメンタル処理
  4. 定期的にOPTIMIZEを実行する: 小ファイル問題の防止
  5. VACUUMを定期実行する: 不要ファイルの削除でストレージコスト削減
  6. 適切な保持期間を設定する: タイムトラベルとストレージコストのバランス
  7. UniFormを検討する: マルチエンジン環境での互換性確保
  8. ANALYZE TABLEで統計を更新する: CBO性能の向上
  9. テーブルプロパティを適切に設定する: ターゲットファイルサイズ、統計カラム数
  10. メダリオンアーキテクチャを採用する: Bronze → Silver → Gold のレイヤード設計

22. 最新動向と将来展望

22.1 最近の主要機能

機能バージョン説明
Liquid Clustering3.0動的データレイアウト最適化
UniForm3.0Iceberg/Hudi互換メタデータ
Deletion Vectors2.3+効率的な行レベル削除
Row Tracking3.0行の追跡とリネージ
Type Widening3.1安全な型変更
Variant型3.1半構造化データ対応
Delta Kernel4.0エンジン非依存の読み書きライブラリ
Coordinated Commits3.x+マルチクラスター書き込み
In-Commit Timestamps4.0正確なコミットタイムスタンプ

22.2 将来の方向性

  • Delta Kernel の成熟: あらゆるエンジンからのネイティブアクセス
  • UniForm の拡張: より深い Iceberg/Hudi 互換性
  • サーバーレス最適化: クラウドネイティブな自動チューニング
  • AI/ML統合: Feature Store、モデルバージョニングとの連携強化
  • リアルタイム処理の強化: Structured Streamingとの更なる統合

23. まとめ

Delta Lake はデータレイクに信頼性とパフォーマンスをもたらすストレージレイヤーであり、レイクハウスアーキテクチャの中核技術である。

技術的要点

カテゴリ要点
アーキテクチャParquet + トランザクションログ(_delta_log)
トランザクション楽観的同時実行制御、ACID保証
スキーマ強制(Enforcement)+ 進化(Evolution)
データレイアウトLiquid Clustering(推奨)/ Z-ORDER / パーティション
行レベル操作Deletion Vectors による高効率化
変更追跡Change Data Feed
互換性UniForm(Iceberg/Hudi読み取り互換)
メンテナンスOPTIMIZE + VACUUM
エンジンSpark(ネイティブ), Flink, Trino, Delta Kernel

選定の指針

Delta Lake が適しているケース:

  • Databricks / Spark 中心のエコシステム: ネイティブ統合の利点を最大活用
  • メダリオンアーキテクチャ: Bronze/Silver/Gold のレイヤード設計
  • 高頻度MERGE/Upsert: Deletion Vectorsによる高性能
  • レイクハウス構築: ACID + タイムトラベル + スキーマ管理

他のツールを検討すべきケース:

  • マルチエンジン重視: Apache Iceberg(より広いエンジン対応)
  • CDC / インクリメンタル処理重視: Apache Hudi
  • Databricks非使用環境: Iceberg の方がエコシステムが広い

参考文献: