Apache Spark
Apache Spark 徹底解説:アーキテクチャ・機能・設定の全容
目次
- はじめに
- Apache Spark の概要と歴史
- アーキテクチャ全体像
- コアコンポーネント
- RDD(Resilient Distributed Dataset)
- DataFrame と Dataset API
- Spark SQL
- Catalyst オプティマイザと Tungsten 実行エンジン
- Structured Streaming
- MLlib(機械学習ライブラリ)
- GraphX / GraphFrames
- デプロイメントモード
- 設定とチューニング
- メモリ管理
- シャッフルとパーティショニング
- パフォーマンス最適化
- セキュリティ
- 監視とデバッグ
- エコシステム連携
- ユースケースとベストプラクティス
- Apache Spark 3.x / 4.x の新機能
- まとめ
1. はじめに
Apache Spark は、大規模データ処理のための統合分析エンジンであり、バッチ処理、ストリーミング処理、機械学習、グラフ処理を単一のフレームワークで実現する。2009年にUC BerkeleyのAMPLabで誕生し、2014年にApache Software Foundationのトップレベルプロジェクトとなって以来、ビッグデータ処理のデファクトスタンダードとして世界中の企業で採用されている。
本記事では、Apache Sparkのアーキテクチャから各コンポーネントの詳細、設定例、パフォーマンスチューニングまでを包括的に解説する。
1.1 本記事の対象読者
- ビッグデータ基盤の設計・構築を担当するデータエンジニア
- Sparkを用いた分析パイプラインを構築するデータサイエンティスト
- Sparkクラスターの運用・監視を担当するSREエンジニア
- 分散処理基盤の技術選定を行うアーキテクト
1.2 前提知識
- 分散システムの基本概念(ノード、クラスター、レプリケーション)
- Hadoop エコシステムの基礎知識
- SQL および Python / Scala / Java のいずれかのプログラミング経験
2. Apache Spark の概要と歴史
2.1 誕生の背景
Apache Sparkは、MapReduceの課題を解決するために生まれた。MapReduceは各ステップの中間結果をディスクに書き出す必要があり、反復処理(機械学習アルゴリズムなど)において深刻なパフォーマンスボトルネックとなっていた。
Sparkは「インメモリ処理」というコンセプトを中心に設計され、中間データをメモリ上に保持することで、MapReduceと比較して最大100倍の高速化を実現した。
2.2 バージョンの変遷
| バージョン | リリース年 | 主な特徴 |
|---|---|---|
| 0.x | 2012 | 初期リリース、RDD API |
| 1.0 | 2014 | Spark SQL、MLlib、GraphX の導入 |
| 1.3 | 2015 | DataFrame API の導入 |
| 1.6 | 2016 | Dataset API の導入 |
| 2.0 | 2016 | Structured Streaming、Catalyst/Tungsten の改善 |
| 2.3 | 2018 | Kubernetes サポート |
| 2.4 | 2019 | Barrier Execution Mode、高階関数 |
| 3.0 | 2020 | Adaptive Query Execution(AQE)、Dynamic Partition Pruning |
| 3.1 | 2021 | ANSI SQL 準拠の改善 |
| 3.2 | 2022 | RocksDB StateStore、Push-based Shuffle |
| 3.3 | 2022 | 多数のパフォーマンス改善 |
| 3.4 | 2023 | Spark Connect の導入 |
| 3.5 | 2023 | English SDK for Apache Spark、Arrow-optimized Python UDF |
| 4.0 | 2025 | ANSI mode デフォルト化、Variant 型、Collation サポート |
2.3 Spark のエコシステム上の位置づけ
┌─────────────────────────────────────────────────────────────────┐
│ Applications │
│ (Data Analytics, ML Pipelines, ETL, Real-time Processing) │
├──────────┬──────────┬──────────┬──────────┬────────────────────┤
│ Spark │ Spark │Structured│ MLlib │ GraphX / │
│ SQL │ Core │Streaming │ │ GraphFrames │
├──────────┴──────────┴──────────┴──────────┴────────────────────┤
│ Spark Core Engine │
│ (Task Scheduling, Memory Management, │
│ Fault Recovery, Storage Interaction) │
├─────────────────────────────────────────────────────────────────┤
│ Cluster Manager │
│ (Standalone / YARN / Mesos / Kubernetes) │
├─────────────────────────────────────────────────────────────────┤
│ Storage Layer │
│ (HDFS / S3 / GCS / Azure Blob / Local FS / Delta Lake / │
│ Apache Iceberg / Apache Hudi) │
└─────────────────────────────────────────────────────────────────┘
3. アーキテクチャ全体像
3.1 マスター・ワーカーアーキテクチャ
Apache Sparkはマスター・ワーカー(Master-Worker)アーキテクチャを採用している。
┌──────────────────────────────────────────────────────────┐
│ Driver Program │
│ ┌─────────────┐ ┌──────────────┐ ┌────────────────┐ │
│ │ SparkContext │ │ DAG Scheduler│ │ Task Scheduler │ │
│ └──────┬──────┘ └──────┬───────┘ └───────┬────────┘ │
│ │ │ │ │
└─────────┼────────────────┼────────────────────┼───────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────┐
│ Cluster Manager │
│ (YARN / K8s / Standalone / Mesos) │
└────────┬──────────────────┬──────────────────┬──────────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │Executor 1│ │ │ │Executor 2│ │ │ │Executor N│ │
│ │┌────┐┌──┐│ │ │ │┌────┐┌──┐│ │ │ │┌────┐┌──┐│ │
│ ││Task││Ca││ │ │ ││Task││Ca││ │ │ ││Task││Ca││ │
│ ││ ││ch││ │ │ ││ ││ch││ │ │ ││ ││ch││ │
│ │└────┘└──┘│ │ │ │└────┘└──┘│ │ │ │└────┘└──┘│ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
└──────────────┘ └──────────────┘ └──────────────┘
Driver Program
Driverは Spark アプリケーションのメインプロセスであり、以下の責務を持つ:
- SparkContext の生成: アプリケーションのエントリーポイントとなるオブジェクトを生成
- DAG(有向非巡回グラフ)の構築: ユーザーコードから論理実行計画を構築
- ステージへの分割: シャッフル境界でDAGをステージに分割
- タスクのスケジューリング: 各ステージをタスクに変換し、Executorに配布
- 結果の集約: Executorからの結果を収集し、最終結果を生成
Executor
各Executor は以下の特性を持つ:
- Worker ノード上で動作するJVMプロセス
- 割り当てられたタスクを実行
- データをメモリまたはディスクにキャッシュ
- 結果をDriverに返送
3.2 ジョブ実行フロー
User Code
│
▼
SparkSession / SparkContext
│
▼
Logical Plan (論理計画)
│
▼
Catalyst Optimizer (最適化)
│
▼
Physical Plan (物理計画)
│
▼
DAG Scheduler
│ ステージ分割(シャッフル境界で分割)
▼
Task Scheduler
│ タスクをExecutorに割り当て
▼
Executors (タスク実行)
│
▼
Result (結果の返却)
3.3 アプリケーション → ジョブ → ステージ → タスクの階層
Application (アプリケーション)
├── Job 1 (アクションごとに1ジョブ)
│ ├── Stage 1 (シャッフル境界で分割)
│ │ ├── Task 1.1 (パーティションごとに1タスク)
│ │ ├── Task 1.2
│ │ └── Task 1.N
│ └── Stage 2
│ ├── Task 2.1
│ └── Task 2.M
└── Job 2
└── Stage 3
├── Task 3.1
└── Task 3.K
Narrow Dependency(狭い依存関係): 親パーティションが1つの子パーティションにのみマッピングされる(例:map, filter)。同一ステージ内で処理される。
Wide Dependency(広い依存関係): 親パーティションが複数の子パーティションにマッピングされる(例:groupBy, join)。シャッフルが発生し、新しいステージの境界となる。
4. コアコンポーネント
4.1 SparkSession
Spark 2.0 以降、SparkSession が全てのAPI機能への統合エントリーポイントとなった。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyApplication") \
.master("yarn") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "4") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.enableHiveSupport() \
.getOrCreate()
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("MyApplication")
.master("yarn")
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", "4")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
4.2 SparkContext
SparkContext はSpark機能へのメインエントリーポイントであり、クラスターへの接続を表す。SparkSession 内部に含まれており、spark.sparkContext でアクセスできる。
sc = spark.sparkContext
# RDDの作成
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=4)
# テキストファイルの読み込み
text_rdd = sc.textFile("hdfs:///data/logs/*.log", minPartitions=10)
# ブロードキャスト変数
lookup_table = {"US": "United States", "JP": "Japan", "UK": "United Kingdom"}
broadcast_var = sc.broadcast(lookup_table)
# アキュムレータ
error_count = sc.accumulator(0)
4.3 ブロードキャスト変数
ブロードキャスト変数は、各Executorに読み取り専用のデータを効率的に配布するためのメカニズムである。
# 大きなルックアップテーブルをブロードキャスト
country_codes = spark.sparkContext.broadcast(
{"US": "United States", "JP": "Japan", "DE": "Germany", "FR": "France"}
)
# UDF内でブロードキャスト変数を使用
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
@udf(StringType())
def get_country_name(code):
return country_codes.value.get(code, "Unknown")
df = df.withColumn("country_name", get_country_name(df["country_code"]))
ブロードキャスト変数のベストプラクティス:
- サイズが数MB〜数百MBのルックアップデータに適用
- 全Executorで共通して参照されるデータに使用
spark.sql.autoBroadcastJoinThreshold(デフォルト10MB)でBroadcast Joinの閾値を制御
# Broadcast Joinの閾値設定
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m") # 50MBに拡大
4.4 アキュムレータ
アキュムレータは、Executor側で値を「加算」し、Driver側で集約結果を取得できる共有変数である。
from pyspark.accumulators import AccumulatorParam
# 標準的なアキュムレータ
error_counter = spark.sparkContext.accumulator(0)
warning_counter = spark.sparkContext.accumulator(0)
def process_log(line):
global error_counter, warning_counter
if "ERROR" in line:
error_counter.add(1)
elif "WARN" in line:
warning_counter.add(1)
return line
rdd = spark.sparkContext.textFile("hdfs:///logs/application.log")
processed_rdd = rdd.map(process_log)
processed_rdd.count() # アクションをトリガーして処理を実行
print(f"Errors: {error_counter.value}")
print(f"Warnings: {warning_counter.value}")
注意点: アキュムレータの更新は「少なくとも1回(at-least-once)」のセマンティクスで実行される。タスクの再実行により値が重複カウントされる可能性があるため、正確なカウントが必要な場合は foreachPartition 内でのみ更新するか、アクション内で使用する。
5. RDD(Resilient Distributed Dataset)
5.1 RDD の基本概念
RDD(Resilient Distributed Dataset)は Spark の最も基本的なデータ抽象化であり、以下の特性を持つ:
- 不変性(Immutable): 一度作成されたRDDは変更できない
- 分散性(Distributed): データは複数ノードにパーティション分割されて格納
- 耐障害性(Resilient): Lineage(系譜)情報により、障害時にデータを再計算可能
- 遅延評価(Lazy Evaluation): 変換はアクションが呼び出されるまで実行されない
5.2 RDD の作成
# 1. コレクションから作成(parallelize)
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, numSlices=4)
# 2. 外部データソースから作成
text_rdd = sc.textFile("hdfs:///data/input.txt")
whole_text_rdd = sc.wholeTextFiles("hdfs:///data/dir/")
sequence_rdd = sc.sequenceFile("hdfs:///data/seq_file")
# 3. 既存RDDからの変換
filtered_rdd = rdd.filter(lambda x: x > 5)
5.3 RDD の操作(Transformations と Actions)
Transformations(変換 - 遅延評価)
# map: 各要素に関数を適用
squared = rdd.map(lambda x: x ** 2)
# flatMap: 各要素を0個以上の要素にマッピング
words = text_rdd.flatMap(lambda line: line.split(" "))
# filter: 条件を満たす要素を抽出
evens = rdd.filter(lambda x: x % 2 == 0)
# mapPartitions: パーティション単位で処理(効率的)
def process_partition(iterator):
# DB接続などのセットアップを1回だけ実行
import json
results = []
for item in iterator:
results.append(json.loads(item))
return iter(results)
processed = text_rdd.mapPartitions(process_partition)
# reduceByKey: キーごとに値を集約(shuffle前にcombine)
word_counts = words.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)
# groupByKey: キーごとに値をグループ化(非推奨 - メモリ効率が悪い)
grouped = pairs_rdd.groupByKey()
# join: 2つのRDDをキーで結合
joined = rdd1.join(rdd2) # Inner Join
left_joined = rdd1.leftOuterJoin(rdd2)
# coalesce / repartition: パーティション数の変更
reduced = rdd.coalesce(2) # パーティション削減(シャッフルなし)
expanded = rdd.repartition(10) # パーティション変更(シャッフルあり)
# sortByKey: キーでソート
sorted_rdd = pairs_rdd.sortByKey(ascending=False)
Actions(アクション - 即座に実行)
# collect: 全要素をDriverに収集(小データセットのみ)
result = rdd.collect()
# count: 要素数をカウント
total = rdd.count()
# first / take: 先頭要素を取得
first_elem = rdd.first()
top_5 = rdd.take(5)
# reduce: 全要素を集約
total_sum = rdd.reduce(lambda a, b: a + b)
# foreach: 各要素に副作用のある処理を適用
rdd.foreach(lambda x: print(x))
# saveAsTextFile: テキストファイルとして保存
rdd.saveAsTextFile("hdfs:///output/result")
# countByKey: キーごとの出現回数
counts = pairs_rdd.countByKey()
# takeSample: ランダムサンプリング
sample = rdd.takeSample(withReplacement=False, num=10, seed=42)
5.4 RDD の永続化(Persistence)
from pyspark import StorageLevel
# キャッシュ(メモリのみ、デフォルト)
rdd.cache() # persist(StorageLevel.MEMORY_ONLY) と同等
# 各ストレージレベル
rdd.persist(StorageLevel.MEMORY_ONLY) # メモリのみ、溢れたら再計算
rdd.persist(StorageLevel.MEMORY_AND_DISK) # メモリ優先、溢れたらディスク
rdd.persist(StorageLevel.DISK_ONLY) # ディスクのみ
rdd.persist(StorageLevel.MEMORY_ONLY_SER) # メモリのみ、シリアライズ形式
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) # シリアライズ形式 + ディスク
rdd.persist(StorageLevel.OFF_HEAP) # オフヒープメモリ
# キャッシュの解除
rdd.unpersist()
| ストレージレベル | メモリ使用 | ディスク使用 | シリアライズ | 備考 |
|---|---|---|---|---|
| MEMORY_ONLY | 高 | なし | なし | 最速、メモリ不足時に再計算 |
| MEMORY_AND_DISK | 高 | フォールバック | なし | メモリ不足時にディスクへスピル |
| MEMORY_ONLY_SER | 中 | なし | あり | メモリ効率とCPUのトレードオフ |
| DISK_ONLY | なし | 高 | あり | メモリを使わない |
| OFF_HEAP | オフヒープ | なし | あり | GCの影響を受けない |
5.5 Lineage と障害回復
RDDの耐障害性は Lineage(系譜) によって実現される。各RDDは自身がどの親RDDからどのような変換で生成されたかの情報を保持しており、パーティションが失われた場合はLineageを辿って再計算できる。
# RDDのLineageを確認
print(rdd.toDebugString())
出力例:
(4) MapPartitionsRDD[3] at filter at <stdin>:1 []
| MapPartitionsRDD[2] at map at <stdin>:1 []
| ParallelCollectionRDD[0] at parallelize at <stdin>:1 []
チェックポイント: Lineageが長くなりすぎた場合、チェックポイントを設定してLineageを切断し、データをHDFS等に永続化する。
sc.setCheckpointDir("hdfs:///checkpoints/")
rdd.checkpoint()
rdd.count() # チェックポイントを実行するためにアクションが必要
6. DataFrame と Dataset API
6.1 DataFrame の概要
DataFrameは名前付きカラムで構成される分散データコレクションであり、リレーショナルデータベースのテーブルやPandasのDataFrameに類似している。RDDに比べて以下の利点がある:
- スキーマ情報: 各カラムに名前と型が付与される
- Catalyst オプティマイザ: SQLエンジンによる自動最適化
- Tungsten エンジン: オフヒープメモリ管理とコード生成による高速実行
- 言語間の統一的なパフォーマンス: Python、Scala、Java、Rのどの言語からも同等の性能
6.2 DataFrame の作成
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
# 1. スキーマ定義によるDataFrame作成
schema = StructType([
StructField("user_id", IntegerType(), nullable=False),
StructField("name", StringType(), nullable=False),
StructField("email", StringType(), nullable=True),
StructField("age", IntegerType(), nullable=True),
StructField("salary", DoubleType(), nullable=True)
])
data = [
(1, "Alice", "alice@example.com", 30, 75000.0),
(2, "Bob", "bob@example.com", 25, 65000.0),
(3, "Charlie", None, 35, 85000.0)
]
df = spark.createDataFrame(data, schema=schema)
df.show()
df.printSchema()
# 2. CSVファイルからの読み込み
df_csv = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("dateFormat", "yyyy-MM-dd") \
.option("nullValue", "NA") \
.csv("hdfs:///data/users.csv")
# 3. Parquetファイルからの読み込み
df_parquet = spark.read.parquet("hdfs:///data/users.parquet")
# 4. JSONファイルからの読み込み
df_json = spark.read \
.option("multiLine", "true") \
.json("hdfs:///data/users.json")
# 5. JDBCからの読み込み
df_jdbc = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://db-host:5432/mydb") \
.option("dbtable", "public.users") \
.option("user", "dbuser") \
.option("password", "dbpass") \
.option("numPartitions", "10") \
.option("partitionColumn", "user_id") \
.option("lowerBound", "1") \
.option("upperBound", "1000000") \
.load()
# 6. Delta Lakeからの読み込み
df_delta = spark.read.format("delta").load("s3://bucket/delta-table")
# 7. Apache Icebergからの読み込み
df_iceberg = spark.read.format("iceberg").load("catalog.db.table_name")
6.3 DataFrame の操作
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# カラム選択
df.select("name", "age", "salary").show()
# カラムの追加・変換
df = df.withColumn("salary_monthly", F.col("salary") / 12)
df = df.withColumn("age_group",
F.when(F.col("age") < 30, "young")
.when(F.col("age") < 40, "middle")
.otherwise("senior")
)
# フィルタリング
df.filter(F.col("age") > 25).show()
df.filter((F.col("age") > 25) & (F.col("salary") > 70000)).show()
df.filter(F.col("name").like("A%")).show()
df.filter(F.col("email").isNotNull()).show()
# 集約
df.groupBy("age_group") \
.agg(
F.count("*").alias("count"),
F.avg("salary").alias("avg_salary"),
F.max("salary").alias("max_salary"),
F.min("salary").alias("min_salary"),
F.sum("salary").alias("total_salary"),
F.stddev("salary").alias("stddev_salary")
).show()
# ウィンドウ関数
window_spec = Window.partitionBy("age_group").orderBy(F.desc("salary"))
df = df.withColumn("rank", F.rank().over(window_spec))
df = df.withColumn("dense_rank", F.dense_rank().over(window_spec))
df = df.withColumn("row_number", F.row_number().over(window_spec))
df = df.withColumn("salary_percentile",
F.percent_rank().over(window_spec)
)
# 累積ウィンドウ
cumulative_window = Window.partitionBy("age_group") \
.orderBy("salary") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("cumulative_salary", F.sum("salary").over(cumulative_window))
# ソート
df.orderBy(F.desc("salary")).show()
df.orderBy(F.col("age_group").asc(), F.col("salary").desc()).show()
# 結合
df_orders = spark.read.parquet("hdfs:///data/orders.parquet")
# Inner Join
df_joined = df.join(df_orders, df["user_id"] == df_orders["customer_id"], "inner")
# Left Outer Join
df_left = df.join(df_orders, df["user_id"] == df_orders["customer_id"], "left")
# Broadcast Join(小さいテーブルをブロードキャスト)
from pyspark.sql.functions import broadcast
df_broadcast = df.join(broadcast(small_df), "key_column")
# 重複排除
df.dropDuplicates(["email"]).show()
# UDF(ユーザー定義関数)
@udf(StringType())
def format_name(name):
return name.upper() if name else "UNKNOWN"
df = df.withColumn("formatted_name", format_name(df["name"]))
# Pandas UDF(Arrow ベースの高速UDF)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(DoubleType())
def normalize_salary(salary: pd.Series) -> pd.Series:
return (salary - salary.mean()) / salary.std()
df = df.withColumn("normalized_salary", normalize_salary(df["salary"]))
6.4 DataFrame の書き出し
# Parquet形式で書き出し(推奨)
df.write \
.mode("overwrite") \
.partitionBy("age_group") \
.option("compression", "snappy") \
.parquet("hdfs:///output/users_parquet")
# CSV形式で書き出し
df.write \
.mode("overwrite") \
.option("header", "true") \
.option("delimiter", ",") \
.csv("hdfs:///output/users_csv")
# Delta Lake形式で書き出し
df.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("s3://bucket/delta-table")
# Hiveテーブルに書き出し
df.write \
.mode("overwrite") \
.saveAsTable("default.users")
# JDBCに書き出し
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://db-host:5432/mydb") \
.option("dbtable", "public.users_output") \
.option("user", "dbuser") \
.option("password", "dbpass") \
.option("batchsize", "10000") \
.mode("append") \
.save()
# 書き出しモード
# "overwrite": 既存データを上書き
# "append": 既存データに追加
# "ignore": テーブルが存在する場合は何もしない
# "error" / "errorifexists": テーブルが存在する場合はエラー(デフォルト)
6.5 Dataset API(Scala / Java)
Dataset APIはScalaとJavaで利用可能な型安全なAPIである。コンパイル時に型チェックが行われるため、ランタイムエラーを事前に検出できる。
// Case Classの定義
case class User(userId: Int, name: String, email: Option[String], age: Int, salary: Double)
// DatasetのからDFへの変換
val ds: Dataset[User] = df.as[User]
// 型安全な操作
val highSalaryUsers = ds.filter(_.salary > 70000)
val nameAndSalary = ds.map(u => (u.name, u.salary))
// DataFrameとDatasetの相互変換
val df: DataFrame = ds.toDF()
val ds2: Dataset[User] = df.as[User]
注意: PySpark では Dataset API は利用できない。Python は動的型付け言語のため、DataFrameが主要なAPIとなる。
7. Spark SQL
7.1 概要
Spark SQL は構造化データを処理するための Spark モジュールであり、SQL クエリと DataFrame API の両方からアクセスできる。ANSI SQL 標準に準拠しており、Hive との互換性も備えている。
7.2 SQL クエリの実行
# DataFrameをTemporary Viewとして登録
df.createOrReplaceTempView("users")
# SQLクエリの実行
result = spark.sql("""
SELECT
age_group,
COUNT(*) as user_count,
AVG(salary) as avg_salary,
PERCENTILE_APPROX(salary, 0.5) as median_salary
FROM users
WHERE age > 20
GROUP BY age_group
HAVING COUNT(*) > 10
ORDER BY avg_salary DESC
""")
result.show()
# グローバルTemporary View(SparkSession間で共有)
df.createOrReplaceGlobalTempView("global_users")
spark.sql("SELECT * FROM global_temp.global_users").show()
# CTEを使った複雑なクエリ
result = spark.sql("""
WITH ranked_users AS (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY age_group ORDER BY salary DESC) as rn
FROM users
)
SELECT * FROM ranked_users WHERE rn <= 3
""")
7.3 Hive メタストアとの連携
# HiveサポートをenableしてSparkSessionを作成
spark = SparkSession.builder \
.appName("HiveIntegration") \
.config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
.config("hive.metastore.uris", "thrift://hive-metastore:9083") \
.enableHiveSupport() \
.getOrCreate()
# Hiveテーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS analytics.user_events (
event_id BIGINT,
user_id INT,
event_type STRING,
event_data STRING,
event_timestamp TIMESTAMP
)
PARTITIONED BY (event_date DATE)
STORED AS PARQUET
TBLPROPERTIES (
'parquet.compression'='SNAPPY'
)
""")
# パーティションの動的追加
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.sql("""
INSERT OVERWRITE TABLE analytics.user_events
PARTITION (event_date)
SELECT event_id, user_id, event_type, event_data,
event_timestamp, CAST(event_timestamp AS DATE) as event_date
FROM raw_events
""")
# テーブル情報の確認
spark.sql("DESCRIBE FORMATTED analytics.user_events").show(truncate=False)
spark.sql("SHOW PARTITIONS analytics.user_events").show()
7.4 カタログAPI
# データベース一覧
spark.catalog.listDatabases()
# テーブル一覧
spark.catalog.listTables("analytics")
# カラム一覧
spark.catalog.listColumns("analytics", "user_events")
# テーブルの存在確認
spark.catalog.tableExists("analytics.user_events")
# キャッシュ管理
spark.catalog.cacheTable("users")
spark.catalog.isCached("users")
spark.catalog.uncacheTable("users")
spark.catalog.clearCache()
8. Catalyst オプティマイザと Tungsten 実行エンジン
8.1 Catalyst オプティマイザ
Catalystは Spark SQL の高度なクエリ最適化エンジンであり、以下の4フェーズで最適化を行う:
Unresolved Resolved Optimized Physical
Logical Plan --> Logical Plan --> Logical Plan --> Plans --> Selected Physical Plan
--> RDD (実行)
(1) (2) (3) (4)
解析 カタログ参照 ルールベース コスト
型解決 最適化 ベース
最適化
フェーズ1: 解析(Analysis)
- テーブル名、カラム名の解決
- カタログからのメタデータ取得
- 型の推論と検証
フェーズ2: 論理最適化(Logical Optimization)
主要な最適化ルール:
| 最適化ルール | 説明 |
|---|---|
| Predicate Pushdown | フィルタ条件をデータソースに近い場所に移動 |
| Column Pruning | 必要なカラムのみを読み込み |
| Constant Folding | 定数式をコンパイル時に評価 |
| Combine Filters | 複数のフィルタを結合 |
| Null Propagation | NULL値の伝播を最適化 |
| Boolean Simplification | ブール式の簡素化 |
フェーズ3: 物理計画(Physical Planning)
- 複数の物理計画候補を生成
- コストモデルに基づいて最適な計画を選択
- Join戦略の決定(BroadcastHashJoin, SortMergeJoin, ShuffledHashJoin)
# 実行計画の確認
df_result = df.join(df_orders, "user_id").filter(F.col("age") > 25)
# 論理計画と物理計画を表示
df_result.explain(mode="extended")
# フォーマットされた実行計画
df_result.explain(mode="formatted")
# コスト情報付き
df_result.explain(mode="cost")
# コード生成の確認
df_result.explain(mode="codegen")
8.2 Adaptive Query Execution(AQE)
Spark 3.0で導入されたAQEは、実行時の統計情報に基づいて動的にクエリプランを最適化する。
# AQEの有効化(Spark 3.2以降はデフォルトで有効)
spark.conf.set("spark.sql.adaptive.enabled", "true")
# AQEの主要設定
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1m")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "200")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
AQEの主要機能:
- 動的パーティション結合(Coalescing Post-Shuffle Partitions): シャッフル後の小さすぎるパーティションを統合
- 動的Join戦略切り替え: ランタイムデータサイズに基づいてSortMergeJoinからBroadcastHashJoinへ切り替え
- スキューJoin最適化: データスキューのあるパーティションを自動的に分割
8.3 Tungsten 実行エンジン
Tungsten はSpark のメモリ・CPU最適化エンジンであり、以下の技術を実装:
- オフヒープメモリ管理: JVMヒープの外にデータを配置し、GCオーバーヘッドを削減
- Whole-Stage Code Generation: 複数の演算子を単一のJavaメソッドにコンパイル
- カスタムメモリレイアウト: Java オブジェクトのオーバーヘッドを排除するバイナリ形式
# Whole-Stage Code Generationの有効化(デフォルトで有効)
spark.conf.set("spark.sql.codegen.wholeStage", "true")
# コード生成のフォールバック閾値
spark.conf.set("spark.sql.codegen.fallback", "true")
spark.conf.set("spark.sql.codegen.maxFields", "100")
9. Structured Streaming
9.1 概要
Structured Streaming は Spark のスケーラブルかつ耐障害性のあるストリーム処理エンジンであり、静的データと同じ DataFrame/Dataset API でストリーミングデータを処理できる。
9.2 基本的なストリーミング処理
# Kafkaからのストリーム読み込み
df_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.option("maxOffsetsPerTrigger", "100000") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.load()
# メッセージのパース
from pyspark.sql.types import StructType, StructField, StringType, LongType
event_schema = StructType([
StructField("user_id", LongType()),
StructField("event_type", StringType()),
StructField("timestamp", LongType()),
StructField("properties", StringType())
])
parsed_stream = df_stream \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp") \
.select(
F.col("key"),
F.from_json(F.col("value"), event_schema).alias("event"),
F.col("timestamp").alias("kafka_timestamp")
) \
.select("key", "event.*", "kafka_timestamp")
# ウィンドウ集約
windowed_counts = parsed_stream \
.withWatermark("kafka_timestamp", "10 minutes") \
.groupBy(
F.window("kafka_timestamp", "5 minutes", "1 minute"),
"event_type"
) \
.agg(
F.count("*").alias("event_count"),
F.approx_count_distinct("user_id").alias("unique_users")
)
# 出力
query = windowed_counts.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime="30 seconds") \
.start()
query.awaitTermination()
9.3 出力モード
| 出力モード | 説明 | 用途 |
|---|---|---|
append | 新しい行のみ出力 | フィルタリング、マッピング |
update | 更新された行のみ出力 | 集約クエリ |
complete | 全結果を毎回出力 | 集約結果の完全出力 |
9.4 出力シンク
# Kafkaへの出力
query = result_stream.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker1:9092") \
.option("topic", "processed-events") \
.option("checkpointLocation", "hdfs:///checkpoints/kafka-sink") \
.trigger(processingTime="1 minute") \
.start()
# Delta Lakeへの出力
query = result_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3://bucket/checkpoints/delta-sink") \
.option("mergeSchema", "true") \
.partitionBy("event_date") \
.start("s3://bucket/delta-table")
# foreachBatch(カスタム処理)
def write_to_multiple_sinks(batch_df, batch_id):
# PostgreSQLに書き出し
batch_df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://db:5432/analytics") \
.option("dbtable", "events") \
.mode("append") \
.save()
# Parquetにも書き出し
batch_df.write \
.mode("append") \
.parquet(f"hdfs:///archive/events/batch_{batch_id}")
query = result_stream.writeStream \
.foreachBatch(write_to_multiple_sinks) \
.option("checkpointLocation", "hdfs:///checkpoints/multi-sink") \
.start()
9.5 ウォーターマークと遅延データ処理
# ウォーターマーク: イベント時刻ベースの遅延データの許容範囲を定義
windowed_agg = parsed_stream \
.withWatermark("event_timestamp", "30 minutes") \
.groupBy(
F.window("event_timestamp", "10 minutes"),
"event_type"
) \
.count()
ウォーターマークの仕組み:
- ウォーターマーク = 最大イベント時刻 - 閾値
- ウォーターマークより古いイベントは破棄される
- State(状態)のクリーンアップに使用される
9.6 トリガーモード
# Processing Time トリガー(定期実行)
.trigger(processingTime="30 seconds")
# Once トリガー(1回だけ実行して停止)
.trigger(once=True)
# Available Now トリガー(利用可能なデータを全て処理して停止)
.trigger(availableNow=True)
# Continuous トリガー(低レイテンシ、実験的)
.trigger(continuous="1 second")
10. MLlib(機械学習ライブラリ)
10.1 概要
MLlibは Spark の分散機械学習ライブラリであり、大規模データセット上での機械学習パイプラインの構築を支援する。Spark 2.0以降、DataFrame ベースの spark.ml パッケージが主要APIとなっている。
10.2 ML パイプライン
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
StringIndexer, VectorAssembler, StandardScaler,
OneHotEncoder, Imputer, Bucketizer
)
from pyspark.ml.classification import (
LogisticRegression, RandomForestClassifier, GBTClassifier
)
from pyspark.ml.evaluation import (
BinaryClassificationEvaluator, MulticlassClassificationEvaluator
)
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# データの準備
training_data = spark.read.parquet("hdfs:///data/ml/training.parquet")
# 特徴量エンジニアリング
# カテゴリ変数のエンコーディング
category_indexer = StringIndexer(
inputCol="category", outputCol="category_index"
)
category_encoder = OneHotEncoder(
inputCol="category_index", outputCol="category_vec"
)
# 欠損値の補完
imputer = Imputer(
inputCols=["age", "income"],
outputCols=["age_imputed", "income_imputed"],
strategy="median"
)
# 特徴量の結合
assembler = VectorAssembler(
inputCols=["age_imputed", "income_imputed", "category_vec"],
outputCol="features_raw"
)
# スケーリング
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features",
withStd=True,
withMean=True
)
# モデル
lr = LogisticRegression(
featuresCol="features",
labelCol="label",
maxIter=100,
regParam=0.01,
elasticNetParam=0.8
)
# パイプラインの構築
pipeline = Pipeline(stages=[
category_indexer,
category_encoder,
imputer,
assembler,
scaler,
lr
])
# ハイパーパラメータチューニング
param_grid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.001, 0.01, 0.1]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.addGrid(lr.maxIter, [50, 100, 200]) \
.build()
evaluator = BinaryClassificationEvaluator(
labelCol="label",
metricName="areaUnderROC"
)
cross_validator = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=5,
parallelism=4
)
# モデルのトレーニング
cv_model = cross_validator.fit(training_data)
# 最良モデルの取得
best_model = cv_model.bestModel
# 予測
test_data = spark.read.parquet("hdfs:///data/ml/test.parquet")
predictions = cv_model.transform(test_data)
# 評価
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")
# モデルの保存
cv_model.write().overwrite().save("hdfs:///models/lr_model")
# モデルの読み込み
from pyspark.ml.tuning import CrossValidatorModel
loaded_model = CrossValidatorModel.load("hdfs:///models/lr_model")
10.3 サポートされるアルゴリズム
| カテゴリ | アルゴリズム |
|---|---|
| 分類 | LogisticRegression, RandomForest, GBT, SVM, NaiveBayes, MultilayerPerceptron |
| 回帰 | LinearRegression, RandomForest, GBT, DecisionTree, IsotonicRegression |
| クラスタリング | KMeans, BisectingKMeans, GaussianMixture, LDA |
| 協調フィルタリング | ALS (Alternating Least Squares) |
| 特徴量 | TF-IDF, Word2Vec, PCA, ChiSqSelector, VectorAssembler |
| 評価 | BinaryClassification, MulticlassClassification, Regression, Ranking |
11. GraphX / GraphFrames
11.1 GraphX の概要
GraphXはSpark上のグラフ処理APIであり、RDDベースで実装されている。頂点(Vertex)とエッジ(Edge)からなるグラフ構造を表現し、グラフアルゴリズムを分散実行できる。
import org.apache.spark.graphx._
// 頂点RDDの作成
val vertices: RDD[(VertexId, String)] = sc.parallelize(Seq(
(1L, "Alice"),
(2L, "Bob"),
(3L, "Charlie"),
(4L, "Diana")
))
// エッジRDDの作成
val edges: RDD[Edge[String]] = sc.parallelize(Seq(
Edge(1L, 2L, "friend"),
Edge(2L, 3L, "colleague"),
Edge(3L, 4L, "friend"),
Edge(1L, 4L, "family")
))
// グラフの作成
val graph = Graph(vertices, edges)
// PageRank
val pageRank = graph.pageRank(0.001)
pageRank.vertices.sortBy(_._2, ascending = false).take(5)
// Connected Components
val cc = graph.connectedComponents()
// Triangle Counting
val triangles = graph.triangleCount()
11.2 GraphFrames(DataFrameベース)
GraphFrames は DataFrame ベースのグラフ処理ライブラリであり、PySpark からも利用可能である。
from graphframes import GraphFrame
# 頂点DataFrame
vertices = spark.createDataFrame([
("1", "Alice", 34),
("2", "Bob", 36),
("3", "Charlie", 30),
("4", "Diana", 29)
], ["id", "name", "age"])
# エッジDataFrame
edges = spark.createDataFrame([
("1", "2", "friend"),
("2", "3", "colleague"),
("3", "4", "friend"),
("1", "4", "family")
], ["src", "dst", "relationship"])
# GraphFrameの作成
g = GraphFrame(vertices, edges)
# 基本クエリ
g.vertices.show()
g.edges.show()
g.degrees.show()
# PageRank
results = g.pageRank(resetProbability=0.15, maxIter=20)
results.vertices.select("id", "name", "pagerank") \
.orderBy(F.desc("pagerank")).show()
# 最短経路
shortest_paths = g.shortestPaths(landmarks=["1", "4"])
shortest_paths.show()
# モチーフ検索(パターンマッチング)
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(c)")
motifs.filter("a.id != c.id").show()
12. デプロイメントモード
12.1 クラスターマネージャーの比較
| 特徴 | Standalone | YARN | Kubernetes | Mesos |
|---|---|---|---|---|
| セットアップ | 簡単 | Hadoop必須 | K8s必須 | やや複雑 |
| リソース共有 | 限定的 | 高度 | 高度 | 高度 |
| スケーリング | 静的 | 動的 | 動的 | 動的 |
| コンテナ化 | なし | なし | ネイティブ | Docker対応 |
| 監視 | Web UI | YARN UI | K8s Dashboard | Mesos UI |
| 推奨環境 | 開発/テスト | Hadoopクラスター | クラウドネイティブ | レガシー |
12.2 Standalone モード
# マスターの起動
$SPARK_HOME/sbin/start-master.sh
# ワーカーの起動
$SPARK_HOME/sbin/start-worker.sh spark://master-host:7077
# アプリケーション投入
spark-submit \
--master spark://master-host:7077 \
--deploy-mode cluster \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.driver.memory=4g \
my_app.py
12.3 YARN モード
# Client モード(Driverがsubmit元マシンで動作)
spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--queue production \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.yarn.am.memory=2g \
--conf spark.yarn.executor.memoryOverhead=2g \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=5 \
--conf spark.dynamicAllocation.maxExecutors=50 \
--conf spark.dynamicAllocation.executorIdleTimeout=60s \
--files hdfs:///config/app.conf \
--py-files dependencies.zip \
my_app.py
# Cluster モード(DriverがYARNコンテナ内で動作)
spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
my_app.py
12.4 Kubernetes モード
# Kubernetesへの投入
spark-submit \
--master k8s://https://k8s-api-server:6443 \
--deploy-mode cluster \
--name spark-app \
--conf spark.executor.instances=10 \
--conf spark.kubernetes.container.image=spark:3.5.0 \
--conf spark.kubernetes.namespace=spark-jobs \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.executor.request.cores=2 \
--conf spark.kubernetes.executor.limit.cores=4 \
--conf spark.kubernetes.driver.request.cores=1 \
--conf spark.kubernetes.driver.limit.cores=2 \
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-data.mount.path=/data \
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-data.options.claimName=spark-pvc \
--conf spark.kubernetes.driver.pod.name=spark-driver \
--conf spark.kubernetes.node.selector.pool=spark-pool \
local:///opt/spark/app/my_app.py
Kubernetes用 Pod Template の例:
# spark-driver-template.yaml
apiVersion: v1
kind: Pod
spec:
containers:
- name: spark-driver
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
volumeMounts:
- name: spark-config
mountPath: /opt/spark/conf
volumes:
- name: spark-config
configMap:
name: spark-config
tolerations:
- key: "spark"
operator: "Equal"
value: "true"
effect: "NoSchedule"
nodeSelector:
node-pool: spark-driver
12.5 Spark Connect(Spark 3.4+)
Spark Connectは、クライアント-サーバーアーキテクチャによりSparkクラスターとの薄い接続を実現する新しいインターフェースである。
# Spark Connectサーバーの起動
# $SPARK_HOME/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.0
# クライアントからの接続
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.remote("sc://spark-server:15002") \
.getOrCreate()
# 通常通りDataFrame APIを使用
df = spark.read.parquet("hdfs:///data/users.parquet")
df.filter(F.col("age") > 25).show()
13. 設定とチューニング
13.1 主要な設定パラメータ
アプリケーション設定
# spark-defaults.conf
# ===== アプリケーション基本設定 =====
spark.app.name MySparkApplication
spark.master yarn
# ===== Driver設定 =====
spark.driver.memory 4g
spark.driver.cores 2
spark.driver.maxResultSize 2g
spark.driver.memoryOverhead 1g
# ===== Executor設定 =====
spark.executor.memory 8g
spark.executor.cores 4
spark.executor.instances 10
spark.executor.memoryOverhead 2g
spark.executor.heartbeatInterval 30s
# ===== Dynamic Allocation =====
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 5
spark.dynamicAllocation.maxExecutors 100
spark.dynamicAllocation.initialExecutors 10
spark.dynamicAllocation.executorIdleTimeout 60s
spark.dynamicAllocation.schedulerBacklogTimeout 1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
spark.shuffle.service.enabled true
# ===== メモリ設定 =====
spark.memory.fraction 0.6
spark.memory.storageFraction 0.5
spark.memory.offHeap.enabled true
spark.memory.offHeap.size 4g
# ===== シリアライゼーション =====
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 256m
spark.kryo.registrationRequired false
# ===== ネットワーク =====
spark.network.timeout 600s
spark.rpc.message.maxSize 256
spark.rpc.askTimeout 300s
# ===== SQL設定 =====
spark.sql.shuffle.partitions 200
spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.skewJoin.enabled true
spark.sql.autoBroadcastJoinThreshold 50m
spark.sql.broadcastTimeout 300
spark.sql.parquet.compression.codec snappy
spark.sql.parquet.mergeSchema false
spark.sql.hive.convertMetastoreParquet true
spark.sql.files.maxPartitionBytes 128m
spark.sql.files.openCostInBytes 4m
# ===== シャッフル設定 =====
spark.shuffle.compress true
spark.shuffle.spill.compress true
spark.shuffle.file.buffer 64k
spark.shuffle.io.maxRetries 10
spark.shuffle.io.retryWait 5s
spark.reducer.maxSizeInFlight 96m
# ===== 圧縮 =====
spark.io.compression.codec lz4
spark.io.compression.lz4.blockSize 64k
# ===== 推測実行 =====
spark.speculation true
spark.speculation.interval 100ms
spark.speculation.multiplier 1.5
spark.speculation.quantile 0.9
13.2 環境別設定例
開発環境
spark = SparkSession.builder \
.appName("DevApp") \
.master("local[*]") \
.config("spark.driver.memory", "4g") \
.config("spark.sql.shuffle.partitions", "8") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.ui.showConsoleProgress", "true") \
.config("spark.log.level", "WARN") \
.getOrCreate()
本番環境(大規模ETL)
spark = SparkSession.builder \
.appName("ProductionETL") \
.master("yarn") \
.config("spark.executor.memory", "16g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memoryOverhead", "4g") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "10") \
.config("spark.dynamicAllocation.maxExecutors", "200") \
.config("spark.sql.shuffle.partitions", "1000") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.parquet.compression.codec", "zstd") \
.config("spark.sql.files.maxPartitionBytes", "256m") \
.config("spark.network.timeout", "800s") \
.config("spark.speculation", "true") \
.enableHiveSupport() \
.getOrCreate()
本番環境(ストリーミング)
spark = SparkSession.builder \
.appName("ProductionStreaming") \
.master("yarn") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "2") \
.config("spark.executor.instances", "20") \
.config("spark.streaming.backpressure.enabled", "true") \
.config("spark.streaming.kafka.maxRatePerPartition", "5000") \
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
.config("spark.sql.streaming.stateStore.rocksdb.compactOnCommit", "true") \
.config("spark.sql.shuffle.partitions", "50") \
.config("spark.sql.adaptive.enabled", "false") \
.getOrCreate()
14. メモリ管理
14.1 Spark のメモリ構造
┌─────────────────────────────────────────────────────────┐
│ Executor Memory │
│ (spark.executor.memory) │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Unified Memory (60%) │ │
│ │ (spark.memory.fraction = 0.6) │ │
│ │ │ │
│ │ ┌──────────────────┐ ┌──────────────────────┐ │ │
│ │ │ Execution Memory│ │ Storage Memory │ │ │
│ │ │ (シャッフル、 │ │ (キャッシュ、 │ │ │
│ │ │ ソート、結合) │◄─►│ ブロードキャスト) │ │ │
│ │ │ │ │ │ │ │
│ │ └──────────────────┘ └──────────────────────┘ │ │
│ │ 動的に境界が移動(相互に貸借可能) │ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ User Memory (40%) │ │
│ │ (データ構造、UDFの変数、メタデータ) │ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Reserved Memory (300MB固定) │ │
│ │ (Spark内部オブジェクト) │ │
│ └────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Memory Overhead │
│ (spark.executor.memoryOverhead) │
│ (オフヒープ、NIO、JVMオーバーヘッド) │
│ デフォルト: MAX(384MB, executor.memory * 0.10) │
└─────────────────────────────────────────────────────────┘
14.2 メモリ関連の設定
# 統合メモリの割合(実行 + ストレージ)
spark.conf.set("spark.memory.fraction", "0.6")
# ストレージメモリの初期割合(統合メモリ内)
spark.conf.set("spark.memory.storageFraction", "0.5")
# オフヒープメモリ
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")
# Executor メモリオーバーヘッド
spark.conf.set("spark.executor.memoryOverhead", "2g")
# PySpark用メモリ(Python workerプロセス)
spark.conf.set("spark.executor.pyspark.memory", "2g")
14.3 メモリチューニングの指針
Executorのメモリサイズ設計:
Total Node Memory: 64GB
OS Reserved: 4GB
Remaining: 60GB
Number of Executors per Node: 3 (各5コア)
Per Executor:
spark.executor.memory: 16g
spark.executor.memoryOverhead: 4g (16g * 0.25)
Total per Executor: 20g
3 Executors × 20g = 60g ✓
大きすぎるExecutorの問題:
- GCの停止時間が長くなる(>64GBは推奨しない)
- HDFS I/Oの並列度が下がる
小さすぎるExecutorの問題:
- ブロードキャスト変数のメモリが不足
- 複数のタスクがメモリを共有できない
推奨値:
spark.executor.cores: 4〜5コアspark.executor.memory: 16〜32GBspark.executor.memoryOverhead: executor.memory の 10〜25%
15. シャッフルとパーティショニング
15.1 シャッフルの仕組み
シャッフルは、データを複数のノード間で再配置する操作であり、Sparkで最もコストの高い処理の一つである。
Stage 1 (Map Side) Stage 2 (Reduce Side)
┌─────────────┐ ┌─────────────┐
│ Partition 0 │──┐ ┌──│ Partition 0 │
│ (Sort+Write) │ │ │ │ (Merge+Read) │
├─────────────┤ │ Shuffle │ ├─────────────┤
│ Partition 1 │──┼──────────┼──│ Partition 1 │
│ (Sort+Write) │ │ Service │ │ (Merge+Read) │
├─────────────┤ │ │ ├─────────────┤
│ Partition 2 │──┘ └──│ Partition 2 │
│ (Sort+Write) │ │ (Merge+Read) │
└─────────────┘ └─────────────┘
シャッフルを引き起こす操作:
groupByKey,reduceByKey,aggregateByKeyjoin,cogrouprepartitiondistinctsortByKey
15.2 パーティション数の最適化
# シャッフルパーティション数の設定
spark.conf.set("spark.sql.shuffle.partitions", "200") # デフォルト200
# 推奨: データサイズに応じて設定
# パーティション数 = データサイズ / ターゲットパーティションサイズ
# ターゲットパーティションサイズ: 128MB〜256MB
# AQEを使用した自動最適化(推奨)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64m")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
# 手動でパーティション数を確認・変更
print(f"パーティション数: {df.rdd.getNumPartitions()}")
# repartition: ハッシュベースの再分割(シャッフルあり)
df = df.repartition(100)
df = df.repartition(100, "user_id") # カラムベースの再分割
# coalesce: パーティション削減(シャッフルなし、効率的)
df = df.coalesce(10)
15.3 データスキューへの対処
データスキュー(特定のキーにデータが偏る)は、ジョブの遅延の主要原因である。
# 方法1: AQEスキューJoin最適化(Spark 3.0+、推奨)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
# 方法2: Salting(手動スキュー解消)
import random
# スキューキーにソルト値を付加
num_salts = 10
df_skewed = df_skewed.withColumn(
"salted_key",
F.concat(F.col("join_key"), F.lit("_"), F.lit(random.randint(0, num_salts - 1)))
)
# 結合先もソルト値で展開
df_lookup_exploded = df_lookup.crossJoin(
spark.range(num_salts).withColumnRenamed("id", "salt")
).withColumn(
"salted_key",
F.concat(F.col("join_key"), F.lit("_"), F.col("salt"))
)
# ソルト付きキーで結合
result = df_skewed.join(df_lookup_exploded, "salted_key")
# 方法3: Broadcast Joinの活用(小テーブルの場合)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m")
result = df_large.join(F.broadcast(df_small), "key")
16. パフォーマンス最適化
16.1 データフォーマットの選択
| フォーマット | 圧縮 | カラムナー | スキーマ | 分割可能 | 推奨用途 |
|---|---|---|---|---|---|
| Parquet | 高 | ✓ | 埋め込み | ✓ | 分析・DWH |
| ORC | 高 | ✓ | 埋め込み | ✓ | Hive連携 |
| Avro | 中 | ✗ | 埋め込み | ✓ | ストリーミング |
| JSON | 低 | ✗ | なし | ✓ | データ交換 |
| CSV | なし | ✗ | なし | ✓ | 互換性重視 |
| Delta Lake | 高 | ✓ | メタデータ | ✓ | ACID対応 |
# Parquet(推奨)
df.write \
.mode("overwrite") \
.option("compression", "zstd") \
.partitionBy("year", "month") \
.bucketBy(256, "user_id") \
.sortBy("event_timestamp") \
.saveAsTable("analytics.user_events")
16.2 パーティショニング戦略
# ファイルパーティショニング(Hive-style)
df.write \
.partitionBy("country", "date") \
.parquet("hdfs:///data/events")
# ディレクトリ構造:
# hdfs:///data/events/country=US/date=2024-01-01/part-00000.parquet
# hdfs:///data/events/country=JP/date=2024-01-01/part-00000.parquet
# Bucket分割(Join最適化)
df.write \
.bucketBy(256, "user_id") \
.sortBy("user_id") \
.saveAsTable("user_events_bucketed")
# Bucket Joinの実行(シャッフル不要)
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.bucketing.autoBucketedScan.enabled", "true")
df1 = spark.table("user_events_bucketed")
df2 = spark.table("user_profiles_bucketed") # 同じbucket数
result = df1.join(df2, "user_id") # シャッフルが発生しない
16.3 よくあるアンチパターンと対策
| アンチパターン | 問題 | 対策 |
|---|---|---|
collect() の乱用 | Driver のOOM | take(), show(), ファイル出力を使用 |
groupByKey() の使用 | 全データをシャッフル | reduceByKey(), aggregateByKey() を使用 |
| 小ファイル問題 | メタデータオーバーヘッド | coalesce(), repartition() で適切なサイズに |
| UDFの過剰使用 | Catalyst最適化が効かない | 組み込み関数を優先使用 |
| キャッシュの忘れ | 同一データの再計算 | 繰り返し使うDataFrameは cache() |
| 不要なキャッシュ | メモリ浪費 | 使い終わったら unpersist() |
count() によるデバッグ | 不要なジョブ実行 | 本番コードから削除 |
| Cartesian Join | データ爆発 | 条件付きJoinに変更 |
17. セキュリティ
17.1 認証・認可
# Kerberos認証
spark.kerberos.keytab /etc/security/keytabs/spark.keytab
spark.kerberos.principal spark/hostname@REALM.COM
spark.yarn.keytab /etc/security/keytabs/spark.keytab
spark.yarn.principal spark/hostname@REALM.COM
# 暗号化
spark.authenticate true
spark.authenticate.secret my-shared-secret
spark.network.crypto.enabled true
spark.network.crypto.keyLength 256
spark.network.crypto.keyFactoryAlgorithm PBKDF2WithHmacSHA256
# SSL/TLS
spark.ssl.enabled true
spark.ssl.keyStore /path/to/keystore.jks
spark.ssl.keyStorePassword keystore-password
spark.ssl.trustStore /path/to/truststore.jks
spark.ssl.trustStorePassword truststore-password
spark.ssl.protocol TLSv1.3
# ACL(アクセス制御リスト)
spark.acls.enable true
spark.admin.acls admin1,admin2
spark.modify.acls user1,user2
spark.ui.view.acls *
17.2 データ保護
# カラムレベルの暗号化
from pyspark.sql.functions import sha2, aes_encrypt, aes_decrypt
# ハッシュ化(不可逆)
df = df.withColumn("email_hash", sha2(F.col("email"), 256))
# AES暗号化(可逆)
key = "0123456789abcdef" # 16バイトキー
df_encrypted = df.withColumn(
"ssn_encrypted",
F.base64(aes_encrypt(F.col("ssn"), F.lit(key)))
)
# 復号
df_decrypted = df_encrypted.withColumn(
"ssn_decrypted",
aes_decrypt(F.unbase64(F.col("ssn_encrypted")), F.lit(key)).cast("string")
)
# データマスキング
df = df.withColumn(
"phone_masked",
F.concat(F.lit("***-***-"), F.substring(F.col("phone"), -4, 4))
)
18. 監視とデバッグ
18.1 Spark Web UI
Spark Web UIはポート4040(デフォルト)で提供され、以下の情報を確認できる:
| タブ | 確認可能な情報 |
|---|---|
| Jobs | ジョブ一覧、各ジョブのステージ構成、実行時間 |
| Stages | ステージ一覧、タスク分布、シャッフルサイズ |
| Storage | キャッシュされたRDD/DataFrame、メモリ使用状況 |
| Environment | 設定値一覧、JVM情報 |
| Executors | Executor一覧、メモリ・ディスク使用状況、GC時間 |
| SQL | SQL実行計画、クエリ統計情報 |
| Streaming | ストリーミングクエリの状態、処理レート |
18.2 メトリクス設定
# metrics.properties
# CSVレポーター
*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
*.sink.csv.period=10
*.sink.csv.unit=seconds
*.sink.csv.directory=/var/log/spark/metrics
# Prometheusレポーター(Spark 3.0+)
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics/prometheus
# Graphiteレポーター
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=graphite-host
*.sink.graphite.port=2003
*.sink.graphite.prefix=spark
*.sink.graphite.period=10
*.sink.graphite.unit=seconds
# JMX
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
18.3 ログ設定
# log4j2.properties
rootLogger.level = WARN
rootLogger.appenderRef.stdout.ref = console
# Sparkのログレベル
logger.spark.name = org.apache.spark
logger.spark.level = WARN
# SQLのログ
logger.sql.name = org.apache.spark.sql
logger.sql.level = WARN
# カスタムアプリケーションのログ
logger.myapp.name = com.mycompany.myapp
logger.myapp.level = INFO
# イベントログ(History Server用)
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs:///spark-history
# spark.eventLog.compress true
18.4 デバッグテクニック
# 実行計画の確認
df.explain(mode="formatted")
# パーティション数の確認
print(f"パーティション数: {df.rdd.getNumPartitions()}")
# パーティションサイズの確認
partition_sizes = df.rdd.mapPartitions(
lambda it: [sum(1 for _ in it)]
).collect()
print(f"パーティションサイズ分布: {partition_sizes}")
# データスキューの検出
df.groupBy("key_column") \
.count() \
.orderBy(F.desc("count")) \
.show(20)
# Spark UIへのジョブ説明追加
spark.sparkContext.setJobDescription("Loading user data")
df = spark.read.parquet("hdfs:///data/users")
spark.sparkContext.setJobDescription("Aggregating sales by region")
result = df.groupBy("region").agg(F.sum("amount"))
18.5 Spark History Server
# History Serverの起動
$SPARK_HOME/sbin/start-history-server.sh
# 設定
spark.history.fs.logDirectory hdfs:///spark-history
spark.history.ui.port 18080
spark.history.retainedApplications 50
spark.history.fs.cleaner.enabled true
spark.history.fs.cleaner.interval 1d
spark.history.fs.cleaner.maxAge 7d
19. エコシステム連携
19.1 主要なエコシステム連携
┌─────────────────────────────────────────────────────────┐
│ Apache Spark │
├──────────┬──────────┬──────────┬──────────┬─────────────┤
│ストレージ │メッセージ │メタデータ │ BI/可視化 │ ワークフロー │
├──────────┼──────────┼──────────┼──────────┼─────────────┤
│ HDFS │ Kafka │ Hive MS │ Tableau │ Airflow │
│ S3 │ Kinesis │ AWS Glue │ Superset │ Dagster │
│ GCS │ Pulsar │ Unity │ Metabase │ Prefect │
│ ADLS │ Event │ Catalog │ Looker │ Oozie │
│ Delta │ Hubs │ Nessie │ Redash │ Luigi │
│ Iceberg │ │ Polaris │ │ │
│ Hudi │ │ │ │ │
└──────────┴──────────┴──────────┴──────────┴─────────────┘
19.2 Apache Kafka との連携
# Kafka -> Spark -> Kafka パイプライン
df_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "input-topic") \
.option("startingOffsets", "latest") \
.option("kafka.group.id", "spark-consumer-group") \
.option("failOnDataLoss", "false") \
.load()
# 処理
processed = df_stream \
.selectExpr("CAST(value AS STRING) as json_str") \
.select(F.from_json("json_str", schema).alias("data")) \
.select("data.*") \
.filter(F.col("status") == "active") \
.withColumn("processed_at", F.current_timestamp())
# Kafkaへ出力
query = processed \
.selectExpr("CAST(user_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092") \
.option("topic", "output-topic") \
.option("checkpointLocation", "hdfs:///checkpoints/kafka-pipeline") \
.start()
19.3 Delta Lake との連携
# Delta Lakeの依存関係追加
# --packages io.delta:delta-spark_2.12:3.1.0
from delta.tables import DeltaTable
# Delta テーブルの作成
df.write.format("delta") \
.mode("overwrite") \
.partitionBy("date") \
.save("s3://bucket/delta/events")
# MERGE(Upsert)操作
delta_table = DeltaTable.forPath(spark, "s3://bucket/delta/events")
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={
"name": "source.name",
"updated_at": "source.updated_at"
}).whenNotMatchedInsertAll() \
.execute()
# タイムトラベル
df_yesterday = spark.read \
.format("delta") \
.option("timestampAsOf", "2024-01-15") \
.load("s3://bucket/delta/events")
# バキューム(古いファイルの削除)
delta_table.vacuum(168) # 7日より古いファイルを削除
19.4 Apache Airflow との連携
# Airflow DAG定義例
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'spark_etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # 毎日02:00
catchup=False,
)
spark_job = SparkSubmitOperator(
task_id='run_spark_etl',
application='/opt/spark/jobs/etl_job.py',
conn_id='spark_default',
conf={
'spark.executor.memory': '8g',
'spark.executor.cores': '4',
'spark.dynamicAllocation.enabled': 'true',
},
application_args=['--date', '{{ ds }}'],
dag=dag,
)
20. ユースケースとベストプラクティス
20.1 代表的なユースケース
ETL パイプライン
def run_etl(date: str):
"""日次ETLジョブ"""
# 1. Extract(抽出)
raw_events = spark.read \
.format("json") \
.schema(event_schema) \
.load(f"s3://raw-data/events/date={date}/")
# 2. Transform(変換)
transformed = raw_events \
.filter(F.col("event_type").isNotNull()) \
.withColumn("event_hour", F.hour("event_timestamp")) \
.withColumn("country", F.upper(F.col("country_code"))) \
.dropDuplicates(["event_id"]) \
.withColumn("processed_at", F.current_timestamp())
# 3. データ品質チェック
total_count = transformed.count()
null_count = transformed.filter(F.col("user_id").isNull()).count()
null_ratio = null_count / total_count if total_count > 0 else 0
if null_ratio > 0.05:
raise ValueError(f"NULL率が閾値超過: {null_ratio:.2%}")
# 4. Load(ロード)
transformed.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", f"date = '{date}'") \
.partitionBy("date", "country") \
.save("s3://processed-data/events")
print(f"ETL完了: {total_count}件処理")
リアルタイムダッシュボード
def streaming_aggregation():
"""リアルタイム集計ストリーミング"""
events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "page-views") \
.load()
parsed = events \
.select(F.from_json(
F.col("value").cast("string"),
page_view_schema
).alias("data")) \
.select("data.*")
# 5分ウィンドウでの集計
aggregated = parsed \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
F.window("timestamp", "5 minutes"),
"page_url"
) \
.agg(
F.count("*").alias("view_count"),
F.approx_count_distinct("user_id").alias("unique_visitors"),
F.avg("load_time_ms").alias("avg_load_time")
)
# JDBCシンク
def write_to_db(batch_df, batch_id):
batch_df.select(
F.col("window.start").alias("window_start"),
"page_url", "view_count", "unique_visitors", "avg_load_time"
).write \
.format("jdbc") \
.option("url", "jdbc:postgresql://db:5432/analytics") \
.option("dbtable", "realtime_page_stats") \
.mode("append") \
.save()
query = aggregated.writeStream \
.foreachBatch(write_to_db) \
.option("checkpointLocation", "hdfs:///cp/dashboard") \
.trigger(processingTime="1 minute") \
.start()
return query
20.2 ベストプラクティスまとめ
- DataFrame API を RDD API より優先使用する: Catalyst 最適化の恩恵を受けられる
- 組み込み関数を UDF より優先使用する: UDFはCatalyst最適化をバイパスする
- AQE を有効にする: Spark 3.0+ では
spark.sql.adaptive.enabled=true - 適切なパーティション数を設定する: 各パーティション 128MB〜256MB が目安
- Broadcast Join を活用する: 小テーブルとの結合はBroadcastが最適
- Parquet / Delta Lake 形式を使用する: カラムナーフォーマットの利点を活用
- 不要なシャッフルを避ける:
coalesce>repartition(パーティション削減時) - キャッシュを適切に管理する: 再利用するDataFrameのみキャッシュし、不要になったら解放
- データスキューに対処する: AQE または Salting テクニックを使用
- 推測実行を検討する:
spark.speculation=true(ストラグラータスク対策)
21. Apache Spark 3.x / 4.x の新機能
21.1 Spark 3.x の主要機能
| 機能 | バージョン | 説明 |
|---|---|---|
| Adaptive Query Execution | 3.0 | ランタイム統計に基づく動的最適化 |
| Dynamic Partition Pruning | 3.0 | スタースキーマクエリの高速化 |
| Kubernetes GA | 3.1 | Kubernetesサポートの正式版 |
| RocksDB StateStore | 3.2 | ストリーミング状態管理の改善 |
| Push-based Shuffle | 3.2 | シャッフル性能の向上 |
| Spark Connect | 3.4 | クライアント-サーバーアーキテクチャ |
| Arrow-optimized Python UDF | 3.5 | PySpark UDFの大幅高速化 |
21.2 Spark 4.0 の主要機能
# 1. ANSI Mode がデフォルトで有効
# 型変換の厳密化(暗黙的な型変換がエラーに)
spark.conf.set("spark.sql.ansi.enabled", "true") # Spark 4.0でデフォルト
# 2. VARIANT 型のサポート
# 半構造化データの効率的な処理
df = spark.sql("""
SELECT
parse_json('{"name": "Alice", "age": 30}') as data
""")
# VARIANT型へのアクセス
df.select(
F.col("data:name").alias("name"),
F.col("data:age").cast("int").alias("age")
).show()
# 3. Collation サポート
# 文字列比較のロケール対応
spark.sql("""
CREATE TABLE users (
name STRING COLLATE 'UNICODE_CI', -- 大文字小文字を区別しない
email STRING
)
""")
# 4. Python Data Source API V2
# Pythonでカスタムデータソースを実装
class MyDataSource:
@classmethod
def schema(cls):
return "id INT, name STRING, value DOUBLE"
@classmethod
def reader(cls, schema):
return MyDataSourceReader()
# 5. Spark Connect の改善
# リモートセッションのセキュリティ強化、カスタムプラグイン対応
22. まとめ
Apache Spark は、バッチ処理、ストリーミング、機械学習、グラフ処理を統合的に扱える強力な分散処理エンジンである。本記事で解説した内容を以下にまとめる:
技術的要点
| カテゴリ | 要点 |
|---|---|
| アーキテクチャ | Driver-Executor モデル、DAGベースの実行計画 |
| データ抽象化 | RDD → DataFrame → Dataset の進化 |
| SQL処理 | Catalyst オプティマイザ + Tungsten エンジン |
| ストリーミング | Structured Streaming によるマイクロバッチ/連続処理 |
| 機械学習 | MLlib Pipeline API による分散ML |
| デプロイ | Standalone / YARN / Kubernetes |
| 最適化 | AQE、Broadcast Join、パーティショニング |
| 最新動向 | Spark Connect、VARIANT型、ANSI準拠 |
選定の指針
Apache Spark は以下の要件がある場合に特に効果的である:
- 大規模バッチETL: TB〜PBスケールのデータ変換
- インタラクティブ分析: Spark SQL によるアドホッククエリ
- ストリーミング処理: Kafka等からのニアリアルタイム処理
- 機械学習パイプライン: 大規模データでのモデル訓練
- 統一基盤: 上記すべてを単一フレームワークで実現
一方、以下のケースでは他の選択肢も検討すべきである:
- 超低レイテンシ(ミリ秒単位): Apache Flink の方が適している
- 小規模データ: Pandas / DuckDB の方がシンプル
- リアルタイムOLAP: Apache Druid / ClickHouse の方が適している
- グラフDB用途: Neo4j / Amazon Neptune の方が適している
Apache Spark は継続的に進化しており、Spark 4.0 では ANSI 準拠の強化、VARIANT 型、Collation サポートなど、より現代的なデータ処理要件に対応する機能が追加されている。データエンジニアリングの基盤として、今後も重要な役割を果たし続けるだろう。
参考文献: