Apache Trino

Apache Trino 徹底解説:アーキテクチャ・機能・設定の全容

目次

  1. はじめに
  2. Apache Trino の概要と歴史
  3. アーキテクチャ全体像
  4. コーディネーターとワーカー
  5. クエリ実行エンジン
  6. コネクターアーキテクチャ
  7. 主要コネクターと設定
  8. SQL機能
  9. データ型とスキーマ管理
  10. フェデレーテッドクエリ
  11. セキュリティ
  12. パフォーマンスチューニング
  13. メモリ管理
  14. Fault Tolerance とスピル
  15. デプロイメント
  16. 監視と運用
  17. エコシステム連携
  18. ユースケースとベストプラクティス
  19. Trino vs 他エンジン比較
  20. 最新動向と将来展望
  21. まとめ

1. はじめに

Apache Trino(旧称 Presto SQL / PrestoSQL)は、大規模データに対するインタラクティブな分析クエリを実行するための高速分散SQLクエリエンジンである。Facebookで開発され、現在はTrino Software Foundationの下で管理されている。

Trinoの最大の特徴はフェデレーテッドクエリであり、HDFS、S3、リレーショナルデータベース、NoSQL、Kafka など多様なデータソースに対して、単一のSQLクエリでデータを横断的に参照・結合できる。

1.1 本記事の対象読者

  • データレイクやデータウェアハウスの構築・運用を担当するデータエンジニア
  • アドホッククエリや探索的分析を行うデータアナリスト
  • Trinoクラスターの設計・運用を行うインフラエンジニア / SRE
  • クエリエンジンの技術選定を行うアーキテクト

1.2 Trino の位置づけ

┌────────────────────────────────────────────────────────────┐
│                    BI / Analytics Tools                      │
│       (Tableau, Superset, Metabase, Redash, dbt)           │
├────────────────────────────────────────────────────────────┤
│                      Trino (SQL Engine)                     │
│           Federated Query / Interactive Analytics           │
├──────┬──────┬──────┬──────┬──────┬──────┬────────────────┤
│ Hive │ Ice- │Delta │ Post │MySQL │Kafka │ Elasticsearch  │
│ HDFS │ berg │ Lake │ gres │      │      │                │
│      │      │      │      │      │      │                │
├──────┴──────┴──────┴──────┴──────┴──────┴────────────────┤
│                    Storage Layer                            │
│     (HDFS / S3 / GCS / ADLS / RDBMS / NoSQL / MQ)        │
└────────────────────────────────────────────────────────────┘

2. Apache Trino の概要と歴史

2.1 誕生と分岐の経緯

出来事
2012FacebookでPrestoプロジェクトが開始
2013Prestoがオープンソースとして公開
2018Prestoの主要開発者がFacebookを離れStarburst社を設立
2019コミュニティが分裂:PrestoSQL(現Trino)と PrestoDB(Facebook主導)
2020PrestoSQL が Trino にリブランド
2021Trino Software Foundation 設立
2024-2025Fault-tolerant execution、Polymorphic table functions、多数の改善

2.2 Trino と PrestoDB の違い

項目Trino (旧 PrestoSQL)PrestoDB (Facebook)
ガバナンスTrino Software FoundationPresto Foundation (Linux Foundation)
開発主体コミュニティ + StarburstMeta + コミュニティ
リリース頻度非常に高い(隔週)中程度
コネクター数40+20+
Fault ToleranceProject TardigradePresto on Spark
特徴インタラクティブ重視バッチ対応強化

2.3 主要な特徴

  • インメモリパイプライン実行: データをディスクに書き出さずにメモリ上でパイプライン処理
  • フェデレーテッドクエリ: 異なるデータソースを単一SQLで横断
  • ANSI SQL準拠: 標準SQLに準拠した豊富なSQL機能
  • スケーラブル: 数ノードから数千ノードまでスケール
  • プラグインアーキテクチャ: コネクター、型、関数、セキュリティの拡張が可能
  • 低レイテンシ: ミリ秒〜秒レベルのクエリ応答

3. アーキテクチャ全体像

3.1 MPP(Massively Parallel Processing)アーキテクチャ

Trinoはシェアードナッシング(Shared-Nothing)のMPPアーキテクチャを採用している。

┌────────────────────────────────────────────────────────────┐
│                         Client                              │
│              (JDBC / CLI / REST API / BI Tool)              │
└──────────────────────┬─────────────────────────────────────┘
                       │
                       ▼
┌────────────────────────────────────────────────────────────┐
│                     Coordinator                             │
│  ┌─────────────┐ ┌──────────────┐ ┌────────────────────┐  │
│  │   Parser &   │ │   Planner &  │ │    Scheduler &     │  │
│  │   Analyzer   │ │  Optimizer   │ │  Task Manager      │  │
│  └─────────────┘ └──────────────┘ └────────────────────┘  │
│  ┌─────────────┐ ┌──────────────┐ ┌────────────────────┐  │
│  │  Metadata   │ │    Cost-     │ │   Resource         │  │
│  │  Manager    │ │  based Opt.  │ │   Manager          │  │
│  └─────────────┘ └──────────────┘ └────────────────────┘  │
└──────────────────────┬─────────────────────────────────────┘
                       │
          ┌────────────┼────────────┐
          ▼            ▼            ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│   Worker 1   │ │   Worker 2   │ │   Worker N   │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │  Task 1  │ │ │ │  Task 3  │ │ │ │  Task 5  │ │
│ │  Task 2  │ │ │ │  Task 4  │ │ │ │  Task 6  │ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │Connector │ │ │ │Connector │ │ │ │Connector │ │
│ │ Instances│ │ │ │ Instances│ │ │ │ Instances│ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
└──────────────┘ └──────────────┘ └──────────────┘
        │                │                │
        ▼                ▼                ▼
┌──────────────────────────────────────────────────┐
│            Data Sources                           │
│   (HDFS, S3, PostgreSQL, MySQL, Kafka, etc.)     │
└──────────────────────────────────────────────────┘

3.2 クエリ実行の流れ

  1. クライアント: SQL クエリをCoordinatorに送信
  2. パーシング: SQL文を抽象構文木(AST)に変換
  3. 分析: テーブル名、カラム名の解決、型チェック
  4. 論理計画: 論理的なクエリ計画を生成
  5. 最適化: コストベース最適化(CBO)を適用
  6. 物理計画: 分散実行計画に変換
  7. スケジューリング: タスクをWorkerに割り当て
  8. 実行: 各Workerがタスクを並列実行
  9. 結果返却: パイプラインで結果をクライアントに返却

3.3 パイプライン実行モデル

Trinoの特徴的な点はパイプライン実行モデルである。MapReduceやSparkと異なり、中間データをディスクに書き出すことなく、データが生成されると同時に次のステージに流す。

Stage 0 (Source)    Stage 1 (Exchange)    Stage 2 (Output)
┌────────────┐      ┌────────────────┐    ┌──────────────┐
│  Table Scan │ ───► │  Hash Join     │ ─► │  Aggregation │
│  + Filter   │      │  + Project     │    │  + Output    │
└────────────┘      └────────────────┘    └──────────────┘
     │                    ▲    │                  │
     │   Pipeline         │    │    Pipeline      │
     └────────────────────┘    └──────────────────┘
   データが即座に次ステージへ      結果をストリーミング返却

4. コーディネーターとワーカー

4.1 Coordinator の役割

Coordinator は Trino クラスターの中核であり、以下の責務を持つ:

  • SQL の受信、パース、分析
  • クエリ計画の最適化
  • ステージとタスクへの分割
  • ワーカーへのタスク配布
  • クエリ状態の管理
  • リソースグループ管理
  • Web UI の提供

Coordinator 設定(config.properties)

# config.properties (Coordinator)
coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8080
discovery.uri=http://coordinator-host:8080

# クエリ管理
query.max-memory=50GB
query.max-memory-per-node=8GB
query.max-total-memory-per-node=10GB
query.max-execution-time=30m
query.max-run-time=60m
query.max-stage-count=150
query.max-history=1000
query.min-expire-age=30m

4.2 Worker の役割

Worker は実際のデータ処理を行うノードであり:

  • データソースからのデータ読み取り
  • タスクの実行(フィルタリング、結合、集約等)
  • 中間データの交換(Exchange)
  • 結果のCoordinatorへの返却

Worker 設定(config.properties)

# config.properties (Worker)
coordinator=false
http-server.http.port=8080
discovery.uri=http://coordinator-host:8080

# タスク設定
task.max-worker-threads=16
task.min-drivers=32
task.max-drivers-per-task=64
task.concurrency=16

4.3 ノード設定(node.properties)

# node.properties
node.environment=production
node.id=<unique-uuid>
node.data-dir=/var/trino/data

4.4 JVM 設定(jvm.config)

# jvm.config
-server
-Xmx16G
-Xms16G
-XX:InitialRAMPercentage=80
-XX:MaxRAMPercentage=80
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=512M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000
-Dfile.encoding=UTF-8
-XX:+UnlockDiagnosticVMOptions
-XX:+UseAESCTRIntrinsics

5. クエリ実行エンジン

5.1 クエリ計画の最適化

コストベース最適化(CBO)

Trinoは統計情報に基づくコストベース最適化を実装している。

-- テーブル統計の収集
ANALYZE table_name;

-- カラム統計の確認
SHOW STATS FOR table_name;

-- 特定パーティションの統計
SHOW STATS FOR (SELECT * FROM table_name WHERE date = '2024-01-15');

主要な最適化:

最適化説明
Predicate Pushdownフィルタ条件をコネクターに押し下げ
Projection Pushdown必要なカラムのみを読み込み
Join Reordering結合順序の最適化(テーブル統計に基づく)
Join DistributionBroadcast vs Partitioned の選択
Partial AggregationWorker側での部分集約
Limit PushdownLIMIT句のプッシュダウン
TopN PushdownORDER BY + LIMIT の最適化

実行計画の確認

-- 論理計画
EXPLAIN SELECT * FROM orders JOIN customers ON orders.customer_id = customers.id;

-- 分散実行計画
EXPLAIN (TYPE DISTRIBUTED)
SELECT customer_id, SUM(total_amount)
FROM orders
GROUP BY customer_id;

-- I/O統計付き
EXPLAIN ANALYZE
SELECT customer_id, COUNT(*) as order_count
FROM orders
WHERE order_date >= DATE '2024-01-01'
GROUP BY customer_id
HAVING COUNT(*) > 10;

-- JSON形式の実行計画
EXPLAIN (FORMAT JSON) SELECT ...;

-- Graphviz形式
EXPLAIN (FORMAT GRAPHVIZ) SELECT ...;

5.2 結合戦略

Trinoは以下の結合戦略をサポートする:

-- Broadcast Join(小テーブルを全Workerに配布)
-- 自動選択または手動指定
SELECT /*+ BROADCAST */ *
FROM large_table l
JOIN small_table s ON l.key = s.key;

-- Partitioned Join(両テーブルをハッシュ分割)
SELECT /*+ PARTITION */ *
FROM table_a a
JOIN table_b b ON a.key = b.key;
結合タイプ使用条件メモリ要件
Broadcast一方が小さい(< broadcast threshold)小テーブルがメモリに収まる
Partitioned (Hash)両テーブルが大きい各パーティションがメモリに収まる
Cross Join条件なし結合右側がメモリに収まる

5.3 動的フィルタリング

Trino は Join 時に動的フィルタを生成し、プローブ側のテーブルスキャンを削減する。

# 動的フィルタリング設定
enable-dynamic-filtering=true
dynamic-filtering-max-per-driver-row-count=1000000
dynamic-filtering-max-per-driver-size=10MB
dynamic-filtering-range-row-limit-per-driver=10000
-- 動的フィルタリングの例
-- dimension テーブルのフィルタが fact テーブルに適用される
SELECT f.*, d.category_name
FROM fact_sales f
JOIN dim_product d ON f.product_id = d.product_id
WHERE d.category = 'Electronics';

-- fact_sales のスキャン時に、product_idが'Electronics'カテゴリの
-- 製品IDに限定される(動的フィルタ)

6. コネクターアーキテクチャ

6.1 コネクターの仕組み

Trinoのコネクターは**SPI(Service Provider Interface)**に基づくプラグインアーキテクチャで実装されている。各コネクターは以下のインターフェースを実装する:

┌────────────────────────────────────────────────────────┐
│                  Trino Connector SPI                     │
├──────────────┬────────────────┬────────────────────────┤
│  Metadata    │  Data Source   │  Access Control        │
│  Provider    │  Provider      │  Provider              │
│              │                │                        │
│  - Schemas   │  - Page Source │  - Table permissions   │
│  - Tables    │  - Record Set  │  - Column masking      │
│  - Columns   │  - Page Sink   │  - Row filtering       │
│  - Stats     │  - Record Sink │                        │
├──────────────┼────────────────┼────────────────────────┤
│  Split       │  Transaction   │  Session Property      │
│  Manager     │  Manager       │  Provider              │
└──────────────┴────────────────┴────────────────────────┘

6.2 カタログの概念

Trinoでは「カタログ」がデータソースへの接続を表す。1つのコネクタータイプに対して複数のカタログを作成できる。

Trino
├── catalog: hive_prod        (connector: hive)
│   ├── schema: analytics
│   │   ├── table: users
│   │   └── table: events
│   └── schema: raw
│       └── table: logs
├── catalog: postgresql_prod   (connector: postgresql)
│   └── schema: public
│       ├── table: customers
│       └── table: orders
├── catalog: iceberg_lake      (connector: iceberg)
│   └── schema: warehouse
│       └── table: transactions
└── catalog: kafka_stream      (connector: kafka)
    └── schema: default
        └── table: click_events

SQL でのアクセス:

-- 完全修飾名
SELECT * FROM hive_prod.analytics.users;

-- カタログの切り替え
USE hive_prod.analytics;
SELECT * FROM users;

-- クロスカタログJoin(フェデレーテッドクエリ)
SELECT u.name, o.total_amount
FROM hive_prod.analytics.users u
JOIN postgresql_prod.public.orders o
ON u.user_id = o.customer_id;

7. 主要コネクターと設定

7.1 Hive コネクター

Hive メタストアを使用してHDFS / S3 上のデータにアクセスする最も一般的なコネクター。

# etc/catalog/hive.properties
connector.name=hive
hive.metastore.uri=thrift://hive-metastore:9083

# S3設定
hive.s3.aws-access-key=AKIAIOSFODNN7EXAMPLE
hive.s3.aws-secret-key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
hive.s3.endpoint=s3.amazonaws.com
hive.s3.path-style-access=false
hive.s3.ssl.enabled=true

# パフォーマンス設定
hive.max-partitions-per-scan=1000000
hive.max-splits-per-second=1000
hive.max-initial-splits=200
hive.max-initial-split-size=64MB
hive.max-split-size=128MB

# 書き込み設定
hive.compression-codec=SNAPPY
hive.target-max-file-size=1GB
hive.immutable-partitions=false

# キャッシュ設定
hive.metastore-cache-ttl=2m
hive.metastore-refresh-interval=1m

# パーティションプロジェクション
hive.partition-projection-enabled=true

7.2 Iceberg コネクター

Apache Iceberg テーブルフォーマットに対する高性能なアクセスを提供する。

# etc/catalog/iceberg.properties
connector.name=iceberg

# Hive Metastore カタログ
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://hive-metastore:9083

# REST カタログ
# iceberg.catalog.type=rest
# iceberg.rest-catalog.uri=http://iceberg-rest:8181
# iceberg.rest-catalog.warehouse=s3://warehouse/

# Nessie カタログ
# iceberg.catalog.type=nessie
# iceberg.nessie-catalog.uri=http://nessie:19120/api/v2
# iceberg.nessie-catalog.default-warehouse-dir=s3://warehouse/

# AWS Glue カタログ
# iceberg.catalog.type=glue
# hive.metastore.glue.region=us-east-1

# S3設定
hive.s3.aws-access-key=AKIAIOSFODNN7EXAMPLE
hive.s3.aws-secret-key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

# パフォーマンス
iceberg.max-partitions-per-writer=1000
iceberg.target-max-file-size=1GB
iceberg.compression-codec=ZSTD

# 高度な設定
iceberg.expire-snapshots.min-retention=7d
iceberg.delete-schema-locations-fallback=false
-- Icebergテーブルの作成
CREATE TABLE iceberg_lake.warehouse.events (
    event_id BIGINT,
    user_id BIGINT,
    event_type VARCHAR,
    event_data VARCHAR,
    event_timestamp TIMESTAMP(6) WITH TIME ZONE
)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['day(event_timestamp)', 'bucket(user_id, 16)'],
    sorted_by = ARRAY['event_timestamp'],
    location = 's3://data-lake/warehouse/events'
);

-- タイムトラベル
SELECT * FROM iceberg_lake.warehouse.events FOR TIMESTAMP AS OF TIMESTAMP '2024-01-15 00:00:00 UTC';
SELECT * FROM iceberg_lake.warehouse.events FOR VERSION AS OF 1234567890;

-- スナップショット情報
SELECT * FROM iceberg_lake.warehouse."events$snapshots";
SELECT * FROM iceberg_lake.warehouse."events$history";
SELECT * FROM iceberg_lake.warehouse."events$manifests";
SELECT * FROM iceberg_lake.warehouse."events$partitions";
SELECT * FROM iceberg_lake.warehouse."events$files";

-- テーブルメンテナンス
ALTER TABLE iceberg_lake.warehouse.events EXECUTE expire_snapshots(retention_threshold => '7d');
ALTER TABLE iceberg_lake.warehouse.events EXECUTE remove_orphan_files(retention_threshold => '7d');
ALTER TABLE iceberg_lake.warehouse.events EXECUTE optimize WHERE event_timestamp >= TIMESTAMP '2024-01-01';

7.3 Delta Lake コネクター

# etc/catalog/delta.properties
connector.name=delta_lake
hive.metastore.uri=thrift://hive-metastore:9083

# S3設定
hive.s3.aws-access-key=AKIAIOSFODNN7EXAMPLE
hive.s3.aws-secret-key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

# パフォーマンス
delta.enable-non-concurrent-writes=true
delta.max-partitions-per-writer=1000
delta.target-max-file-size=1GB
delta.compression-codec=SNAPPY

# Change Data Feed
delta.enable-change-data-feed=true

# Vacuum
delta.vacuum.min-retention=168h

7.4 PostgreSQL / MySQL コネクター

# etc/catalog/postgresql.properties
connector.name=postgresql
connection-url=jdbc:postgresql://db-host:5432/mydb
connection-user=trino_user
connection-password=trino_password

# コネクションプール
connection-pool.max-size=30
connection-pool.min-size=1

# プッシュダウン設定
postgresql.experimental.enable-string-pushdown-with-collate=true

# 集約プッシュダウン
jdbc.aggregation-pushdown.enabled=true
jdbc.join-pushdown.enabled=true
jdbc.topn-pushdown.enabled=true
# etc/catalog/mysql.properties
connector.name=mysql
connection-url=jdbc:mysql://db-host:3306/mydb
connection-user=trino_user
connection-password=trino_password

7.5 Kafka コネクター

# etc/catalog/kafka.properties
connector.name=kafka
kafka.nodes=kafka-broker1:9092,kafka-broker2:9092,kafka-broker3:9092
kafka.table-names=click_events,page_views,user_actions
kafka.hide-internal-columns=false
kafka.default-schema=default

# セキュリティ
kafka.security-protocol=SASL_SSL
kafka.sasl-mechanism=PLAIN
// etc/kafka/click_events.json (テーブル定義)
{
  "tableName": "click_events",
  "schemaName": "default",
  "topicName": "click-events",
  "key": {
    "dataFormat": "raw",
    "fields": [
      {
        "name": "kafka_key",
        "type": "VARCHAR",
        "dataFormat": "raw"
      }
    ]
  },
  "message": {
    "dataFormat": "json",
    "fields": [
      {"name": "user_id", "type": "BIGINT", "mapping": "user_id"},
      {"name": "page_url", "type": "VARCHAR", "mapping": "page_url"},
      {"name": "timestamp", "type": "TIMESTAMP", "mapping": "timestamp"}
    ]
  }
}

7.6 Elasticsearch コネクター

# etc/catalog/elasticsearch.properties
connector.name=elasticsearch
elasticsearch.host=es-host
elasticsearch.port=9200
elasticsearch.default-schema-name=default
elasticsearch.scroll-size=1000
elasticsearch.scroll-timeout=1m
elasticsearch.request-timeout=10s
elasticsearch.tls.enabled=true

7.7 MongoDB コネクター

# etc/catalog/mongodb.properties
connector.name=mongodb
mongodb.connection-url=mongodb://mongo-host:27017/
mongodb.schema-collection=_schema
mongodb.case-insensitive-name-matching=true

8. SQL機能

8.1 基本的なSQL操作

-- DDL操作
CREATE SCHEMA hive_prod.analytics;

CREATE TABLE hive_prod.analytics.users (
    user_id BIGINT,
    name VARCHAR,
    email VARCHAR,
    created_at TIMESTAMP(3) WITH TIME ZONE
)
WITH (
    format = 'PARQUET',
    partitioned_by = ARRAY['date(created_at)'],
    external_location = 's3://data-lake/analytics/users'
);

-- INSERT
INSERT INTO hive_prod.analytics.users
SELECT user_id, name, email, created_at
FROM raw.user_registrations;

-- CREATE TABLE AS SELECT (CTAS)
CREATE TABLE hive_prod.analytics.active_users
WITH (format = 'PARQUET', partitioned_by = ARRAY['country'])
AS
SELECT user_id, name, country, last_login
FROM users
WHERE last_login >= CURRENT_DATE - INTERVAL '30' DAY;

-- MERGE (Iceberg / Delta Lake)
MERGE INTO iceberg_lake.warehouse.customers AS target
USING staging.new_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN
    UPDATE SET name = source.name, email = source.email, updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
    INSERT (customer_id, name, email, created_at, updated_at)
    VALUES (source.customer_id, source.name, source.email, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);

-- DELETE
DELETE FROM iceberg_lake.warehouse.events
WHERE event_timestamp < TIMESTAMP '2023-01-01 00:00:00 UTC';

-- UPDATE
UPDATE iceberg_lake.warehouse.customers
SET status = 'inactive'
WHERE last_login < CURRENT_DATE - INTERVAL '365' DAY;

8.2 高度なSQL機能

-- ウィンドウ関数
SELECT
    user_id,
    event_type,
    event_timestamp,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_timestamp DESC) as rn,
    LAG(event_timestamp) OVER (PARTITION BY user_id ORDER BY event_timestamp) as prev_event,
    LEAD(event_timestamp) OVER (PARTITION BY user_id ORDER BY event_timestamp) as next_event,
    SUM(amount) OVER (
        PARTITION BY user_id 
        ORDER BY event_timestamp 
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as cumulative_amount,
    AVG(amount) OVER (
        PARTITION BY user_id 
        ORDER BY event_timestamp 
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as moving_avg_7
FROM events;

-- GROUPING SETS / CUBE / ROLLUP
SELECT
    COALESCE(country, 'ALL') as country,
    COALESCE(category, 'ALL') as category,
    SUM(revenue) as total_revenue,
    COUNT(*) as order_count
FROM orders
GROUP BY GROUPING SETS (
    (country, category),
    (country),
    (category),
    ()
);

-- UNNEST(配列/マップの展開)
SELECT
    user_id,
    tag
FROM users
CROSS JOIN UNNEST(tags) AS t(tag);

-- JSON関数
SELECT
    json_extract_scalar(event_data, '$.action') as action,
    json_extract_scalar(event_data, '$.properties.source') as source,
    CAST(json_extract(event_data, '$.items') AS ARRAY(JSON)) as items
FROM events;

-- 配列・マップ操作
SELECT
    user_id,
    ARRAY_AGG(DISTINCT event_type) as event_types,
    MAP_AGG(event_type, event_count) as event_count_map,
    ARRAY_JOIN(ARRAY_AGG(DISTINCT tag), ', ') as tag_list,
    CARDINALITY(ARRAY_AGG(DISTINCT event_type)) as unique_event_count
FROM user_events
GROUP BY user_id;

-- WITH RECURSIVE(再帰CTE)
WITH RECURSIVE org_tree AS (
    SELECT employee_id, name, manager_id, 1 as level
    FROM employees
    WHERE manager_id IS NULL
    
    UNION ALL
    
    SELECT e.employee_id, e.name, e.manager_id, ot.level + 1
    FROM employees e
    JOIN org_tree ot ON e.manager_id = ot.employee_id
)
SELECT * FROM org_tree ORDER BY level, name;

-- MATCH_RECOGNIZE(パターンマッチング)
SELECT *
FROM stock_prices
MATCH_RECOGNIZE (
    PARTITION BY symbol
    ORDER BY trade_date
    MEASURES
        FIRST(DOWN.trade_date) AS start_date,
        LAST(UP.trade_date) AS end_date,
        FIRST(DOWN.price) AS start_price,
        LAST(UP.price) AS end_price
    ONE ROW PER MATCH
    PATTERN (DOWN+ UP+)
    DEFINE
        DOWN AS price < PREV(price),
        UP AS price > PREV(price)
);

-- Polymorphic Table Functions
SELECT *
FROM TABLE(exclude_columns(
    input => TABLE(orders),
    columns => DESCRIPTOR(internal_notes, debug_info)
));

8.3 PreparedStatement

-- Prepared Statementの使用
PREPARE my_query FROM
SELECT * FROM orders WHERE customer_id = ? AND order_date >= ?;

EXECUTE my_query USING 12345, DATE '2024-01-01';

DEALLOCATE PREPARE my_query;

9. データ型とスキーマ管理

9.1 サポートされるデータ型

カテゴリデータ型説明
論理値BOOLEANtrue / false
整数TINYINT, SMALLINT, INTEGER, BIGINT8/16/32/64ビット整数
浮動小数点REAL, DOUBLE32/64ビット浮動小数点
固定精度DECIMAL(p, s)任意精度の固定小数点
文字列VARCHAR, CHAR, VARBINARY可変長/固定長文字列、バイナリ
日時DATE, TIME, TIMESTAMP, INTERVAL日付・時刻・タイムスタンプ
構造化ARRAY, MAP, ROW配列、マップ、構造体
その他JSON, UUID, IPADDRESS, HyperLogLog特殊型
-- 型変換
SELECT CAST('2024-01-15' AS DATE);
SELECT CAST(123 AS VARCHAR);
SELECT TRY_CAST('invalid' AS INTEGER);  -- NULLを返す(エラーにならない)

-- ROW型
SELECT CAST(ROW('Alice', 30) AS ROW(name VARCHAR, age INTEGER));

-- ARRAY型
SELECT ARRAY[1, 2, 3, 4, 5];
SELECT ARRAY_AGG(name ORDER BY name) FROM users;

-- MAP型
SELECT MAP(ARRAY['a', 'b'], ARRAY[1, 2]);

10. フェデレーテッドクエリ

10.1 クロスカタログJoin

Trinoの最大の強みは、異なるデータソースを透過的にJoinできることである。

-- S3上のHiveテーブルとPostgreSQLテーブルの結合
SELECT
    h.event_type,
    h.event_count,
    p.category_name,
    p.description
FROM hive_prod.analytics.event_summary h
JOIN postgresql_prod.public.event_categories p
    ON h.event_type = p.event_code
WHERE h.event_date = CURRENT_DATE;

-- Kafkaストリーム + S3データレイク + RDBMSの3way結合
SELECT
    k.user_id,
    k.action,
    k.timestamp as event_time,
    u.name as user_name,
    u.country,
    p.product_name,
    p.price
FROM kafka_stream.default.click_events k
JOIN hive_prod.analytics.users u
    ON k.user_id = u.user_id
JOIN postgresql_prod.public.products p
    ON k.product_id = p.product_id
WHERE k.timestamp >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR;

-- Iceberg + Elasticsearch
SELECT
    i.order_id,
    i.total_amount,
    e.search_query,
    e.click_position
FROM iceberg_lake.warehouse.orders i
JOIN elasticsearch.default.search_logs e
    ON i.session_id = e.session_id;

10.2 フェデレーテッドクエリのパフォーマンス考慮

-- ✓ 良い例:各データソースでフィルタリングしてからJoin
WITH filtered_orders AS (
    SELECT order_id, customer_id, total_amount
    FROM postgresql_prod.public.orders
    WHERE order_date >= DATE '2024-01-01'  -- PostgreSQLでフィルタ
),
filtered_events AS (
    SELECT user_id, event_type, event_count
    FROM hive_prod.analytics.events
    WHERE event_date >= DATE '2024-01-01'  -- Hive(S3)でフィルタ
)
SELECT * FROM filtered_orders o
JOIN filtered_events e ON o.customer_id = e.user_id;

-- ✗ 悪い例:大量データを全てTrinoに読み込んでからフィルタ
SELECT * FROM postgresql_prod.public.orders o
JOIN hive_prod.analytics.events e ON o.customer_id = e.user_id
WHERE o.order_date >= DATE '2024-01-01';
-- (上記でもオプティマイザがPredicate Pushdownするが、明示的に分離する方が確実)

11. セキュリティ

11.1 認証

# config.properties - Password認証
http-server.authentication.type=PASSWORD

# etc/password-authenticator.properties (LDAP)
password-authenticator.name=ldap
ldap.url=ldaps://ldap-server:636
ldap.user-bind-pattern=${USER}@company.com
ldap.group-auth-pattern=(&(objectClass=group)(member=${USER}))

# etc/password-authenticator.properties (File)
password-authenticator.name=file
file.password-file=etc/password.db
# OAuth2 / OpenID Connect 認証
http-server.authentication.type=OAUTH2
http-server.authentication.oauth2.issuer=https://auth.company.com/
http-server.authentication.oauth2.client-id=trino-client
http-server.authentication.oauth2.client-secret=client-secret
http-server.authentication.oauth2.scopes=openid,profile
# Kerberos認証
http-server.authentication.type=KERBEROS
http.authentication.krb5.config=/etc/krb5.conf
http.server.authentication.krb5.keytab=/etc/trino/trino.keytab
http.server.authentication.krb5.service-name=trino

11.2 認可(Access Control)

# config.properties
access-control.name=file
security.config-file=etc/access-control.json
security.refresh-period=10s
// etc/access-control.json
{
  "catalogs": [
    {
      "user": "admin",
      "catalog": ".*",
      "allow": "all"
    },
    {
      "group": "analysts",
      "catalog": "hive_prod|iceberg_lake",
      "allow": "read-only"
    },
    {
      "user": "etl_user",
      "catalog": "hive_prod",
      "allow": "all"
    }
  ],
  "schemas": [
    {
      "user": ".*",
      "schema": "information_schema",
      "owner": false
    }
  ],
  "tables": [
    {
      "user": "analyst_.*",
      "catalog": "hive_prod",
      "schema": "analytics",
      "table": ".*",
      "privileges": ["SELECT"]
    }
  ],
  "impersonation": [
    {
      "original_user": "airflow",
      "new_user": "etl_user",
      "allow": true
    }
  ]
}

11.3 通信の暗号化

# 内部通信の暗号化
internal-communication.https.required=true
internal-communication.shared-secret=<shared-secret>

# HTTPS設定
http-server.https.enabled=true
http-server.https.port=8443
http-server.https.keystore.path=/etc/trino/tls/keystore.jks
http-server.https.keystore.key=keystore-password

12. パフォーマンスチューニング

12.1 リソースグループ

リソースグループは、クエリのリソース使用量を制御するためのメカニズムである。

// etc/resource-groups.json
{
  "rootGroups": [
    {
      "name": "global",
      "softMemoryLimit": "90%",
      "hardConcurrencyLimit": 500,
      "maxQueued": 5000,
      "subGroups": [
        {
          "name": "admin",
          "softMemoryLimit": "30%",
          "hardConcurrencyLimit": 50,
          "maxQueued": 100,
          "schedulingWeight": 10,
          "schedulingPolicy": "weighted_fair"
        },
        {
          "name": "etl",
          "softMemoryLimit": "40%",
          "hardConcurrencyLimit": 20,
          "maxQueued": 200,
          "schedulingWeight": 5,
          "schedulingPolicy": "fair"
        },
        {
          "name": "interactive",
          "softMemoryLimit": "20%",
          "hardConcurrencyLimit": 200,
          "maxQueued": 1000,
          "schedulingWeight": 3,
          "schedulingPolicy": "weighted_fair"
        },
        {
          "name": "dashboards",
          "softMemoryLimit": "10%",
          "hardConcurrencyLimit": 100,
          "maxQueued": 500,
          "schedulingWeight": 2,
          "schedulingPolicy": "query_priority"
        }
      ]
    }
  ],
  "selectors": [
    {
      "user": "admin",
      "group": "global.admin"
    },
    {
      "source": "airflow|dagster",
      "group": "global.etl"
    },
    {
      "source": "tableau|superset|metabase",
      "group": "global.dashboards"
    },
    {
      "group": "global.interactive"
    }
  ]
}

12.2 セッションプロパティ

-- クエリ最適化
SET SESSION join_reordering_strategy = 'AUTOMATIC';
SET SESSION join_distribution_type = 'AUTOMATIC';
SET SESSION task_concurrency = 16;
SET SESSION hash_partition_count = 100;
SET SESSION optimize_hash_generation = true;

-- 結合最適化
SET SESSION join_max_broadcast_table_size = '100MB';
SET SESSION enable_dynamic_filtering = true;

-- メモリ制限
SET SESSION query_max_memory = '10GB';
SET SESSION query_max_memory_per_node = '2GB';

-- スピル設定
SET SESSION spill_enabled = true;
SET SESSION aggregation_operator_unspill_memory_limit = '128MB';

-- Writer設定
SET SESSION scale_writers = true;
SET SESSION writer_scaling_min_data_processed = '100MB';
SET SESSION task_writer_count = 4;
SET SESSION task_partitioned_writer_count = 64;

12.3 パフォーマンス最適化テクニック

-- 1. パーティションプルーニング(必ずパーティションカラムでフィルタ)
SELECT * FROM events WHERE event_date = DATE '2024-01-15';  -- ✓ Good
SELECT * FROM events WHERE CAST(event_timestamp AS DATE) = DATE '2024-01-15';  -- ✗ Bad

-- 2. Predicate Pushdownの活用
SELECT * FROM pg_table WHERE status = 'active';  -- PostgreSQLでフィルタされる

-- 3. LIMIT の活用(探索的クエリ)
SELECT * FROM large_table LIMIT 100;  -- 全データスキャンを回避

-- 4. approx_ 関数の活用
SELECT approx_distinct(user_id) FROM events;  -- COUNT(DISTINCT)より高速
SELECT approx_percentile(latency, 0.95) FROM requests;  -- P95近似値

-- 5. 大きな IN リストの回避
-- ✗ Bad
SELECT * FROM orders WHERE customer_id IN (1, 2, 3, ..., 10000);
-- ✓ Good
CREATE TABLE tmp_customer_ids AS SELECT * FROM UNNEST(ARRAY[1, 2, 3]) AS t(id);
SELECT o.* FROM orders o JOIN tmp_customer_ids t ON o.customer_id = t.id;

-- 6. TABLESAMPLE の活用
SELECT * FROM large_table TABLESAMPLE BERNOULLI(1);  -- 1%サンプリング

13. メモリ管理

13.1 メモリ構造

┌──────────────────────────────────────────┐
│            JVM Heap Memory               │
│                                          │
│  ┌────────────────────────────────────┐  │
│  │         Query Memory Pool          │  │
│  │    (query.max-memory-per-node)     │  │
│  │                                    │  │
│  │  ┌────────────────┐               │  │
│  │  │  User Memory   │               │  │
│  │  │  (Hash tables, │               │  │
│  │  │  Aggregations) │               │  │
│  │  └────────────────┘               │  │
│  │  ┌────────────────┐               │  │
│  │  │ System Memory  │               │  │
│  │  │ (Buffers,      │               │  │
│  │  │  Overhead)     │               │  │
│  │  └────────────────┘               │  │
│  └────────────────────────────────────┘  │
│                                          │
│  ┌────────────────────────────────────┐  │
│  │      General / Reserved Pool       │  │
│  │    (Internal, Metadata, etc.)      │  │
│  └────────────────────────────────────┘  │
└──────────────────────────────────────────┘

13.2 メモリ設定

# config.properties

# クエリあたりの最大メモリ(クラスター全体)
query.max-memory=50GB

# ノードあたりのクエリ最大メモリ(ユーザーメモリ)
query.max-memory-per-node=8GB

# ノードあたりの合計メモリ(ユーザー + システム)
query.max-total-memory-per-node=10GB

# メモリ不足時のキル設定
query.low-memory-killer.delay=5m
query.low-memory-killer.policy=total-reservation-on-blocked-nodes

14. Fault Tolerance とスピル

14.1 スピル(ディスクへの退避)

メモリ不足時にデータをディスクに退避する機能。

# config.properties
spill-enabled=true
spiller-spill-path=/var/trino/spill
spiller-max-used-space-threshold=0.9
spiller-threads=4
max-spill-per-node=100GB

# 集約スピル
aggregation-operator-unspill-memory-limit=128MB

# Joinスピル
join-spill-enabled=true

# ソートスピル
order-by-spill-enabled=true

# ウィンドウ関数スピル
window-spill-enabled=true

14.2 Fault-Tolerant Execution(Project Tardigrade)

Spark 3.4+で導入された耐障害実行モードで、ワーカー障害時にクエリ全体を再実行するのではなく、失敗したタスクのみを再実行する。

# config.properties
retry-policy=TASK

# Exchange Manager (中間データの永続化)
exchange.base-directories=s3://trino-exchange/
exchange.sink-max-file-size=1GB
exchange.source-max-error-duration=5m

# リトライ設定
task-retry-attempts-per-task=4
retry-initial-delay=10s
retry-max-delay=1m
retry-delay-scale-factor=2.0
fault-tolerant-execution-target-task-input-size=4GB

15. デプロイメント

15.1 ベアメタル / VM デプロイ

# Trinoのインストール
wget https://repo1.maven.org/maven2/io/trino/trino-server/439/trino-server-439.tar.gz
tar xzf trino-server-439.tar.gz
cd trino-server-439

# ディレクトリ構造
# trino-server-439/
# ├── bin/
# │   └── launcher         # 起動スクリプト
# ├── etc/
# │   ├── config.properties # メイン設定
# │   ├── node.properties   # ノード設定
# │   ├── jvm.config        # JVM設定
# │   ├── log.properties    # ログ設定
# │   └── catalog/          # カタログ設定
# │       ├── hive.properties
# │       ├── iceberg.properties
# │       └── postgresql.properties
# └── plugin/               # プラグイン

# 起動
bin/launcher start
bin/launcher stop
bin/launcher restart
bin/launcher status

15.2 Docker デプロイ

# docker-compose.yml
version: '3.8'

services:
  trino-coordinator:
    image: trinodb/trino:439
    container_name: trino-coordinator
    ports:
      - "8080:8080"
    volumes:
      - ./etc/coordinator/config.properties:/etc/trino/config.properties
      - ./etc/coordinator/node.properties:/etc/trino/node.properties
      - ./etc/jvm.config:/etc/trino/jvm.config
      - ./etc/catalog:/etc/trino/catalog
    environment:
      - JAVA_TOOL_OPTIONS=-Xmx16G
    deploy:
      resources:
        limits:
          memory: 20G
          cpus: "8"

  trino-worker:
    image: trinodb/trino:439
    volumes:
      - ./etc/worker/config.properties:/etc/trino/config.properties
      - ./etc/worker/node.properties:/etc/trino/node.properties
      - ./etc/jvm.config:/etc/trino/jvm.config
      - ./etc/catalog:/etc/trino/catalog
    environment:
      - JAVA_TOOL_OPTIONS=-Xmx16G
    deploy:
      mode: replicated
      replicas: 5
      resources:
        limits:
          memory: 20G
          cpus: "8"

15.3 Kubernetes デプロイ(Helm Chart)

# Helm チャートの追加
helm repo add trino https://trinodb.github.io/charts
helm repo update

# インストール
helm install trino trino/trino \
    --namespace trino \
    --create-namespace \
    --values values.yaml
# values.yaml
image:
  repository: trinodb/trino
  tag: "439"

server:
  workers: 10
  coordinatorExtraConfig: |
    query.max-memory=100GB
    query.max-memory-per-node=16GB
  workerExtraConfig: |
    query.max-memory-per-node=16GB

coordinator:
  jvm:
    maxHeapSize: "24G"
  resources:
    requests:
      memory: "28Gi"
      cpu: "8"
    limits:
      memory: "28Gi"
      cpu: "16"

worker:
  jvm:
    maxHeapSize: "24G"
  resources:
    requests:
      memory: "28Gi"
      cpu: "8"
    limits:
      memory: "28Gi"
      cpu: "16"

additionalCatalogs:
  hive: |
    connector.name=hive
    hive.metastore.uri=thrift://hive-metastore:9083
    hive.s3.aws-access-key=${ENV:AWS_ACCESS_KEY_ID}
    hive.s3.aws-secret-key=${ENV:AWS_SECRET_ACCESS_KEY}
  
  iceberg: |
    connector.name=iceberg
    iceberg.catalog.type=hive_metastore
    hive.metastore.uri=thrift://hive-metastore:9083

  postgresql: |
    connector.name=postgresql
    connection-url=jdbc:postgresql://pg-host:5432/mydb
    connection-user=trino
    connection-password=${ENV:PG_PASSWORD}

autoscaling:
  enabled: true
  minReplicas: 5
  maxReplicas: 50
  targetCPUUtilizationPercentage: 70

16. 監視と運用

16.1 Trino Web UI

Trino Web UI(デフォルト: ポート8080)は以下の情報を提供する:

ページ情報
Query List実行中/完了/失敗クエリの一覧
Query Detail各クエリの実行計画、ステージ、タスク詳細
Resource Groupsリソースグループの使用状況
Worker Listワーカーの一覧と状態

16.2 JMX メトリクス

# etc/catalog/jmx.properties
connector.name=jmx
-- JMXメトリクスの取得
SELECT * FROM jmx.current."trino.execution:name=QueryManager";
SELECT * FROM jmx.current."trino.memory:name=ClusterMemoryManager";
SELECT * FROM jmx.current."trino.execution:name=TaskManager";

-- クエリ統計
SELECT
    node_id,
    queued_queries,
    running_queries,
    blocked_queries
FROM jmx.current."trino.execution:name=QueryManager";

16.3 Prometheus / Grafana 連携

# config.properties に追加
jmx.base-uri=http://localhost:8080
# prometheus.yml
scrape_configs:
  - job_name: 'trino'
    metrics_path: '/v1/jmx/mbean'
    static_configs:
      - targets: ['trino-coordinator:8080']
    params:
      objectName: ['trino.execution:name=QueryManager']

16.4 運用コマンド

-- 実行中のクエリ確認
SELECT query_id, state, user, query, created
FROM system.runtime.queries
WHERE state = 'RUNNING'
ORDER BY created DESC;

-- クエリのキル
CALL system.runtime.kill_query(query_id => '20240115_123456_00001_abcde', message => 'Cancelled by admin');

-- ノード情報
SELECT * FROM system.runtime.nodes;

-- タスク情報
SELECT * FROM system.runtime.tasks
WHERE query_id = '20240115_123456_00001_abcde';

-- テーブル統計の収集
ANALYZE hive_prod.analytics.events;

-- テーブルのプロパティ確認
SHOW CREATE TABLE hive_prod.analytics.events;

16.5 ログ設定

# etc/log.properties
io.trino=INFO
io.trino.execution=WARN
io.trino.plugin.hive=WARN
io.trino.plugin.iceberg=INFO
io.trino.server=INFO
io.trino.security=INFO

17. エコシステム連携

17.1 dbt との連携

# profiles.yml (dbt)
my_project:
  target: prod
  outputs:
    prod:
      type: trino
      method: none  # or ldap, kerberos, oauth
      host: trino-coordinator
      port: 8080
      user: dbt_user
      database: hive_prod
      schema: analytics
      catalog: hive_prod
      threads: 8
      http_scheme: http
      session_properties:
        query_max_run_time: 30m
        query_max_memory: 10GB

17.2 Apache Superset との連携

# Superset Database Connection
# SQLAlchemy URI:
# trino://user@trino-coordinator:8080/hive_prod/analytics

# 認証あり:
# trino://user:password@trino-coordinator:8443/hive_prod/analytics?protocol=https

17.3 Airflow との連携

from airflow.providers.trino.operators.trino import TrinoOperator

trino_task = TrinoOperator(
    task_id='run_trino_query',
    trino_conn_id='trino_default',
    sql="""
        INSERT INTO hive_prod.analytics.daily_summary
        SELECT
            event_date,
            COUNT(*) as event_count,
            COUNT(DISTINCT user_id) as unique_users
        FROM hive_prod.raw.events
        WHERE event_date = DATE '{{ ds }}'
        GROUP BY event_date
    """,
    dag=dag,
)

17.4 JDBC / ODBC 接続

// JDBC接続例
String url = "jdbc:trino://coordinator:8080/hive_prod/analytics";
Properties properties = new Properties();
properties.setProperty("user", "analyst");
properties.setProperty("SSL", "true");

Connection connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery("SELECT * FROM users LIMIT 10");
# Python (trino-python-client)
from trino.dbapi import connect
from trino.auth import BasicAuthentication

conn = connect(
    host="trino-coordinator",
    port=8080,
    user="analyst",
    catalog="hive_prod",
    schema="analytics",
    http_scheme="https",
    auth=BasicAuthentication("analyst", "password"),
)

cursor = conn.cursor()
cursor.execute("SELECT * FROM users LIMIT 10")
rows = cursor.fetchall()

18. ユースケースとベストプラクティス

18.1 代表的なユースケース

データレイク分析

-- S3上のデータレイクに対するアドホック分析
SELECT
    date_trunc('month', event_timestamp) as month,
    event_type,
    COUNT(*) as event_count,
    COUNT(DISTINCT user_id) as unique_users,
    approx_percentile(response_time_ms, 0.95) as p95_response_time
FROM iceberg_lake.analytics.api_events
WHERE event_timestamp >= TIMESTAMP '2024-01-01'
GROUP BY 1, 2
ORDER BY 1 DESC, 3 DESC;

データメッシュ / フェデレーテッドクエリ

-- 複数チームのデータソースを横断した分析
WITH customer_360 AS (
    SELECT
        c.customer_id,
        c.name,
        c.segment,
        o.total_orders,
        o.total_revenue,
        s.support_tickets,
        s.avg_satisfaction_score
    FROM postgresql_prod.crm.customers c
    LEFT JOIN iceberg_lake.sales.customer_orders o
        ON c.customer_id = o.customer_id
    LEFT JOIN postgresql_support.public.customer_metrics s
        ON c.customer_id = s.customer_id
)
SELECT
    segment,
    COUNT(*) as customer_count,
    AVG(total_revenue) as avg_revenue,
    AVG(avg_satisfaction_score) as avg_satisfaction
FROM customer_360
GROUP BY segment
ORDER BY avg_revenue DESC;

18.2 ベストプラクティス

  1. パーティションプルーニングを常に意識する: パーティションカラムをWHERE句に含める
  2. Predicate Pushdownを活用する: フィルタ条件をサブクエリ/CTEで先に適用
  3. 適切なデータフォーマットを選択: Parquet / ORC のカラムナー形式を推奨
  4. ANALYZE でテーブル統計を最新に保つ: CBO の精度向上
  5. リソースグループで優先度管理: インタラクティブクエリとETLを分離
  6. approx_ 関数を活用: 正確性が不要な場合はapprox_distinct, approx_percentile
  7. クエリのEXPLAIN確認: 大規模クエリの前に実行計画を確認
  8. Broadcast Joinの閾値を適切に設定: 小テーブルのBroadcast化
  9. スピルの有効化: メモリ不足時の安全弁として設定
  10. Dynamic Filteringの活用: スタースキーマクエリの高速化

19. Trino vs 他エンジン比較

19.1 比較表

項目TrinoSpark SQLPresto (Meta)HiveImpala
実行モデルパイプラインステージパイプラインMapReduceMPP
レイテンシ低〜中中〜高低〜中
フェデレーション強力限定的中程度弱い弱い
コネクター数40+20+20+10+10+
ストリーミング限定的強力限定的なしなし
MLなしMLlibなしなしなし
Fault ToleranceTask LevelStage Level限定的Job Levelなし
ETL向き中程度高い中程度高い低い
アドホック分析高い中程度高い低い高い

19.2 使い分けの指針

  • Trino: フェデレーテッドクエリ、インタラクティブ分析、データレイク探索
  • Spark: 大規模バッチETL、ML、ストリーミング処理
  • Hive: 超大規模バッチ、レガシーHadoopパイプライン
  • Impala: CDH/CDPクラスターでの低レイテンシクエリ

20. 最新動向と将来展望

20.1 最近のバージョンの主要機能

機能バージョン説明
Fault-Tolerant Execution400+タスクレベルのリトライ
Polymorphic Table Functions400+テーブル関数の型パラメトリック化
MERGE Statement391+Iceberg/Delta Lake での Upsert
Dynamic Catalog406+カタログの動的追加/削除
Project Hummingbird最新Rust ベースのワーカー(実験的)
Improved CBO最新コストモデルの継続的改善
Warp Speed最新Velox エンジン統合(実験的)

20.2 将来の方向性

  • Lakehouse との深い統合: Iceberg / Delta Lake / Hudi のファーストクラスサポート強化
  • AI/ML連携: 推論機能の組み込み、ベクトル検索対応
  • ネイティブRustワーカー: パフォーマンスのさらなる向上
  • サーバーレス実行: オンデマンドスケーリング
  • 統一カタログ: Unity Catalog、Polaris との連携強化

21. まとめ

Apache Trinoは、フェデレーテッドクエリ機能と高速なインタラクティブ分析を特徴とする分散SQLクエリエンジンである。

技術的要点

カテゴリ要点
アーキテクチャCoordinator-Worker MPP モデル
実行モデルパイプライン実行(インメモリ)
コネクター40+のデータソース対応
フェデレーションクロスカタログJoinの強力なサポート
SQLANSI SQL 準拠、高度な分析関数
最適化CBO、Predicate Pushdown、動的フィルタリング
デプロイベアメタル / Docker / Kubernetes
セキュリティLDAP / OAuth2 / Kerberos / ACL
耐障害性Fault-Tolerant Execution(タスクレベルリトライ)

選定の指針

Apache Trino は以下の要件がある場合に特に効果的である:

  • フェデレーテッドクエリ: 複数データソースを単一SQLで横断的に分析
  • インタラクティブ分析: 秒〜分レベルの応答時間が求められるアドホッククエリ
  • データレイク分析: S3/HDFS上のParquet/ORC/Iceberg/Deltaデータの高速参照
  • データメッシュ: 分散したデータプロダクトへの統一的なアクセスレイヤー
  • BIツール連携: Tableau/Superset等からの直接クエリ

一方、以下のケースでは他のツールを検討すべきである:

  • 大規模バッチETL: Apache Spark の方が適している
  • ストリーミング処理: Apache Flink / Spark Structured Streaming
  • 超低レイテンシOLAP: ClickHouse / Apache Druid
  • 機械学習: Apache Spark MLlib / 専用MLプラットフォーム

参考文献: