Apache Spark

Apache Spark 徹底解説:アーキテクチャ・機能・設定の全容

目次

  1. はじめに
  2. Apache Spark の概要と歴史
  3. アーキテクチャ全体像
  4. コアコンポーネント
  5. RDD(Resilient Distributed Dataset)
  6. DataFrame と Dataset API
  7. Spark SQL
  8. Catalyst オプティマイザと Tungsten 実行エンジン
  9. Structured Streaming
  10. MLlib(機械学習ライブラリ)
  11. GraphX / GraphFrames
  12. デプロイメントモード
  13. 設定とチューニング
  14. メモリ管理
  15. シャッフルとパーティショニング
  16. パフォーマンス最適化
  17. セキュリティ
  18. 監視とデバッグ
  19. エコシステム連携
  20. ユースケースとベストプラクティス
  21. Apache Spark 3.x / 4.x の新機能
  22. まとめ

1. はじめに

Apache Spark は、大規模データ処理のための統合分析エンジンであり、バッチ処理、ストリーミング処理、機械学習、グラフ処理を単一のフレームワークで実現する。2009年にUC BerkeleyのAMPLabで誕生し、2014年にApache Software Foundationのトップレベルプロジェクトとなって以来、ビッグデータ処理のデファクトスタンダードとして世界中の企業で採用されている。

本記事では、Apache Sparkのアーキテクチャから各コンポーネントの詳細、設定例、パフォーマンスチューニングまでを包括的に解説する。

1.1 本記事の対象読者

  • ビッグデータ基盤の設計・構築を担当するデータエンジニア
  • Sparkを用いた分析パイプラインを構築するデータサイエンティスト
  • Sparkクラスターの運用・監視を担当するSREエンジニア
  • 分散処理基盤の技術選定を行うアーキテクト

1.2 前提知識

  • 分散システムの基本概念(ノード、クラスター、レプリケーション)
  • Hadoop エコシステムの基礎知識
  • SQL および Python / Scala / Java のいずれかのプログラミング経験

2. Apache Spark の概要と歴史

2.1 誕生の背景

Apache Sparkは、MapReduceの課題を解決するために生まれた。MapReduceは各ステップの中間結果をディスクに書き出す必要があり、反復処理(機械学習アルゴリズムなど)において深刻なパフォーマンスボトルネックとなっていた。

Sparkは「インメモリ処理」というコンセプトを中心に設計され、中間データをメモリ上に保持することで、MapReduceと比較して最大100倍の高速化を実現した。

2.2 バージョンの変遷

バージョンリリース年主な特徴
0.x2012初期リリース、RDD API
1.02014Spark SQL、MLlib、GraphX の導入
1.32015DataFrame API の導入
1.62016Dataset API の導入
2.02016Structured Streaming、Catalyst/Tungsten の改善
2.32018Kubernetes サポート
2.42019Barrier Execution Mode、高階関数
3.02020Adaptive Query Execution(AQE)、Dynamic Partition Pruning
3.12021ANSI SQL 準拠の改善
3.22022RocksDB StateStore、Push-based Shuffle
3.32022多数のパフォーマンス改善
3.42023Spark Connect の導入
3.52023English SDK for Apache Spark、Arrow-optimized Python UDF
4.02025ANSI mode デフォルト化、Variant 型、Collation サポート

2.3 Spark のエコシステム上の位置づけ

┌─────────────────────────────────────────────────────────────────┐
│                     Applications                                 │
│  (Data Analytics, ML Pipelines, ETL, Real-time Processing)      │
├──────────┬──────────┬──────────┬──────────┬────────────────────┤
│ Spark    │ Spark    │Structured│  MLlib   │ GraphX /           │
│ SQL      │ Core     │Streaming │          │ GraphFrames        │
├──────────┴──────────┴──────────┴──────────┴────────────────────┤
│                   Spark Core Engine                              │
│           (Task Scheduling, Memory Management,                   │
│            Fault Recovery, Storage Interaction)                   │
├─────────────────────────────────────────────────────────────────┤
│              Cluster Manager                                     │
│     (Standalone / YARN / Mesos / Kubernetes)                    │
├─────────────────────────────────────────────────────────────────┤
│              Storage Layer                                       │
│    (HDFS / S3 / GCS / Azure Blob / Local FS / Delta Lake /     │
│     Apache Iceberg / Apache Hudi)                               │
└─────────────────────────────────────────────────────────────────┘

3. アーキテクチャ全体像

3.1 マスター・ワーカーアーキテクチャ

Apache Sparkはマスター・ワーカー(Master-Worker)アーキテクチャを採用している。

┌──────────────────────────────────────────────────────────┐
│                    Driver Program                         │
│  ┌─────────────┐  ┌──────────────┐  ┌────────────────┐  │
│  │ SparkContext │  │ DAG Scheduler│  │ Task Scheduler │  │
│  └──────┬──────┘  └──────┬───────┘  └───────┬────────┘  │
│         │                │                    │           │
└─────────┼────────────────┼────────────────────┼───────────┘
          │                │                    │
          ▼                ▼                    ▼
┌─────────────────────────────────────────────────────────┐
│                   Cluster Manager                        │
│            (YARN / K8s / Standalone / Mesos)             │
└────────┬──────────────────┬──────────────────┬──────────┘
         │                  │                  │
         ▼                  ▼                  ▼
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│   Worker 1   │  │   Worker 2   │  │   Worker N   │
│ ┌──────────┐ │  │ ┌──────────┐ │  │ ┌──────────┐ │
│ │Executor 1│ │  │ │Executor 2│ │  │ │Executor N│ │
│ │┌────┐┌──┐│ │  │ │┌────┐┌──┐│ │  │ │┌────┐┌──┐│ │
│ ││Task││Ca││ │  │ ││Task││Ca││ │  │ ││Task││Ca││ │
│ ││    ││ch││ │  │ ││    ││ch││ │  │ ││    ││ch││ │
│ │└────┘└──┘│ │  │ │└────┘└──┘│ │  │ │└────┘└──┘│ │
│ └──────────┘ │  │ └──────────┘ │  │ └──────────┘ │
└──────────────┘  └──────────────┘  └──────────────┘

Driver Program

Driverは Spark アプリケーションのメインプロセスであり、以下の責務を持つ:

  • SparkContext の生成: アプリケーションのエントリーポイントとなるオブジェクトを生成
  • DAG(有向非巡回グラフ)の構築: ユーザーコードから論理実行計画を構築
  • ステージへの分割: シャッフル境界でDAGをステージに分割
  • タスクのスケジューリング: 各ステージをタスクに変換し、Executorに配布
  • 結果の集約: Executorからの結果を収集し、最終結果を生成

Executor

各Executor は以下の特性を持つ:

  • Worker ノード上で動作するJVMプロセス
  • 割り当てられたタスクを実行
  • データをメモリまたはディスクにキャッシュ
  • 結果をDriverに返送

3.2 ジョブ実行フロー

User Code
    │
    ▼
SparkSession / SparkContext
    │
    ▼
Logical Plan (論理計画)
    │
    ▼
Catalyst Optimizer (最適化)
    │
    ▼
Physical Plan (物理計画)
    │
    ▼
DAG Scheduler
    │  ステージ分割(シャッフル境界で分割)
    ▼
Task Scheduler
    │  タスクをExecutorに割り当て
    ▼
Executors (タスク実行)
    │
    ▼
Result (結果の返却)

3.3 アプリケーション → ジョブ → ステージ → タスクの階層

Application (アプリケーション)
├── Job 1 (アクションごとに1ジョブ)
│   ├── Stage 1 (シャッフル境界で分割)
│   │   ├── Task 1.1 (パーティションごとに1タスク)
│   │   ├── Task 1.2
│   │   └── Task 1.N
│   └── Stage 2
│       ├── Task 2.1
│       └── Task 2.M
└── Job 2
    └── Stage 3
        ├── Task 3.1
        └── Task 3.K

Narrow Dependency(狭い依存関係): 親パーティションが1つの子パーティションにのみマッピングされる(例:map, filter)。同一ステージ内で処理される。

Wide Dependency(広い依存関係): 親パーティションが複数の子パーティションにマッピングされる(例:groupBy, join)。シャッフルが発生し、新しいステージの境界となる。

4. コアコンポーネント

4.1 SparkSession

Spark 2.0 以降、SparkSession が全てのAPI機能への統合エントリーポイントとなった。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApplication") \
    .master("yarn") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .enableHiveSupport() \
    .getOrCreate()
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("MyApplication")
  .master("yarn")
  .config("spark.executor.memory", "8g")
  .config("spark.executor.cores", "4")
  .config("spark.sql.shuffle.partitions", "200")
  .config("spark.sql.adaptive.enabled", "true")
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .enableHiveSupport()
  .getOrCreate()

4.2 SparkContext

SparkContext はSpark機能へのメインエントリーポイントであり、クラスターへの接続を表す。SparkSession 内部に含まれており、spark.sparkContext でアクセスできる。

sc = spark.sparkContext

# RDDの作成
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=4)

# テキストファイルの読み込み
text_rdd = sc.textFile("hdfs:///data/logs/*.log", minPartitions=10)

# ブロードキャスト変数
lookup_table = {"US": "United States", "JP": "Japan", "UK": "United Kingdom"}
broadcast_var = sc.broadcast(lookup_table)

# アキュムレータ
error_count = sc.accumulator(0)

4.3 ブロードキャスト変数

ブロードキャスト変数は、各Executorに読み取り専用のデータを効率的に配布するためのメカニズムである。

# 大きなルックアップテーブルをブロードキャスト
country_codes = spark.sparkContext.broadcast(
    {"US": "United States", "JP": "Japan", "DE": "Germany", "FR": "France"}
)

# UDF内でブロードキャスト変数を使用
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(StringType())
def get_country_name(code):
    return country_codes.value.get(code, "Unknown")

df = df.withColumn("country_name", get_country_name(df["country_code"]))

ブロードキャスト変数のベストプラクティス:

  • サイズが数MB〜数百MBのルックアップデータに適用
  • 全Executorで共通して参照されるデータに使用
  • spark.sql.autoBroadcastJoinThreshold(デフォルト10MB)でBroadcast Joinの閾値を制御
# Broadcast Joinの閾値設定
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m")  # 50MBに拡大

4.4 アキュムレータ

アキュムレータは、Executor側で値を「加算」し、Driver側で集約結果を取得できる共有変数である。

from pyspark.accumulators import AccumulatorParam

# 標準的なアキュムレータ
error_counter = spark.sparkContext.accumulator(0)
warning_counter = spark.sparkContext.accumulator(0)

def process_log(line):
    global error_counter, warning_counter
    if "ERROR" in line:
        error_counter.add(1)
    elif "WARN" in line:
        warning_counter.add(1)
    return line

rdd = spark.sparkContext.textFile("hdfs:///logs/application.log")
processed_rdd = rdd.map(process_log)
processed_rdd.count()  # アクションをトリガーして処理を実行

print(f"Errors: {error_counter.value}")
print(f"Warnings: {warning_counter.value}")

注意点: アキュムレータの更新は「少なくとも1回(at-least-once)」のセマンティクスで実行される。タスクの再実行により値が重複カウントされる可能性があるため、正確なカウントが必要な場合は foreachPartition 内でのみ更新するか、アクション内で使用する。


5. RDD(Resilient Distributed Dataset)

5.1 RDD の基本概念

RDD(Resilient Distributed Dataset)は Spark の最も基本的なデータ抽象化であり、以下の特性を持つ:

  • 不変性(Immutable): 一度作成されたRDDは変更できない
  • 分散性(Distributed): データは複数ノードにパーティション分割されて格納
  • 耐障害性(Resilient): Lineage(系譜)情報により、障害時にデータを再計算可能
  • 遅延評価(Lazy Evaluation): 変換はアクションが呼び出されるまで実行されない

5.2 RDD の作成

# 1. コレクションから作成(parallelize)
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, numSlices=4)

# 2. 外部データソースから作成
text_rdd = sc.textFile("hdfs:///data/input.txt")
whole_text_rdd = sc.wholeTextFiles("hdfs:///data/dir/")
sequence_rdd = sc.sequenceFile("hdfs:///data/seq_file")

# 3. 既存RDDからの変換
filtered_rdd = rdd.filter(lambda x: x > 5)

5.3 RDD の操作(Transformations と Actions)

Transformations(変換 - 遅延評価)

# map: 各要素に関数を適用
squared = rdd.map(lambda x: x ** 2)

# flatMap: 各要素を0個以上の要素にマッピング
words = text_rdd.flatMap(lambda line: line.split(" "))

# filter: 条件を満たす要素を抽出
evens = rdd.filter(lambda x: x % 2 == 0)

# mapPartitions: パーティション単位で処理(効率的)
def process_partition(iterator):
    # DB接続などのセットアップを1回だけ実行
    import json
    results = []
    for item in iterator:
        results.append(json.loads(item))
    return iter(results)

processed = text_rdd.mapPartitions(process_partition)

# reduceByKey: キーごとに値を集約(shuffle前にcombine)
word_counts = words.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)

# groupByKey: キーごとに値をグループ化(非推奨 - メモリ効率が悪い)
grouped = pairs_rdd.groupByKey()

# join: 2つのRDDをキーで結合
joined = rdd1.join(rdd2)  # Inner Join
left_joined = rdd1.leftOuterJoin(rdd2)

# coalesce / repartition: パーティション数の変更
reduced = rdd.coalesce(2)       # パーティション削減(シャッフルなし)
expanded = rdd.repartition(10)  # パーティション変更(シャッフルあり)

# sortByKey: キーでソート
sorted_rdd = pairs_rdd.sortByKey(ascending=False)

Actions(アクション - 即座に実行)

# collect: 全要素をDriverに収集(小データセットのみ)
result = rdd.collect()

# count: 要素数をカウント
total = rdd.count()

# first / take: 先頭要素を取得
first_elem = rdd.first()
top_5 = rdd.take(5)

# reduce: 全要素を集約
total_sum = rdd.reduce(lambda a, b: a + b)

# foreach: 各要素に副作用のある処理を適用
rdd.foreach(lambda x: print(x))

# saveAsTextFile: テキストファイルとして保存
rdd.saveAsTextFile("hdfs:///output/result")

# countByKey: キーごとの出現回数
counts = pairs_rdd.countByKey()

# takeSample: ランダムサンプリング
sample = rdd.takeSample(withReplacement=False, num=10, seed=42)

5.4 RDD の永続化(Persistence)

from pyspark import StorageLevel

# キャッシュ(メモリのみ、デフォルト)
rdd.cache()  # persist(StorageLevel.MEMORY_ONLY) と同等

# 各ストレージレベル
rdd.persist(StorageLevel.MEMORY_ONLY)       # メモリのみ、溢れたら再計算
rdd.persist(StorageLevel.MEMORY_AND_DISK)   # メモリ優先、溢れたらディスク
rdd.persist(StorageLevel.DISK_ONLY)         # ディスクのみ
rdd.persist(StorageLevel.MEMORY_ONLY_SER)   # メモリのみ、シリアライズ形式
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) # シリアライズ形式 + ディスク
rdd.persist(StorageLevel.OFF_HEAP)          # オフヒープメモリ

# キャッシュの解除
rdd.unpersist()
ストレージレベルメモリ使用ディスク使用シリアライズ備考
MEMORY_ONLYなしなし最速、メモリ不足時に再計算
MEMORY_AND_DISKフォールバックなしメモリ不足時にディスクへスピル
MEMORY_ONLY_SERなしありメモリ効率とCPUのトレードオフ
DISK_ONLYなしありメモリを使わない
OFF_HEAPオフヒープなしありGCの影響を受けない

5.5 Lineage と障害回復

RDDの耐障害性は Lineage(系譜) によって実現される。各RDDは自身がどの親RDDからどのような変換で生成されたかの情報を保持しており、パーティションが失われた場合はLineageを辿って再計算できる。

# RDDのLineageを確認
print(rdd.toDebugString())

出力例:

(4) MapPartitionsRDD[3] at filter at <stdin>:1 []
 |  MapPartitionsRDD[2] at map at <stdin>:1 []
 |  ParallelCollectionRDD[0] at parallelize at <stdin>:1 []

チェックポイント: Lineageが長くなりすぎた場合、チェックポイントを設定してLineageを切断し、データをHDFS等に永続化する。

sc.setCheckpointDir("hdfs:///checkpoints/")
rdd.checkpoint()
rdd.count()  # チェックポイントを実行するためにアクションが必要

6. DataFrame と Dataset API

6.1 DataFrame の概要

DataFrameは名前付きカラムで構成される分散データコレクションであり、リレーショナルデータベースのテーブルやPandasのDataFrameに類似している。RDDに比べて以下の利点がある:

  • スキーマ情報: 各カラムに名前と型が付与される
  • Catalyst オプティマイザ: SQLエンジンによる自動最適化
  • Tungsten エンジン: オフヒープメモリ管理とコード生成による高速実行
  • 言語間の統一的なパフォーマンス: Python、Scala、Java、Rのどの言語からも同等の性能

6.2 DataFrame の作成

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# 1. スキーマ定義によるDataFrame作成
schema = StructType([
    StructField("user_id", IntegerType(), nullable=False),
    StructField("name", StringType(), nullable=False),
    StructField("email", StringType(), nullable=True),
    StructField("age", IntegerType(), nullable=True),
    StructField("salary", DoubleType(), nullable=True)
])

data = [
    (1, "Alice", "alice@example.com", 30, 75000.0),
    (2, "Bob", "bob@example.com", 25, 65000.0),
    (3, "Charlie", None, 35, 85000.0)
]

df = spark.createDataFrame(data, schema=schema)
df.show()
df.printSchema()

# 2. CSVファイルからの読み込み
df_csv = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("dateFormat", "yyyy-MM-dd") \
    .option("nullValue", "NA") \
    .csv("hdfs:///data/users.csv")

# 3. Parquetファイルからの読み込み
df_parquet = spark.read.parquet("hdfs:///data/users.parquet")

# 4. JSONファイルからの読み込み
df_json = spark.read \
    .option("multiLine", "true") \
    .json("hdfs:///data/users.json")

# 5. JDBCからの読み込み
df_jdbc = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://db-host:5432/mydb") \
    .option("dbtable", "public.users") \
    .option("user", "dbuser") \
    .option("password", "dbpass") \
    .option("numPartitions", "10") \
    .option("partitionColumn", "user_id") \
    .option("lowerBound", "1") \
    .option("upperBound", "1000000") \
    .load()

# 6. Delta Lakeからの読み込み
df_delta = spark.read.format("delta").load("s3://bucket/delta-table")

# 7. Apache Icebergからの読み込み
df_iceberg = spark.read.format("iceberg").load("catalog.db.table_name")

6.3 DataFrame の操作

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# カラム選択
df.select("name", "age", "salary").show()

# カラムの追加・変換
df = df.withColumn("salary_monthly", F.col("salary") / 12)
df = df.withColumn("age_group",
    F.when(F.col("age") < 30, "young")
     .when(F.col("age") < 40, "middle")
     .otherwise("senior")
)

# フィルタリング
df.filter(F.col("age") > 25).show()
df.filter((F.col("age") > 25) & (F.col("salary") > 70000)).show()
df.filter(F.col("name").like("A%")).show()
df.filter(F.col("email").isNotNull()).show()

# 集約
df.groupBy("age_group") \
    .agg(
        F.count("*").alias("count"),
        F.avg("salary").alias("avg_salary"),
        F.max("salary").alias("max_salary"),
        F.min("salary").alias("min_salary"),
        F.sum("salary").alias("total_salary"),
        F.stddev("salary").alias("stddev_salary")
    ).show()

# ウィンドウ関数
window_spec = Window.partitionBy("age_group").orderBy(F.desc("salary"))

df = df.withColumn("rank", F.rank().over(window_spec))
df = df.withColumn("dense_rank", F.dense_rank().over(window_spec))
df = df.withColumn("row_number", F.row_number().over(window_spec))
df = df.withColumn("salary_percentile",
    F.percent_rank().over(window_spec)
)

# 累積ウィンドウ
cumulative_window = Window.partitionBy("age_group") \
    .orderBy("salary") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df = df.withColumn("cumulative_salary", F.sum("salary").over(cumulative_window))

# ソート
df.orderBy(F.desc("salary")).show()
df.orderBy(F.col("age_group").asc(), F.col("salary").desc()).show()

# 結合
df_orders = spark.read.parquet("hdfs:///data/orders.parquet")

# Inner Join
df_joined = df.join(df_orders, df["user_id"] == df_orders["customer_id"], "inner")

# Left Outer Join
df_left = df.join(df_orders, df["user_id"] == df_orders["customer_id"], "left")

# Broadcast Join(小さいテーブルをブロードキャスト)
from pyspark.sql.functions import broadcast
df_broadcast = df.join(broadcast(small_df), "key_column")

# 重複排除
df.dropDuplicates(["email"]).show()

# UDF(ユーザー定義関数)
@udf(StringType())
def format_name(name):
    return name.upper() if name else "UNKNOWN"

df = df.withColumn("formatted_name", format_name(df["name"]))

# Pandas UDF(Arrow ベースの高速UDF)
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(DoubleType())
def normalize_salary(salary: pd.Series) -> pd.Series:
    return (salary - salary.mean()) / salary.std()

df = df.withColumn("normalized_salary", normalize_salary(df["salary"]))

6.4 DataFrame の書き出し

# Parquet形式で書き出し(推奨)
df.write \
    .mode("overwrite") \
    .partitionBy("age_group") \
    .option("compression", "snappy") \
    .parquet("hdfs:///output/users_parquet")

# CSV形式で書き出し
df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .csv("hdfs:///output/users_csv")

# Delta Lake形式で書き出し
df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("s3://bucket/delta-table")

# Hiveテーブルに書き出し
df.write \
    .mode("overwrite") \
    .saveAsTable("default.users")

# JDBCに書き出し
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://db-host:5432/mydb") \
    .option("dbtable", "public.users_output") \
    .option("user", "dbuser") \
    .option("password", "dbpass") \
    .option("batchsize", "10000") \
    .mode("append") \
    .save()

# 書き出しモード
# "overwrite": 既存データを上書き
# "append": 既存データに追加
# "ignore": テーブルが存在する場合は何もしない
# "error" / "errorifexists": テーブルが存在する場合はエラー(デフォルト)

6.5 Dataset API(Scala / Java)

Dataset APIはScalaとJavaで利用可能な型安全なAPIである。コンパイル時に型チェックが行われるため、ランタイムエラーを事前に検出できる。

// Case Classの定義
case class User(userId: Int, name: String, email: Option[String], age: Int, salary: Double)

// DatasetのからDFへの変換
val ds: Dataset[User] = df.as[User]

// 型安全な操作
val highSalaryUsers = ds.filter(_.salary > 70000)
val nameAndSalary = ds.map(u => (u.name, u.salary))

// DataFrameとDatasetの相互変換
val df: DataFrame = ds.toDF()
val ds2: Dataset[User] = df.as[User]

注意: PySpark では Dataset API は利用できない。Python は動的型付け言語のため、DataFrameが主要なAPIとなる。

7. Spark SQL

7.1 概要

Spark SQL は構造化データを処理するための Spark モジュールであり、SQL クエリと DataFrame API の両方からアクセスできる。ANSI SQL 標準に準拠しており、Hive との互換性も備えている。

7.2 SQL クエリの実行

# DataFrameをTemporary Viewとして登録
df.createOrReplaceTempView("users")

# SQLクエリの実行
result = spark.sql("""
    SELECT 
        age_group,
        COUNT(*) as user_count,
        AVG(salary) as avg_salary,
        PERCENTILE_APPROX(salary, 0.5) as median_salary
    FROM users
    WHERE age > 20
    GROUP BY age_group
    HAVING COUNT(*) > 10
    ORDER BY avg_salary DESC
""")
result.show()

# グローバルTemporary View(SparkSession間で共有)
df.createOrReplaceGlobalTempView("global_users")
spark.sql("SELECT * FROM global_temp.global_users").show()

# CTEを使った複雑なクエリ
result = spark.sql("""
    WITH ranked_users AS (
        SELECT 
            *,
            ROW_NUMBER() OVER (PARTITION BY age_group ORDER BY salary DESC) as rn
        FROM users
    )
    SELECT * FROM ranked_users WHERE rn <= 3
""")

7.3 Hive メタストアとの連携

# HiveサポートをenableしてSparkSessionを作成
spark = SparkSession.builder \
    .appName("HiveIntegration") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .enableHiveSupport() \
    .getOrCreate()

# Hiveテーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS analytics.user_events (
        event_id BIGINT,
        user_id INT,
        event_type STRING,
        event_data STRING,
        event_timestamp TIMESTAMP
    )
    PARTITIONED BY (event_date DATE)
    STORED AS PARQUET
    TBLPROPERTIES (
        'parquet.compression'='SNAPPY'
    )
""")

# パーティションの動的追加
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

spark.sql("""
    INSERT OVERWRITE TABLE analytics.user_events
    PARTITION (event_date)
    SELECT event_id, user_id, event_type, event_data, 
           event_timestamp, CAST(event_timestamp AS DATE) as event_date
    FROM raw_events
""")

# テーブル情報の確認
spark.sql("DESCRIBE FORMATTED analytics.user_events").show(truncate=False)
spark.sql("SHOW PARTITIONS analytics.user_events").show()

7.4 カタログAPI

# データベース一覧
spark.catalog.listDatabases()

# テーブル一覧
spark.catalog.listTables("analytics")

# カラム一覧
spark.catalog.listColumns("analytics", "user_events")

# テーブルの存在確認
spark.catalog.tableExists("analytics.user_events")

# キャッシュ管理
spark.catalog.cacheTable("users")
spark.catalog.isCached("users")
spark.catalog.uncacheTable("users")
spark.catalog.clearCache()

8. Catalyst オプティマイザと Tungsten 実行エンジン

8.1 Catalyst オプティマイザ

Catalystは Spark SQL の高度なクエリ最適化エンジンであり、以下の4フェーズで最適化を行う:

Unresolved       Resolved        Optimized       Physical
Logical Plan --> Logical Plan --> Logical Plan --> Plans --> Selected Physical Plan
                                                           --> RDD (実行)
   (1)             (2)              (3)            (4)
  解析          カタログ参照      ルールベース     コスト
               型解決            最適化            ベース
                                                  最適化

フェーズ1: 解析(Analysis)

  • テーブル名、カラム名の解決
  • カタログからのメタデータ取得
  • 型の推論と検証

フェーズ2: 論理最適化(Logical Optimization)

主要な最適化ルール:

最適化ルール説明
Predicate Pushdownフィルタ条件をデータソースに近い場所に移動
Column Pruning必要なカラムのみを読み込み
Constant Folding定数式をコンパイル時に評価
Combine Filters複数のフィルタを結合
Null PropagationNULL値の伝播を最適化
Boolean Simplificationブール式の簡素化

フェーズ3: 物理計画(Physical Planning)

  • 複数の物理計画候補を生成
  • コストモデルに基づいて最適な計画を選択
  • Join戦略の決定(BroadcastHashJoin, SortMergeJoin, ShuffledHashJoin)
# 実行計画の確認
df_result = df.join(df_orders, "user_id").filter(F.col("age") > 25)

# 論理計画と物理計画を表示
df_result.explain(mode="extended")

# フォーマットされた実行計画
df_result.explain(mode="formatted")

# コスト情報付き
df_result.explain(mode="cost")

# コード生成の確認
df_result.explain(mode="codegen")

8.2 Adaptive Query Execution(AQE)

Spark 3.0で導入されたAQEは、実行時の統計情報に基づいて動的にクエリプランを最適化する。

# AQEの有効化(Spark 3.2以降はデフォルトで有効)
spark.conf.set("spark.sql.adaptive.enabled", "true")

# AQEの主要設定
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1m")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "200")

spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

AQEの主要機能:

  1. 動的パーティション結合(Coalescing Post-Shuffle Partitions): シャッフル後の小さすぎるパーティションを統合
  2. 動的Join戦略切り替え: ランタイムデータサイズに基づいてSortMergeJoinからBroadcastHashJoinへ切り替え
  3. スキューJoin最適化: データスキューのあるパーティションを自動的に分割

8.3 Tungsten 実行エンジン

Tungsten はSpark のメモリ・CPU最適化エンジンであり、以下の技術を実装:

  1. オフヒープメモリ管理: JVMヒープの外にデータを配置し、GCオーバーヘッドを削減
  2. Whole-Stage Code Generation: 複数の演算子を単一のJavaメソッドにコンパイル
  3. カスタムメモリレイアウト: Java オブジェクトのオーバーヘッドを排除するバイナリ形式
# Whole-Stage Code Generationの有効化(デフォルトで有効)
spark.conf.set("spark.sql.codegen.wholeStage", "true")

# コード生成のフォールバック閾値
spark.conf.set("spark.sql.codegen.fallback", "true")
spark.conf.set("spark.sql.codegen.maxFields", "100")

9. Structured Streaming

9.1 概要

Structured Streaming は Spark のスケーラブルかつ耐障害性のあるストリーム処理エンジンであり、静的データと同じ DataFrame/Dataset API でストリーミングデータを処理できる。

9.2 基本的なストリーミング処理

# Kafkaからのストリーム読み込み
df_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", "100000") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .load()

# メッセージのパース
from pyspark.sql.types import StructType, StructField, StringType, LongType

event_schema = StructType([
    StructField("user_id", LongType()),
    StructField("event_type", StringType()),
    StructField("timestamp", LongType()),
    StructField("properties", StringType())
])

parsed_stream = df_stream \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp") \
    .select(
        F.col("key"),
        F.from_json(F.col("value"), event_schema).alias("event"),
        F.col("timestamp").alias("kafka_timestamp")
    ) \
    .select("key", "event.*", "kafka_timestamp")

# ウィンドウ集約
windowed_counts = parsed_stream \
    .withWatermark("kafka_timestamp", "10 minutes") \
    .groupBy(
        F.window("kafka_timestamp", "5 minutes", "1 minute"),
        "event_type"
    ) \
    .agg(
        F.count("*").alias("event_count"),
        F.approx_count_distinct("user_id").alias("unique_users")
    )

# 出力
query = windowed_counts.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()

9.3 出力モード

出力モード説明用途
append新しい行のみ出力フィルタリング、マッピング
update更新された行のみ出力集約クエリ
complete全結果を毎回出力集約結果の完全出力

9.4 出力シンク

# Kafkaへの出力
query = result_stream.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker1:9092") \
    .option("topic", "processed-events") \
    .option("checkpointLocation", "hdfs:///checkpoints/kafka-sink") \
    .trigger(processingTime="1 minute") \
    .start()

# Delta Lakeへの出力
query = result_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "s3://bucket/checkpoints/delta-sink") \
    .option("mergeSchema", "true") \
    .partitionBy("event_date") \
    .start("s3://bucket/delta-table")

# foreachBatch(カスタム処理)
def write_to_multiple_sinks(batch_df, batch_id):
    # PostgreSQLに書き出し
    batch_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://db:5432/analytics") \
        .option("dbtable", "events") \
        .mode("append") \
        .save()
    
    # Parquetにも書き出し
    batch_df.write \
        .mode("append") \
        .parquet(f"hdfs:///archive/events/batch_{batch_id}")

query = result_stream.writeStream \
    .foreachBatch(write_to_multiple_sinks) \
    .option("checkpointLocation", "hdfs:///checkpoints/multi-sink") \
    .start()

9.5 ウォーターマークと遅延データ処理

# ウォーターマーク: イベント時刻ベースの遅延データの許容範囲を定義
windowed_agg = parsed_stream \
    .withWatermark("event_timestamp", "30 minutes") \
    .groupBy(
        F.window("event_timestamp", "10 minutes"),
        "event_type"
    ) \
    .count()

ウォーターマークの仕組み:

  • ウォーターマーク = 最大イベント時刻 - 閾値
  • ウォーターマークより古いイベントは破棄される
  • State(状態)のクリーンアップに使用される

9.6 トリガーモード

# Processing Time トリガー(定期実行)
.trigger(processingTime="30 seconds")

# Once トリガー(1回だけ実行して停止)
.trigger(once=True)

# Available Now トリガー(利用可能なデータを全て処理して停止)
.trigger(availableNow=True)

# Continuous トリガー(低レイテンシ、実験的)
.trigger(continuous="1 second")

10. MLlib(機械学習ライブラリ)

10.1 概要

MLlibは Spark の分散機械学習ライブラリであり、大規模データセット上での機械学習パイプラインの構築を支援する。Spark 2.0以降、DataFrame ベースの spark.ml パッケージが主要APIとなっている。

10.2 ML パイプライン

from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    StringIndexer, VectorAssembler, StandardScaler,
    OneHotEncoder, Imputer, Bucketizer
)
from pyspark.ml.classification import (
    LogisticRegression, RandomForestClassifier, GBTClassifier
)
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator, MulticlassClassificationEvaluator
)
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# データの準備
training_data = spark.read.parquet("hdfs:///data/ml/training.parquet")

# 特徴量エンジニアリング
# カテゴリ変数のエンコーディング
category_indexer = StringIndexer(
    inputCol="category", outputCol="category_index"
)
category_encoder = OneHotEncoder(
    inputCol="category_index", outputCol="category_vec"
)

# 欠損値の補完
imputer = Imputer(
    inputCols=["age", "income"],
    outputCols=["age_imputed", "income_imputed"],
    strategy="median"
)

# 特徴量の結合
assembler = VectorAssembler(
    inputCols=["age_imputed", "income_imputed", "category_vec"],
    outputCol="features_raw"
)

# スケーリング
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

# モデル
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=100,
    regParam=0.01,
    elasticNetParam=0.8
)

# パイプラインの構築
pipeline = Pipeline(stages=[
    category_indexer,
    category_encoder,
    imputer,
    assembler,
    scaler,
    lr
])

# ハイパーパラメータチューニング
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.001, 0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [50, 100, 200]) \
    .build()

evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    metricName="areaUnderROC"
)

cross_validator = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=5,
    parallelism=4
)

# モデルのトレーニング
cv_model = cross_validator.fit(training_data)

# 最良モデルの取得
best_model = cv_model.bestModel

# 予測
test_data = spark.read.parquet("hdfs:///data/ml/test.parquet")
predictions = cv_model.transform(test_data)

# 評価
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

# モデルの保存
cv_model.write().overwrite().save("hdfs:///models/lr_model")

# モデルの読み込み
from pyspark.ml.tuning import CrossValidatorModel
loaded_model = CrossValidatorModel.load("hdfs:///models/lr_model")

10.3 サポートされるアルゴリズム

カテゴリアルゴリズム
分類LogisticRegression, RandomForest, GBT, SVM, NaiveBayes, MultilayerPerceptron
回帰LinearRegression, RandomForest, GBT, DecisionTree, IsotonicRegression
クラスタリングKMeans, BisectingKMeans, GaussianMixture, LDA
協調フィルタリングALS (Alternating Least Squares)
特徴量TF-IDF, Word2Vec, PCA, ChiSqSelector, VectorAssembler
評価BinaryClassification, MulticlassClassification, Regression, Ranking

11. GraphX / GraphFrames

11.1 GraphX の概要

GraphXはSpark上のグラフ処理APIであり、RDDベースで実装されている。頂点(Vertex)とエッジ(Edge)からなるグラフ構造を表現し、グラフアルゴリズムを分散実行できる。

import org.apache.spark.graphx._

// 頂点RDDの作成
val vertices: RDD[(VertexId, String)] = sc.parallelize(Seq(
  (1L, "Alice"),
  (2L, "Bob"),
  (3L, "Charlie"),
  (4L, "Diana")
))

// エッジRDDの作成
val edges: RDD[Edge[String]] = sc.parallelize(Seq(
  Edge(1L, 2L, "friend"),
  Edge(2L, 3L, "colleague"),
  Edge(3L, 4L, "friend"),
  Edge(1L, 4L, "family")
))

// グラフの作成
val graph = Graph(vertices, edges)

// PageRank
val pageRank = graph.pageRank(0.001)
pageRank.vertices.sortBy(_._2, ascending = false).take(5)

// Connected Components
val cc = graph.connectedComponents()

// Triangle Counting
val triangles = graph.triangleCount()

11.2 GraphFrames(DataFrameベース)

GraphFrames は DataFrame ベースのグラフ処理ライブラリであり、PySpark からも利用可能である。

from graphframes import GraphFrame

# 頂点DataFrame
vertices = spark.createDataFrame([
    ("1", "Alice", 34),
    ("2", "Bob", 36),
    ("3", "Charlie", 30),
    ("4", "Diana", 29)
], ["id", "name", "age"])

# エッジDataFrame
edges = spark.createDataFrame([
    ("1", "2", "friend"),
    ("2", "3", "colleague"),
    ("3", "4", "friend"),
    ("1", "4", "family")
], ["src", "dst", "relationship"])

# GraphFrameの作成
g = GraphFrame(vertices, edges)

# 基本クエリ
g.vertices.show()
g.edges.show()
g.degrees.show()

# PageRank
results = g.pageRank(resetProbability=0.15, maxIter=20)
results.vertices.select("id", "name", "pagerank") \
    .orderBy(F.desc("pagerank")).show()

# 最短経路
shortest_paths = g.shortestPaths(landmarks=["1", "4"])
shortest_paths.show()

# モチーフ検索(パターンマッチング)
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(c)")
motifs.filter("a.id != c.id").show()

12. デプロイメントモード

12.1 クラスターマネージャーの比較

特徴StandaloneYARNKubernetesMesos
セットアップ簡単Hadoop必須K8s必須やや複雑
リソース共有限定的高度高度高度
スケーリング静的動的動的動的
コンテナ化なしなしネイティブDocker対応
監視Web UIYARN UIK8s DashboardMesos UI
推奨環境開発/テストHadoopクラスタークラウドネイティブレガシー

12.2 Standalone モード

# マスターの起動
$SPARK_HOME/sbin/start-master.sh

# ワーカーの起動
$SPARK_HOME/sbin/start-worker.sh spark://master-host:7077

# アプリケーション投入
spark-submit \
    --master spark://master-host:7077 \
    --deploy-mode cluster \
    --executor-memory 8g \
    --executor-cores 4 \
    --num-executors 10 \
    --conf spark.driver.memory=4g \
    my_app.py

12.3 YARN モード

# Client モード(Driverがsubmit元マシンで動作)
spark-submit \
    --master yarn \
    --deploy-mode client \
    --executor-memory 8g \
    --executor-cores 4 \
    --num-executors 10 \
    --queue production \
    --conf spark.yarn.maxAppAttempts=2 \
    --conf spark.yarn.am.memory=2g \
    --conf spark.yarn.executor.memoryOverhead=2g \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=5 \
    --conf spark.dynamicAllocation.maxExecutors=50 \
    --conf spark.dynamicAllocation.executorIdleTimeout=60s \
    --files hdfs:///config/app.conf \
    --py-files dependencies.zip \
    my_app.py

# Cluster モード(DriverがYARNコンテナ内で動作)
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --executor-memory 8g \
    --executor-cores 4 \
    --num-executors 10 \
    my_app.py

12.4 Kubernetes モード

# Kubernetesへの投入
spark-submit \
    --master k8s://https://k8s-api-server:6443 \
    --deploy-mode cluster \
    --name spark-app \
    --conf spark.executor.instances=10 \
    --conf spark.kubernetes.container.image=spark:3.5.0 \
    --conf spark.kubernetes.namespace=spark-jobs \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.executor.request.cores=2 \
    --conf spark.kubernetes.executor.limit.cores=4 \
    --conf spark.kubernetes.driver.request.cores=1 \
    --conf spark.kubernetes.driver.limit.cores=2 \
    --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-data.mount.path=/data \
    --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-data.options.claimName=spark-pvc \
    --conf spark.kubernetes.driver.pod.name=spark-driver \
    --conf spark.kubernetes.node.selector.pool=spark-pool \
    local:///opt/spark/app/my_app.py

Kubernetes用 Pod Template の例:

# spark-driver-template.yaml
apiVersion: v1
kind: Pod
spec:
  containers:
    - name: spark-driver
      resources:
        requests:
          memory: "4Gi"
          cpu: "2"
        limits:
          memory: "8Gi"
          cpu: "4"
      volumeMounts:
        - name: spark-config
          mountPath: /opt/spark/conf
  volumes:
    - name: spark-config
      configMap:
        name: spark-config
  tolerations:
    - key: "spark"
      operator: "Equal"
      value: "true"
      effect: "NoSchedule"
  nodeSelector:
    node-pool: spark-driver

12.5 Spark Connect(Spark 3.4+)

Spark Connectは、クライアント-サーバーアーキテクチャによりSparkクラスターとの薄い接続を実現する新しいインターフェースである。

# Spark Connectサーバーの起動
# $SPARK_HOME/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.0

# クライアントからの接続
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .remote("sc://spark-server:15002") \
    .getOrCreate()

# 通常通りDataFrame APIを使用
df = spark.read.parquet("hdfs:///data/users.parquet")
df.filter(F.col("age") > 25).show()

13. 設定とチューニング

13.1 主要な設定パラメータ

アプリケーション設定

# spark-defaults.conf

# ===== アプリケーション基本設定 =====
spark.app.name                          MySparkApplication
spark.master                            yarn

# ===== Driver設定 =====
spark.driver.memory                     4g
spark.driver.cores                      2
spark.driver.maxResultSize              2g
spark.driver.memoryOverhead             1g

# ===== Executor設定 =====
spark.executor.memory                   8g
spark.executor.cores                    4
spark.executor.instances                10
spark.executor.memoryOverhead           2g
spark.executor.heartbeatInterval        30s

# ===== Dynamic Allocation =====
spark.dynamicAllocation.enabled                     true
spark.dynamicAllocation.minExecutors                5
spark.dynamicAllocation.maxExecutors                100
spark.dynamicAllocation.initialExecutors            10
spark.dynamicAllocation.executorIdleTimeout         60s
spark.dynamicAllocation.schedulerBacklogTimeout     1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
spark.shuffle.service.enabled                       true

# ===== メモリ設定 =====
spark.memory.fraction                   0.6
spark.memory.storageFraction            0.5
spark.memory.offHeap.enabled            true
spark.memory.offHeap.size               4g

# ===== シリアライゼーション =====
spark.serializer                        org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max         256m
spark.kryo.registrationRequired         false

# ===== ネットワーク =====
spark.network.timeout                   600s
spark.rpc.message.maxSize               256
spark.rpc.askTimeout                    300s

# ===== SQL設定 =====
spark.sql.shuffle.partitions            200
spark.sql.adaptive.enabled              true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.skewJoin.enabled     true
spark.sql.autoBroadcastJoinThreshold    50m
spark.sql.broadcastTimeout              300
spark.sql.parquet.compression.codec     snappy
spark.sql.parquet.mergeSchema           false
spark.sql.hive.convertMetastoreParquet  true
spark.sql.files.maxPartitionBytes       128m
spark.sql.files.openCostInBytes         4m

# ===== シャッフル設定 =====
spark.shuffle.compress                  true
spark.shuffle.spill.compress            true
spark.shuffle.file.buffer               64k
spark.shuffle.io.maxRetries             10
spark.shuffle.io.retryWait              5s
spark.reducer.maxSizeInFlight           96m

# ===== 圧縮 =====
spark.io.compression.codec              lz4
spark.io.compression.lz4.blockSize      64k

# ===== 推測実行 =====
spark.speculation                       true
spark.speculation.interval              100ms
spark.speculation.multiplier            1.5
spark.speculation.quantile              0.9

13.2 環境別設定例

開発環境

spark = SparkSession.builder \
    .appName("DevApp") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.ui.showConsoleProgress", "true") \
    .config("spark.log.level", "WARN") \
    .getOrCreate()

本番環境(大規模ETL)

spark = SparkSession.builder \
    .appName("ProductionETL") \
    .master("yarn") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memoryOverhead", "4g") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "10") \
    .config("spark.dynamicAllocation.maxExecutors", "200") \
    .config("spark.sql.shuffle.partitions", "1000") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.parquet.compression.codec", "zstd") \
    .config("spark.sql.files.maxPartitionBytes", "256m") \
    .config("spark.network.timeout", "800s") \
    .config("spark.speculation", "true") \
    .enableHiveSupport() \
    .getOrCreate()

本番環境(ストリーミング)

spark = SparkSession.builder \
    .appName("ProductionStreaming") \
    .master("yarn") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.instances", "20") \
    .config("spark.streaming.backpressure.enabled", "true") \
    .config("spark.streaming.kafka.maxRatePerPartition", "5000") \
    .config("spark.sql.streaming.stateStore.providerClass",
            "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
    .config("spark.sql.streaming.stateStore.rocksdb.compactOnCommit", "true") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.sql.adaptive.enabled", "false") \
    .getOrCreate()

14. メモリ管理

14.1 Spark のメモリ構造

┌─────────────────────────────────────────────────────────┐
│                    Executor Memory                       │
│                  (spark.executor.memory)                  │
│                                                          │
│  ┌────────────────────────────────────────────────────┐  │
│  │          Unified Memory (60%)                       │  │
│  │        (spark.memory.fraction = 0.6)                │  │
│  │                                                     │  │
│  │  ┌──────────────────┐  ┌──────────────────────┐    │  │
│  │  │  Execution Memory│  │   Storage Memory     │    │  │
│  │  │  (シャッフル、    │  │   (キャッシュ、      │    │  │
│  │  │   ソート、結合)   │◄─►│   ブロードキャスト) │    │  │
│  │  │                  │  │                      │    │  │
│  │  └──────────────────┘  └──────────────────────┘    │  │
│  │       動的に境界が移動(相互に貸借可能)              │  │
│  └────────────────────────────────────────────────────┘  │
│                                                          │
│  ┌────────────────────────────────────────────────────┐  │
│  │          User Memory (40%)                          │  │
│  │     (データ構造、UDFの変数、メタデータ)               │  │
│  └────────────────────────────────────────────────────┘  │
│                                                          │
│  ┌────────────────────────────────────────────────────┐  │
│  │          Reserved Memory (300MB固定)                │  │
│  │          (Spark内部オブジェクト)                      │  │
│  └────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│              Memory Overhead                             │
│         (spark.executor.memoryOverhead)                   │
│      (オフヒープ、NIO、JVMオーバーヘッド)                  │
│      デフォルト: MAX(384MB, executor.memory * 0.10)       │
└─────────────────────────────────────────────────────────┘

14.2 メモリ関連の設定

# 統合メモリの割合(実行 + ストレージ)
spark.conf.set("spark.memory.fraction", "0.6")

# ストレージメモリの初期割合(統合メモリ内)
spark.conf.set("spark.memory.storageFraction", "0.5")

# オフヒープメモリ
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")

# Executor メモリオーバーヘッド
spark.conf.set("spark.executor.memoryOverhead", "2g")

# PySpark用メモリ(Python workerプロセス)
spark.conf.set("spark.executor.pyspark.memory", "2g")

14.3 メモリチューニングの指針

Executorのメモリサイズ設計:

Total Node Memory: 64GB
OS Reserved:        4GB
Remaining:         60GB
Number of Executors per Node: 3 (各5コア)

Per Executor:
  spark.executor.memory:          16g
  spark.executor.memoryOverhead:   4g  (16g * 0.25)
  Total per Executor:             20g
  
  3 Executors × 20g = 60g ✓

大きすぎるExecutorの問題:

  • GCの停止時間が長くなる(>64GBは推奨しない)
  • HDFS I/Oの並列度が下がる

小さすぎるExecutorの問題:

  • ブロードキャスト変数のメモリが不足
  • 複数のタスクがメモリを共有できない

推奨値:

  • spark.executor.cores: 4〜5コア
  • spark.executor.memory: 16〜32GB
  • spark.executor.memoryOverhead: executor.memory の 10〜25%

15. シャッフルとパーティショニング

15.1 シャッフルの仕組み

シャッフルは、データを複数のノード間で再配置する操作であり、Sparkで最もコストの高い処理の一つである。

Stage 1 (Map Side)              Stage 2 (Reduce Side)
┌─────────────┐                 ┌─────────────┐
│ Partition 0  │──┐          ┌──│ Partition 0  │
│ (Sort+Write) │  │          │  │ (Merge+Read) │
├─────────────┤  │  Shuffle  │  ├─────────────┤
│ Partition 1  │──┼──────────┼──│ Partition 1  │
│ (Sort+Write) │  │  Service  │  │ (Merge+Read) │
├─────────────┤  │          │  ├─────────────┤
│ Partition 2  │──┘          └──│ Partition 2  │
│ (Sort+Write) │                │ (Merge+Read) │
└─────────────┘                 └─────────────┘

シャッフルを引き起こす操作:

  • groupByKey, reduceByKey, aggregateByKey
  • join, cogroup
  • repartition
  • distinct
  • sortByKey

15.2 パーティション数の最適化

# シャッフルパーティション数の設定
spark.conf.set("spark.sql.shuffle.partitions", "200")  # デフォルト200

# 推奨: データサイズに応じて設定
# パーティション数 = データサイズ / ターゲットパーティションサイズ
# ターゲットパーティションサイズ: 128MB〜256MB

# AQEを使用した自動最適化(推奨)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64m")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")

# 手動でパーティション数を確認・変更
print(f"パーティション数: {df.rdd.getNumPartitions()}")

# repartition: ハッシュベースの再分割(シャッフルあり)
df = df.repartition(100)
df = df.repartition(100, "user_id")  # カラムベースの再分割

# coalesce: パーティション削減(シャッフルなし、効率的)
df = df.coalesce(10)

15.3 データスキューへの対処

データスキュー(特定のキーにデータが偏る)は、ジョブの遅延の主要原因である。

# 方法1: AQEスキューJoin最適化(Spark 3.0+、推奨)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

# 方法2: Salting(手動スキュー解消)
import random

# スキューキーにソルト値を付加
num_salts = 10
df_skewed = df_skewed.withColumn(
    "salted_key",
    F.concat(F.col("join_key"), F.lit("_"), F.lit(random.randint(0, num_salts - 1)))
)

# 結合先もソルト値で展開
df_lookup_exploded = df_lookup.crossJoin(
    spark.range(num_salts).withColumnRenamed("id", "salt")
).withColumn(
    "salted_key",
    F.concat(F.col("join_key"), F.lit("_"), F.col("salt"))
)

# ソルト付きキーで結合
result = df_skewed.join(df_lookup_exploded, "salted_key")

# 方法3: Broadcast Joinの活用(小テーブルの場合)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m")
result = df_large.join(F.broadcast(df_small), "key")

16. パフォーマンス最適化

16.1 データフォーマットの選択

フォーマット圧縮カラムナースキーマ分割可能推奨用途
Parquet埋め込み分析・DWH
ORC埋め込みHive連携
Avro埋め込みストリーミング
JSONなしデータ交換
CSVなしなし互換性重視
Delta LakeメタデータACID対応
# Parquet(推奨)
df.write \
    .mode("overwrite") \
    .option("compression", "zstd") \
    .partitionBy("year", "month") \
    .bucketBy(256, "user_id") \
    .sortBy("event_timestamp") \
    .saveAsTable("analytics.user_events")

16.2 パーティショニング戦略

# ファイルパーティショニング(Hive-style)
df.write \
    .partitionBy("country", "date") \
    .parquet("hdfs:///data/events")

# ディレクトリ構造:
# hdfs:///data/events/country=US/date=2024-01-01/part-00000.parquet
# hdfs:///data/events/country=JP/date=2024-01-01/part-00000.parquet

# Bucket分割(Join最適化)
df.write \
    .bucketBy(256, "user_id") \
    .sortBy("user_id") \
    .saveAsTable("user_events_bucketed")

# Bucket Joinの実行(シャッフル不要)
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.bucketing.autoBucketedScan.enabled", "true")

df1 = spark.table("user_events_bucketed")
df2 = spark.table("user_profiles_bucketed")  # 同じbucket数
result = df1.join(df2, "user_id")  # シャッフルが発生しない

16.3 よくあるアンチパターンと対策

アンチパターン問題対策
collect() の乱用Driver のOOMtake(), show(), ファイル出力を使用
groupByKey() の使用全データをシャッフルreduceByKey(), aggregateByKey() を使用
小ファイル問題メタデータオーバーヘッドcoalesce(), repartition() で適切なサイズに
UDFの過剰使用Catalyst最適化が効かない組み込み関数を優先使用
キャッシュの忘れ同一データの再計算繰り返し使うDataFrameは cache()
不要なキャッシュメモリ浪費使い終わったら unpersist()
count() によるデバッグ不要なジョブ実行本番コードから削除
Cartesian Joinデータ爆発条件付きJoinに変更

17. セキュリティ

17.1 認証・認可

# Kerberos認証
spark.kerberos.keytab             /etc/security/keytabs/spark.keytab
spark.kerberos.principal          spark/hostname@REALM.COM
spark.yarn.keytab                 /etc/security/keytabs/spark.keytab
spark.yarn.principal              spark/hostname@REALM.COM

# 暗号化
spark.authenticate                true
spark.authenticate.secret         my-shared-secret
spark.network.crypto.enabled      true
spark.network.crypto.keyLength    256
spark.network.crypto.keyFactoryAlgorithm PBKDF2WithHmacSHA256

# SSL/TLS
spark.ssl.enabled                 true
spark.ssl.keyStore                /path/to/keystore.jks
spark.ssl.keyStorePassword        keystore-password
spark.ssl.trustStore              /path/to/truststore.jks
spark.ssl.trustStorePassword      truststore-password
spark.ssl.protocol                TLSv1.3

# ACL(アクセス制御リスト)
spark.acls.enable                 true
spark.admin.acls                  admin1,admin2
spark.modify.acls                 user1,user2
spark.ui.view.acls                *

17.2 データ保護

# カラムレベルの暗号化
from pyspark.sql.functions import sha2, aes_encrypt, aes_decrypt

# ハッシュ化(不可逆)
df = df.withColumn("email_hash", sha2(F.col("email"), 256))

# AES暗号化(可逆)
key = "0123456789abcdef"  # 16バイトキー
df_encrypted = df.withColumn(
    "ssn_encrypted",
    F.base64(aes_encrypt(F.col("ssn"), F.lit(key)))
)

# 復号
df_decrypted = df_encrypted.withColumn(
    "ssn_decrypted",
    aes_decrypt(F.unbase64(F.col("ssn_encrypted")), F.lit(key)).cast("string")
)

# データマスキング
df = df.withColumn(
    "phone_masked",
    F.concat(F.lit("***-***-"), F.substring(F.col("phone"), -4, 4))
)

18. 監視とデバッグ

18.1 Spark Web UI

Spark Web UIはポート4040(デフォルト)で提供され、以下の情報を確認できる:

タブ確認可能な情報
Jobsジョブ一覧、各ジョブのステージ構成、実行時間
Stagesステージ一覧、タスク分布、シャッフルサイズ
StorageキャッシュされたRDD/DataFrame、メモリ使用状況
Environment設定値一覧、JVM情報
ExecutorsExecutor一覧、メモリ・ディスク使用状況、GC時間
SQLSQL実行計画、クエリ統計情報
Streamingストリーミングクエリの状態、処理レート

18.2 メトリクス設定

# metrics.properties

# CSVレポーター
*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
*.sink.csv.period=10
*.sink.csv.unit=seconds
*.sink.csv.directory=/var/log/spark/metrics

# Prometheusレポーター(Spark 3.0+)
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics/prometheus

# Graphiteレポーター
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=graphite-host
*.sink.graphite.port=2003
*.sink.graphite.prefix=spark
*.sink.graphite.period=10
*.sink.graphite.unit=seconds

# JMX
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink

18.3 ログ設定

# log4j2.properties

rootLogger.level = WARN
rootLogger.appenderRef.stdout.ref = console

# Sparkのログレベル
logger.spark.name = org.apache.spark
logger.spark.level = WARN

# SQLのログ
logger.sql.name = org.apache.spark.sql
logger.sql.level = WARN

# カスタムアプリケーションのログ
logger.myapp.name = com.mycompany.myapp
logger.myapp.level = INFO

# イベントログ(History Server用)
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs:///spark-history
# spark.eventLog.compress true

18.4 デバッグテクニック

# 実行計画の確認
df.explain(mode="formatted")

# パーティション数の確認
print(f"パーティション数: {df.rdd.getNumPartitions()}")

# パーティションサイズの確認
partition_sizes = df.rdd.mapPartitions(
    lambda it: [sum(1 for _ in it)]
).collect()
print(f"パーティションサイズ分布: {partition_sizes}")

# データスキューの検出
df.groupBy("key_column") \
    .count() \
    .orderBy(F.desc("count")) \
    .show(20)

# Spark UIへのジョブ説明追加
spark.sparkContext.setJobDescription("Loading user data")
df = spark.read.parquet("hdfs:///data/users")

spark.sparkContext.setJobDescription("Aggregating sales by region")
result = df.groupBy("region").agg(F.sum("amount"))

18.5 Spark History Server

# History Serverの起動
$SPARK_HOME/sbin/start-history-server.sh

# 設定
spark.history.fs.logDirectory       hdfs:///spark-history
spark.history.ui.port               18080
spark.history.retainedApplications  50
spark.history.fs.cleaner.enabled    true
spark.history.fs.cleaner.interval   1d
spark.history.fs.cleaner.maxAge     7d

19. エコシステム連携

19.1 主要なエコシステム連携

┌─────────────────────────────────────────────────────────┐
│                    Apache Spark                          │
├──────────┬──────────┬──────────┬──────────┬─────────────┤
│ストレージ │メッセージ │メタデータ │ BI/可視化  │ ワークフロー  │
├──────────┼──────────┼──────────┼──────────┼─────────────┤
│ HDFS     │ Kafka    │ Hive MS  │ Tableau  │ Airflow     │
│ S3       │ Kinesis  │ AWS Glue │ Superset │ Dagster     │
│ GCS      │ Pulsar   │ Unity    │ Metabase │ Prefect     │
│ ADLS     │ Event    │ Catalog  │ Looker   │ Oozie       │
│ Delta    │  Hubs    │ Nessie   │ Redash   │ Luigi       │
│ Iceberg  │          │ Polaris  │          │             │
│ Hudi     │          │          │          │             │
└──────────┴──────────┴──────────┴──────────┴─────────────┘

19.2 Apache Kafka との連携

# Kafka -> Spark -> Kafka パイプライン
df_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
    .option("subscribe", "input-topic") \
    .option("startingOffsets", "latest") \
    .option("kafka.group.id", "spark-consumer-group") \
    .option("failOnDataLoss", "false") \
    .load()

# 処理
processed = df_stream \
    .selectExpr("CAST(value AS STRING) as json_str") \
    .select(F.from_json("json_str", schema).alias("data")) \
    .select("data.*") \
    .filter(F.col("status") == "active") \
    .withColumn("processed_at", F.current_timestamp())

# Kafkaへ出力
query = processed \
    .selectExpr("CAST(user_id AS STRING) AS key",
                "to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092") \
    .option("topic", "output-topic") \
    .option("checkpointLocation", "hdfs:///checkpoints/kafka-pipeline") \
    .start()

19.3 Delta Lake との連携

# Delta Lakeの依存関係追加
# --packages io.delta:delta-spark_2.12:3.1.0

from delta.tables import DeltaTable

# Delta テーブルの作成
df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("date") \
    .save("s3://bucket/delta/events")

# MERGE(Upsert)操作
delta_table = DeltaTable.forPath(spark, "s3://bucket/delta/events")

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdate(set={
    "name": "source.name",
    "updated_at": "source.updated_at"
}).whenNotMatchedInsertAll() \
.execute()

# タイムトラベル
df_yesterday = spark.read \
    .format("delta") \
    .option("timestampAsOf", "2024-01-15") \
    .load("s3://bucket/delta/events")

# バキューム(古いファイルの削除)
delta_table.vacuum(168)  # 7日より古いファイルを削除

19.4 Apache Airflow との連携

# Airflow DAG定義例
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'spark_etl_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # 毎日02:00
    catchup=False,
)

spark_job = SparkSubmitOperator(
    task_id='run_spark_etl',
    application='/opt/spark/jobs/etl_job.py',
    conn_id='spark_default',
    conf={
        'spark.executor.memory': '8g',
        'spark.executor.cores': '4',
        'spark.dynamicAllocation.enabled': 'true',
    },
    application_args=['--date', '{{ ds }}'],
    dag=dag,
)

20. ユースケースとベストプラクティス

20.1 代表的なユースケース

ETL パイプライン

def run_etl(date: str):
    """日次ETLジョブ"""
    
    # 1. Extract(抽出)
    raw_events = spark.read \
        .format("json") \
        .schema(event_schema) \
        .load(f"s3://raw-data/events/date={date}/")
    
    # 2. Transform(変換)
    transformed = raw_events \
        .filter(F.col("event_type").isNotNull()) \
        .withColumn("event_hour", F.hour("event_timestamp")) \
        .withColumn("country", F.upper(F.col("country_code"))) \
        .dropDuplicates(["event_id"]) \
        .withColumn("processed_at", F.current_timestamp())
    
    # 3. データ品質チェック
    total_count = transformed.count()
    null_count = transformed.filter(F.col("user_id").isNull()).count()
    null_ratio = null_count / total_count if total_count > 0 else 0
    
    if null_ratio > 0.05:
        raise ValueError(f"NULL率が閾値超過: {null_ratio:.2%}")
    
    # 4. Load(ロード)
    transformed.write \
        .format("delta") \
        .mode("overwrite") \
        .option("replaceWhere", f"date = '{date}'") \
        .partitionBy("date", "country") \
        .save("s3://processed-data/events")
    
    print(f"ETL完了: {total_count}件処理")

リアルタイムダッシュボード

def streaming_aggregation():
    """リアルタイム集計ストリーミング"""
    
    events = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("subscribe", "page-views") \
        .load()
    
    parsed = events \
        .select(F.from_json(
            F.col("value").cast("string"), 
            page_view_schema
        ).alias("data")) \
        .select("data.*")
    
    # 5分ウィンドウでの集計
    aggregated = parsed \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            F.window("timestamp", "5 minutes"),
            "page_url"
        ) \
        .agg(
            F.count("*").alias("view_count"),
            F.approx_count_distinct("user_id").alias("unique_visitors"),
            F.avg("load_time_ms").alias("avg_load_time")
        )
    
    # JDBCシンク
    def write_to_db(batch_df, batch_id):
        batch_df.select(
            F.col("window.start").alias("window_start"),
            "page_url", "view_count", "unique_visitors", "avg_load_time"
        ).write \
            .format("jdbc") \
            .option("url", "jdbc:postgresql://db:5432/analytics") \
            .option("dbtable", "realtime_page_stats") \
            .mode("append") \
            .save()
    
    query = aggregated.writeStream \
        .foreachBatch(write_to_db) \
        .option("checkpointLocation", "hdfs:///cp/dashboard") \
        .trigger(processingTime="1 minute") \
        .start()
    
    return query

20.2 ベストプラクティスまとめ

  1. DataFrame API を RDD API より優先使用する: Catalyst 最適化の恩恵を受けられる
  2. 組み込み関数を UDF より優先使用する: UDFはCatalyst最適化をバイパスする
  3. AQE を有効にする: Spark 3.0+ では spark.sql.adaptive.enabled=true
  4. 適切なパーティション数を設定する: 各パーティション 128MB〜256MB が目安
  5. Broadcast Join を活用する: 小テーブルとの結合はBroadcastが最適
  6. Parquet / Delta Lake 形式を使用する: カラムナーフォーマットの利点を活用
  7. 不要なシャッフルを避ける: coalesce > repartition(パーティション削減時)
  8. キャッシュを適切に管理する: 再利用するDataFrameのみキャッシュし、不要になったら解放
  9. データスキューに対処する: AQE または Salting テクニックを使用
  10. 推測実行を検討する: spark.speculation=true(ストラグラータスク対策)

21. Apache Spark 3.x / 4.x の新機能

21.1 Spark 3.x の主要機能

機能バージョン説明
Adaptive Query Execution3.0ランタイム統計に基づく動的最適化
Dynamic Partition Pruning3.0スタースキーマクエリの高速化
Kubernetes GA3.1Kubernetesサポートの正式版
RocksDB StateStore3.2ストリーミング状態管理の改善
Push-based Shuffle3.2シャッフル性能の向上
Spark Connect3.4クライアント-サーバーアーキテクチャ
Arrow-optimized Python UDF3.5PySpark UDFの大幅高速化

21.2 Spark 4.0 の主要機能

# 1. ANSI Mode がデフォルトで有効
# 型変換の厳密化(暗黙的な型変換がエラーに)
spark.conf.set("spark.sql.ansi.enabled", "true")  # Spark 4.0でデフォルト

# 2. VARIANT 型のサポート
# 半構造化データの効率的な処理
df = spark.sql("""
    SELECT 
        parse_json('{"name": "Alice", "age": 30}') as data
""")

# VARIANT型へのアクセス
df.select(
    F.col("data:name").alias("name"),
    F.col("data:age").cast("int").alias("age")
).show()

# 3. Collation サポート
# 文字列比較のロケール対応
spark.sql("""
    CREATE TABLE users (
        name STRING COLLATE 'UNICODE_CI',  -- 大文字小文字を区別しない
        email STRING
    )
""")

# 4. Python Data Source API V2
# Pythonでカスタムデータソースを実装
class MyDataSource:
    @classmethod
    def schema(cls):
        return "id INT, name STRING, value DOUBLE"
    
    @classmethod
    def reader(cls, schema):
        return MyDataSourceReader()

# 5. Spark Connect の改善
# リモートセッションのセキュリティ強化、カスタムプラグイン対応

22. まとめ

Apache Spark は、バッチ処理、ストリーミング、機械学習、グラフ処理を統合的に扱える強力な分散処理エンジンである。本記事で解説した内容を以下にまとめる:

技術的要点

カテゴリ要点
アーキテクチャDriver-Executor モデル、DAGベースの実行計画
データ抽象化RDD → DataFrame → Dataset の進化
SQL処理Catalyst オプティマイザ + Tungsten エンジン
ストリーミングStructured Streaming によるマイクロバッチ/連続処理
機械学習MLlib Pipeline API による分散ML
デプロイStandalone / YARN / Kubernetes
最適化AQE、Broadcast Join、パーティショニング
最新動向Spark Connect、VARIANT型、ANSI準拠

選定の指針

Apache Spark は以下の要件がある場合に特に効果的である:

  • 大規模バッチETL: TB〜PBスケールのデータ変換
  • インタラクティブ分析: Spark SQL によるアドホッククエリ
  • ストリーミング処理: Kafka等からのニアリアルタイム処理
  • 機械学習パイプライン: 大規模データでのモデル訓練
  • 統一基盤: 上記すべてを単一フレームワークで実現

一方、以下のケースでは他の選択肢も検討すべきである:

  • 超低レイテンシ(ミリ秒単位): Apache Flink の方が適している
  • 小規模データ: Pandas / DuckDB の方がシンプル
  • リアルタイムOLAP: Apache Druid / ClickHouse の方が適している
  • グラフDB用途: Neo4j / Amazon Neptune の方が適している

Apache Spark は継続的に進化しており、Spark 4.0 では ANSI 準拠の強化、VARIANT 型、Collation サポートなど、より現代的なデータ処理要件に対応する機能が追加されている。データエンジニアリングの基盤として、今後も重要な役割を果たし続けるだろう。


参考文献: