Splunk

Splunk 包括的技術ガイド - SREエンジニアのための実践リファレンス


目次

  1. Splunkとは
  2. アーキテクチャ
  3. データ取り込み(Data Ingestion)
  4. SPL(Search Processing Language)包括ガイド
  5. ナレッジオブジェクト
  6. ダッシュボードとビジュアライゼーション
  7. アラート
  8. インデックス管理
  9. セキュリティ
  10. デプロイメントとスケーリング
  11. パフォーマンスチューニング
  12. Splunk自体の監視
  13. インテグレーション
  14. SREのためのベストプラクティス

1. Splunkとは

1.1 概要

Splunkは、マシンデータ(ログ、メトリクス、イベント等)を収集・インデックス化・分析するための統合プラットフォームである。元々はログ管理ツールとして誕生したが、現在ではSIEM(Security Information and Event Management)、ITオペレーション分析、アプリケーションパフォーマンス監視(APM)、ビジネスインテリジェンスまで幅広いユースケースをカバーする。

1.2 主要な機能領域

機能領域説明
ログ管理あらゆるソースからのログデータの収集、パース、保存、検索
SIEMセキュリティイベントの相関分析、脅威検出、コンプライアンスレポート
IT運用分析(ITOA)インフラストラクチャの監視、障害検出、根本原因分析
APMアプリケーションのパフォーマンス監視とトレーシング
ビジネス分析ビジネスメトリクスの可視化とレポーティング
データプラットフォーム構造化・非構造化データの統合的な管理と分析

1.3 Splunkの製品ファミリー

  • Splunk Enterprise: オンプレミスで運用するSplunkのコア製品
  • Splunk Cloud Platform: Splunk社が管理するSaaS版
  • Splunk Enterprise Security (ES): SIEM機能を提供するプレミアムアプリ
  • Splunk ITSI: IT Service Intelligenceによる高度なIT運用監視
  • Splunk SOAR: Security Orchestration, Automation and Responseプラットフォーム
  • Splunk Observability Cloud: マイクロサービスベースの監視(旧SignalFx)
  • Splunk APM: 分散トレーシングとアプリケーションパフォーマンス監視

1.4 マシンデータとは

Splunkが扱う「マシンデータ」とは以下を指す。

  • アプリケーションログ: Apache、Nginx、カスタムアプリケーションのログ
  • システムログ: syslog、Windows Event Log、journald
  • セキュリティログ: ファイアウォール、IDS/IPS、認証ログ
  • ネットワークデータ: NetFlow、パケットキャプチャ、DNS
  • メトリクス: CPU使用率、メモリ、ディスクI/O等の時系列データ
  • 設定ファイル: システム設定の変更履歴
  • APIレスポンス: REST API、Webhook等のデータ
  • IoTデータ: センサーデータ、テレメトリ

1.5 他のツールとの比較

機能SplunkElasticsearch/ELKDatadogGrafana/Loki
ログ検索SPL(強力)Lucene/KQL独自クエリLogQL
SIEMEnterprise SecurityOpenSearch SecurityCloud SIEMなし(別途必要)
スケーラビリティ非常に高い高い高い(SaaS)中程度
コスト高い(ライセンス)OSS(運用コスト)中程度(SaaS)低い(OSS)
学習曲線中程度高い低い低い
リアルタイム処理優秀良好優秀良好

2. アーキテクチャ

2.1 コンポーネント概要

Splunkのアーキテクチャは、複数のコンポーネントが協調して動作する分散システムである。

┌─────────────────────────────────────────────────────────────────────┐
│                    Splunk 分散アーキテクチャ                          │
│                                                                     │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                          │
│  │  データ   │  │  データ   │  │  データ   │  データソース             │
│  │  ソース1  │  │  ソース2  │  │  ソース3  │                          │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘                          │
│       │              │              │                                │
│       v              v              v                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                          │
│  │Universal  │  │Universal  │  │  Heavy   │  フォワーダー層           │
│  │Forwarder  │  │Forwarder  │  │Forwarder │                          │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘                          │
│       │              │              │                                │
│       └──────────────┼──────────────┘                                │
│                      │                                               │
│                      v  (TCP 9997)                                   │
│       ┌──────────────┼──────────────┐                                │
│       │              │              │                                │
│       v              v              v                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                          │
│  │ Indexer 1 │  │ Indexer 2 │  │ Indexer 3 │  インデクサー層          │
│  │(Primary)  │  │(Replica)  │  │(Replica)  │  (Index Cluster)       │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘                          │
│       │              │              │                                │
│       └──────────────┼──────────────┘                                │
│                      │                                               │
│                      v                                               │
│       ┌──────────────┼──────────────┐                                │
│       │              │              │                                │
│       v              v              v                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                          │
│  │Search Head│  │Search Head│  │Search Head│  サーチヘッド層          │
│  │    1      │  │    2      │  │    3      │  (SH Cluster)          │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘                          │
│       │              │              │                                │
│       └──────────────┼──────────────┘                                │
│                      │                                               │
│                      v                                               │
│              ┌──────────────┐                                        │
│              │  ロードバラ   │                                        │
│              │  ンサー       │                                        │
│              └──────┬───────┘                                        │
│                     │                                                │
│                     v                                                │
│              ┌──────────────┐                                        │
│              │   ユーザー    │                                        │
│              │  (Web UI/API) │                                        │
│              └──────────────┘                                        │
│                                                                     │
│  管理コンポーネント:                                                  │
│  ┌───────────────┐ ┌───────────────┐ ┌───────────────┐              │
│  │ Cluster       │ │ Deployment    │ │ License       │              │
│  │ Manager       │ │ Server        │ │ Master        │              │
│  │ (マスター)    │ │ (設定配布)    │ │ (ライセンス)  │              │
│  └───────────────┘ └───────────────┘ └───────────────┘              │
└─────────────────────────────────────────────────────────────────────┘

2.2 各コンポーネントの詳細

2.2.1 フォワーダー(Forwarder)

フォワーダーはデータソースからデータを収集し、インデクサーに転送するエージェントである。

Universal Forwarder (UF)

軽量なデータ転送エージェント。データのパースやインデックス化は行わない。

# /opt/splunkforwarder/etc/system/local/outputs.conf
[tcpout]
defaultGroup = my_indexers

[tcpout:my_indexers]
server = indexer1.example.com:9997, indexer2.example.com:9997, indexer3.example.com:9997
# 自動ロードバランシング
autoLBFrequency = 30
# ロードバランシング時のボリュームベースの切替
autoLBVolume = 1048576
# SSL/TLS設定
sslCertPath = $SPLUNK_HOME/etc/auth/server.pem
sslRootCAPath = $SPLUNK_HOME/etc/auth/cacert.pem
sslPassword = changeme
sslVerifyServerCert = true
useSSL = true

[tcpout:my_indexers:indexer1]
server = indexer1.example.com:9997

[tcpout:my_indexers:indexer2]
server = indexer2.example.com:9997
# /opt/splunkforwarder/etc/system/local/inputs.conf
[monitor:///var/log/syslog]
disabled = false
sourcetype = syslog
index = os_logs

[monitor:///var/log/apache2/access.log]
disabled = false
sourcetype = access_combined
index = web_logs

[monitor:///opt/app/logs/*.log]
disabled = false
sourcetype = custom_app
index = app_logs
# ファイルの先頭から読み込む
initCrcLength = 1024
# ホワイトリスト/ブラックリスト
whitelist = \.log$
blacklist = \.gz$|\.bak$

Heavy Forwarder (HF)

フル機能のSplunkインスタンスで、データのパース、フィルタリング、ルーティングが可能。

# Heavy Forwarderのtransforms.conf
# データのルーティング例
[route_syslog_to_security]
REGEX = (failed|error|denied|unauthorized)
DEST_KEY = _MetaData:Index
FORMAT = security_logs

[route_default]
REGEX = .
DEST_KEY = _MetaData:Index
FORMAT = os_logs

# データのマスキング例
[mask_credit_card]
REGEX = (\d{4})-(\d{4})-(\d{4})-(\d{4})
FORMAT = $1-XXXX-XXXX-$4
DEST_KEY = _raw

[mask_ssn]
REGEX = \d{3}-\d{2}-\d{4}
FORMAT = XXX-XX-XXXX
DEST_KEY = _raw
# Heavy Forwarderのprops.conf
[source::/var/log/secure]
TRANSFORMS-routing = route_syslog_to_security, route_default

[source::/var/log/app/payment.log]
TRANSFORMS-mask = mask_credit_card, mask_ssn

2.2.2 インデクサー(Indexer)

インデクサーはデータの受信、パース、インデックス化、保存を担当する。Splunkの心臓部である。

インデクサーの処理パイプライン:

受信データ → パーシング → ライン分割 → タイムスタンプ抽出 → 
インデックス化 → バケット書き込み → ストレージ
# /opt/splunk/etc/system/local/indexes.conf
[default]
homePath = $SPLUNK_DB/$_index_name/db
coldPath = $SPLUNK_DB/$_index_name/colddb
thawedPath = $SPLUNK_DB/$_index_name/thaweddb
maxTotalDataSizeMB = 500000
maxDataSize = auto_high_volume
maxHotBuckets = 10
maxWarmDBCount = 300
frozenTimePeriodInSecs = 7776000  # 90日

[web_logs]
homePath = $SPLUNK_DB/web_logs/db
coldPath = $SPLUNK_DB/web_logs/colddb
thawedPath = $SPLUNK_DB/web_logs/thaweddb
maxTotalDataSizeMB = 100000
frozenTimePeriodInSecs = 2592000  # 30日
maxDataSize = auto_high_volume

[security_logs]
homePath = $SPLUNK_DB/security_logs/db
coldPath = $SPLUNK_DB/security_logs/colddb
thawedPath = $SPLUNK_DB/security_logs/thaweddb
maxTotalDataSizeMB = 200000
frozenTimePeriodInSecs = 31536000  # 365日
coldToFrozenDir = /archive/splunk/security_logs

[metrics_index]
homePath = $SPLUNK_DB/metrics_index/db
coldPath = $SPLUNK_DB/metrics_index/colddb
thawedPath = $SPLUNK_DB/metrics_index/thaweddb
datatype = metric
maxTotalDataSizeMB = 50000
frozenTimePeriodInSecs = 7776000  # 90日

2.2.3 サーチヘッド(Search Head)

ユーザーインターフェースを提供し、検索クエリの実行、ダッシュボードの表示、レポートの生成を行う。

# /opt/splunk/etc/system/local/distsearch.conf
[distributedSearch]
servers = https://indexer1.example.com:8089, https://indexer2.example.com:8089, https://indexer3.example.com:8089

[replicationSettings]
replicationFactor = 3

# サーチヘッドクラスタの設定
[shclustering]
disabled = false
mgmt_uri = https://sh1.example.com:8089
replication_factor = 2
replication_port = 9200
conf_deploy_fetch_url = https://deployer.example.com:8089
shcluster_label = shcluster1

2.2.4 クラスターマネージャー(Cluster Manager)

旧称Cluster Master。インデックスクラスタの管理を行い、レプリケーションファクタとサーチファクタを維持する。

# /opt/splunk/etc/system/local/server.conf (Cluster Manager)
[clustering]
mode = manager
replication_factor = 3
search_factor = 2
pass4SymmKey = your_cluster_key
cluster_label = idx_cluster_prod

[general]
site = site1

# マルチサイトクラスタ設定
[clustering]
mode = manager
multisite = true
available_sites = site1, site2
site_replication_factor = origin:2, total:3
site_search_factor = origin:1, total:2
# /opt/splunk/etc/system/local/server.conf (Indexer/Peer Node)
[clustering]
mode = peer
manager_uri = https://cm.example.com:8089
pass4SymmKey = your_cluster_key

[replication_port://9100]
disabled = false

[general]
site = site1

2.2.5 デプロイメントサーバー(Deployment Server)

フォワーダーへの設定配布を一元管理する。

# /opt/splunk/etc/system/local/serverclass.conf
[global]
repositoryLocation = $SPLUNK_HOME/etc/deployment-apps

[serverClass:linux_servers]
whitelist.0 = linux-*
restartSplunkd = true

[serverClass:linux_servers:app:TA-linux-inputs]
restartSplunkd = true

[serverClass:linux_servers:app:TA-custom-app-logs]
restartSplunkd = true

[serverClass:windows_servers]
whitelist.0 = win-*

[serverClass:windows_servers:app:TA-windows-inputs]
restartSplunkd = true

[serverClass:web_servers]
whitelist.0 = web-*
whitelist.1 = nginx-*
blacklist.0 = web-dev-*

[serverClass:web_servers:app:TA-nginx-inputs]
restartSplunkd = true

2.2.6 ライセンスマスター(License Master)

Splunkのライセンスを一元管理し、各インスタンスのライセンス使用量を監視する。

# /opt/splunk/etc/system/local/server.conf (License Slave)
[license]
master_uri = https://license-master.example.com:8089

2.3 通信ポート一覧

ポート用途
8000Splunk Web (HTTP/HTTPS)
8089Management/REST API (HTTPS)
9997データ転送(Forwarder → Indexer)
9100インデックスレプリケーション
9200サーチヘッドクラスタレプリケーション
8088HTTP Event Collector (HEC)
514Syslog受信
8191KVStore (MongoDB)

3. データ取り込み(Data Ingestion)

3.1 データ入力の種類

Splunkは多様なデータソースからデータを取り込むことができる。

3.1.1 ファイルモニタリング

# inputs.conf - ファイルモニタリングの詳細設定
[monitor:///var/log/application/*.log]
disabled = false
sourcetype = custom_app_log
index = app_logs
# ファイルローテーション対応
followTail = 0
# CRCチェックの長さ(バイト)
initCrcLength = 256
# ログファイルの先頭から読み込むか
ignoreOlderThan = 7d
# ホワイトリスト・ブラックリスト
whitelist = access\.log$|error\.log$
blacklist = \.gz$|\.bak$|\.old$
# タイムスタンプ認識
time_before_close = 5
# マルチラインイベントの設定
# props.confで詳細に設定

[monitor:///var/log/containers/*.log]
disabled = false
sourcetype = kube:container:log
index = k8s_logs
# Kubernetes環境用
recursive = true

[monitor:///opt/splunk/var/log/watchdog/*.log]
disabled = false
sourcetype = watchdog
index = _internal

3.1.2 ネットワーク入力(Syslog/TCP/UDP)

# inputs.conf - ネットワーク入力
[tcp://9514]
disabled = false
sourcetype = syslog
index = network_logs
connection_host = dns
# 接続制限
acceptFrom = 10.0.0.0/8, 172.16.0.0/12
queueSize = 500KB

[udp://514]
disabled = false
sourcetype = syslog
index = network_logs
connection_host = ip
no_appending_timestamp = true
no_priority_stripping = false

[tcp-ssl://9515]
disabled = false
sourcetype = syslog_tls
index = secure_logs
# SSL設定
serverCert = $SPLUNK_HOME/etc/auth/server.pem
sslPassword = changeme
requireClientCert = true
sslRootCAPath = $SPLUNK_HOME/etc/auth/cacert.pem

3.1.3 HTTP Event Collector (HEC)

HECはHTTP/HTTPS経由でデータをSplunkに送信するための仕組みである。アプリケーションやCI/CDパイプラインからのデータ送信に最適。

# inputs.conf - HEC設定
[http]
disabled = false
port = 8088
enableSSL = 1
dedicatedIoThreads = 2
maxThreads = 0
maxSockets = 0
useDeploymentServer = false

[http://app_events]
disabled = false
token = xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
index = app_events
sourcetype = _json
allowedIndexes = app_events, app_metrics
useACK = true

HECへのデータ送信例:

# 単一イベントの送信
curl -k https://splunk-hec.example.com:8088/services/collector/event \
  -H "Authorization: Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
  -d '{
    "event": {
      "message": "User login successful",
      "user": "admin",
      "src_ip": "10.0.1.50",
      "action": "login"
    },
    "sourcetype": "app:auth",
    "index": "app_events",
    "host": "web-server-01",
    "time": 1704067200
  }'

# バッチイベントの送信(Raw endpoint)
curl -k https://splunk-hec.example.com:8088/services/collector/raw \
  -H "Authorization: Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
  -H "X-Splunk-Request-Channel: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
  -d '2024-01-01T00:00:00Z host=web01 action=GET uri=/api/health status=200
2024-01-01T00:00:01Z host=web01 action=POST uri=/api/login status=401
2024-01-01T00:00:02Z host=web01 action=GET uri=/api/data status=500'

# メトリクスデータの送信
curl -k https://splunk-hec.example.com:8088/services/collector \
  -H "Authorization: Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
  -d '{
    "time": 1704067200,
    "event": "metric",
    "source": "app_metrics",
    "host": "web-server-01",
    "fields": {
      "metric_name:cpu.usage": 75.5,
      "metric_name:memory.usage": 82.3,
      "metric_name:disk.io_read": 1024,
      "region": "us-east-1",
      "environment": "production"
    }
  }'

# HECヘルスチェック
curl -k https://splunk-hec.example.com:8088/services/collector/health \
  -H "Authorization: Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

# ACK確認(useACK=true時)
curl -k https://splunk-hec.example.com:8088/services/collector/ack \
  -H "Authorization: Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
  -H "X-Splunk-Request-Channel: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
  -d '{"acks": [0, 1, 2]}'

Python SDKでのHEC送信例:

import requests
import json
import time

class SplunkHEC:
    def __init__(self, host, token, port=8088, ssl=True):
        self.url = f"{'https' if ssl else 'http'}://{host}:{port}"
        self.token = token
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Splunk {token}',
            'Content-Type': 'application/json'
        })
        self.session.verify = False  # 本番ではCA証明書を指定

    def send_event(self, event_data, sourcetype=None, index=None, host=None):
        payload = {
            'event': event_data,
            'time': int(time.time())
        }
        if sourcetype:
            payload['sourcetype'] = sourcetype
        if index:
            payload['index'] = index
        if host:
            payload['host'] = host

        response = self.session.post(
            f'{self.url}/services/collector/event',
            data=json.dumps(payload)
        )
        return response.json()

    def send_batch(self, events):
        payload = ''
        for event in events:
            payload += json.dumps(event) + '\n'

        response = self.session.post(
            f'{self.url}/services/collector/event',
            data=payload
        )
        return response.json()

# 使用例
hec = SplunkHEC('splunk-hec.example.com', 'your-token-here')
hec.send_event(
    {'message': 'Deployment completed', 'version': 'v2.1.0', 'status': 'success'},
    sourcetype='deploy:event',
    index='deployment_logs'
)

3.1.4 スクリプト入力

# inputs.conf - スクリプト入力
[script:///opt/splunk/etc/apps/myapp/bin/get_api_metrics.sh]
disabled = false
interval = 300
sourcetype = api_metrics
index = app_metrics
source = api_health_check

[script:///opt/splunk/etc/apps/myapp/bin/collect_db_stats.py]
disabled = false
interval = 60
sourcetype = db_stats
index = db_metrics
passAuth = splunk_system_user
#!/bin/bash
# get_api_metrics.sh - API健全性チェックスクリプト
ENDPOINTS=(
    "https://api.example.com/health"
    "https://api.example.com/v1/status"
    "https://auth.example.com/health"
)

for endpoint in "${ENDPOINTS[@]}"; do
    start_time=$(date +%s%N)
    response=$(curl -s -o /dev/null -w "%{http_code},%{time_total}" "$endpoint" 2>/dev/null)
    end_time=$(date +%s%N)
    
    http_code=$(echo "$response" | cut -d',' -f1)
    response_time=$(echo "$response" | cut -d',' -f2)
    
    echo "$(date -u +%Y-%m-%dT%H:%M:%SZ) endpoint=\"$endpoint\" http_code=$http_code response_time=$response_time"
done

3.1.5 モジュラー入力

#!/usr/bin/env python3
# modular_input_example.py
import sys
import json
import time
import requests

# Splunk SDKのモジュラー入力フレームワーク
import splunklib.modularinput as smi

class CloudWatchInput(smi.Script):
    def get_scheme(self):
        scheme = smi.Scheme("AWS CloudWatch Metrics")
        scheme.description = "Collects metrics from AWS CloudWatch"
        scheme.use_external_validation = True
        scheme.use_single_instance = False

        region_arg = smi.Argument("aws_region")
        region_arg.data_type = smi.Argument.data_type_string
        region_arg.required_on_create = True
        scheme.add_argument(region_arg)

        namespace_arg = smi.Argument("namespace")
        namespace_arg.data_type = smi.Argument.data_type_string
        namespace_arg.required_on_create = True
        scheme.add_argument(namespace_arg)

        return scheme

    def stream_events(self, inputs, ew):
        for input_name, input_item in inputs.inputs.items():
            # CloudWatchからメトリクスを取得してイベントとして出力
            event = smi.Event()
            event.stanza = input_name
            event.data = json.dumps({
                "metric": "CPUUtilization",
                "value": 75.5,
                "namespace": input_item["namespace"],
                "region": input_item["aws_region"]
            })
            ew.write_event(event)

if __name__ == "__main__":
    sys.exit(CloudWatchInput().run(sys.argv))

3.2 ソースタイプ設定

# props.conf - ソースタイプ定義
[custom_app_log]
# タイムスタンプ設定
TIME_FORMAT = %Y-%m-%dT%H:%M:%S.%3N%z
TIME_PREFIX = ^
MAX_TIMESTAMP_LOOKAHEAD = 30
TZ = Asia/Tokyo

# ライン分割設定
SHOULD_LINEMERGE = true
BREAK_ONLY_BEFORE = ^\d{4}-\d{2}-\d{2}T
LINE_BREAKER = ([\r\n]+)\d{4}-\d{2}-\d{2}T
TRUNCATE = 100000

# 文字エンコーディング
CHARSET = utf-8

# イベント処理
ANNOTATE_PUNCT = false
SEDCMD-remove_ansi = s/\x1B\[[0-9;]*[a-zA-Z]//g

# フィールド抽出(自動キーバリュー抽出)
KV_MODE = auto
# 区切り文字ベースの抽出
# KV_MODE = none
# DELIMS = ","
# FIELDS = field1, field2, field3

[json_app_log]
KV_MODE = json
TIME_FORMAT = %Y-%m-%dT%H:%M:%S.%6NZ
TIME_PREFIX = "timestamp"\s*:\s*"
SHOULD_LINEMERGE = false
LINE_BREAKER = ([\r\n]+)

# JSONの自動フィールド抽出設定
JSON_TRIM_BRACES_IN_ARRAY_NAMES = true

[access_combined_custom]
# Apache Combined Log Formatの例
EXTRACT-access = ^(?P<clientip>\S+)\s+\S+\s+(?P<user>\S+)\s+\[(?P<timestamp>[^\]]+)\]\s+"(?P<method>\w+)\s+(?P<uri>\S+)\s+(?P<http_version>\S+)"\s+(?P<status>\d+)\s+(?P<bytes>\d+)\s+"(?P<referer>[^"]+)"\s+"(?P<useragent>[^"]+)"
# transforms.conf - フィールド変換定義
[extract_error_code]
REGEX = error_code=(?P<error_code>\d+)
FORMAT = error_code::$1

[extract_json_fields]
REGEX = "level"\s*:\s*"(?P<log_level>[^"]+)"
FORMAT = log_level::$1

[lookup_geo_ip]
filename = geo_ip.csv
match_type = CIDR(src_ip)

# ルーティング変換
[set_index_by_severity]
REGEX = level=(ERROR|CRITICAL|FATAL)
DEST_KEY = _MetaData:Index
FORMAT = critical_logs

[null_queue_debug]
REGEX = level=DEBUG
DEST_KEY = queue
FORMAT = nullQueue

3.3 データパイプライン

データソース → 入力フェーズ → パーシングフェーズ → インデクシングフェーズ
                  │                │                    │
                  │                │                    │
            inputs.conf       props.conf           indexes.conf
            (データ受信)     transforms.conf       (バケット管理)
                             (パース・変換)
# パイプライン設定 - server.conf
[pipeline:parsing]
maxQueueSize = 2MB

[pipeline:merging]
maxQueueSize = 2MB

[pipeline:typing]
maxQueueSize = 2MB

[pipeline:indexing]
maxQueueSize = 2MB

4. SPL(Search Processing Language)包括ガイド

SPLはSplunkの中核をなすクエリ言語であり、データの検索、変換、分析、可視化を行う。

4.1 基本検索

# 基本的なキーワード検索
index=web_logs sourcetype=access_combined status=500

# 時間範囲指定
index=web_logs sourcetype=access_combined earliest=-24h latest=now

# ワイルドカード検索
index=app_logs error* NOT debug

# フィールド値での検索
index=app_logs log_level=ERROR src_ip="10.0.1.*"

# OR条件
index=web_logs (status=500 OR status=503) method=POST

# NOT条件
index=app_logs log_level=ERROR NOT source="/var/log/app/debug.log"

# 正規表現での検索
index=web_logs | regex uri="^/api/v[12]/users"

4.2 主要コマンド詳解

4.2.1 stats - 統計集計

# 基本的な統計
index=web_logs | stats count by status
index=web_logs | stats count, avg(response_time), max(response_time), min(response_time) by uri

# 複数の集計関数
index=web_logs 
| stats count as total_requests,
        count(eval(status>=500)) as errors,
        count(eval(status>=200 AND status<300)) as success,
        avg(response_time) as avg_resp_time,
        perc95(response_time) as p95_resp_time,
        perc99(response_time) as p99_resp_time,
        dc(clientip) as unique_clients
  by uri

# values/list関数
index=app_logs log_level=ERROR
| stats count, values(error_code) as error_codes, latest(_time) as last_seen by host

# 時間ベースの統計(span指定)
index=web_logs 
| bin _time span=5m
| stats count by _time, status

# first/last
index=app_logs 
| stats first(_time) as first_seen, last(_time) as last_seen, count by session_id

4.2.2 eval - フィールド計算・変換

# 新しいフィールドの作成
index=web_logs 
| eval response_category=case(
    status>=200 AND status<300, "Success",
    status>=300 AND status<400, "Redirect",
    status>=400 AND status<500, "Client Error",
    status>=500, "Server Error",
    true(), "Unknown"
  )
| stats count by response_category

# 数値計算
index=metrics 
| eval memory_usage_pct = round((used_memory / total_memory) * 100, 2)
| eval disk_free_gb = round(disk_free_bytes / 1073741824, 2)

# 文字列操作
index=app_logs 
| eval short_uri = replace(uri, "\?.*", "")
| eval domain = mvindex(split(url, "/"), 2)
| eval log_message = upper(substr(message, 1, 100))

# 条件分岐(if)
index=web_logs 
| eval is_error = if(status >= 400, 1, 0)
| eval latency_category = if(response_time > 1000, "slow", if(response_time > 500, "medium", "fast"))

# 日時操作
index=app_logs 
| eval event_hour = strftime(_time, "%H")
| eval event_day = strftime(_time, "%A")
| eval time_since_event = now() - _time
| eval hours_ago = round(time_since_event / 3600, 1)

# 型変換
index=app_logs 
| eval numeric_status = tonumber(status)
| eval status_str = tostring(status)

# null処理
index=app_logs 
| eval safe_field = coalesce(field1, field2, "default_value")
| eval has_error = if(isnull(error_code), "No", "Yes")

# マルチバリューフィールド操作
index=app_logs 
| eval tags = split(tag_string, ",")
| eval tag_count = mvcount(tags)
| eval has_critical = if(mvfind(tags, "critical") >= 0, "Yes", "No")
| mvexpand tags

4.2.3 where - フィルタリング

# 基本的なフィルタリング
index=web_logs | where status >= 400

# 文字列関数でのフィルタ
index=web_logs | where like(uri, "/api/v%")
index=web_logs | where match(uri, "^/api/v[12]/users")

# null チェック
index=app_logs | where isnotnull(error_code)
index=app_logs | where isnull(response_time)

# 複合条件
index=web_logs 
| where (status >= 500 AND response_time > 5000) OR (status >= 400 AND method="DELETE")

# cidrmatch(IPアドレスマッチ)
index=firewall_logs 
| where cidrmatch("10.0.0.0/8", src_ip) AND NOT cidrmatch("10.0.1.0/24", src_ip)

4.2.4 rex - 正規表現によるフィールド抽出

# 名前付きキャプチャグループでフィールド抽出
index=app_logs 
| rex field=_raw "user=(?P<username>\w+)\s+action=(?P<user_action>\w+)\s+result=(?P<result>\w+)"

# IPアドレスの抽出
index=web_logs 
| rex field=_raw "(?P<src_ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})"

# 複数マッチ(max_match)
index=app_logs 
| rex field=_raw "error_code=(?P<error_codes>\d+)" max_match=0

# sedモード(置換)
index=app_logs 
| rex field=message mode=sed "s/password=[^\s]+/password=REDACTED/g"
| rex field=message mode=sed "s/\d{4}-\d{4}-\d{4}-\d{4}/XXXX-XXXX-XXXX-XXXX/g"

# JSONからのフィールド抽出
index=app_logs sourcetype=json_log
| rex field=_raw "\"error_message\"\s*:\s*\"(?P<error_msg>[^\"]+)\""

# URLパス部分の抽出
index=web_logs 
| rex field=uri "^/api/(?P<api_version>v\d+)/(?P<resource>\w+)"

4.2.5 table / fields - 出力フィールドの選択

# table - 指定フィールドの表形式出力
index=web_logs 
| stats count, avg(response_time) as avg_rt by uri, status
| table uri, status, count, avg_rt

# fields - フィールドの選択/除外
index=web_logs 
| fields _time, host, uri, status, response_time
| fields - _raw, _serial

# rename - フィールド名変更
index=web_logs 
| stats count by clientip
| rename clientip as "Client IP", count as "Request Count"
| sort - "Request Count"

4.2.6 dedup - 重複排除

# 基本的な重複排除(各ユーザーの最新イベントのみ)
index=app_logs | dedup user | table _time, user, action

# 複数フィールドでの重複排除
index=web_logs | dedup clientip, uri | table clientip, uri, status

# sortbyで順序を指定
index=app_logs | dedup user sortby -_time | table _time, user, action

# 上位N件を保持
index=app_logs | dedup 3 user sortby -_time

4.2.7 sort - ソート

# 降順ソート
index=web_logs | stats count by uri | sort - count

# 複合ソート
index=web_logs | stats count, avg(response_time) as avg_rt by uri
| sort - count, + avg_rt

# 上位/下位N件
index=web_logs | stats count by uri | sort - count | head 20
index=web_logs | stats avg(response_time) as avg_rt by uri | sort + avg_rt | head 10

4.2.8 top / rare - 頻度分析

# 上位10件
index=web_logs | top 10 uri
index=web_logs | top clientip by status

# レア(まれな値)
index=app_logs | rare error_code
index=app_logs | rare 10 user_agent

# top with options
index=web_logs | top 20 uri showperc=true showcount=true countfield=requests percentfield=pct

4.2.9 timechart - 時系列チャート

# 基本的なタイムチャート
index=web_logs | timechart span=5m count by status

# 複数メトリクスの時系列
index=web_logs 
| timechart span=1h 
    count as total_requests,
    avg(response_time) as avg_response_time,
    perc95(response_time) as p95_response_time

# エラーレートの時系列
index=web_logs 
| timechart span=10m 
    count(eval(status>=500)) as errors,
    count as total
| eval error_rate = round((errors / total) * 100, 2)

# limit で表示系列数を制限
index=web_logs | timechart span=1h count by uri limit=10 useother=true

# fillnull で欠損値を補完
index=web_logs | timechart span=5m count by status | fillnull value=0

4.2.10 chart - ピボットチャート

# 基本的なチャート
index=web_logs | chart count by status, method

# over/by構文
index=web_logs | chart avg(response_time) over uri by method

# 複数集計
index=web_logs 
| chart count, avg(response_time) as avg_rt, max(response_time) as max_rt by uri

4.2.11 eventstats - インラインの統計追加

# 各イベントに全体の統計値を追加
index=web_logs 
| eventstats avg(response_time) as global_avg, stdev(response_time) as global_stdev
| eval is_anomaly = if(response_time > (global_avg + 3 * global_stdev), 1, 0)
| where is_anomaly=1

# グループ別の統計値を追加
index=web_logs 
| eventstats avg(response_time) as avg_by_uri by uri
| eval deviation = response_time - avg_by_uri
| where deviation > 1000

# パーセンタイルベースの異常検出
index=web_logs 
| eventstats perc99(response_time) as p99 by uri
| where response_time > p99

4.2.12 streamstats - ストリーム統計

# 移動平均の計算
index=web_logs 
| timechart span=5m avg(response_time) as avg_rt
| streamstats window=12 avg(avg_rt) as moving_avg
| eval threshold = moving_avg * 1.5
| where avg_rt > threshold

# 累積カウント
index=app_logs log_level=ERROR
| streamstats count as running_error_count
| timechart span=1h max(running_error_count)

# 前のイベントとの時間差
index=app_logs 
| sort _time
| streamstats current=false last(_time) as prev_time by host
| eval time_gap = _time - prev_time
| where time_gap > 300

4.2.13 transaction - トランザクション分析

# セッションベースのトランザクション
index=web_logs 
| transaction clientip maxpause=30m maxspan=2h
| stats avg(duration) as avg_session_duration, avg(eventcount) as avg_pages by clientip

# リクエスト-レスポンスの組み合わせ
index=app_logs 
| transaction request_id startswith="REQUEST_START" endswith="REQUEST_END"
| eval processing_time = duration
| stats avg(processing_time) by service_name

# ユーザージャーニー分析
index=web_logs 
| transaction session_id maxpause=30m
| eval pages_visited = mvcount(uri)
| eval has_purchase = if(like(uri, "%/checkout/complete%"), 1, 0)
| stats count, sum(has_purchase) as purchases by pages_visited

4.2.14 lookup - ルックアップ

# CSVルックアップ
index=web_logs 
| lookup geo_ip_lookup src_ip as clientip OUTPUT country, city, latitude, longitude
| stats count by country | sort - count

# KVStore ルックアップ
index=app_logs 
| lookup kv_user_info user_id OUTPUT department, manager, location

# 自動ルックアップ(定義済み)
# transforms.conf で定義:
# [user_lookup]
# filename = users.csv
# match_type = WILDCARD(user_id)

# inputlookup(ルックアップテーブルの直接検索)
| inputlookup geo_ip_lookup | where country="Japan"

# outputlookup(検索結果をルックアップに書き込み)
index=web_logs 
| stats count by clientip 
| where count > 1000 
| outputlookup suspicious_ips.csv

# ルックアップの定義(transforms.conf)
# [geo_ip_lookup]
# filename = geo_ip.csv
# match_type = CIDR(src_ip)
# min_matches = 1
# default_match = unknown

4.2.15 subsearch - サブサーチ

# サブサーチで動的フィルタリング
index=web_logs 
  [search index=security_logs action=blocked | fields src_ip | rename src_ip as clientip]
| stats count by clientip, uri

# TOP N のユーザーのアクティビティ詳細
index=app_logs 
  [search index=app_logs log_level=ERROR | top 5 user | fields user]
| stats count by user, action

# 時間相関のサブサーチ
index=web_logs status>=500
  [search index=infra_logs "deployment started" | eval earliest=_time-300 | eval latest=_time+300 | fields earliest, latest]

4.2.16 join - テーブル結合

# 内部結合
index=web_logs 
| stats count, avg(response_time) as avg_rt by clientip
| join clientip 
  [search index=user_logs | dedup clientip | fields clientip, username, department]

# 左結合
index=app_logs 
| stats count by user_id
| join type=left user_id 
  [search index=user_db | table user_id, email, role]

# 注意: joinは大量データでパフォーマンスが悪い。
# 代わりにlookupやstatsを使うことを推奨。

# joinの代替: statsを使った擬似結合
index=web_logs OR index=user_logs 
| stats values(username) as username, sum(eval(if(index="web_logs",1,0))) as web_count by clientip

4.2.17 append / appendcols

# append - 結果の追加(UNION相当)
index=web_logs status>=500 | stats count as error_count
| append [search index=web_logs | stats count as total_count]
| stats values(*) as *

# appendcols - 列の追加
index=web_logs | stats count as current_errors by host
| appendcols 
  [search index=web_logs earliest=-7d@d latest=-6d@d | stats count as prev_errors by host]
| eval change_pct = round(((current_errors - prev_errors) / prev_errors) * 100, 2)

4.2.18 tstats - 高速統計(インデックス化フィールド)

# tstatsによる高速カウント
| tstats count where index=web_logs by _time, host, source, sourcetype span=1h

# データモデルを使ったtstats
| tstats count from datamodel=Web where Web.status>=500 by Web.dest, Web.status

# tstats + summariesonly(アクセラレーション必須)
| tstats summariesonly=true count from datamodel=Network_Traffic 
  where Network_Traffic.action=blocked 
  by Network_Traffic.src, Network_Traffic.dest 
  span=1h

# プレフィックス検索
| tstats count where index=web_logs sourcetype=access_combined host=web-* by host span=1h

4.2.19 datamodel

# データモデルの内容確認
| datamodel Web

# データモデルからの検索
| datamodel Web Web.Web_Traffic search
| stats count by Web.Web_Traffic.status

# ピボット(データモデルベース)
| pivot Web Web_Traffic count(Web_Traffic) as request_count 
  SPLITROW _time PERIOD auto 
  SPLITCOL status

4.2.20 マクロ

# macros.conf での定義
# [error_rate(2)]
# args = index_name, threshold
# definition = search index=$index_name$ | timechart span=5m count(eval(status>=500)) as errors, count as total | eval error_rate=round((errors/total)*100,2) | where error_rate > $threshold$

# マクロの使用
`error_rate(web_logs, 5)`

# 引数なしマクロ
# [get_business_hours]
# definition = eval is_business_hours=if(tonumber(strftime(_time,"%H"))>=9 AND tonumber(strftime(_time,"%H"))<18 AND NOT match(strftime(_time,"%A"),"Saturday|Sunday"), 1, 0)

index=app_logs | `get_business_hours` | where is_business_hours=1

# バリデーション付きマクロ
# [filter_by_env(1)]
# args = environment
# definition = search environment="$environment$"
# validation = $environment$ IN ("production", "staging", "development")
# errormsg = Invalid environment. Must be production, staging, or development.

4.3 高度なSPLパターン

異常検出パターン

# Z-scoreベースの異常検出
index=web_logs 
| bin _time span=5m
| stats count as requests by _time, host
| eventstats avg(requests) as mean, stdev(requests) as stdev by host
| eval zscore = (requests - mean) / stdev
| where abs(zscore) > 3
| table _time, host, requests, mean, stdev, zscore

# IQRベースの異常検出
index=web_logs 
| stats avg(response_time) as avg_rt by uri
| eventstats perc25(avg_rt) as q1, perc75(avg_rt) as q3
| eval iqr = q3 - q1
| eval lower_bound = q1 - 1.5 * iqr
| eval upper_bound = q3 + 1.5 * iqr
| where avg_rt < lower_bound OR avg_rt > upper_bound

エラーバジェット計算

# SLOベースのエラーバジェット
index=web_logs 
| stats count(eval(status<500)) as good, count as total
| eval slo = 99.9
| eval actual_availability = round((good / total) * 100, 4)
| eval error_budget_total = round(total * (1 - slo/100))
| eval errors_used = total - good
| eval error_budget_remaining = error_budget_total - errors_used
| eval budget_consumed_pct = round((errors_used / error_budget_total) * 100, 2)
| table slo, actual_availability, total, good, errors_used, error_budget_total, error_budget_remaining, budget_consumed_pct

変化検出パターン

# 前の時間帯との比較
index=web_logs 
| timechart span=1h count as current_count
| appendcols 
  [search index=web_logs earliest=-2d latest=-1d | timechart span=1h count as prev_day_count]
| eval change_pct = round(((current_count - prev_day_count) / prev_day_count) * 100, 2)
| where abs(change_pct) > 50

5. ナレッジオブジェクト

5.1 フィールド抽出

# props.conf - インラインフィールド抽出
[custom_app_log]
# 正規表現ベースの抽出
EXTRACT-user = user=(?P<username>\w+)
EXTRACT-action = action=(?P<user_action>\w+)
EXTRACT-duration = duration=(?P<request_duration>\d+\.?\d*)ms
EXTRACT-error = error_code=(?P<error_code>\d+)\s+error_msg="(?P<error_message>[^"]+)"

# transforms.confを参照する抽出
REPORT-fields = extract_custom_fields, extract_json_payload

# フィールドエイリアス
FIELDALIAS-src = src_ip AS source_ip
FIELDALIAS-dst = dst_ip AS dest_ip
FIELDALIAS-user = user AS username

# 計算済みフィールド(Calculated Fields)
EVAL-response_time_sec = round(response_time / 1000, 3)
EVAL-is_error = if(status >= 400, "true", "false")
EVAL-request_size_kb = round(bytes / 1024, 2)
# transforms.conf - 外部フィールド抽出定義
[extract_custom_fields]
REGEX = (?i)level=(?P<log_level>\w+)\s+component=(?P<component>[\w\-]+)\s+msg="(?P<log_message>[^"]*)"
FORMAT = log_level::$1 component::$2 log_message::$3

[extract_json_payload]
REGEX = payload=(?P<_raw_payload>\{.*\})
FORMAT = _raw_payload::$1

5.2 ルックアップ

# transforms.conf - ルックアップ定義
[asset_lookup]
filename = assets.csv
# フィールドマッチング
match_type = WILDCARD(hostname)
min_matches = 1
default_match = Unknown
max_matches = 1
case_sensitive_match = false

[cidr_lookup]
filename = network_ranges.csv
match_type = CIDR(ip_address)
min_matches = 1

[kv_store_lookup]
collection = user_preferences
external_type = kvstore
fields_list = user_id, theme, language, timezone
filter = active=1

[external_lookup]
external_cmd = lookup_api.py user_id
fields_list = user_id, full_name, department, location
external_type = python
# props.conf - 自動ルックアップの適用
[access_combined]
LOOKUP-asset_info = asset_lookup hostname AS host OUTPUT asset_type, owner, location, criticality
LOOKUP-geo = cidr_lookup ip_address AS clientip OUTPUT country, region, city
# assets.csv の例
hostname,asset_type,owner,location,criticality
web-*,Web Server,Platform Team,US-East,High
db-*,Database,DBA Team,US-East,Critical
app-*,Application Server,App Team,US-West,High

5.3 イベントタイプとタグ

# eventtypes.conf
[web_error]
search = index=web_logs status>=400

[web_server_error]
search = index=web_logs status>=500

[authentication_failure]
search = index=auth_logs (action=login AND result=failure)

[high_latency_request]
search = index=web_logs response_time>5000

[brute_force_attempt]
search = index=auth_logs action=login result=failure | stats count by src_ip | where count > 10
# tags.conf
[eventtype=web_error]
error = enabled
web = enabled

[eventtype=web_server_error]
critical = enabled
web = enabled
server_error = enabled

[eventtype=authentication_failure]
authentication = enabled
failure = enabled
security = enabled

[eventtype=brute_force_attempt]
attack = enabled
brute_force = enabled
security = enabled
# タグを使った検索
tag=security tag=attack | stats count by src_ip | sort - count
tag=web tag=error | timechart count by host

5.4 データモデル

// データモデル定義例(Web Traffic)
{
  "modelName": "Custom_Web_Traffic",
  "displayName": "Custom Web Traffic",
  "description": "Custom web traffic data model for SRE monitoring",
  "objects": [
    {
      "objectName": "Web_Request",
      "displayName": "Web Request",
      "parentName": "BaseEvent",
      "comment": "",
      "fields": [
        {
          "fieldName": "uri",
          "displayName": "URI",
          "type": "string",
          "fieldSearch": "",
          "required": false,
          "multivalue": false
        },
        {
          "fieldName": "status",
          "displayName": "HTTP Status",
          "type": "number",
          "fieldSearch": "",
          "required": false,
          "multivalue": false
        },
        {
          "fieldName": "response_time",
          "displayName": "Response Time (ms)",
          "type": "number",
          "fieldSearch": "",
          "required": false,
          "multivalue": false
        },
        {
          "fieldName": "method",
          "displayName": "HTTP Method",
          "type": "string",
          "fieldSearch": "",
          "required": false,
          "multivalue": false
        }
      ],
      "constraints": [
        {
          "search": "index=web_logs sourcetype=access_combined"
        }
      ],
      "children": [
        {
          "objectName": "Error_Request",
          "displayName": "Error Request",
          "parentName": "Web_Request",
          "constraints": [
            {
              "search": "status>=400"
            }
          ]
        },
        {
          "objectName": "Slow_Request",
          "displayName": "Slow Request",
          "parentName": "Web_Request",
          "constraints": [
            {
              "search": "response_time>3000"
            }
          ]
        }
      ]
    }
  ]
}

5.5 保存済み検索(Saved Searches)

# savedsearches.conf
[Daily Error Summary]
search = index=web_logs status>=500 \
| stats count as error_count, dc(uri) as affected_endpoints, dc(host) as affected_hosts by sourcetype \
| sort - error_count
dispatch.earliest_time = -24h@h
dispatch.latest_time = now
cron_schedule = 0 8 * * *
enableSched = 1
is_visible = true
description = Daily summary of server errors across all web services
action.email.to = sre-team@example.com
action.email.subject = Daily Error Summary Report
action.email = 1
action.email.include.results_link = 1
action.email.format = html

[Hourly Error Rate Check]
search = index=web_logs \
| timechart span=5m count(eval(status>=500)) as errors, count as total \
| eval error_rate = round((errors/total)*100, 2) \
| where error_rate > 5
dispatch.earliest_time = -1h
dispatch.latest_time = now
cron_schedule = 0 * * * *
enableSched = 1
alert.severity = 4
alert.suppress = 1
alert.suppress.period = 30m
alert_type = number of results
alert_comparator = greater than
alert_threshold = 0
action.webhook.enable = 1
action.webhook.param.url = https://hooks.slack.com/services/xxx/yyy/zzz

6. ダッシュボードとビジュアライゼーション

6.1 Simple XML ダッシュボード

<dashboard version="1.1" theme="dark">
  <label>SRE Operations Dashboard</label>
  <description>Real-time infrastructure and application monitoring</description>
  
  <!-- 入力フィルター -->
  <fieldset submitButton="true" autoRun="true">
    <input type="time" token="time_range" searchWhenChanged="true">
      <label>Time Range</label>
      <default>
        <earliest>-4h@m</earliest>
        <latest>now</latest>
      </default>
    </input>
    <input type="dropdown" token="environment" searchWhenChanged="true">
      <label>Environment</label>
      <choice value="*">All</choice>
      <choice value="production">Production</choice>
      <choice value="staging">Staging</choice>
      <default>production</default>
    </input>
    <input type="multiselect" token="hosts" searchWhenChanged="true">
      <label>Hosts</label>
      <search>
        <query>index=web_logs environment=$environment$ | dedup host | sort host | table host</query>
        <earliest>$time_range.earliest$</earliest>
        <latest>$time_range.latest$</latest>
      </search>
      <fieldForLabel>host</fieldForLabel>
      <fieldForValue>host</fieldForValue>
      <delimiter>,</delimiter>
      <prefix>(</prefix>
      <suffix>)</suffix>
      <valuePrefix>host="</valuePrefix>
      <valueSuffix>"</valueSuffix>
      <default>*</default>
    </input>
  </fieldset>

  <!-- Row 1: KPIシングルバリュー -->
  <row>
    <panel>
      <title>Total Requests</title>
      <single>
        <search>
          <query>index=web_logs environment=$environment$ $hosts$
| stats count as total_requests</query>
          <earliest>$time_range.earliest$</earliest>
          <latest>$time_range.latest$</latest>
        </search>
        <option name="colorBy">value</option>
        <option name="rangeColors">["0x53a051","0x0877a6","0xf8be34","0xf1813f","0xdc4e41"]</option>
        <option name="rangeValues">[0,1000,5000,10000]</option>
        <option name="useColors">1</option>
        <option name="drilldown">all</option>
      </single>
    </panel>
    <panel>
      <title>Error Rate (%)</title>
      <single>
        <search>
          <query>index=web_logs environment=$environment$ $hosts$
| stats count(eval(status>=500)) as errors, count as total
| eval error_rate = round((errors/total)*100, 2)
| fields error_rate</query>
          <earliest>$time_range.earliest$</earliest>
          <latest>$time_range.latest$</latest>
        </search>
        <option name="colorBy">value</option>
        <option name="rangeColors">["0x53a051","0xf8be34","0xf1813f","0xdc4e41"]</option>
        <option name="rangeValues">[1,3,5]</option>
        <option name="useColors">1</option>
        <option name="unit">%</option>
      </single>
    </panel>
    <panel>
      <title>P95 Response Time</title>
      <single>
        <search>
          <query>index=web_logs environment=$environment$ $hosts$
| stats perc95(response_time) as p95
| eval p95 = round(p95, 0)</query>
          <earliest>$time_range.earliest$</earliest>
          <latest>$time_range.latest$</latest>
        </search>
        <option name="unit">ms</option>
        <option name="rangeColors">["0x53a051","0xf8be34","0xf1813f","0xdc4e41"]</option>
        <option name="rangeValues">[500,1000,3000]</option>
        <option name="useColors">1</option>
      </single>
    </panel>
    <panel>
      <title>Active Hosts</title>
      <single>
        <search>
          <query>index=web_logs environment=$environment$
| stats dc(host) as active_hosts</query>
          <earliest>$time_range.earliest$</earliest>
          <latest>$time_range.latest$</latest>
        </search>
      </single>
    </panel>
  </row>

  <!-- Row 2: タイムチャート -->
  <row>
    <panel>
      <title>Request Volume &amp; Error Rate</title>
      <chart>
        <search>
          <query>index=web_logs environment=$environment$ $hosts$
| timechart span=5m count as requests, count(eval(status>=500)) as errors
| eval error_rate = round((errors/requests)*100, 2)</query>
          <earliest>$time_range.earliest$</earliest>
          <latest>$time_range.latest$</latest>
        </search>
        <option name="charting.chart">column</option>
        <option name="charting.chart.overlayFields">error_rate</option>
        <option name="charting.axisTitleX.text">Time</option>
        <option name="charting.axisTitleY.text">Requests</option>
        <option name="charting.axisTitleY2.text">Error Rate (%)</option>
        <option name="charting.chart.showDataLabels">none</option>
        <option name="charting.legend.placement">bottom</option>
      </chart>
    </panel>
  </row>

  <!-- Row 3: レスポンスタイムのヒートマップ -->
  <row>
    <panel>
      <title>Response Time by Endpoint</title>
      <chart>
        <search>
          <query>index=web_logs environment=$environment$ $hosts$
| eval uri_path = replace(uri, "\?.*", "")
| stats avg(response_time) as avg_rt, perc95(response_time) as p95_rt, count by uri_path
| sort - count | head 20
| table uri_path, avg_rt, p95_rt, count</query>
          <earliest>$time_range.earliest$</earliest>
          <latest>$time_range.latest$</latest>
        </search>
        <option name="charting.chart">bar</option>
        <option name="charting.axisTitleX.text">Response Time (ms)</option>
      </chart>
    </panel>
  </row>

  <!-- Row 4: テーブルとドリルダウン -->
  <row>
    <panel>
      <title>Top Errors</title>
      <table>
        <search>
          <query>index=web_logs environment=$environment$ $hosts$ status>=500
| stats count, latest(_time) as last_seen, values(host) as hosts by status, uri
| eval last_seen = strftime(last_seen, "%Y-%m-%d %H:%M:%S")
| sort - count | head 20</query>
          <earliest>$time_range.earliest$</earliest>
          <latest>$time_range.latest$</latest>
        </search>
        <option name="drilldown">row</option>
        <option name="count">10</option>
        <drilldown>
          <set token="selected_uri">$row.uri$</set>
          <set token="selected_status">$row.status$</set>
        </drilldown>
      </table>
    </panel>
  </row>

  <!-- 条件付きパネル(ドリルダウン時に表示) -->
  <row depends="$selected_uri$">
    <panel>
      <title>Error Details for $selected_uri$ (Status: $selected_status$)</title>
      <event>
        <search>
          <query>index=web_logs environment=$environment$ uri="$selected_uri$" status=$selected_status$</query>
          <earliest>$time_range.earliest$</earliest>
          <latest>$time_range.latest$</latest>
        </search>
        <option name="count">20</option>
        <option name="list.drilldown">full</option>
      </event>
    </panel>
  </row>
</dashboard>

6.2 Dashboard Studio(JSON形式)

Dashboard Studioは新世代のダッシュボードフレームワークで、より柔軟なレイアウトと高度なビジュアライゼーションを提供する。

{
  "visualizations": {
    "viz_error_trend": {
      "type": "splunk.area",
      "dataSources": {
        "primary": "ds_error_trend"
      },
      "options": {
        "xAxisTitleText": "Time",
        "yAxisTitleText": "Error Count",
        "showOverlayY": true,
        "stackMode": "stacked",
        "nullValueMode": "zero"
      },
      "title": "Error Trend"
    },
    "viz_status_pie": {
      "type": "splunk.pie",
      "dataSources": {
        "primary": "ds_status_dist"
      },
      "options": {
        "showDonutHole": true
      },
      "title": "Status Distribution"
    }
  },
  "dataSources": {
    "ds_error_trend": {
      "type": "ds.search",
      "options": {
        "query": "index=web_logs status>=400 | timechart span=5m count by status",
        "queryParameters": {
          "earliest": "$global_time.earliest$",
          "latest": "$global_time.latest$"
        }
      }
    },
    "ds_status_dist": {
      "type": "ds.search",
      "options": {
        "query": "index=web_logs | eval status_group=case(status<300,\"2xx\",status<400,\"3xx\",status<500,\"4xx\",true(),\"5xx\") | stats count by status_group"
      }
    }
  },
  "inputs": {
    "global_time": {
      "type": "input.timerange",
      "options": {
        "defaultValue": "-4h,now",
        "token": "global_time"
      }
    }
  },
  "layout": {
    "type": "absolute",
    "options": {
      "width": 1440,
      "height": 900
    },
    "structure": [
      {
        "item": "viz_error_trend",
        "position": {"x": 0, "y": 0, "w": 960, "h": 400}
      },
      {
        "item": "viz_status_pie",
        "position": {"x": 960, "y": 0, "w": 480, "h": 400}
      }
    ]
  }
}

6.3 トークンとドリルダウン

<!-- トークンの定義と使用 -->
<set token="my_token">value</set>
<unset token="my_token"/>

<!-- ドリルダウンアクション -->
<drilldown>
  <!-- トークン設定 -->
  <set token="selected_host">$click.value$</set>
  
  <!-- 別のダッシュボードへのリンク -->
  <link target="_blank">/app/search/host_detail?host=$click.value$</link>
  
  <!-- 検索へのリンク -->
  <link>search?q=index=web_logs host=$click.value$&amp;earliest=$time_range.earliest$&amp;latest=$time_range.latest$</link>
  
  <!-- 条件付きドリルダウン -->
  <condition field="host">
    <set token="selected_host">$click.value$</set>
  </condition>
  <condition field="status">
    <set token="selected_status">$click.value$</set>
  </condition>
</drilldown>

7. アラート

7.1 アラートタイプ

# savedsearches.conf - アラート設定

# スケジュールアラート
[High Error Rate Alert]
search = index=web_logs \
| timechart span=5m count(eval(status>=500)) as errors, count as total \
| eval error_rate = round((errors/total)*100, 2) \
| where error_rate > 5
dispatch.earliest_time = -15m
dispatch.latest_time = now
cron_schedule = */5 * * * *
enableSched = 1
is_scheduled = 1

# アラート条件
alert_type = number of results
alert_comparator = greater than
alert_threshold = 0
alert.severity = 4

# スロットリング(同じアラートの抑制)
alert.suppress = 1
alert.suppress.period = 30m
alert.suppress.fields = host
alert.suppress.group_name = error_rate_alert

# アラートアクション
action.email = 1
action.email.to = sre-oncall@example.com
action.email.subject = [ALERT] High Error Rate Detected - $result.host$
action.email.format = html
action.email.inline = 1
action.email.include.results_link = 1
action.email.include.view_link = 1
action.email.priority = 1

# Webhookアクション(Slack/PagerDuty)
action.webhook = 1
action.webhook.param.url = https://hooks.slack.com/services/T00000/B00000/XXXXX

# リアルタイムアラート
[Real-time Security Alert]
search = index=security_logs action=login result=failure \
| stats count by src_ip \
| where count > 5
dispatch.earliest_time = rt-5m
dispatch.latest_time = rt
realtime_schedule = 1
alert_type = number of results
alert_comparator = greater than
alert_threshold = 0
alert.severity = 5

# ローリングウィンドウアラート
[Rolling Window CPU Alert]
search = index=infra_metrics metric_name=cpu.usage \
| stats avg(value) as avg_cpu by host \
| where avg_cpu > 90
dispatch.earliest_time = -30m
dispatch.latest_time = now
cron_schedule = */10 * * * *
enableSched = 1
alert_type = number of results
alert_comparator = greater than
alert_threshold = 0

7.2 アラートアクション

# カスタムアラートアクション - PagerDuty連携
# $SPLUNK_HOME/etc/apps/custom_alert_actions/bin/pagerduty_alert.py

import sys
import json
import requests
import csv
import gzip

def send_pagerduty_alert(settings):
    routing_key = settings.get('routing_key')
    severity = settings.get('severity', 'critical')
    
    # 検索結果の読み取り
    results_file = settings.get('results_file')
    results = []
    if results_file:
        with gzip.open(results_file, 'rt') as f:
            reader = csv.DictReader(f)
            results = list(reader)
    
    # PagerDuty Events API v2
    payload = {
        "routing_key": routing_key,
        "event_action": "trigger",
        "dedup_key": settings.get('search_name', 'splunk_alert'),
        "payload": {
            "summary": f"Splunk Alert: {settings.get('search_name')} - {len(results)} results",
            "severity": severity,
            "source": "Splunk",
            "component": settings.get('app', 'search'),
            "group": settings.get('search_name'),
            "custom_details": {
                "search_name": settings.get('search_name'),
                "results_count": len(results),
                "results_link": settings.get('results_link'),
                "search_uri": settings.get('search_uri'),
                "top_results": results[:5]
            }
        },
        "links": [
            {
                "href": settings.get('results_link', ''),
                "text": "View in Splunk"
            }
        ]
    }
    
    response = requests.post(
        'https://events.pagerduty.com/v2/enqueue',
        json=payload,
        headers={'Content-Type': 'application/json'}
    )
    
    return response.status_code == 202

if __name__ == '__main__':
    payload = json.loads(sys.stdin.read())
    settings = payload.get('configuration', {})
    settings.update(payload.get('result', {}))
    
    if send_pagerduty_alert(settings):
        print("PagerDuty alert sent successfully")
    else:
        print("Failed to send PagerDuty alert", file=sys.stderr)
        sys.exit(1)

7.3 Webhook設定例

// Slack Webhook ペイロード設定
{
  "text": ":rotating_light: *Splunk Alert: $name$*",
  "attachments": [
    {
      "color": "danger",
      "fields": [
        {
          "title": "Search Name",
          "value": "$name$",
          "short": true
        },
        {
          "title": "Results Count",
          "value": "$job.resultCount$",
          "short": true
        },
        {
          "title": "Trigger Time",
          "value": "$trigger_time$",
          "short": true
        },
        {
          "title": "Severity",
          "value": "$alert.severity$",
          "short": true
        }
      ],
      "actions": [
        {
          "type": "button",
          "text": "View Results",
          "url": "$results_link$"
        }
      ]
    }
  ]
}

8. インデックス管理

8.1 インデックスの作成と設定

# indexes.conf - 詳細なインデックス設定
[default]
# デフォルトのストレージパス
homePath = $SPLUNK_DB/$_index_name/db
coldPath = $SPLUNK_DB/$_index_name/colddb
thawedPath = $SPLUNK_DB/$_index_name/thaweddb

# デフォルトの保持期間
frozenTimePeriodInSecs = 7776000  # 90日
maxTotalDataSizeMB = 500000

[web_logs]
homePath = $SPLUNK_DB/web_logs/db
coldPath = /cold_storage/web_logs/colddb
thawedPath = $SPLUNK_DB/web_logs/thaweddb

# データサイズ制限
maxTotalDataSizeMB = 200000
maxDataSize = auto_high_volume

# バケット設定
maxHotBuckets = 10
maxHotSpanSecs = 86400        # 24時間
maxHotIdleSecs = 86400        # 24時間アイドルでウォームへ
maxWarmDBCount = 300

# 保持期間
frozenTimePeriodInSecs = 2592000  # 30日

# Cold→Frozenの動作
coldToFrozenDir = /archive/splunk/web_logs
# または coldToFrozenScript = $SPLUNK_HOME/bin/archiver.sh

# Bloomフィルター
enableOnlineBucketRepair = true

# TSIDXの圧縮
enableTsidxReduction = true
tsidxReductionCheckPeriodInSec = 600

[security_logs]
homePath = $SPLUNK_DB/security_logs/db
coldPath = /cold_storage/security_logs/colddb
thawedPath = $SPLUNK_DB/security_logs/thaweddb
maxTotalDataSizeMB = 500000
frozenTimePeriodInSecs = 31536000  # 365日
maxDataSize = auto_high_volume
maxHotBuckets = 10

# コンプライアンス要件: 削除防止
coldToFrozenDir = /archive/splunk/security_logs

[metrics_index]
homePath = $SPLUNK_DB/metrics_index/db
coldPath = $SPLUNK_DB/metrics_index/colddb
thawedPath = $SPLUNK_DB/metrics_index/thaweddb
datatype = metric
maxTotalDataSizeMB = 100000
frozenTimePeriodInSecs = 7776000  # 90日

[summary_index]
homePath = $SPLUNK_DB/summary_index/db
coldPath = $SPLUNK_DB/summary_index/colddb
thawedPath = $SPLUNK_DB/summary_index/thaweddb
maxTotalDataSizeMB = 50000
frozenTimePeriodInSecs = 31536000  # 365日

8.2 バケットライフサイクル

┌─────────────────────────────────────────────────────────────────┐
│                    バケットライフサイクル                          │
│                                                                 │
│  Hot       →    Warm      →    Cold      →    Frozen            │
│  (読み書き)     (読み取り)     (読み取り)     (アーカイブ/削除)   │
│                                                                 │
│  ・新規データ    ・Hotから     ・Warmから     ・Coldから           │
│    書き込み      ロール       ロール         ロール               │
│  ・最も高速な    ・高速な      ・低速ストレ   ・テープ/S3等       │
│    ストレージ    ストレージ    ージ可能       ・削除も可能         │
│  ・SSD推奨      ・SSD/HDD    ・HDD          ・thawで復元         │
│                                                                 │
│  条件:           条件:         条件:          条件:               │
│  ・maxHotBuckets ・自動        ・時間ベース   ・frozenTimePeriod  │
│  ・maxHotSpanSecs              ・サイズベース  ・maxTotalDataSize │
│  ・maxHotIdleSecs                                                │
└─────────────────────────────────────────────────────────────────┘

8.3 SmartStore

SmartStoreは、ウォーム/コールドバケットをリモートストレージ(S3等)に保存する機能。

# indexes.conf - SmartStore設定
[volume:remote_store]
storageType = remote
path = s3://my-splunk-bucket/smartstore
remote.s3.access_key = AKIAIOSFODNN7EXAMPLE
remote.s3.secret_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
remote.s3.endpoint = https://s3.amazonaws.com
remote.s3.auth_region = us-east-1

[web_logs]
remotePath = volume:remote_store/$_index_name
homePath = $SPLUNK_DB/web_logs/db
coldPath = $SPLUNK_DB/web_logs/colddb
thawedPath = $SPLUNK_DB/web_logs/thaweddb

# SmartStoreキャッシュ設定
[cachemanager]
max_cache_size = 500000  # MB
hotlist_recency_secs = 86400
hotlist_bloom_filter_recency_hours = 360

8.4 インデックス管理のCLIコマンド

# インデックスの一覧表示
splunk list index -auth admin:password

# インデックスの作成
splunk add index my_new_index \
  -maxTotalDataSizeMB 100000 \
  -frozenTimePeriodInSecs 7776000 \
  -auth admin:password

# バケット情報の確認
splunk list clustering-peers -auth admin:password

# インデックスの統計情報
splunk show index-stats web_logs -auth admin:password

# バケットの修復
splunk rebuild web_logs -auth admin:password

# Frozenバケットの復元(thaw)
splunk thaw web_logs -db /archive/splunk/web_logs/db_1704067200_1703462400_0

9. セキュリティ

9.1 認証設定

# authentication.conf - LDAP認証
[authentication]
authType = LDAP
authSettings = my_ldap

[my_ldap]
SSLEnabled = 1
host = ldap.example.com
port = 636
bindDN = cn=splunk_svc,ou=service_accounts,dc=example,dc=com
bindDNpassword = encrypted_password
userBaseDN = ou=users,dc=example,dc=com
userBaseFilter = (objectClass=person)
userNameAttribute = sAMAccountName
realNameAttribute = cn
emailAttribute = mail
groupBaseDN = ou=groups,dc=example,dc=com
groupBaseFilter = (objectClass=group)
groupMemberAttribute = member
groupNameAttribute = cn
nestedGroups = true
charset = utf-8
anonymous_referrals = false
timelimit = 15
network_timeout = 20

# SAML認証設定
[authentication]
authType = SAML
authSettings = my_saml

[my_saml]
idpSSOUrl = https://idp.example.com/saml/sso
idpCertPath = $SPLUNK_HOME/etc/auth/saml/idp_cert.pem
entityId = https://splunk.example.com
signAuthnRequest = true
signedAssertion = true
attributeQuerySoapUrl = https://idp.example.com/saml/aq
attributeQueryRequestSigned = true
redirectPort = 8000
idpSLOUrl = https://idp.example.com/saml/slo
# web.conf - SSO設定
[settings]
SSOMode = strict
trustedIP = 10.0.0.50
remoteUser = X-Remote-User

9.2 認可設定(ロールとケイパビリティ)

# authorize.conf - ロール定義
[role_sre_admin]
importRoles = user
srchFilter = *
srchIndexesAllowed = *
srchIndexesDefault = web_logs;app_logs;infra_metrics
srchMaxTime = 86400
srchDiskQuota = 10000
srchJobsQuota = 50
rtSrchJobsQuota = 10
cumulativeSrchJobsQuota = 100
cumulativeRTSrchJobsQuota = 20

# ケイパビリティの割り当て
schedule_search = enabled
edit_monitor = enabled
edit_tcp = enabled
list_inputs = enabled
rest_properties_get = enabled
rest_properties_set = enabled
admin_all_objects = disabled
change_own_password = enabled
edit_search_schedule_priority = enabled

[role_sre_viewer]
importRoles = user
srchFilter = index=web_logs OR index=app_logs
srchIndexesAllowed = web_logs;app_logs
srchIndexesDefault = web_logs
srchMaxTime = 3600
srchDiskQuota = 1000
srchJobsQuota = 10
rtSrchJobsQuota = 3

schedule_search = enabled
edit_monitor = disabled
list_inputs = enabled

[role_security_analyst]
importRoles = user
srchFilter = *
srchIndexesAllowed = security_logs;auth_logs;firewall_logs;web_logs
srchIndexesDefault = security_logs
srchMaxTime = 86400
srchDiskQuota = 5000
srchJobsQuota = 30

schedule_search = enabled
list_inputs = enabled
edit_saved_search = enabled

[role_developer]
importRoles = user
srchFilter = (index=app_logs OR index=web_logs) environment!="production"
srchIndexesAllowed = app_logs;web_logs
srchIndexesDefault = app_logs
srchMaxTime = 3600
srchDiskQuota = 500
srchJobsQuota = 5

9.3 TLS/SSL設定

# server.conf - SSL設定
[sslConfig]
sslVersions = tls1.2
sslVersionsForClient = tls1.2
cipherSuite = ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256
ecdhCurves = prime256v1, secp384r1, secp521r1
sslRootCAPath = $SPLUNK_HOME/etc/auth/ca/cacert.pem

[general]
serverCert = $SPLUNK_HOME/etc/auth/server.pem
sslPassword = encrypted_password
requireClientCert = false

# web.conf - Web UIのSSL設定
[settings]
enableSplunkWebSSL = true
privKeyPath = etc/auth/splunkweb/privkey.pem
serverCert = etc/auth/splunkweb/cert.pem
caCertPath = etc/auth/splunkweb/cacert.pem
sslPassword = encrypted_password

9.4 監査ログ

# 監査ログの検索
index=_audit sourcetype=audittrail
| stats count by action, user, info
| sort - count

# ユーザーアクティビティの追跡
index=_audit sourcetype=audittrail action=search
| stats count, avg(total_run_time) as avg_runtime, sum(scan_count) as total_scans by user
| sort - total_scans

# 権限変更の追跡
index=_audit sourcetype=audittrail action=edit_roles
| table _time, user, action, info, object

# ログインの監視
index=_audit sourcetype=audittrail action=login*
| stats count by user, action, info
| sort - count

# 設定変更の追跡
index=_internal sourcetype=splunkd_conf
| stats count by user, action, path

10. デプロイメントとスケーリング

10.1 分散デプロイメントアーキテクチャ

┌─────────────────────────────────────────────────────────────────────┐
│              本番環境分散デプロイメントアーキテクチャ                  │
│                                                                     │
│  Site 1 (Primary DC)              Site 2 (DR DC)                    │
│  ┌─────────────────────┐         ┌─────────────────────┐           │
│  │                     │         │                     │           │
│  │  ┌───┐ ┌───┐ ┌───┐ │         │  ┌───┐ ┌───┐ ┌───┐ │           │
│  │  │SH1│ │SH2│ │SH3│ │         │  │SH4│ │SH5│ │SH6│ │           │
│  │  └─┬─┘ └─┬─┘ └─┬─┘ │         │  └─┬─┘ └─┬─┘ └─┬─┘ │           │
│  │    └──┬──┘──┬──┘    │         │    └──┬──┘──┬──┘    │           │
│  │       │  SHC Captain│         │       │  SHC Member │           │
│  │       v             │         │       v             │           │
│  │  ┌───┐ ┌───┐ ┌───┐ │         │  ┌───┐ ┌───┐       │           │
│  │  │IX1│ │IX2│ │IX3│ │ ←Rep→   │  │IX4│ │IX5│       │           │
│  │  └───┘ └───┘ └───┘ │         │  └───┘ └───┘       │           │
│  │   Index Cluster     │         │   Index Cluster     │           │
│  │   (RF=3, SF=2)      │         │   (Multi-site)      │           │
│  │                     │         │                     │           │
│  │  ┌────────────────┐ │         │                     │           │
│  │  │ Cluster Manager│ │         │                     │           │
│  │  └────────────────┘ │         │                     │           │
│  │  ┌────────────────┐ │         │                     │           │
│  │  │ License Master │ │         │                     │           │
│  │  └────────────────┘ │         │                     │           │
│  │  ┌────────────────┐ │         │                     │           │
│  │  │Deployment Server│ │         │                     │           │
│  │  └────────────────┘ │         │                     │           │
│  │  ┌────────────────┐ │         │                     │           │
│  │  │   SHC Deployer │ │         │                     │           │
│  │  └────────────────┘ │         │                     │           │
│  └─────────────────────┘         └─────────────────────┘           │
└─────────────────────────────────────────────────────────────────────┘

10.2 インデックスクラスタリング

# server.conf - Cluster Manager
[clustering]
mode = manager
replication_factor = 3       # データの物理コピー数
search_factor = 2            # 検索可能なコピー数
pass4SymmKey = your_secret_key
cluster_label = production_cluster

# マルチサイト構成
multisite = true
available_sites = site1, site2
site_replication_factor = origin:2, total:3
site_search_factor = origin:1, total:2

# server.conf - Peer Node (Indexer)
[clustering]
mode = peer
manager_uri = https://cm.example.com:8089
pass4SymmKey = your_secret_key

[general]
site = site1

[replication_port://9100]
disabled = false

10.3 サーチヘッドクラスタリング

# server.conf - Search Head Cluster Member
[shclustering]
disabled = false
mgmt_uri = https://sh1.example.com:8089
replication_factor = 2
replication_port = 9200
conf_deploy_fetch_url = https://deployer.example.com:8089
pass4SymmKey = shc_secret_key
shcluster_label = production_shc

# captain選出設定
election = true
captain_is_adhoc_searchhead = false
preferred_captain = true  # このノードをcaptain優先に
# SHCの初期化コマンド
# 各メンバーで実行
splunk init shcluster-config -auth admin:password \
  -mgmt_uri https://sh1.example.com:8089 \
  -replication_port 9200 \
  -replication_factor 2 \
  -conf_deploy_fetch_url https://deployer.example.com:8089 \
  -secret shc_secret_key \
  -shcluster_label production_shc

# captain選出(最初の1回のみ)
splunk bootstrap shcluster-captain -servers_list \
  "https://sh1.example.com:8089,https://sh2.example.com:8089,https://sh3.example.com:8089" \
  -auth admin:password

# SHCステータス確認
splunk show shcluster-status -auth admin:password

# アプリのデプロイ(Deployerで実行)
splunk apply shcluster-bundle -target https://sh1.example.com:8089 \
  -auth admin:password --answer-yes

10.4 Deployment Server によるアプリ配布

# serverclass.conf - 詳細なサーバークラス設定
[global]
repositoryLocation = $SPLUNK_HOME/etc/deployment-apps
# Phoneホーム間隔
phoneHomeIntervalInSecs = 60

[serverClass:production_linux]
whitelist.0 = prod-linux-*
blacklist.0 = prod-linux-dev-*
restartSplunkd = true
stateOnClient = enabled

[serverClass:production_linux:app:base_linux_config]
restartSplunkd = true

[serverClass:production_linux:app:TA-syslog]
restartSplunkd = true

[serverClass:production_linux:app:TA-custom-app]
restartSplunkd = true

[serverClass:production_windows]
whitelist.0 = prod-win-*
restartSplunkd = true

[serverClass:production_windows:app:TA-windows]
restartSplunkd = true

[serverClass:heavy_forwarders]
machineTypesFilter = linux-x86_64
whitelist.0 = hf-*
restartSplunkd = true

[serverClass:heavy_forwarders:app:TA-heavy-forwarder-config]
restartSplunkd = true

11. パフォーマンスチューニング

11.1 検索の最適化

# 悪い例:フィルタリングが遅い
index=web_logs | search status=500 | stats count by host

# 良い例:最初からフィルタリング
index=web_logs status=500 | stats count by host

# 悪い例:不要なフィールドを処理
index=web_logs status=500 | stats count by host, uri, method, clientip, response_time

# 良い例:必要なフィールドのみに限定
index=web_logs status=500 | fields host | stats count by host

# 悪い例:joinの使用(大量データで遅い)
index=web_logs | join clientip [search index=user_db | fields clientip, username]

# 良い例:lookupの使用
index=web_logs | lookup user_lookup clientip OUTPUT username

# 悪い例:ワイルドカードの多用
index=* sourcetype=* error

# 良い例:具体的な指定
index=web_logs sourcetype=access_combined status>=500

# 悪い例:subsearchの乱用
index=web_logs [search index=app_logs log_level=ERROR | stats values(host) as host | mvexpand host | fields host]

# 良い例:tstatsの活用
| tstats count where index=web_logs status>=500 by host span=1h

11.2 サマリインデキシング

# savedsearches.conf - サマリインデキシング
[Hourly Web Stats Summary]
search = index=web_logs \
| stats count as total_requests, \
        count(eval(status>=500)) as server_errors, \
        count(eval(status>=400 AND status<500)) as client_errors, \
        avg(response_time) as avg_response_time, \
        perc95(response_time) as p95_response_time, \
        dc(clientip) as unique_clients, \
        sum(bytes) as total_bytes \
  by host, uri, method \
| eval error_rate = round((server_errors / total_requests) * 100, 4)
dispatch.earliest_time = -65m@m
dispatch.latest_time = -5m@m
cron_schedule = 5 * * * *
enableSched = 1
action.summary_index = 1
action.summary_index._name = summary_index
action.summary_index.force_realtime_schedule = 1
# サマリインデックスからの高速検索
index=summary_index source="Hourly Web Stats Summary" 
| timechart span=1h sum(total_requests) as requests, avg(error_rate) as error_rate by host

11.3 アクセラレーテッドデータモデル

# datamodels.conf
[Web_Traffic]
acceleration = true
acceleration.earliest_time = -1y
acceleration.backfill_time = -30d
acceleration.max_time = 3600
acceleration.cron_schedule = */5 * * * *
acceleration.manual_rebuilds = true
acceleration.max_concurrent = 3
# アクセラレーション済みデータモデルからのtstats検索
| tstats summariesonly=true count, avg(Web_Traffic.response_time) as avg_rt 
  from datamodel=Web_Traffic 
  where Web_Traffic.status>=500 
  by Web_Traffic.host, _time 
  span=1h

11.4 レポートアクセラレーション

# savedsearches.conf
[Accelerated Error Report]
search = index=web_logs status>=400 | stats count by host, status, uri
auto_summarize = 1
auto_summarize.dispatch.earliest_time = -30d@d
auto_summarize.dispatch.latest_time = now
auto_summarize.cron_schedule = */10 * * * *
auto_summarize.timespan = 3600

11.5 tstats vs stats の比較

┌─────────────────────────────────────────────────────────────┐
│               tstats vs stats パフォーマンス比較              │
│                                                             │
│  tstats:                                                    │
│  ・tsidx(Time-Series Index)ファイルを直接読む              │
│  ・rawデータを読まないため非常に高速                          │
│  ・インデックス化されたフィールドのみ使用可能                  │
│  ・データモデルアクセラレーションと組み合わせ可能              │
│  ・使用可能フィールド: _time, host, source, sourcetype,      │
│    index, および明示的にインデックス化したフィールド          │
│                                                             │
│  stats:                                                     │
│  ・rawデータを読んでフィールドを抽出                         │
│  ・全てのフィールドが使用可能                                │
│  ・tstatsより遅いが柔軟                                     │
│                                                             │
│  パフォーマンス:                                             │
│  tstats   ████████████████████  ~100x高速                   │
│  stats    █                     基準                        │
│                                                             │
│  推奨: 可能な限りtstatsを使用し、必要な場合のみstatsを使用   │
└─────────────────────────────────────────────────────────────┘

11.6 Search Job Inspector

# 検索パフォーマンスの分析
# Splunk WebのSearch Job Inspectorで確認可能な項目:

# 1. 実行時間の内訳
#    - command.search: インデックス検索時間
#    - command.stats: 統計処理時間
#    - command.eval: 計算処理時間

# 2. スキャンカウント
#    - scanCount: スキャンされたイベント数
#    - resultCount: 結果イベント数
#    - 比率が低い = フィルタリングの改善余地あり

# 検索ジョブの情報をREST APIで取得
curl -k -u admin:password \
  https://localhost:8089/services/search/jobs/{search_id} \
  -d output_mode=json

# 実行中の検索ジョブ一覧
| rest /services/search/jobs 
| where isRealTimeSearch=0 AND dispatchState="RUNNING"
| table sid, label, runDuration, scanCount, eventCount, author
| sort - runDuration

11.7 Bloomフィルター

# indexes.conf - Bloomフィルター設定
[web_logs]
# Bloomフィルターはデフォルトで有効
# 特定のインデックスで無効化する場合
# bloomHomePath = 

# server.conf
[general]
# Bloomフィルターの設定
bloomfilter_refresh_interval_seconds = 900
# Bloomフィルターの効果を確認
index=_internal sourcetype=splunkd component=DatabaseDirectoryManager 
| stats count(eval(like(message, "%bloom%"))) as bloom_events by host

12. Splunk自体の監視

12.1 _internalインデックス

# インデクサーのパフォーマンス監視
index=_internal sourcetype=splunkd component=Metrics group=pipeline
| timechart span=5m avg(cpu_seconds) by processor

# インデックスのスループット
index=_internal sourcetype=splunkd component=Metrics group=per_index_thruput
| timechart span=5m sum(kb) as throughput_kb by series

# キューの健全性
index=_internal sourcetype=splunkd component=Metrics group=queue
| eval fill_pct = round((current_size / max_size) * 100, 2)
| timechart span=5m avg(fill_pct) by name

# フォワーダーの接続状態
index=_internal sourcetype=splunkd component=Metrics group=tcpin_connections
| stats latest(kb) as kb, latest(ev) as events by hostname, sourceIp
| sort - kb

# 検索のパフォーマンス
index=_audit sourcetype=audittrail action=search info=completed
| stats avg(total_run_time) as avg_runtime, 
        max(total_run_time) as max_runtime,
        avg(scan_count) as avg_scans,
        count as search_count 
  by user, savedsearch_name
| sort - avg_runtime

# ディスク使用量の監視
index=_introspection sourcetype=splunk_disk_objects component=Indexes
| stats latest(sizeOnDiskMB) as disk_mb by title
| sort - disk_mb
| eval disk_gb = round(disk_mb / 1024, 2)
| fields title, disk_gb

12.2 Monitoring Console (DMC)

# ライセンス使用量の監視
index=_internal sourcetype=splunkd component=LicenseUsage type=Usage
| timechart span=1d sum(b) as bytes_indexed
| eval gb_indexed = round(bytes_indexed / 1073741824, 2)

# ライセンス使用量の予測
index=_internal sourcetype=splunkd component=LicenseUsage type=Usage
| timechart span=1d sum(b) as bytes
| predict bytes as predicted_bytes future_timespan=30
| eval predicted_gb = round(predicted_bytes / 1073741824, 2)

# インデクサークラスタの健全性
| rest /services/cluster/manager/peers
| table title, site, status, is_searchable, replication_count, search_count, bucket_count
| sort title

# サーチヘッドクラスタの健全性  
| rest /services/shcluster/status
| table label, captain.mgmt_uri, captain.elected_captain, peers{}.label, peers{}.status

# スケジュール検索の遅延
index=_internal sourcetype=scheduler 
| stats avg(run_time) as avg_runtime, avg(lag_time) as avg_lag by savedsearch_name, app
| where avg_lag > 60
| sort - avg_lag

# KVStoreの監視
| rest /services/kvstore/status
| table title, current, status, replicationStatus, backupRestoreStatus

12.3 SREダッシュボード用の検索クエリ

# Splunkプラットフォーム健全性サマリ
index=_internal sourcetype=splunkd log_level=ERROR earliest=-1h
| stats count by component, host
| sort - count
| head 20

# インジェストパイプラインの遅延
index=_internal sourcetype=splunkd component=Metrics group=queue name=parsingQueue OR name=indexQueue
| eval fill_pct = round((current_size / max_size) * 100, 2)
| where fill_pct > 70
| table _time, host, name, fill_pct

# 検索の同時実行数
index=_internal sourcetype=splunkd component=SearchScheduler
| timechart span=5m dc(search_id) as concurrent_searches by host

13. インテグレーション

13.1 REST API

# 認証トークンの取得
curl -k https://localhost:8089/services/auth/login \
  -d username=admin \
  -d password=changeme

# 検索の実行
curl -k -u admin:password \
  https://localhost:8089/services/search/jobs \
  -d search="search index=web_logs status>=500 | stats count by host" \
  -d earliest_time="-1h" \
  -d latest_time="now" \
  -d output_mode=json

# 検索結果の取得
curl -k -u admin:password \
  "https://localhost:8089/services/search/jobs/{search_id}/results?output_mode=json&count=100"

# エクスポート(ストリーミング)
curl -k -u admin:password \
  "https://localhost:8089/services/search/jobs/export" \
  -d search="search index=web_logs | head 1000" \
  -d earliest_time="-1h" \
  -d latest_time="now" \
  -d output_mode=json

# インデックスの管理
# インデックス一覧
curl -k -u admin:password \
  "https://localhost:8089/services/data/indexes?output_mode=json&count=0"

# インデックスの作成
curl -k -u admin:password \
  https://localhost:8089/services/data/indexes \
  -d name=my_new_index \
  -d maxTotalDataSizeMB=100000 \
  -d frozenTimePeriodInSecs=7776000

# HECトークンの管理
# トークン一覧
curl -k -u admin:password \
  "https://localhost:8089/services/data/inputs/http?output_mode=json"

# トークンの作成
curl -k -u admin:password \
  https://localhost:8089/services/data/inputs/http \
  -d name=my_hec_token \
  -d index=app_events \
  -d sourcetype=_json \
  -d useACK=false

# ユーザー管理
curl -k -u admin:password \
  "https://localhost:8089/services/authentication/users?output_mode=json"

# ユーザーの作成
curl -k -u admin:password \
  https://localhost:8089/services/authentication/users \
  -d name=newuser \
  -d password=securepassword \
  -d roles=sre_viewer \
  -d email=newuser@example.com

# 保存済み検索の管理
curl -k -u admin:password \
  "https://localhost:8089/services/saved/searches?output_mode=json&count=0"

# ナレッジオブジェクトのエクスポート
curl -k -u admin:password \
  "https://localhost:8089/services/data/transforms/lookups?output_mode=json"

# クラスタステータス
curl -k -u admin:password \
  "https://localhost:8089/services/cluster/manager/status?output_mode=json"

# サーバー情報
curl -k -u admin:password \
  "https://localhost:8089/services/server/info?output_mode=json"

# アプリの管理
curl -k -u admin:password \
  "https://localhost:8089/services/apps/local?output_mode=json"

13.2 Python SDK

import splunklib.client as client
import splunklib.results as results
import time
import json

class SplunkClient:
    def __init__(self, host, port, username, password, scheme='https'):
        self.service = client.connect(
            host=host,
            port=port,
            username=username,
            password=password,
            scheme=scheme,
            autologin=True
        )
    
    def search_sync(self, query, earliest='-1h', latest='now', max_count=10000):
        """同期検索(結果を待ってから返す)"""
        kwargs = {
            'earliest_time': earliest,
            'latest_time': latest,
            'search_mode': 'normal',
            'max_count': max_count,
            'exec_mode': 'blocking'  # 完了まで待機
        }
        
        job = self.service.jobs.create(query, **kwargs)
        
        result_list = []
        reader = results.JSONResultsReader(job.results(output_mode='json', count=0))
        for result in reader:
            if isinstance(result, results.Message):
                continue
            result_list.append(dict(result))
        
        return result_list
    
    def search_async(self, query, earliest='-1h', latest='now', callback=None):
        """非同期検索"""
        kwargs = {
            'earliest_time': earliest,
            'latest_time': latest,
            'search_mode': 'normal',
            'exec_mode': 'normal'
        }
        
        job = self.service.jobs.create(query, **kwargs)
        
        while not job.is_done():
            time.sleep(2)
            job.refresh()
            progress = float(job['doneProgress']) * 100
            scan_count = int(job['scanCount'])
            print(f"Progress: {progress:.1f}% - Scanned: {scan_count}")
        
        result_list = []
        offset = 0
        count = 1000
        
        while True:
            reader = results.JSONResultsReader(
                job.results(output_mode='json', count=count, offset=offset)
            )
            batch = [dict(r) for r in reader if isinstance(r, dict)]
            if not batch:
                break
            result_list.extend(batch)
            offset += count
        
        job.cancel()
        
        if callback:
            callback(result_list)
        
        return result_list
    
    def get_index_info(self):
        """インデックス情報の取得"""
        indexes = self.service.indexes
        info = []
        for index in indexes:
            info.append({
                'name': index.name,
                'totalEventCount': index['totalEventCount'],
                'currentDBSizeMB': index['currentDBSizeMB'],
                'maxTotalDataSizeMB': index['maxTotalDataSizeMB'],
                'frozenTimePeriodInSecs': index['frozenTimePeriodInSecs']
            })
        return info
    
    def create_hec_token(self, name, index, sourcetype='_json'):
        """HECトークンの作成"""
        return self.service.inputs.create(
            name, 'http',
            index=index,
            sourcetype=sourcetype
        )
    
    def manage_saved_search(self, name, search_query, cron='0 * * * *', 
                            actions=None):
        """保存済み検索の作成・更新"""
        kwargs = {
            'search': search_query,
            'cron_schedule': cron,
            'is_scheduled': True,
            'dispatch.earliest_time': '-1h',
            'dispatch.latest_time': 'now'
        }
        
        if actions:
            kwargs.update(actions)
        
        try:
            saved_search = self.service.saved_searches[name]
            saved_search.update(**kwargs)
        except KeyError:
            saved_search = self.service.saved_searches.create(name, **kwargs)
        
        return saved_search

# 使用例
splunk = SplunkClient('localhost', 8089, 'admin', 'password')

# エラーの統計を取得
error_stats = splunk.search_sync(
    'search index=web_logs status>=500 | stats count by host, status',
    earliest='-4h'
)
print(json.dumps(error_stats, indent=2))

# インデックス情報の取得
for idx in splunk.get_index_info():
    print(f"{idx['name']}: {idx['currentDBSizeMB']}MB / {idx['maxTotalDataSizeMB']}MB")

13.3 Splunk SOAR連携

# Splunk SOAR (Phantom) プレイブック例
import phantom.rules as phantom
import json

def playbook_main(container=None, **kwargs):
    """セキュリティインシデント自動対応プレイブック"""
    
    # 1. アーティファクトからIPアドレスを抽出
    success, message, artifacts = phantom.get_artifacts(
        container=container,
        artifact_type='ip'
    )
    
    for artifact in artifacts:
        src_ip = artifact['cef']['sourceAddress']
        
        # 2. VirusTotalでIPレピュテーション確認
        phantom.act(
            'ip reputation',
            parameters=[{'ip': src_ip}],
            assets=['virustotal'],
            callback=handle_reputation_result,
            name='vt_check'
        )
        
        # 3. Splunkで関連イベントを検索
        phantom.act(
            'run query',
            parameters=[{
                'query': f'search index=security_logs src_ip="{src_ip}" | stats count by action',
                'display': 'action,count'
            }],
            assets=['splunk'],
            callback=handle_splunk_results,
            name='splunk_search'
        )

def handle_reputation_result(action_results=None, **kwargs):
    """レピュテーション結果に基づく対応"""
    for result in action_results:
        data = result.get_data()[0]
        if data.get('positives', 0) > 5:
            # 悪意のあるIPの場合、ファイアウォールでブロック
            phantom.act(
                'block ip',
                parameters=[{'ip': result.get_param('ip')}],
                assets=['firewall'],
                name='block_malicious_ip'
            )

13.4 Splunk Observability / APM

# OpenTelemetry Collector設定 - Splunk Observability Cloud連携
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318
  
  prometheus:
    config:
      scrape_configs:
        - job_name: 'kubernetes-pods'
          kubernetes_sd_configs:
            - role: pod

exporters:
  splunk_hec:
    token: "your-hec-token"
    endpoint: "https://ingest.us0.signalfx.com/v1/log"
    source: "otel"
    sourcetype: "otel:log"
    index: "otel_logs"
    
  signalfx:
    access_token: "your-signalfx-token"
    realm: "us0"
    
  sapm:
    access_token: "your-signalfx-token"
    endpoint: "https://ingest.us0.signalfx.com/v2/trace"

processors:
  batch:
    timeout: 5s
    send_batch_size: 1000
  
  resourcedetection:
    detectors: [system, env, docker]
    timeout: 5s

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch, resourcedetection]
      exporters: [sapm]
    metrics:
      receivers: [otlp, prometheus]
      processors: [batch, resourcedetection]
      exporters: [signalfx]
    logs:
      receivers: [otlp]
      processors: [batch]
      exporters: [splunk_hec]

14. SREのためのベストプラクティス

14.1 ログ管理戦略

┌─────────────────────────────────────────────────────────────────┐
│                    ログ管理のベストプラクティス                    │
│                                                                 │
│  1. インデックス設計                                             │
│     ・用途別にインデックスを分離(web, app, security, infra)    │
│     ・保持期間を用途に応じて設定                                 │
│     ・ライセンス使用量を考慮したサイジング                       │
│                                                                 │
│  2. ソースタイプの標準化                                        │
│     ・命名規則: <vendor>:<product>:<type>                       │
│     ・例: apache:access:combined, custom:app:json               │
│     ・タイムスタンプフォーマットの統一                           │
│                                                                 │
│  3. フィールド抽出の最適化                                      │
│     ・search-timeよりindex-timeの抽出を優先(高頻度フィールド)  │
│     ・CIM(Common Information Model)への準拠                   │
│     ・フィールドエイリアスでの標準化                             │
│                                                                 │
│  4. データフィルタリング                                        │
│     ・デバッグログの選択的取り込み                               │
│     ・Heavy ForwarderでのPIIマスキング                          │
│     ・不要データのnullQueue送り                                  │
└─────────────────────────────────────────────────────────────────┘

14.2 アラート設計のベストプラクティス

# 良いアラート設計の例

# 1. エラーバジェットベースのアラート
index=web_logs 
| bin _time span=1h
| stats count(eval(status<500)) as good, count as total by _time
| eval availability = (good / total) * 100
| streamstats window=24 avg(availability) as rolling_24h_availability
| where rolling_24h_availability < 99.9
| eval message = "24h rolling availability dropped below SLO: " . tostring(round(rolling_24h_availability, 4)) . "%"

# 2. 多段階アラート(Warning → Critical)
index=web_logs 
| timechart span=5m count(eval(status>=500)) as errors, count as total
| eval error_rate = (errors / total) * 100
| eval severity = case(
    error_rate > 10, "CRITICAL",
    error_rate > 5, "WARNING",
    error_rate > 2, "INFO",
    true(), "OK"
  )
| where severity IN ("WARNING", "CRITICAL")

# 3. アノマリーベースのアラート(予測との差異)
index=web_logs 
| timechart span=1h count as requests
| predict requests as predicted future_timespan=0
| eval deviation = abs(requests - predicted)
| eval threshold = predicted * 0.5
| where deviation > threshold
| eval message = "Traffic anomaly detected: actual=" . tostring(requests) . " predicted=" . tostring(round(predicted, 0))

# 4. 相関アラート(複数条件の組み合わせ)
index=web_logs status>=500
| stats count as error_count by host
| join host 
  [search index=infra_metrics metric_name=cpu.usage | stats avg(value) as avg_cpu by host | where avg_cpu > 80]
| where error_count > 100
| eval alert_msg = host . ": " . tostring(error_count) . " errors with CPU at " . tostring(round(avg_cpu, 1)) . "%"

14.3 ダッシュボード設計のベストプラクティス

レベル1: エグゼクティブダッシュボード
 └─ SLO/SLA達成率、全体のヘルスステータス

レベル2: サービスダッシュボード
 └─ 各サービスのKPI(リクエスト数、エラー率、レイテンシー)

レベル3: 詳細分析ダッシュボード
 └─ エンドポイント別、ホスト別の詳細メトリクス

レベル4: デバッグダッシュボード
 └─ ログイベントの詳細、トレース情報

推奨事項:
・ドリルダウンで各レベル間を遷移可能にする
・ベースサーチを使って重複クエリを削減する
・トークンフィルターで対話的な分析を可能にする
・自動更新間隔を適切に設定する(リアルタイム vs 定期)

14.4 キャパシティプランニング

# 日次インジェスト量のトレンド
index=_internal sourcetype=splunkd component=LicenseUsage type=Usage
| timechart span=1d sum(b) as daily_bytes
| eval daily_gb = round(daily_bytes / 1073741824, 2)
| predict daily_gb as predicted_gb future_timespan=90
| eval upper_bound = predicted_gb * 1.2

# インデクサーのディスク使用量トレンド
index=_introspection sourcetype=splunk_disk_objects component=Partitions
| timechart span=1d avg(available) as available_gb by mount_point
| predict available_gb as predicted_available future_timespan=30

# 検索のキャパシティ分析
index=_internal sourcetype=scheduler
| stats count as total_searches, 
        avg(run_time) as avg_runtime, 
        sum(run_time) as total_runtime,
        avg(lag_time) as avg_lag
  by host
| eval utilization_pct = round((total_runtime / 3600) * 100, 2)

# ライセンス使用量の予測とアラート
index=_internal sourcetype=splunkd component=LicenseUsage type=Usage
| timechart span=1d sum(b) as daily_bytes
| eval daily_gb = round(daily_bytes / 1073741824, 2)
| eventstats avg(daily_gb) as avg_daily
| eval license_limit_gb = 500
| eval usage_pct = round((avg_daily / license_limit_gb) * 100, 2)
| eval days_to_exceed = if(avg_daily > 0, round((license_limit_gb - avg_daily) / (avg_daily * 0.05), 0), 999)

14.5 インシデント対応ワークフロー

# Step 1: 影響範囲の特定
index=web_logs status>=500 earliest=-1h
| stats count as errors, dc(clientip) as affected_users, dc(uri) as affected_endpoints by host
| sort - errors

# Step 2: タイムラインの確認
index=web_logs status>=500 earliest=-4h
| timechart span=1m count by host

# Step 3: エラーの分類
index=web_logs status>=500 earliest=-1h
| rex field=_raw "(?P<error_type>\w+Exception|\w+Error)"
| stats count by error_type, host
| sort - count

# Step 4: デプロイメントとの相関
(index=web_logs status>=500) OR (index=deployment_logs)
| timechart span=5m 
    count(eval(index="web_logs" AND status>=500)) as errors,
    count(eval(index="deployment_logs")) as deployments

# Step 5: 根本原因の調査
index=app_logs host=affected-host-01 log_level=ERROR earliest=-1h
| transaction request_id
| stats count by error_type
| sort - count

まとめ

Splunkは強力なマシンデータ分析プラットフォームであり、SREエンジニアにとって不可欠なツールである。本ガイドで紹介した以下のポイントを押さえることで、Splunkの効果的な活用が可能になる。

  1. アーキテクチャの理解: フォワーダー、インデクサー、サーチヘッドの役割を理解し、適切にスケーリングする
  2. SPLの習熟: 基本コマンドから高度な分析パターンまで、SPLの活用がSplunkの真価を発揮する鍵
  3. インデックス設計: 用途に応じたインデックスの分離、保持期間の設定、SmartStoreの活用
  4. アラートの最適化: エラーバジェットベース、アノマリー検出、多段階アラートの実装
  5. パフォーマンス: tstatsの活用、サマリインデキシング、データモデルアクセラレーション
  6. セキュリティ: 適切な認証・認可設定、TLS暗号化、監査ログの活用
  7. 自動化: REST API、Python SDK、SOAR連携による運用自動化

Splunkを効果的に活用し、信頼性の高いシステム運用を実現してほしい。