Apache Trino
Apache Trino 徹底解説:アーキテクチャ・機能・設定の全容
目次
- はじめに
- Apache Trino の概要と歴史
- アーキテクチャ全体像
- コーディネーターとワーカー
- クエリ実行エンジン
- コネクターアーキテクチャ
- 主要コネクターと設定
- SQL機能
- データ型とスキーマ管理
- フェデレーテッドクエリ
- セキュリティ
- パフォーマンスチューニング
- メモリ管理
- Fault Tolerance とスピル
- デプロイメント
- 監視と運用
- エコシステム連携
- ユースケースとベストプラクティス
- Trino vs 他エンジン比較
- 最新動向と将来展望
- まとめ
1. はじめに
Apache Trino(旧称 Presto SQL / PrestoSQL)は、大規模データに対するインタラクティブな分析クエリを実行するための高速分散SQLクエリエンジンである。Facebookで開発され、現在はTrino Software Foundationの下で管理されている。
Trinoの最大の特徴はフェデレーテッドクエリであり、HDFS、S3、リレーショナルデータベース、NoSQL、Kafka など多様なデータソースに対して、単一のSQLクエリでデータを横断的に参照・結合できる。
1.1 本記事の対象読者
- データレイクやデータウェアハウスの構築・運用を担当するデータエンジニア
- アドホッククエリや探索的分析を行うデータアナリスト
- Trinoクラスターの設計・運用を行うインフラエンジニア / SRE
- クエリエンジンの技術選定を行うアーキテクト
1.2 Trino の位置づけ
┌────────────────────────────────────────────────────────────┐
│ BI / Analytics Tools │
│ (Tableau, Superset, Metabase, Redash, dbt) │
├────────────────────────────────────────────────────────────┤
│ Trino (SQL Engine) │
│ Federated Query / Interactive Analytics │
├──────┬──────┬──────┬──────┬──────┬──────┬────────────────┤
│ Hive │ Ice- │Delta │ Post │MySQL │Kafka │ Elasticsearch │
│ HDFS │ berg │ Lake │ gres │ │ │ │
│ │ │ │ │ │ │ │
├──────┴──────┴──────┴──────┴──────┴──────┴────────────────┤
│ Storage Layer │
│ (HDFS / S3 / GCS / ADLS / RDBMS / NoSQL / MQ) │
└────────────────────────────────────────────────────────────┘
2. Apache Trino の概要と歴史
2.1 誕生と分岐の経緯
| 年 | 出来事 |
|---|---|
| 2012 | FacebookでPrestoプロジェクトが開始 |
| 2013 | Prestoがオープンソースとして公開 |
| 2018 | Prestoの主要開発者がFacebookを離れStarburst社を設立 |
| 2019 | コミュニティが分裂:PrestoSQL(現Trino)と PrestoDB(Facebook主導) |
| 2020 | PrestoSQL が Trino にリブランド |
| 2021 | Trino Software Foundation 設立 |
| 2024-2025 | Fault-tolerant execution、Polymorphic table functions、多数の改善 |
2.2 Trino と PrestoDB の違い
| 項目 | Trino (旧 PrestoSQL) | PrestoDB (Facebook) |
|---|---|---|
| ガバナンス | Trino Software Foundation | Presto Foundation (Linux Foundation) |
| 開発主体 | コミュニティ + Starburst | Meta + コミュニティ |
| リリース頻度 | 非常に高い(隔週) | 中程度 |
| コネクター数 | 40+ | 20+ |
| Fault Tolerance | Project Tardigrade | Presto on Spark |
| 特徴 | インタラクティブ重視 | バッチ対応強化 |
2.3 主要な特徴
- インメモリパイプライン実行: データをディスクに書き出さずにメモリ上でパイプライン処理
- フェデレーテッドクエリ: 異なるデータソースを単一SQLで横断
- ANSI SQL準拠: 標準SQLに準拠した豊富なSQL機能
- スケーラブル: 数ノードから数千ノードまでスケール
- プラグインアーキテクチャ: コネクター、型、関数、セキュリティの拡張が可能
- 低レイテンシ: ミリ秒〜秒レベルのクエリ応答
3. アーキテクチャ全体像
3.1 MPP(Massively Parallel Processing)アーキテクチャ
Trinoはシェアードナッシング(Shared-Nothing)のMPPアーキテクチャを採用している。
┌────────────────────────────────────────────────────────────┐
│ Client │
│ (JDBC / CLI / REST API / BI Tool) │
└──────────────────────┬─────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ Coordinator │
│ ┌─────────────┐ ┌──────────────┐ ┌────────────────────┐ │
│ │ Parser & │ │ Planner & │ │ Scheduler & │ │
│ │ Analyzer │ │ Optimizer │ │ Task Manager │ │
│ └─────────────┘ └──────────────┘ └────────────────────┘ │
│ ┌─────────────┐ ┌──────────────┐ ┌────────────────────┐ │
│ │ Metadata │ │ Cost- │ │ Resource │ │
│ │ Manager │ │ based Opt. │ │ Manager │ │
│ └─────────────┘ └──────────────┘ └────────────────────┘ │
└──────────────────────┬─────────────────────────────────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │ Task 1 │ │ │ │ Task 3 │ │ │ │ Task 5 │ │
│ │ Task 2 │ │ │ │ Task 4 │ │ │ │ Task 6 │ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │Connector │ │ │ │Connector │ │ │ │Connector │ │
│ │ Instances│ │ │ │ Instances│ │ │ │ Instances│ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────┐
│ Data Sources │
│ (HDFS, S3, PostgreSQL, MySQL, Kafka, etc.) │
└──────────────────────────────────────────────────┘
3.2 クエリ実行の流れ
- クライアント: SQL クエリをCoordinatorに送信
- パーシング: SQL文を抽象構文木(AST)に変換
- 分析: テーブル名、カラム名の解決、型チェック
- 論理計画: 論理的なクエリ計画を生成
- 最適化: コストベース最適化(CBO)を適用
- 物理計画: 分散実行計画に変換
- スケジューリング: タスクをWorkerに割り当て
- 実行: 各Workerがタスクを並列実行
- 結果返却: パイプラインで結果をクライアントに返却
3.3 パイプライン実行モデル
Trinoの特徴的な点はパイプライン実行モデルである。MapReduceやSparkと異なり、中間データをディスクに書き出すことなく、データが生成されると同時に次のステージに流す。
Stage 0 (Source) Stage 1 (Exchange) Stage 2 (Output)
┌────────────┐ ┌────────────────┐ ┌──────────────┐
│ Table Scan │ ───► │ Hash Join │ ─► │ Aggregation │
│ + Filter │ │ + Project │ │ + Output │
└────────────┘ └────────────────┘ └──────────────┘
│ ▲ │ │
│ Pipeline │ │ Pipeline │
└────────────────────┘ └──────────────────┘
データが即座に次ステージへ 結果をストリーミング返却
4. コーディネーターとワーカー
4.1 Coordinator の役割
Coordinator は Trino クラスターの中核であり、以下の責務を持つ:
- SQL の受信、パース、分析
- クエリ計画の最適化
- ステージとタスクへの分割
- ワーカーへのタスク配布
- クエリ状態の管理
- リソースグループ管理
- Web UI の提供
Coordinator 設定(config.properties)
# config.properties (Coordinator)
coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8080
discovery.uri=http://coordinator-host:8080
# クエリ管理
query.max-memory=50GB
query.max-memory-per-node=8GB
query.max-total-memory-per-node=10GB
query.max-execution-time=30m
query.max-run-time=60m
query.max-stage-count=150
query.max-history=1000
query.min-expire-age=30m
4.2 Worker の役割
Worker は実際のデータ処理を行うノードであり:
- データソースからのデータ読み取り
- タスクの実行(フィルタリング、結合、集約等)
- 中間データの交換(Exchange)
- 結果のCoordinatorへの返却
Worker 設定(config.properties)
# config.properties (Worker)
coordinator=false
http-server.http.port=8080
discovery.uri=http://coordinator-host:8080
# タスク設定
task.max-worker-threads=16
task.min-drivers=32
task.max-drivers-per-task=64
task.concurrency=16
4.3 ノード設定(node.properties)
# node.properties
node.environment=production
node.id=<unique-uuid>
node.data-dir=/var/trino/data
4.4 JVM 設定(jvm.config)
# jvm.config
-server
-Xmx16G
-Xms16G
-XX:InitialRAMPercentage=80
-XX:MaxRAMPercentage=80
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=512M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000
-Dfile.encoding=UTF-8
-XX:+UnlockDiagnosticVMOptions
-XX:+UseAESCTRIntrinsics
5. クエリ実行エンジン
5.1 クエリ計画の最適化
コストベース最適化(CBO)
Trinoは統計情報に基づくコストベース最適化を実装している。
-- テーブル統計の収集
ANALYZE table_name;
-- カラム統計の確認
SHOW STATS FOR table_name;
-- 特定パーティションの統計
SHOW STATS FOR (SELECT * FROM table_name WHERE date = '2024-01-15');
主要な最適化:
| 最適化 | 説明 |
|---|---|
| Predicate Pushdown | フィルタ条件をコネクターに押し下げ |
| Projection Pushdown | 必要なカラムのみを読み込み |
| Join Reordering | 結合順序の最適化(テーブル統計に基づく) |
| Join Distribution | Broadcast vs Partitioned の選択 |
| Partial Aggregation | Worker側での部分集約 |
| Limit Pushdown | LIMIT句のプッシュダウン |
| TopN Pushdown | ORDER BY + LIMIT の最適化 |
実行計画の確認
-- 論理計画
EXPLAIN SELECT * FROM orders JOIN customers ON orders.customer_id = customers.id;
-- 分散実行計画
EXPLAIN (TYPE DISTRIBUTED)
SELECT customer_id, SUM(total_amount)
FROM orders
GROUP BY customer_id;
-- I/O統計付き
EXPLAIN ANALYZE
SELECT customer_id, COUNT(*) as order_count
FROM orders
WHERE order_date >= DATE '2024-01-01'
GROUP BY customer_id
HAVING COUNT(*) > 10;
-- JSON形式の実行計画
EXPLAIN (FORMAT JSON) SELECT ...;
-- Graphviz形式
EXPLAIN (FORMAT GRAPHVIZ) SELECT ...;
5.2 結合戦略
Trinoは以下の結合戦略をサポートする:
-- Broadcast Join(小テーブルを全Workerに配布)
-- 自動選択または手動指定
SELECT /*+ BROADCAST */ *
FROM large_table l
JOIN small_table s ON l.key = s.key;
-- Partitioned Join(両テーブルをハッシュ分割)
SELECT /*+ PARTITION */ *
FROM table_a a
JOIN table_b b ON a.key = b.key;
| 結合タイプ | 使用条件 | メモリ要件 |
|---|---|---|
| Broadcast | 一方が小さい(< broadcast threshold) | 小テーブルがメモリに収まる |
| Partitioned (Hash) | 両テーブルが大きい | 各パーティションがメモリに収まる |
| Cross Join | 条件なし結合 | 右側がメモリに収まる |
5.3 動的フィルタリング
Trino は Join 時に動的フィルタを生成し、プローブ側のテーブルスキャンを削減する。
# 動的フィルタリング設定
enable-dynamic-filtering=true
dynamic-filtering-max-per-driver-row-count=1000000
dynamic-filtering-max-per-driver-size=10MB
dynamic-filtering-range-row-limit-per-driver=10000
-- 動的フィルタリングの例
-- dimension テーブルのフィルタが fact テーブルに適用される
SELECT f.*, d.category_name
FROM fact_sales f
JOIN dim_product d ON f.product_id = d.product_id
WHERE d.category = 'Electronics';
-- fact_sales のスキャン時に、product_idが'Electronics'カテゴリの
-- 製品IDに限定される(動的フィルタ)
6. コネクターアーキテクチャ
6.1 コネクターの仕組み
Trinoのコネクターは**SPI(Service Provider Interface)**に基づくプラグインアーキテクチャで実装されている。各コネクターは以下のインターフェースを実装する:
┌────────────────────────────────────────────────────────┐
│ Trino Connector SPI │
├──────────────┬────────────────┬────────────────────────┤
│ Metadata │ Data Source │ Access Control │
│ Provider │ Provider │ Provider │
│ │ │ │
│ - Schemas │ - Page Source │ - Table permissions │
│ - Tables │ - Record Set │ - Column masking │
│ - Columns │ - Page Sink │ - Row filtering │
│ - Stats │ - Record Sink │ │
├──────────────┼────────────────┼────────────────────────┤
│ Split │ Transaction │ Session Property │
│ Manager │ Manager │ Provider │
└──────────────┴────────────────┴────────────────────────┘
6.2 カタログの概念
Trinoでは「カタログ」がデータソースへの接続を表す。1つのコネクタータイプに対して複数のカタログを作成できる。
Trino
├── catalog: hive_prod (connector: hive)
│ ├── schema: analytics
│ │ ├── table: users
│ │ └── table: events
│ └── schema: raw
│ └── table: logs
├── catalog: postgresql_prod (connector: postgresql)
│ └── schema: public
│ ├── table: customers
│ └── table: orders
├── catalog: iceberg_lake (connector: iceberg)
│ └── schema: warehouse
│ └── table: transactions
└── catalog: kafka_stream (connector: kafka)
└── schema: default
└── table: click_events
SQL でのアクセス:
-- 完全修飾名
SELECT * FROM hive_prod.analytics.users;
-- カタログの切り替え
USE hive_prod.analytics;
SELECT * FROM users;
-- クロスカタログJoin(フェデレーテッドクエリ)
SELECT u.name, o.total_amount
FROM hive_prod.analytics.users u
JOIN postgresql_prod.public.orders o
ON u.user_id = o.customer_id;
7. 主要コネクターと設定
7.1 Hive コネクター
Hive メタストアを使用してHDFS / S3 上のデータにアクセスする最も一般的なコネクター。
# etc/catalog/hive.properties
connector.name=hive
hive.metastore.uri=thrift://hive-metastore:9083
# S3設定
hive.s3.aws-access-key=AKIAIOSFODNN7EXAMPLE
hive.s3.aws-secret-key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
hive.s3.endpoint=s3.amazonaws.com
hive.s3.path-style-access=false
hive.s3.ssl.enabled=true
# パフォーマンス設定
hive.max-partitions-per-scan=1000000
hive.max-splits-per-second=1000
hive.max-initial-splits=200
hive.max-initial-split-size=64MB
hive.max-split-size=128MB
# 書き込み設定
hive.compression-codec=SNAPPY
hive.target-max-file-size=1GB
hive.immutable-partitions=false
# キャッシュ設定
hive.metastore-cache-ttl=2m
hive.metastore-refresh-interval=1m
# パーティションプロジェクション
hive.partition-projection-enabled=true
7.2 Iceberg コネクター
Apache Iceberg テーブルフォーマットに対する高性能なアクセスを提供する。
# etc/catalog/iceberg.properties
connector.name=iceberg
# Hive Metastore カタログ
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://hive-metastore:9083
# REST カタログ
# iceberg.catalog.type=rest
# iceberg.rest-catalog.uri=http://iceberg-rest:8181
# iceberg.rest-catalog.warehouse=s3://warehouse/
# Nessie カタログ
# iceberg.catalog.type=nessie
# iceberg.nessie-catalog.uri=http://nessie:19120/api/v2
# iceberg.nessie-catalog.default-warehouse-dir=s3://warehouse/
# AWS Glue カタログ
# iceberg.catalog.type=glue
# hive.metastore.glue.region=us-east-1
# S3設定
hive.s3.aws-access-key=AKIAIOSFODNN7EXAMPLE
hive.s3.aws-secret-key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
# パフォーマンス
iceberg.max-partitions-per-writer=1000
iceberg.target-max-file-size=1GB
iceberg.compression-codec=ZSTD
# 高度な設定
iceberg.expire-snapshots.min-retention=7d
iceberg.delete-schema-locations-fallback=false
-- Icebergテーブルの作成
CREATE TABLE iceberg_lake.warehouse.events (
event_id BIGINT,
user_id BIGINT,
event_type VARCHAR,
event_data VARCHAR,
event_timestamp TIMESTAMP(6) WITH TIME ZONE
)
WITH (
format = 'PARQUET',
partitioning = ARRAY['day(event_timestamp)', 'bucket(user_id, 16)'],
sorted_by = ARRAY['event_timestamp'],
location = 's3://data-lake/warehouse/events'
);
-- タイムトラベル
SELECT * FROM iceberg_lake.warehouse.events FOR TIMESTAMP AS OF TIMESTAMP '2024-01-15 00:00:00 UTC';
SELECT * FROM iceberg_lake.warehouse.events FOR VERSION AS OF 1234567890;
-- スナップショット情報
SELECT * FROM iceberg_lake.warehouse."events$snapshots";
SELECT * FROM iceberg_lake.warehouse."events$history";
SELECT * FROM iceberg_lake.warehouse."events$manifests";
SELECT * FROM iceberg_lake.warehouse."events$partitions";
SELECT * FROM iceberg_lake.warehouse."events$files";
-- テーブルメンテナンス
ALTER TABLE iceberg_lake.warehouse.events EXECUTE expire_snapshots(retention_threshold => '7d');
ALTER TABLE iceberg_lake.warehouse.events EXECUTE remove_orphan_files(retention_threshold => '7d');
ALTER TABLE iceberg_lake.warehouse.events EXECUTE optimize WHERE event_timestamp >= TIMESTAMP '2024-01-01';
7.3 Delta Lake コネクター
# etc/catalog/delta.properties
connector.name=delta_lake
hive.metastore.uri=thrift://hive-metastore:9083
# S3設定
hive.s3.aws-access-key=AKIAIOSFODNN7EXAMPLE
hive.s3.aws-secret-key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
# パフォーマンス
delta.enable-non-concurrent-writes=true
delta.max-partitions-per-writer=1000
delta.target-max-file-size=1GB
delta.compression-codec=SNAPPY
# Change Data Feed
delta.enable-change-data-feed=true
# Vacuum
delta.vacuum.min-retention=168h
7.4 PostgreSQL / MySQL コネクター
# etc/catalog/postgresql.properties
connector.name=postgresql
connection-url=jdbc:postgresql://db-host:5432/mydb
connection-user=trino_user
connection-password=trino_password
# コネクションプール
connection-pool.max-size=30
connection-pool.min-size=1
# プッシュダウン設定
postgresql.experimental.enable-string-pushdown-with-collate=true
# 集約プッシュダウン
jdbc.aggregation-pushdown.enabled=true
jdbc.join-pushdown.enabled=true
jdbc.topn-pushdown.enabled=true
# etc/catalog/mysql.properties
connector.name=mysql
connection-url=jdbc:mysql://db-host:3306/mydb
connection-user=trino_user
connection-password=trino_password
7.5 Kafka コネクター
# etc/catalog/kafka.properties
connector.name=kafka
kafka.nodes=kafka-broker1:9092,kafka-broker2:9092,kafka-broker3:9092
kafka.table-names=click_events,page_views,user_actions
kafka.hide-internal-columns=false
kafka.default-schema=default
# セキュリティ
kafka.security-protocol=SASL_SSL
kafka.sasl-mechanism=PLAIN
// etc/kafka/click_events.json (テーブル定義)
{
"tableName": "click_events",
"schemaName": "default",
"topicName": "click-events",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "kafka_key",
"type": "VARCHAR",
"dataFormat": "raw"
}
]
},
"message": {
"dataFormat": "json",
"fields": [
{"name": "user_id", "type": "BIGINT", "mapping": "user_id"},
{"name": "page_url", "type": "VARCHAR", "mapping": "page_url"},
{"name": "timestamp", "type": "TIMESTAMP", "mapping": "timestamp"}
]
}
}
7.6 Elasticsearch コネクター
# etc/catalog/elasticsearch.properties
connector.name=elasticsearch
elasticsearch.host=es-host
elasticsearch.port=9200
elasticsearch.default-schema-name=default
elasticsearch.scroll-size=1000
elasticsearch.scroll-timeout=1m
elasticsearch.request-timeout=10s
elasticsearch.tls.enabled=true
7.7 MongoDB コネクター
# etc/catalog/mongodb.properties
connector.name=mongodb
mongodb.connection-url=mongodb://mongo-host:27017/
mongodb.schema-collection=_schema
mongodb.case-insensitive-name-matching=true
8. SQL機能
8.1 基本的なSQL操作
-- DDL操作
CREATE SCHEMA hive_prod.analytics;
CREATE TABLE hive_prod.analytics.users (
user_id BIGINT,
name VARCHAR,
email VARCHAR,
created_at TIMESTAMP(3) WITH TIME ZONE
)
WITH (
format = 'PARQUET',
partitioned_by = ARRAY['date(created_at)'],
external_location = 's3://data-lake/analytics/users'
);
-- INSERT
INSERT INTO hive_prod.analytics.users
SELECT user_id, name, email, created_at
FROM raw.user_registrations;
-- CREATE TABLE AS SELECT (CTAS)
CREATE TABLE hive_prod.analytics.active_users
WITH (format = 'PARQUET', partitioned_by = ARRAY['country'])
AS
SELECT user_id, name, country, last_login
FROM users
WHERE last_login >= CURRENT_DATE - INTERVAL '30' DAY;
-- MERGE (Iceberg / Delta Lake)
MERGE INTO iceberg_lake.warehouse.customers AS target
USING staging.new_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN
UPDATE SET name = source.name, email = source.email, updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
INSERT (customer_id, name, email, created_at, updated_at)
VALUES (source.customer_id, source.name, source.email, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);
-- DELETE
DELETE FROM iceberg_lake.warehouse.events
WHERE event_timestamp < TIMESTAMP '2023-01-01 00:00:00 UTC';
-- UPDATE
UPDATE iceberg_lake.warehouse.customers
SET status = 'inactive'
WHERE last_login < CURRENT_DATE - INTERVAL '365' DAY;
8.2 高度なSQL機能
-- ウィンドウ関数
SELECT
user_id,
event_type,
event_timestamp,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_timestamp DESC) as rn,
LAG(event_timestamp) OVER (PARTITION BY user_id ORDER BY event_timestamp) as prev_event,
LEAD(event_timestamp) OVER (PARTITION BY user_id ORDER BY event_timestamp) as next_event,
SUM(amount) OVER (
PARTITION BY user_id
ORDER BY event_timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as cumulative_amount,
AVG(amount) OVER (
PARTITION BY user_id
ORDER BY event_timestamp
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7
FROM events;
-- GROUPING SETS / CUBE / ROLLUP
SELECT
COALESCE(country, 'ALL') as country,
COALESCE(category, 'ALL') as category,
SUM(revenue) as total_revenue,
COUNT(*) as order_count
FROM orders
GROUP BY GROUPING SETS (
(country, category),
(country),
(category),
()
);
-- UNNEST(配列/マップの展開)
SELECT
user_id,
tag
FROM users
CROSS JOIN UNNEST(tags) AS t(tag);
-- JSON関数
SELECT
json_extract_scalar(event_data, '$.action') as action,
json_extract_scalar(event_data, '$.properties.source') as source,
CAST(json_extract(event_data, '$.items') AS ARRAY(JSON)) as items
FROM events;
-- 配列・マップ操作
SELECT
user_id,
ARRAY_AGG(DISTINCT event_type) as event_types,
MAP_AGG(event_type, event_count) as event_count_map,
ARRAY_JOIN(ARRAY_AGG(DISTINCT tag), ', ') as tag_list,
CARDINALITY(ARRAY_AGG(DISTINCT event_type)) as unique_event_count
FROM user_events
GROUP BY user_id;
-- WITH RECURSIVE(再帰CTE)
WITH RECURSIVE org_tree AS (
SELECT employee_id, name, manager_id, 1 as level
FROM employees
WHERE manager_id IS NULL
UNION ALL
SELECT e.employee_id, e.name, e.manager_id, ot.level + 1
FROM employees e
JOIN org_tree ot ON e.manager_id = ot.employee_id
)
SELECT * FROM org_tree ORDER BY level, name;
-- MATCH_RECOGNIZE(パターンマッチング)
SELECT *
FROM stock_prices
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY trade_date
MEASURES
FIRST(DOWN.trade_date) AS start_date,
LAST(UP.trade_date) AS end_date,
FIRST(DOWN.price) AS start_price,
LAST(UP.price) AS end_price
ONE ROW PER MATCH
PATTERN (DOWN+ UP+)
DEFINE
DOWN AS price < PREV(price),
UP AS price > PREV(price)
);
-- Polymorphic Table Functions
SELECT *
FROM TABLE(exclude_columns(
input => TABLE(orders),
columns => DESCRIPTOR(internal_notes, debug_info)
));
8.3 PreparedStatement
-- Prepared Statementの使用
PREPARE my_query FROM
SELECT * FROM orders WHERE customer_id = ? AND order_date >= ?;
EXECUTE my_query USING 12345, DATE '2024-01-01';
DEALLOCATE PREPARE my_query;
9. データ型とスキーマ管理
9.1 サポートされるデータ型
| カテゴリ | データ型 | 説明 |
|---|---|---|
| 論理値 | BOOLEAN | true / false |
| 整数 | TINYINT, SMALLINT, INTEGER, BIGINT | 8/16/32/64ビット整数 |
| 浮動小数点 | REAL, DOUBLE | 32/64ビット浮動小数点 |
| 固定精度 | DECIMAL(p, s) | 任意精度の固定小数点 |
| 文字列 | VARCHAR, CHAR, VARBINARY | 可変長/固定長文字列、バイナリ |
| 日時 | DATE, TIME, TIMESTAMP, INTERVAL | 日付・時刻・タイムスタンプ |
| 構造化 | ARRAY, MAP, ROW | 配列、マップ、構造体 |
| その他 | JSON, UUID, IPADDRESS, HyperLogLog | 特殊型 |
-- 型変換
SELECT CAST('2024-01-15' AS DATE);
SELECT CAST(123 AS VARCHAR);
SELECT TRY_CAST('invalid' AS INTEGER); -- NULLを返す(エラーにならない)
-- ROW型
SELECT CAST(ROW('Alice', 30) AS ROW(name VARCHAR, age INTEGER));
-- ARRAY型
SELECT ARRAY[1, 2, 3, 4, 5];
SELECT ARRAY_AGG(name ORDER BY name) FROM users;
-- MAP型
SELECT MAP(ARRAY['a', 'b'], ARRAY[1, 2]);
10. フェデレーテッドクエリ
10.1 クロスカタログJoin
Trinoの最大の強みは、異なるデータソースを透過的にJoinできることである。
-- S3上のHiveテーブルとPostgreSQLテーブルの結合
SELECT
h.event_type,
h.event_count,
p.category_name,
p.description
FROM hive_prod.analytics.event_summary h
JOIN postgresql_prod.public.event_categories p
ON h.event_type = p.event_code
WHERE h.event_date = CURRENT_DATE;
-- Kafkaストリーム + S3データレイク + RDBMSの3way結合
SELECT
k.user_id,
k.action,
k.timestamp as event_time,
u.name as user_name,
u.country,
p.product_name,
p.price
FROM kafka_stream.default.click_events k
JOIN hive_prod.analytics.users u
ON k.user_id = u.user_id
JOIN postgresql_prod.public.products p
ON k.product_id = p.product_id
WHERE k.timestamp >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR;
-- Iceberg + Elasticsearch
SELECT
i.order_id,
i.total_amount,
e.search_query,
e.click_position
FROM iceberg_lake.warehouse.orders i
JOIN elasticsearch.default.search_logs e
ON i.session_id = e.session_id;
10.2 フェデレーテッドクエリのパフォーマンス考慮
-- ✓ 良い例:各データソースでフィルタリングしてからJoin
WITH filtered_orders AS (
SELECT order_id, customer_id, total_amount
FROM postgresql_prod.public.orders
WHERE order_date >= DATE '2024-01-01' -- PostgreSQLでフィルタ
),
filtered_events AS (
SELECT user_id, event_type, event_count
FROM hive_prod.analytics.events
WHERE event_date >= DATE '2024-01-01' -- Hive(S3)でフィルタ
)
SELECT * FROM filtered_orders o
JOIN filtered_events e ON o.customer_id = e.user_id;
-- ✗ 悪い例:大量データを全てTrinoに読み込んでからフィルタ
SELECT * FROM postgresql_prod.public.orders o
JOIN hive_prod.analytics.events e ON o.customer_id = e.user_id
WHERE o.order_date >= DATE '2024-01-01';
-- (上記でもオプティマイザがPredicate Pushdownするが、明示的に分離する方が確実)
11. セキュリティ
11.1 認証
# config.properties - Password認証
http-server.authentication.type=PASSWORD
# etc/password-authenticator.properties (LDAP)
password-authenticator.name=ldap
ldap.url=ldaps://ldap-server:636
ldap.user-bind-pattern=${USER}@company.com
ldap.group-auth-pattern=(&(objectClass=group)(member=${USER}))
# etc/password-authenticator.properties (File)
password-authenticator.name=file
file.password-file=etc/password.db
# OAuth2 / OpenID Connect 認証
http-server.authentication.type=OAUTH2
http-server.authentication.oauth2.issuer=https://auth.company.com/
http-server.authentication.oauth2.client-id=trino-client
http-server.authentication.oauth2.client-secret=client-secret
http-server.authentication.oauth2.scopes=openid,profile
# Kerberos認証
http-server.authentication.type=KERBEROS
http.authentication.krb5.config=/etc/krb5.conf
http.server.authentication.krb5.keytab=/etc/trino/trino.keytab
http.server.authentication.krb5.service-name=trino
11.2 認可(Access Control)
# config.properties
access-control.name=file
security.config-file=etc/access-control.json
security.refresh-period=10s
// etc/access-control.json
{
"catalogs": [
{
"user": "admin",
"catalog": ".*",
"allow": "all"
},
{
"group": "analysts",
"catalog": "hive_prod|iceberg_lake",
"allow": "read-only"
},
{
"user": "etl_user",
"catalog": "hive_prod",
"allow": "all"
}
],
"schemas": [
{
"user": ".*",
"schema": "information_schema",
"owner": false
}
],
"tables": [
{
"user": "analyst_.*",
"catalog": "hive_prod",
"schema": "analytics",
"table": ".*",
"privileges": ["SELECT"]
}
],
"impersonation": [
{
"original_user": "airflow",
"new_user": "etl_user",
"allow": true
}
]
}
11.3 通信の暗号化
# 内部通信の暗号化
internal-communication.https.required=true
internal-communication.shared-secret=<shared-secret>
# HTTPS設定
http-server.https.enabled=true
http-server.https.port=8443
http-server.https.keystore.path=/etc/trino/tls/keystore.jks
http-server.https.keystore.key=keystore-password
12. パフォーマンスチューニング
12.1 リソースグループ
リソースグループは、クエリのリソース使用量を制御するためのメカニズムである。
// etc/resource-groups.json
{
"rootGroups": [
{
"name": "global",
"softMemoryLimit": "90%",
"hardConcurrencyLimit": 500,
"maxQueued": 5000,
"subGroups": [
{
"name": "admin",
"softMemoryLimit": "30%",
"hardConcurrencyLimit": 50,
"maxQueued": 100,
"schedulingWeight": 10,
"schedulingPolicy": "weighted_fair"
},
{
"name": "etl",
"softMemoryLimit": "40%",
"hardConcurrencyLimit": 20,
"maxQueued": 200,
"schedulingWeight": 5,
"schedulingPolicy": "fair"
},
{
"name": "interactive",
"softMemoryLimit": "20%",
"hardConcurrencyLimit": 200,
"maxQueued": 1000,
"schedulingWeight": 3,
"schedulingPolicy": "weighted_fair"
},
{
"name": "dashboards",
"softMemoryLimit": "10%",
"hardConcurrencyLimit": 100,
"maxQueued": 500,
"schedulingWeight": 2,
"schedulingPolicy": "query_priority"
}
]
}
],
"selectors": [
{
"user": "admin",
"group": "global.admin"
},
{
"source": "airflow|dagster",
"group": "global.etl"
},
{
"source": "tableau|superset|metabase",
"group": "global.dashboards"
},
{
"group": "global.interactive"
}
]
}
12.2 セッションプロパティ
-- クエリ最適化
SET SESSION join_reordering_strategy = 'AUTOMATIC';
SET SESSION join_distribution_type = 'AUTOMATIC';
SET SESSION task_concurrency = 16;
SET SESSION hash_partition_count = 100;
SET SESSION optimize_hash_generation = true;
-- 結合最適化
SET SESSION join_max_broadcast_table_size = '100MB';
SET SESSION enable_dynamic_filtering = true;
-- メモリ制限
SET SESSION query_max_memory = '10GB';
SET SESSION query_max_memory_per_node = '2GB';
-- スピル設定
SET SESSION spill_enabled = true;
SET SESSION aggregation_operator_unspill_memory_limit = '128MB';
-- Writer設定
SET SESSION scale_writers = true;
SET SESSION writer_scaling_min_data_processed = '100MB';
SET SESSION task_writer_count = 4;
SET SESSION task_partitioned_writer_count = 64;
12.3 パフォーマンス最適化テクニック
-- 1. パーティションプルーニング(必ずパーティションカラムでフィルタ)
SELECT * FROM events WHERE event_date = DATE '2024-01-15'; -- ✓ Good
SELECT * FROM events WHERE CAST(event_timestamp AS DATE) = DATE '2024-01-15'; -- ✗ Bad
-- 2. Predicate Pushdownの活用
SELECT * FROM pg_table WHERE status = 'active'; -- PostgreSQLでフィルタされる
-- 3. LIMIT の活用(探索的クエリ)
SELECT * FROM large_table LIMIT 100; -- 全データスキャンを回避
-- 4. approx_ 関数の活用
SELECT approx_distinct(user_id) FROM events; -- COUNT(DISTINCT)より高速
SELECT approx_percentile(latency, 0.95) FROM requests; -- P95近似値
-- 5. 大きな IN リストの回避
-- ✗ Bad
SELECT * FROM orders WHERE customer_id IN (1, 2, 3, ..., 10000);
-- ✓ Good
CREATE TABLE tmp_customer_ids AS SELECT * FROM UNNEST(ARRAY[1, 2, 3]) AS t(id);
SELECT o.* FROM orders o JOIN tmp_customer_ids t ON o.customer_id = t.id;
-- 6. TABLESAMPLE の活用
SELECT * FROM large_table TABLESAMPLE BERNOULLI(1); -- 1%サンプリング
13. メモリ管理
13.1 メモリ構造
┌──────────────────────────────────────────┐
│ JVM Heap Memory │
│ │
│ ┌────────────────────────────────────┐ │
│ │ Query Memory Pool │ │
│ │ (query.max-memory-per-node) │ │
│ │ │ │
│ │ ┌────────────────┐ │ │
│ │ │ User Memory │ │ │
│ │ │ (Hash tables, │ │ │
│ │ │ Aggregations) │ │ │
│ │ └────────────────┘ │ │
│ │ ┌────────────────┐ │ │
│ │ │ System Memory │ │ │
│ │ │ (Buffers, │ │ │
│ │ │ Overhead) │ │ │
│ │ └────────────────┘ │ │
│ └────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────┐ │
│ │ General / Reserved Pool │ │
│ │ (Internal, Metadata, etc.) │ │
│ └────────────────────────────────────┘ │
└──────────────────────────────────────────┘
13.2 メモリ設定
# config.properties
# クエリあたりの最大メモリ(クラスター全体)
query.max-memory=50GB
# ノードあたりのクエリ最大メモリ(ユーザーメモリ)
query.max-memory-per-node=8GB
# ノードあたりの合計メモリ(ユーザー + システム)
query.max-total-memory-per-node=10GB
# メモリ不足時のキル設定
query.low-memory-killer.delay=5m
query.low-memory-killer.policy=total-reservation-on-blocked-nodes
14. Fault Tolerance とスピル
14.1 スピル(ディスクへの退避)
メモリ不足時にデータをディスクに退避する機能。
# config.properties
spill-enabled=true
spiller-spill-path=/var/trino/spill
spiller-max-used-space-threshold=0.9
spiller-threads=4
max-spill-per-node=100GB
# 集約スピル
aggregation-operator-unspill-memory-limit=128MB
# Joinスピル
join-spill-enabled=true
# ソートスピル
order-by-spill-enabled=true
# ウィンドウ関数スピル
window-spill-enabled=true
14.2 Fault-Tolerant Execution(Project Tardigrade)
Spark 3.4+で導入された耐障害実行モードで、ワーカー障害時にクエリ全体を再実行するのではなく、失敗したタスクのみを再実行する。
# config.properties
retry-policy=TASK
# Exchange Manager (中間データの永続化)
exchange.base-directories=s3://trino-exchange/
exchange.sink-max-file-size=1GB
exchange.source-max-error-duration=5m
# リトライ設定
task-retry-attempts-per-task=4
retry-initial-delay=10s
retry-max-delay=1m
retry-delay-scale-factor=2.0
fault-tolerant-execution-target-task-input-size=4GB
15. デプロイメント
15.1 ベアメタル / VM デプロイ
# Trinoのインストール
wget https://repo1.maven.org/maven2/io/trino/trino-server/439/trino-server-439.tar.gz
tar xzf trino-server-439.tar.gz
cd trino-server-439
# ディレクトリ構造
# trino-server-439/
# ├── bin/
# │ └── launcher # 起動スクリプト
# ├── etc/
# │ ├── config.properties # メイン設定
# │ ├── node.properties # ノード設定
# │ ├── jvm.config # JVM設定
# │ ├── log.properties # ログ設定
# │ └── catalog/ # カタログ設定
# │ ├── hive.properties
# │ ├── iceberg.properties
# │ └── postgresql.properties
# └── plugin/ # プラグイン
# 起動
bin/launcher start
bin/launcher stop
bin/launcher restart
bin/launcher status
15.2 Docker デプロイ
# docker-compose.yml
version: '3.8'
services:
trino-coordinator:
image: trinodb/trino:439
container_name: trino-coordinator
ports:
- "8080:8080"
volumes:
- ./etc/coordinator/config.properties:/etc/trino/config.properties
- ./etc/coordinator/node.properties:/etc/trino/node.properties
- ./etc/jvm.config:/etc/trino/jvm.config
- ./etc/catalog:/etc/trino/catalog
environment:
- JAVA_TOOL_OPTIONS=-Xmx16G
deploy:
resources:
limits:
memory: 20G
cpus: "8"
trino-worker:
image: trinodb/trino:439
volumes:
- ./etc/worker/config.properties:/etc/trino/config.properties
- ./etc/worker/node.properties:/etc/trino/node.properties
- ./etc/jvm.config:/etc/trino/jvm.config
- ./etc/catalog:/etc/trino/catalog
environment:
- JAVA_TOOL_OPTIONS=-Xmx16G
deploy:
mode: replicated
replicas: 5
resources:
limits:
memory: 20G
cpus: "8"
15.3 Kubernetes デプロイ(Helm Chart)
# Helm チャートの追加
helm repo add trino https://trinodb.github.io/charts
helm repo update
# インストール
helm install trino trino/trino \
--namespace trino \
--create-namespace \
--values values.yaml
# values.yaml
image:
repository: trinodb/trino
tag: "439"
server:
workers: 10
coordinatorExtraConfig: |
query.max-memory=100GB
query.max-memory-per-node=16GB
workerExtraConfig: |
query.max-memory-per-node=16GB
coordinator:
jvm:
maxHeapSize: "24G"
resources:
requests:
memory: "28Gi"
cpu: "8"
limits:
memory: "28Gi"
cpu: "16"
worker:
jvm:
maxHeapSize: "24G"
resources:
requests:
memory: "28Gi"
cpu: "8"
limits:
memory: "28Gi"
cpu: "16"
additionalCatalogs:
hive: |
connector.name=hive
hive.metastore.uri=thrift://hive-metastore:9083
hive.s3.aws-access-key=${ENV:AWS_ACCESS_KEY_ID}
hive.s3.aws-secret-key=${ENV:AWS_SECRET_ACCESS_KEY}
iceberg: |
connector.name=iceberg
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://hive-metastore:9083
postgresql: |
connector.name=postgresql
connection-url=jdbc:postgresql://pg-host:5432/mydb
connection-user=trino
connection-password=${ENV:PG_PASSWORD}
autoscaling:
enabled: true
minReplicas: 5
maxReplicas: 50
targetCPUUtilizationPercentage: 70
16. 監視と運用
16.1 Trino Web UI
Trino Web UI(デフォルト: ポート8080)は以下の情報を提供する:
| ページ | 情報 |
|---|---|
| Query List | 実行中/完了/失敗クエリの一覧 |
| Query Detail | 各クエリの実行計画、ステージ、タスク詳細 |
| Resource Groups | リソースグループの使用状況 |
| Worker List | ワーカーの一覧と状態 |
16.2 JMX メトリクス
# etc/catalog/jmx.properties
connector.name=jmx
-- JMXメトリクスの取得
SELECT * FROM jmx.current."trino.execution:name=QueryManager";
SELECT * FROM jmx.current."trino.memory:name=ClusterMemoryManager";
SELECT * FROM jmx.current."trino.execution:name=TaskManager";
-- クエリ統計
SELECT
node_id,
queued_queries,
running_queries,
blocked_queries
FROM jmx.current."trino.execution:name=QueryManager";
16.3 Prometheus / Grafana 連携
# config.properties に追加
jmx.base-uri=http://localhost:8080
# prometheus.yml
scrape_configs:
- job_name: 'trino'
metrics_path: '/v1/jmx/mbean'
static_configs:
- targets: ['trino-coordinator:8080']
params:
objectName: ['trino.execution:name=QueryManager']
16.4 運用コマンド
-- 実行中のクエリ確認
SELECT query_id, state, user, query, created
FROM system.runtime.queries
WHERE state = 'RUNNING'
ORDER BY created DESC;
-- クエリのキル
CALL system.runtime.kill_query(query_id => '20240115_123456_00001_abcde', message => 'Cancelled by admin');
-- ノード情報
SELECT * FROM system.runtime.nodes;
-- タスク情報
SELECT * FROM system.runtime.tasks
WHERE query_id = '20240115_123456_00001_abcde';
-- テーブル統計の収集
ANALYZE hive_prod.analytics.events;
-- テーブルのプロパティ確認
SHOW CREATE TABLE hive_prod.analytics.events;
16.5 ログ設定
# etc/log.properties
io.trino=INFO
io.trino.execution=WARN
io.trino.plugin.hive=WARN
io.trino.plugin.iceberg=INFO
io.trino.server=INFO
io.trino.security=INFO
17. エコシステム連携
17.1 dbt との連携
# profiles.yml (dbt)
my_project:
target: prod
outputs:
prod:
type: trino
method: none # or ldap, kerberos, oauth
host: trino-coordinator
port: 8080
user: dbt_user
database: hive_prod
schema: analytics
catalog: hive_prod
threads: 8
http_scheme: http
session_properties:
query_max_run_time: 30m
query_max_memory: 10GB
17.2 Apache Superset との連携
# Superset Database Connection
# SQLAlchemy URI:
# trino://user@trino-coordinator:8080/hive_prod/analytics
# 認証あり:
# trino://user:password@trino-coordinator:8443/hive_prod/analytics?protocol=https
17.3 Airflow との連携
from airflow.providers.trino.operators.trino import TrinoOperator
trino_task = TrinoOperator(
task_id='run_trino_query',
trino_conn_id='trino_default',
sql="""
INSERT INTO hive_prod.analytics.daily_summary
SELECT
event_date,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM hive_prod.raw.events
WHERE event_date = DATE '{{ ds }}'
GROUP BY event_date
""",
dag=dag,
)
17.4 JDBC / ODBC 接続
// JDBC接続例
String url = "jdbc:trino://coordinator:8080/hive_prod/analytics";
Properties properties = new Properties();
properties.setProperty("user", "analyst");
properties.setProperty("SSL", "true");
Connection connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery("SELECT * FROM users LIMIT 10");
# Python (trino-python-client)
from trino.dbapi import connect
from trino.auth import BasicAuthentication
conn = connect(
host="trino-coordinator",
port=8080,
user="analyst",
catalog="hive_prod",
schema="analytics",
http_scheme="https",
auth=BasicAuthentication("analyst", "password"),
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users LIMIT 10")
rows = cursor.fetchall()
18. ユースケースとベストプラクティス
18.1 代表的なユースケース
データレイク分析
-- S3上のデータレイクに対するアドホック分析
SELECT
date_trunc('month', event_timestamp) as month,
event_type,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users,
approx_percentile(response_time_ms, 0.95) as p95_response_time
FROM iceberg_lake.analytics.api_events
WHERE event_timestamp >= TIMESTAMP '2024-01-01'
GROUP BY 1, 2
ORDER BY 1 DESC, 3 DESC;
データメッシュ / フェデレーテッドクエリ
-- 複数チームのデータソースを横断した分析
WITH customer_360 AS (
SELECT
c.customer_id,
c.name,
c.segment,
o.total_orders,
o.total_revenue,
s.support_tickets,
s.avg_satisfaction_score
FROM postgresql_prod.crm.customers c
LEFT JOIN iceberg_lake.sales.customer_orders o
ON c.customer_id = o.customer_id
LEFT JOIN postgresql_support.public.customer_metrics s
ON c.customer_id = s.customer_id
)
SELECT
segment,
COUNT(*) as customer_count,
AVG(total_revenue) as avg_revenue,
AVG(avg_satisfaction_score) as avg_satisfaction
FROM customer_360
GROUP BY segment
ORDER BY avg_revenue DESC;
18.2 ベストプラクティス
- パーティションプルーニングを常に意識する: パーティションカラムをWHERE句に含める
- Predicate Pushdownを活用する: フィルタ条件をサブクエリ/CTEで先に適用
- 適切なデータフォーマットを選択: Parquet / ORC のカラムナー形式を推奨
- ANALYZE でテーブル統計を最新に保つ: CBO の精度向上
- リソースグループで優先度管理: インタラクティブクエリとETLを分離
- approx_ 関数を活用: 正確性が不要な場合はapprox_distinct, approx_percentile
- クエリのEXPLAIN確認: 大規模クエリの前に実行計画を確認
- Broadcast Joinの閾値を適切に設定: 小テーブルのBroadcast化
- スピルの有効化: メモリ不足時の安全弁として設定
- Dynamic Filteringの活用: スタースキーマクエリの高速化
19. Trino vs 他エンジン比較
19.1 比較表
| 項目 | Trino | Spark SQL | Presto (Meta) | Hive | Impala |
|---|---|---|---|---|---|
| 実行モデル | パイプライン | ステージ | パイプライン | MapReduce | MPP |
| レイテンシ | 低〜中 | 中〜高 | 低〜中 | 高 | 低 |
| フェデレーション | 強力 | 限定的 | 中程度 | 弱い | 弱い |
| コネクター数 | 40+ | 20+ | 20+ | 10+ | 10+ |
| ストリーミング | 限定的 | 強力 | 限定的 | なし | なし |
| ML | なし | MLlib | なし | なし | なし |
| Fault Tolerance | Task Level | Stage Level | 限定的 | Job Level | なし |
| ETL向き | 中程度 | 高い | 中程度 | 高い | 低い |
| アドホック分析 | 高い | 中程度 | 高い | 低い | 高い |
19.2 使い分けの指針
- Trino: フェデレーテッドクエリ、インタラクティブ分析、データレイク探索
- Spark: 大規模バッチETL、ML、ストリーミング処理
- Hive: 超大規模バッチ、レガシーHadoopパイプライン
- Impala: CDH/CDPクラスターでの低レイテンシクエリ
20. 最新動向と将来展望
20.1 最近のバージョンの主要機能
| 機能 | バージョン | 説明 |
|---|---|---|
| Fault-Tolerant Execution | 400+ | タスクレベルのリトライ |
| Polymorphic Table Functions | 400+ | テーブル関数の型パラメトリック化 |
| MERGE Statement | 391+ | Iceberg/Delta Lake での Upsert |
| Dynamic Catalog | 406+ | カタログの動的追加/削除 |
| Project Hummingbird | 最新 | Rust ベースのワーカー(実験的) |
| Improved CBO | 最新 | コストモデルの継続的改善 |
| Warp Speed | 最新 | Velox エンジン統合(実験的) |
20.2 将来の方向性
- Lakehouse との深い統合: Iceberg / Delta Lake / Hudi のファーストクラスサポート強化
- AI/ML連携: 推論機能の組み込み、ベクトル検索対応
- ネイティブRustワーカー: パフォーマンスのさらなる向上
- サーバーレス実行: オンデマンドスケーリング
- 統一カタログ: Unity Catalog、Polaris との連携強化
21. まとめ
Apache Trinoは、フェデレーテッドクエリ機能と高速なインタラクティブ分析を特徴とする分散SQLクエリエンジンである。
技術的要点
| カテゴリ | 要点 |
|---|---|
| アーキテクチャ | Coordinator-Worker MPP モデル |
| 実行モデル | パイプライン実行(インメモリ) |
| コネクター | 40+のデータソース対応 |
| フェデレーション | クロスカタログJoinの強力なサポート |
| SQL | ANSI SQL 準拠、高度な分析関数 |
| 最適化 | CBO、Predicate Pushdown、動的フィルタリング |
| デプロイ | ベアメタル / Docker / Kubernetes |
| セキュリティ | LDAP / OAuth2 / Kerberos / ACL |
| 耐障害性 | Fault-Tolerant Execution(タスクレベルリトライ) |
選定の指針
Apache Trino は以下の要件がある場合に特に効果的である:
- フェデレーテッドクエリ: 複数データソースを単一SQLで横断的に分析
- インタラクティブ分析: 秒〜分レベルの応答時間が求められるアドホッククエリ
- データレイク分析: S3/HDFS上のParquet/ORC/Iceberg/Deltaデータの高速参照
- データメッシュ: 分散したデータプロダクトへの統一的なアクセスレイヤー
- BIツール連携: Tableau/Superset等からの直接クエリ
一方、以下のケースでは他のツールを検討すべきである:
- 大規模バッチETL: Apache Spark の方が適している
- ストリーミング処理: Apache Flink / Spark Structured Streaming
- 超低レイテンシOLAP: ClickHouse / Apache Druid
- 機械学習: Apache Spark MLlib / 専用MLプラットフォーム
参考文献: