Splunk
Splunk 包括的技術ガイド - SREエンジニアのための実践リファレンス
目次
- Splunkとは
- アーキテクチャ
- データ取り込み(Data Ingestion)
- SPL(Search Processing Language)包括ガイド
- ナレッジオブジェクト
- ダッシュボードとビジュアライゼーション
- アラート
- インデックス管理
- セキュリティ
- デプロイメントとスケーリング
- パフォーマンスチューニング
- Splunk自体の監視
- インテグレーション
- SREのためのベストプラクティス
1. Splunkとは
1.1 概要
Splunkは、マシンデータ(ログ、メトリクス、イベント等)を収集・インデックス化・分析するための統合プラットフォームである。元々はログ管理ツールとして誕生したが、現在ではSIEM(Security Information and Event Management)、ITオペレーション分析、アプリケーションパフォーマンス監視(APM)、ビジネスインテリジェンスまで幅広いユースケースをカバーする。
1.2 主要な機能領域
| 機能領域 | 説明 |
|---|---|
| ログ管理 | あらゆるソースからのログデータの収集、パース、保存、検索 |
| SIEM | セキュリティイベントの相関分析、脅威検出、コンプライアンスレポート |
| IT運用分析(ITOA) | インフラストラクチャの監視、障害検出、根本原因分析 |
| APM | アプリケーションのパフォーマンス監視とトレーシング |
| ビジネス分析 | ビジネスメトリクスの可視化とレポーティング |
| データプラットフォーム | 構造化・非構造化データの統合的な管理と分析 |
1.3 Splunkの製品ファミリー
- Splunk Enterprise: オンプレミスで運用するSplunkのコア製品
- Splunk Cloud Platform: Splunk社が管理するSaaS版
- Splunk Enterprise Security (ES): SIEM機能を提供するプレミアムアプリ
- Splunk ITSI: IT Service Intelligenceによる高度なIT運用監視
- Splunk SOAR: Security Orchestration, Automation and Responseプラットフォーム
- Splunk Observability Cloud: マイクロサービスベースの監視(旧SignalFx)
- Splunk APM: 分散トレーシングとアプリケーションパフォーマンス監視
1.4 マシンデータとは
Splunkが扱う「マシンデータ」とは以下を指す。
- アプリケーションログ: Apache、Nginx、カスタムアプリケーションのログ
- システムログ: syslog、Windows Event Log、journald
- セキュリティログ: ファイアウォール、IDS/IPS、認証ログ
- ネットワークデータ: NetFlow、パケットキャプチャ、DNS
- メトリクス: CPU使用率、メモリ、ディスクI/O等の時系列データ
- 設定ファイル: システム設定の変更履歴
- APIレスポンス: REST API、Webhook等のデータ
- IoTデータ: センサーデータ、テレメトリ
1.5 他のツールとの比較
| 機能 | Splunk | Elasticsearch/ELK | Datadog | Grafana/Loki |
|---|---|---|---|---|
| ログ検索 | SPL(強力) | Lucene/KQL | 独自クエリ | LogQL |
| SIEM | Enterprise Security | OpenSearch Security | Cloud SIEM | なし(別途必要) |
| スケーラビリティ | 非常に高い | 高い | 高い(SaaS) | 中程度 |
| コスト | 高い(ライセンス) | OSS(運用コスト) | 中程度(SaaS) | 低い(OSS) |
| 学習曲線 | 中程度 | 高い | 低い | 低い |
| リアルタイム処理 | 優秀 | 良好 | 優秀 | 良好 |
2. アーキテクチャ
2.1 コンポーネント概要
Splunkのアーキテクチャは、複数のコンポーネントが協調して動作する分散システムである。
┌─────────────────────────────────────────────────────────────────────┐
│ Splunk 分散アーキテクチャ │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ データ │ │ データ │ │ データ │ データソース │
│ │ ソース1 │ │ ソース2 │ │ ソース3 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ v v v │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Universal │ │Universal │ │ Heavy │ フォワーダー層 │
│ │Forwarder │ │Forwarder │ │Forwarder │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ │ │
│ v (TCP 9997) │
│ ┌──────────────┼──────────────┐ │
│ │ │ │ │
│ v v v │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Indexer 1 │ │ Indexer 2 │ │ Indexer 3 │ インデクサー層 │
│ │(Primary) │ │(Replica) │ │(Replica) │ (Index Cluster) │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ │ │
│ v │
│ ┌──────────────┼──────────────┐ │
│ │ │ │ │
│ v v v │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Search Head│ │Search Head│ │Search Head│ サーチヘッド層 │
│ │ 1 │ │ 2 │ │ 3 │ (SH Cluster) │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ │ │
│ v │
│ ┌──────────────┐ │
│ │ ロードバラ │ │
│ │ ンサー │ │
│ └──────┬───────┘ │
│ │ │
│ v │
│ ┌──────────────┐ │
│ │ ユーザー │ │
│ │ (Web UI/API) │ │
│ └──────────────┘ │
│ │
│ 管理コンポーネント: │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ Cluster │ │ Deployment │ │ License │ │
│ │ Manager │ │ Server │ │ Master │ │
│ │ (マスター) │ │ (設定配布) │ │ (ライセンス) │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
2.2 各コンポーネントの詳細
2.2.1 フォワーダー(Forwarder)
フォワーダーはデータソースからデータを収集し、インデクサーに転送するエージェントである。
Universal Forwarder (UF)
軽量なデータ転送エージェント。データのパースやインデックス化は行わない。
# /opt/splunkforwarder/etc/system/local/outputs.conf
[tcpout]
defaultGroup = my_indexers
[tcpout:my_indexers]
server = indexer1.example.com:9997, indexer2.example.com:9997, indexer3.example.com:9997
# 自動ロードバランシング
autoLBFrequency = 30
# ロードバランシング時のボリュームベースの切替
autoLBVolume = 1048576
# SSL/TLS設定
sslCertPath = $SPLUNK_HOME/etc/auth/server.pem
sslRootCAPath = $SPLUNK_HOME/etc/auth/cacert.pem
sslPassword = changeme
sslVerifyServerCert = true
useSSL = true
[tcpout:my_indexers:indexer1]
server = indexer1.example.com:9997
[tcpout:my_indexers:indexer2]
server = indexer2.example.com:9997
# /opt/splunkforwarder/etc/system/local/inputs.conf
[monitor:///var/log/syslog]
disabled = false
sourcetype = syslog
index = os_logs
[monitor:///var/log/apache2/access.log]
disabled = false
sourcetype = access_combined
index = web_logs
[monitor:///opt/app/logs/*.log]
disabled = false
sourcetype = custom_app
index = app_logs
# ファイルの先頭から読み込む
initCrcLength = 1024
# ホワイトリスト/ブラックリスト
whitelist = \.log$
blacklist = \.gz$|\.bak$
Heavy Forwarder (HF)
フル機能のSplunkインスタンスで、データのパース、フィルタリング、ルーティングが可能。
# Heavy Forwarderのtransforms.conf
# データのルーティング例
[route_syslog_to_security]
REGEX = (failed|error|denied|unauthorized)
DEST_KEY = _MetaData:Index
FORMAT = security_logs
[route_default]
REGEX = .
DEST_KEY = _MetaData:Index
FORMAT = os_logs
# データのマスキング例
[mask_credit_card]
REGEX = (\d{4})-(\d{4})-(\d{4})-(\d{4})
FORMAT = $1-XXXX-XXXX-$4
DEST_KEY = _raw
[mask_ssn]
REGEX = \d{3}-\d{2}-\d{4}
FORMAT = XXX-XX-XXXX
DEST_KEY = _raw
# Heavy Forwarderのprops.conf
[source::/var/log/secure]
TRANSFORMS-routing = route_syslog_to_security, route_default
[source::/var/log/app/payment.log]
TRANSFORMS-mask = mask_credit_card, mask_ssn
2.2.2 インデクサー(Indexer)
インデクサーはデータの受信、パース、インデックス化、保存を担当する。Splunkの心臓部である。
インデクサーの処理パイプライン:
受信データ → パーシング → ライン分割 → タイムスタンプ抽出 →
インデックス化 → バケット書き込み → ストレージ
# /opt/splunk/etc/system/local/indexes.conf
[default]
homePath = $SPLUNK_DB/$_index_name/db
coldPath = $SPLUNK_DB/$_index_name/colddb
thawedPath = $SPLUNK_DB/$_index_name/thaweddb
maxTotalDataSizeMB = 500000
maxDataSize = auto_high_volume
maxHotBuckets = 10
maxWarmDBCount = 300
frozenTimePeriodInSecs = 7776000 # 90日
[web_logs]
homePath = $SPLUNK_DB/web_logs/db
coldPath = $SPLUNK_DB/web_logs/colddb
thawedPath = $SPLUNK_DB/web_logs/thaweddb
maxTotalDataSizeMB = 100000
frozenTimePeriodInSecs = 2592000 # 30日
maxDataSize = auto_high_volume
[security_logs]
homePath = $SPLUNK_DB/security_logs/db
coldPath = $SPLUNK_DB/security_logs/colddb
thawedPath = $SPLUNK_DB/security_logs/thaweddb
maxTotalDataSizeMB = 200000
frozenTimePeriodInSecs = 31536000 # 365日
coldToFrozenDir = /archive/splunk/security_logs
[metrics_index]
homePath = $SPLUNK_DB/metrics_index/db
coldPath = $SPLUNK_DB/metrics_index/colddb
thawedPath = $SPLUNK_DB/metrics_index/thaweddb
datatype = metric
maxTotalDataSizeMB = 50000
frozenTimePeriodInSecs = 7776000 # 90日
2.2.3 サーチヘッド(Search Head)
ユーザーインターフェースを提供し、検索クエリの実行、ダッシュボードの表示、レポートの生成を行う。
# /opt/splunk/etc/system/local/distsearch.conf
[distributedSearch]
servers = https://indexer1.example.com:8089, https://indexer2.example.com:8089, https://indexer3.example.com:8089
[replicationSettings]
replicationFactor = 3
# サーチヘッドクラスタの設定
[shclustering]
disabled = false
mgmt_uri = https://sh1.example.com:8089
replication_factor = 2
replication_port = 9200
conf_deploy_fetch_url = https://deployer.example.com:8089
shcluster_label = shcluster1
2.2.4 クラスターマネージャー(Cluster Manager)
旧称Cluster Master。インデックスクラスタの管理を行い、レプリケーションファクタとサーチファクタを維持する。
# /opt/splunk/etc/system/local/server.conf (Cluster Manager)
[clustering]
mode = manager
replication_factor = 3
search_factor = 2
pass4SymmKey = your_cluster_key
cluster_label = idx_cluster_prod
[general]
site = site1
# マルチサイトクラスタ設定
[clustering]
mode = manager
multisite = true
available_sites = site1, site2
site_replication_factor = origin:2, total:3
site_search_factor = origin:1, total:2
# /opt/splunk/etc/system/local/server.conf (Indexer/Peer Node)
[clustering]
mode = peer
manager_uri = https://cm.example.com:8089
pass4SymmKey = your_cluster_key
[replication_port://9100]
disabled = false
[general]
site = site1
2.2.5 デプロイメントサーバー(Deployment Server)
フォワーダーへの設定配布を一元管理する。
# /opt/splunk/etc/system/local/serverclass.conf
[global]
repositoryLocation = $SPLUNK_HOME/etc/deployment-apps
[serverClass:linux_servers]
whitelist.0 = linux-*
restartSplunkd = true
[serverClass:linux_servers:app:TA-linux-inputs]
restartSplunkd = true
[serverClass:linux_servers:app:TA-custom-app-logs]
restartSplunkd = true
[serverClass:windows_servers]
whitelist.0 = win-*
[serverClass:windows_servers:app:TA-windows-inputs]
restartSplunkd = true
[serverClass:web_servers]
whitelist.0 = web-*
whitelist.1 = nginx-*
blacklist.0 = web-dev-*
[serverClass:web_servers:app:TA-nginx-inputs]
restartSplunkd = true
2.2.6 ライセンスマスター(License Master)
Splunkのライセンスを一元管理し、各インスタンスのライセンス使用量を監視する。
# /opt/splunk/etc/system/local/server.conf (License Slave)
[license]
master_uri = https://license-master.example.com:8089
2.3 通信ポート一覧
| ポート | 用途 |
|---|---|
| 8000 | Splunk Web (HTTP/HTTPS) |
| 8089 | Management/REST API (HTTPS) |
| 9997 | データ転送(Forwarder → Indexer) |
| 9100 | インデックスレプリケーション |
| 9200 | サーチヘッドクラスタレプリケーション |
| 8088 | HTTP Event Collector (HEC) |
| 514 | Syslog受信 |
| 8191 | KVStore (MongoDB) |
3. データ取り込み(Data Ingestion)
3.1 データ入力の種類
Splunkは多様なデータソースからデータを取り込むことができる。
3.1.1 ファイルモニタリング
# inputs.conf - ファイルモニタリングの詳細設定
[monitor:///var/log/application/*.log]
disabled = false
sourcetype = custom_app_log
index = app_logs
# ファイルローテーション対応
followTail = 0
# CRCチェックの長さ(バイト)
initCrcLength = 256
# ログファイルの先頭から読み込むか
ignoreOlderThan = 7d
# ホワイトリスト・ブラックリスト
whitelist = access\.log$|error\.log$
blacklist = \.gz$|\.bak$|\.old$
# タイムスタンプ認識
time_before_close = 5
# マルチラインイベントの設定
# props.confで詳細に設定
[monitor:///var/log/containers/*.log]
disabled = false
sourcetype = kube:container:log
index = k8s_logs
# Kubernetes環境用
recursive = true
[monitor:///opt/splunk/var/log/watchdog/*.log]
disabled = false
sourcetype = watchdog
index = _internal
3.1.2 ネットワーク入力(Syslog/TCP/UDP)
# inputs.conf - ネットワーク入力
[tcp://9514]
disabled = false
sourcetype = syslog
index = network_logs
connection_host = dns
# 接続制限
acceptFrom = 10.0.0.0/8, 172.16.0.0/12
queueSize = 500KB
[udp://514]
disabled = false
sourcetype = syslog
index = network_logs
connection_host = ip
no_appending_timestamp = true
no_priority_stripping = false
[tcp-ssl://9515]
disabled = false
sourcetype = syslog_tls
index = secure_logs
# SSL設定
serverCert = $SPLUNK_HOME/etc/auth/server.pem
sslPassword = changeme
requireClientCert = true
sslRootCAPath = $SPLUNK_HOME/etc/auth/cacert.pem
3.1.3 HTTP Event Collector (HEC)
HECはHTTP/HTTPS経由でデータをSplunkに送信するための仕組みである。アプリケーションやCI/CDパイプラインからのデータ送信に最適。
# inputs.conf - HEC設定
[http]
disabled = false
port = 8088
enableSSL = 1
dedicatedIoThreads = 2
maxThreads = 0
maxSockets = 0
useDeploymentServer = false
[http://app_events]
disabled = false
token = xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
index = app_events
sourcetype = _json
allowedIndexes = app_events, app_metrics
useACK = true
HECへのデータ送信例:
# 単一イベントの送信
curl -k https://splunk-hec.example.com:8088/services/collector/event \
-H "Authorization: Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
-d '{
"event": {
"message": "User login successful",
"user": "admin",
"src_ip": "10.0.1.50",
"action": "login"
},
"sourcetype": "app:auth",
"index": "app_events",
"host": "web-server-01",
"time": 1704067200
}'
# バッチイベントの送信(Raw endpoint)
curl -k https://splunk-hec.example.com:8088/services/collector/raw \
-H "Authorization: Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
-H "X-Splunk-Request-Channel: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
-d '2024-01-01T00:00:00Z host=web01 action=GET uri=/api/health status=200
2024-01-01T00:00:01Z host=web01 action=POST uri=/api/login status=401
2024-01-01T00:00:02Z host=web01 action=GET uri=/api/data status=500'
# メトリクスデータの送信
curl -k https://splunk-hec.example.com:8088/services/collector \
-H "Authorization: Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
-d '{
"time": 1704067200,
"event": "metric",
"source": "app_metrics",
"host": "web-server-01",
"fields": {
"metric_name:cpu.usage": 75.5,
"metric_name:memory.usage": 82.3,
"metric_name:disk.io_read": 1024,
"region": "us-east-1",
"environment": "production"
}
}'
# HECヘルスチェック
curl -k https://splunk-hec.example.com:8088/services/collector/health \
-H "Authorization: Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
# ACK確認(useACK=true時)
curl -k https://splunk-hec.example.com:8088/services/collector/ack \
-H "Authorization: Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
-H "X-Splunk-Request-Channel: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
-d '{"acks": [0, 1, 2]}'
Python SDKでのHEC送信例:
import requests
import json
import time
class SplunkHEC:
def __init__(self, host, token, port=8088, ssl=True):
self.url = f"{'https' if ssl else 'http'}://{host}:{port}"
self.token = token
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Splunk {token}',
'Content-Type': 'application/json'
})
self.session.verify = False # 本番ではCA証明書を指定
def send_event(self, event_data, sourcetype=None, index=None, host=None):
payload = {
'event': event_data,
'time': int(time.time())
}
if sourcetype:
payload['sourcetype'] = sourcetype
if index:
payload['index'] = index
if host:
payload['host'] = host
response = self.session.post(
f'{self.url}/services/collector/event',
data=json.dumps(payload)
)
return response.json()
def send_batch(self, events):
payload = ''
for event in events:
payload += json.dumps(event) + '\n'
response = self.session.post(
f'{self.url}/services/collector/event',
data=payload
)
return response.json()
# 使用例
hec = SplunkHEC('splunk-hec.example.com', 'your-token-here')
hec.send_event(
{'message': 'Deployment completed', 'version': 'v2.1.0', 'status': 'success'},
sourcetype='deploy:event',
index='deployment_logs'
)
3.1.4 スクリプト入力
# inputs.conf - スクリプト入力
[script:///opt/splunk/etc/apps/myapp/bin/get_api_metrics.sh]
disabled = false
interval = 300
sourcetype = api_metrics
index = app_metrics
source = api_health_check
[script:///opt/splunk/etc/apps/myapp/bin/collect_db_stats.py]
disabled = false
interval = 60
sourcetype = db_stats
index = db_metrics
passAuth = splunk_system_user
#!/bin/bash
# get_api_metrics.sh - API健全性チェックスクリプト
ENDPOINTS=(
"https://api.example.com/health"
"https://api.example.com/v1/status"
"https://auth.example.com/health"
)
for endpoint in "${ENDPOINTS[@]}"; do
start_time=$(date +%s%N)
response=$(curl -s -o /dev/null -w "%{http_code},%{time_total}" "$endpoint" 2>/dev/null)
end_time=$(date +%s%N)
http_code=$(echo "$response" | cut -d',' -f1)
response_time=$(echo "$response" | cut -d',' -f2)
echo "$(date -u +%Y-%m-%dT%H:%M:%SZ) endpoint=\"$endpoint\" http_code=$http_code response_time=$response_time"
done
3.1.5 モジュラー入力
#!/usr/bin/env python3
# modular_input_example.py
import sys
import json
import time
import requests
# Splunk SDKのモジュラー入力フレームワーク
import splunklib.modularinput as smi
class CloudWatchInput(smi.Script):
def get_scheme(self):
scheme = smi.Scheme("AWS CloudWatch Metrics")
scheme.description = "Collects metrics from AWS CloudWatch"
scheme.use_external_validation = True
scheme.use_single_instance = False
region_arg = smi.Argument("aws_region")
region_arg.data_type = smi.Argument.data_type_string
region_arg.required_on_create = True
scheme.add_argument(region_arg)
namespace_arg = smi.Argument("namespace")
namespace_arg.data_type = smi.Argument.data_type_string
namespace_arg.required_on_create = True
scheme.add_argument(namespace_arg)
return scheme
def stream_events(self, inputs, ew):
for input_name, input_item in inputs.inputs.items():
# CloudWatchからメトリクスを取得してイベントとして出力
event = smi.Event()
event.stanza = input_name
event.data = json.dumps({
"metric": "CPUUtilization",
"value": 75.5,
"namespace": input_item["namespace"],
"region": input_item["aws_region"]
})
ew.write_event(event)
if __name__ == "__main__":
sys.exit(CloudWatchInput().run(sys.argv))
3.2 ソースタイプ設定
# props.conf - ソースタイプ定義
[custom_app_log]
# タイムスタンプ設定
TIME_FORMAT = %Y-%m-%dT%H:%M:%S.%3N%z
TIME_PREFIX = ^
MAX_TIMESTAMP_LOOKAHEAD = 30
TZ = Asia/Tokyo
# ライン分割設定
SHOULD_LINEMERGE = true
BREAK_ONLY_BEFORE = ^\d{4}-\d{2}-\d{2}T
LINE_BREAKER = ([\r\n]+)\d{4}-\d{2}-\d{2}T
TRUNCATE = 100000
# 文字エンコーディング
CHARSET = utf-8
# イベント処理
ANNOTATE_PUNCT = false
SEDCMD-remove_ansi = s/\x1B\[[0-9;]*[a-zA-Z]//g
# フィールド抽出(自動キーバリュー抽出)
KV_MODE = auto
# 区切り文字ベースの抽出
# KV_MODE = none
# DELIMS = ","
# FIELDS = field1, field2, field3
[json_app_log]
KV_MODE = json
TIME_FORMAT = %Y-%m-%dT%H:%M:%S.%6NZ
TIME_PREFIX = "timestamp"\s*:\s*"
SHOULD_LINEMERGE = false
LINE_BREAKER = ([\r\n]+)
# JSONの自動フィールド抽出設定
JSON_TRIM_BRACES_IN_ARRAY_NAMES = true
[access_combined_custom]
# Apache Combined Log Formatの例
EXTRACT-access = ^(?P<clientip>\S+)\s+\S+\s+(?P<user>\S+)\s+\[(?P<timestamp>[^\]]+)\]\s+"(?P<method>\w+)\s+(?P<uri>\S+)\s+(?P<http_version>\S+)"\s+(?P<status>\d+)\s+(?P<bytes>\d+)\s+"(?P<referer>[^"]+)"\s+"(?P<useragent>[^"]+)"
# transforms.conf - フィールド変換定義
[extract_error_code]
REGEX = error_code=(?P<error_code>\d+)
FORMAT = error_code::$1
[extract_json_fields]
REGEX = "level"\s*:\s*"(?P<log_level>[^"]+)"
FORMAT = log_level::$1
[lookup_geo_ip]
filename = geo_ip.csv
match_type = CIDR(src_ip)
# ルーティング変換
[set_index_by_severity]
REGEX = level=(ERROR|CRITICAL|FATAL)
DEST_KEY = _MetaData:Index
FORMAT = critical_logs
[null_queue_debug]
REGEX = level=DEBUG
DEST_KEY = queue
FORMAT = nullQueue
3.3 データパイプライン
データソース → 入力フェーズ → パーシングフェーズ → インデクシングフェーズ
│ │ │
│ │ │
inputs.conf props.conf indexes.conf
(データ受信) transforms.conf (バケット管理)
(パース・変換)
# パイプライン設定 - server.conf
[pipeline:parsing]
maxQueueSize = 2MB
[pipeline:merging]
maxQueueSize = 2MB
[pipeline:typing]
maxQueueSize = 2MB
[pipeline:indexing]
maxQueueSize = 2MB
4. SPL(Search Processing Language)包括ガイド
SPLはSplunkの中核をなすクエリ言語であり、データの検索、変換、分析、可視化を行う。
4.1 基本検索
# 基本的なキーワード検索
index=web_logs sourcetype=access_combined status=500
# 時間範囲指定
index=web_logs sourcetype=access_combined earliest=-24h latest=now
# ワイルドカード検索
index=app_logs error* NOT debug
# フィールド値での検索
index=app_logs log_level=ERROR src_ip="10.0.1.*"
# OR条件
index=web_logs (status=500 OR status=503) method=POST
# NOT条件
index=app_logs log_level=ERROR NOT source="/var/log/app/debug.log"
# 正規表現での検索
index=web_logs | regex uri="^/api/v[12]/users"
4.2 主要コマンド詳解
4.2.1 stats - 統計集計
# 基本的な統計
index=web_logs | stats count by status
index=web_logs | stats count, avg(response_time), max(response_time), min(response_time) by uri
# 複数の集計関数
index=web_logs
| stats count as total_requests,
count(eval(status>=500)) as errors,
count(eval(status>=200 AND status<300)) as success,
avg(response_time) as avg_resp_time,
perc95(response_time) as p95_resp_time,
perc99(response_time) as p99_resp_time,
dc(clientip) as unique_clients
by uri
# values/list関数
index=app_logs log_level=ERROR
| stats count, values(error_code) as error_codes, latest(_time) as last_seen by host
# 時間ベースの統計(span指定)
index=web_logs
| bin _time span=5m
| stats count by _time, status
# first/last
index=app_logs
| stats first(_time) as first_seen, last(_time) as last_seen, count by session_id
4.2.2 eval - フィールド計算・変換
# 新しいフィールドの作成
index=web_logs
| eval response_category=case(
status>=200 AND status<300, "Success",
status>=300 AND status<400, "Redirect",
status>=400 AND status<500, "Client Error",
status>=500, "Server Error",
true(), "Unknown"
)
| stats count by response_category
# 数値計算
index=metrics
| eval memory_usage_pct = round((used_memory / total_memory) * 100, 2)
| eval disk_free_gb = round(disk_free_bytes / 1073741824, 2)
# 文字列操作
index=app_logs
| eval short_uri = replace(uri, "\?.*", "")
| eval domain = mvindex(split(url, "/"), 2)
| eval log_message = upper(substr(message, 1, 100))
# 条件分岐(if)
index=web_logs
| eval is_error = if(status >= 400, 1, 0)
| eval latency_category = if(response_time > 1000, "slow", if(response_time > 500, "medium", "fast"))
# 日時操作
index=app_logs
| eval event_hour = strftime(_time, "%H")
| eval event_day = strftime(_time, "%A")
| eval time_since_event = now() - _time
| eval hours_ago = round(time_since_event / 3600, 1)
# 型変換
index=app_logs
| eval numeric_status = tonumber(status)
| eval status_str = tostring(status)
# null処理
index=app_logs
| eval safe_field = coalesce(field1, field2, "default_value")
| eval has_error = if(isnull(error_code), "No", "Yes")
# マルチバリューフィールド操作
index=app_logs
| eval tags = split(tag_string, ",")
| eval tag_count = mvcount(tags)
| eval has_critical = if(mvfind(tags, "critical") >= 0, "Yes", "No")
| mvexpand tags
4.2.3 where - フィルタリング
# 基本的なフィルタリング
index=web_logs | where status >= 400
# 文字列関数でのフィルタ
index=web_logs | where like(uri, "/api/v%")
index=web_logs | where match(uri, "^/api/v[12]/users")
# null チェック
index=app_logs | where isnotnull(error_code)
index=app_logs | where isnull(response_time)
# 複合条件
index=web_logs
| where (status >= 500 AND response_time > 5000) OR (status >= 400 AND method="DELETE")
# cidrmatch(IPアドレスマッチ)
index=firewall_logs
| where cidrmatch("10.0.0.0/8", src_ip) AND NOT cidrmatch("10.0.1.0/24", src_ip)
4.2.4 rex - 正規表現によるフィールド抽出
# 名前付きキャプチャグループでフィールド抽出
index=app_logs
| rex field=_raw "user=(?P<username>\w+)\s+action=(?P<user_action>\w+)\s+result=(?P<result>\w+)"
# IPアドレスの抽出
index=web_logs
| rex field=_raw "(?P<src_ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})"
# 複数マッチ(max_match)
index=app_logs
| rex field=_raw "error_code=(?P<error_codes>\d+)" max_match=0
# sedモード(置換)
index=app_logs
| rex field=message mode=sed "s/password=[^\s]+/password=REDACTED/g"
| rex field=message mode=sed "s/\d{4}-\d{4}-\d{4}-\d{4}/XXXX-XXXX-XXXX-XXXX/g"
# JSONからのフィールド抽出
index=app_logs sourcetype=json_log
| rex field=_raw "\"error_message\"\s*:\s*\"(?P<error_msg>[^\"]+)\""
# URLパス部分の抽出
index=web_logs
| rex field=uri "^/api/(?P<api_version>v\d+)/(?P<resource>\w+)"
4.2.5 table / fields - 出力フィールドの選択
# table - 指定フィールドの表形式出力
index=web_logs
| stats count, avg(response_time) as avg_rt by uri, status
| table uri, status, count, avg_rt
# fields - フィールドの選択/除外
index=web_logs
| fields _time, host, uri, status, response_time
| fields - _raw, _serial
# rename - フィールド名変更
index=web_logs
| stats count by clientip
| rename clientip as "Client IP", count as "Request Count"
| sort - "Request Count"
4.2.6 dedup - 重複排除
# 基本的な重複排除(各ユーザーの最新イベントのみ)
index=app_logs | dedup user | table _time, user, action
# 複数フィールドでの重複排除
index=web_logs | dedup clientip, uri | table clientip, uri, status
# sortbyで順序を指定
index=app_logs | dedup user sortby -_time | table _time, user, action
# 上位N件を保持
index=app_logs | dedup 3 user sortby -_time
4.2.7 sort - ソート
# 降順ソート
index=web_logs | stats count by uri | sort - count
# 複合ソート
index=web_logs | stats count, avg(response_time) as avg_rt by uri
| sort - count, + avg_rt
# 上位/下位N件
index=web_logs | stats count by uri | sort - count | head 20
index=web_logs | stats avg(response_time) as avg_rt by uri | sort + avg_rt | head 10
4.2.8 top / rare - 頻度分析
# 上位10件
index=web_logs | top 10 uri
index=web_logs | top clientip by status
# レア(まれな値)
index=app_logs | rare error_code
index=app_logs | rare 10 user_agent
# top with options
index=web_logs | top 20 uri showperc=true showcount=true countfield=requests percentfield=pct
4.2.9 timechart - 時系列チャート
# 基本的なタイムチャート
index=web_logs | timechart span=5m count by status
# 複数メトリクスの時系列
index=web_logs
| timechart span=1h
count as total_requests,
avg(response_time) as avg_response_time,
perc95(response_time) as p95_response_time
# エラーレートの時系列
index=web_logs
| timechart span=10m
count(eval(status>=500)) as errors,
count as total
| eval error_rate = round((errors / total) * 100, 2)
# limit で表示系列数を制限
index=web_logs | timechart span=1h count by uri limit=10 useother=true
# fillnull で欠損値を補完
index=web_logs | timechart span=5m count by status | fillnull value=0
4.2.10 chart - ピボットチャート
# 基本的なチャート
index=web_logs | chart count by status, method
# over/by構文
index=web_logs | chart avg(response_time) over uri by method
# 複数集計
index=web_logs
| chart count, avg(response_time) as avg_rt, max(response_time) as max_rt by uri
4.2.11 eventstats - インラインの統計追加
# 各イベントに全体の統計値を追加
index=web_logs
| eventstats avg(response_time) as global_avg, stdev(response_time) as global_stdev
| eval is_anomaly = if(response_time > (global_avg + 3 * global_stdev), 1, 0)
| where is_anomaly=1
# グループ別の統計値を追加
index=web_logs
| eventstats avg(response_time) as avg_by_uri by uri
| eval deviation = response_time - avg_by_uri
| where deviation > 1000
# パーセンタイルベースの異常検出
index=web_logs
| eventstats perc99(response_time) as p99 by uri
| where response_time > p99
4.2.12 streamstats - ストリーム統計
# 移動平均の計算
index=web_logs
| timechart span=5m avg(response_time) as avg_rt
| streamstats window=12 avg(avg_rt) as moving_avg
| eval threshold = moving_avg * 1.5
| where avg_rt > threshold
# 累積カウント
index=app_logs log_level=ERROR
| streamstats count as running_error_count
| timechart span=1h max(running_error_count)
# 前のイベントとの時間差
index=app_logs
| sort _time
| streamstats current=false last(_time) as prev_time by host
| eval time_gap = _time - prev_time
| where time_gap > 300
4.2.13 transaction - トランザクション分析
# セッションベースのトランザクション
index=web_logs
| transaction clientip maxpause=30m maxspan=2h
| stats avg(duration) as avg_session_duration, avg(eventcount) as avg_pages by clientip
# リクエスト-レスポンスの組み合わせ
index=app_logs
| transaction request_id startswith="REQUEST_START" endswith="REQUEST_END"
| eval processing_time = duration
| stats avg(processing_time) by service_name
# ユーザージャーニー分析
index=web_logs
| transaction session_id maxpause=30m
| eval pages_visited = mvcount(uri)
| eval has_purchase = if(like(uri, "%/checkout/complete%"), 1, 0)
| stats count, sum(has_purchase) as purchases by pages_visited
4.2.14 lookup - ルックアップ
# CSVルックアップ
index=web_logs
| lookup geo_ip_lookup src_ip as clientip OUTPUT country, city, latitude, longitude
| stats count by country | sort - count
# KVStore ルックアップ
index=app_logs
| lookup kv_user_info user_id OUTPUT department, manager, location
# 自動ルックアップ(定義済み)
# transforms.conf で定義:
# [user_lookup]
# filename = users.csv
# match_type = WILDCARD(user_id)
# inputlookup(ルックアップテーブルの直接検索)
| inputlookup geo_ip_lookup | where country="Japan"
# outputlookup(検索結果をルックアップに書き込み)
index=web_logs
| stats count by clientip
| where count > 1000
| outputlookup suspicious_ips.csv
# ルックアップの定義(transforms.conf)
# [geo_ip_lookup]
# filename = geo_ip.csv
# match_type = CIDR(src_ip)
# min_matches = 1
# default_match = unknown
4.2.15 subsearch - サブサーチ
# サブサーチで動的フィルタリング
index=web_logs
[search index=security_logs action=blocked | fields src_ip | rename src_ip as clientip]
| stats count by clientip, uri
# TOP N のユーザーのアクティビティ詳細
index=app_logs
[search index=app_logs log_level=ERROR | top 5 user | fields user]
| stats count by user, action
# 時間相関のサブサーチ
index=web_logs status>=500
[search index=infra_logs "deployment started" | eval earliest=_time-300 | eval latest=_time+300 | fields earliest, latest]
4.2.16 join - テーブル結合
# 内部結合
index=web_logs
| stats count, avg(response_time) as avg_rt by clientip
| join clientip
[search index=user_logs | dedup clientip | fields clientip, username, department]
# 左結合
index=app_logs
| stats count by user_id
| join type=left user_id
[search index=user_db | table user_id, email, role]
# 注意: joinは大量データでパフォーマンスが悪い。
# 代わりにlookupやstatsを使うことを推奨。
# joinの代替: statsを使った擬似結合
index=web_logs OR index=user_logs
| stats values(username) as username, sum(eval(if(index="web_logs",1,0))) as web_count by clientip
4.2.17 append / appendcols
# append - 結果の追加(UNION相当)
index=web_logs status>=500 | stats count as error_count
| append [search index=web_logs | stats count as total_count]
| stats values(*) as *
# appendcols - 列の追加
index=web_logs | stats count as current_errors by host
| appendcols
[search index=web_logs earliest=-7d@d latest=-6d@d | stats count as prev_errors by host]
| eval change_pct = round(((current_errors - prev_errors) / prev_errors) * 100, 2)
4.2.18 tstats - 高速統計(インデックス化フィールド)
# tstatsによる高速カウント
| tstats count where index=web_logs by _time, host, source, sourcetype span=1h
# データモデルを使ったtstats
| tstats count from datamodel=Web where Web.status>=500 by Web.dest, Web.status
# tstats + summariesonly(アクセラレーション必須)
| tstats summariesonly=true count from datamodel=Network_Traffic
where Network_Traffic.action=blocked
by Network_Traffic.src, Network_Traffic.dest
span=1h
# プレフィックス検索
| tstats count where index=web_logs sourcetype=access_combined host=web-* by host span=1h
4.2.19 datamodel
# データモデルの内容確認
| datamodel Web
# データモデルからの検索
| datamodel Web Web.Web_Traffic search
| stats count by Web.Web_Traffic.status
# ピボット(データモデルベース)
| pivot Web Web_Traffic count(Web_Traffic) as request_count
SPLITROW _time PERIOD auto
SPLITCOL status
4.2.20 マクロ
# macros.conf での定義
# [error_rate(2)]
# args = index_name, threshold
# definition = search index=$index_name$ | timechart span=5m count(eval(status>=500)) as errors, count as total | eval error_rate=round((errors/total)*100,2) | where error_rate > $threshold$
# マクロの使用
`error_rate(web_logs, 5)`
# 引数なしマクロ
# [get_business_hours]
# definition = eval is_business_hours=if(tonumber(strftime(_time,"%H"))>=9 AND tonumber(strftime(_time,"%H"))<18 AND NOT match(strftime(_time,"%A"),"Saturday|Sunday"), 1, 0)
index=app_logs | `get_business_hours` | where is_business_hours=1
# バリデーション付きマクロ
# [filter_by_env(1)]
# args = environment
# definition = search environment="$environment$"
# validation = $environment$ IN ("production", "staging", "development")
# errormsg = Invalid environment. Must be production, staging, or development.
4.3 高度なSPLパターン
異常検出パターン
# Z-scoreベースの異常検出
index=web_logs
| bin _time span=5m
| stats count as requests by _time, host
| eventstats avg(requests) as mean, stdev(requests) as stdev by host
| eval zscore = (requests - mean) / stdev
| where abs(zscore) > 3
| table _time, host, requests, mean, stdev, zscore
# IQRベースの異常検出
index=web_logs
| stats avg(response_time) as avg_rt by uri
| eventstats perc25(avg_rt) as q1, perc75(avg_rt) as q3
| eval iqr = q3 - q1
| eval lower_bound = q1 - 1.5 * iqr
| eval upper_bound = q3 + 1.5 * iqr
| where avg_rt < lower_bound OR avg_rt > upper_bound
エラーバジェット計算
# SLOベースのエラーバジェット
index=web_logs
| stats count(eval(status<500)) as good, count as total
| eval slo = 99.9
| eval actual_availability = round((good / total) * 100, 4)
| eval error_budget_total = round(total * (1 - slo/100))
| eval errors_used = total - good
| eval error_budget_remaining = error_budget_total - errors_used
| eval budget_consumed_pct = round((errors_used / error_budget_total) * 100, 2)
| table slo, actual_availability, total, good, errors_used, error_budget_total, error_budget_remaining, budget_consumed_pct
変化検出パターン
# 前の時間帯との比較
index=web_logs
| timechart span=1h count as current_count
| appendcols
[search index=web_logs earliest=-2d latest=-1d | timechart span=1h count as prev_day_count]
| eval change_pct = round(((current_count - prev_day_count) / prev_day_count) * 100, 2)
| where abs(change_pct) > 50
5. ナレッジオブジェクト
5.1 フィールド抽出
# props.conf - インラインフィールド抽出
[custom_app_log]
# 正規表現ベースの抽出
EXTRACT-user = user=(?P<username>\w+)
EXTRACT-action = action=(?P<user_action>\w+)
EXTRACT-duration = duration=(?P<request_duration>\d+\.?\d*)ms
EXTRACT-error = error_code=(?P<error_code>\d+)\s+error_msg="(?P<error_message>[^"]+)"
# transforms.confを参照する抽出
REPORT-fields = extract_custom_fields, extract_json_payload
# フィールドエイリアス
FIELDALIAS-src = src_ip AS source_ip
FIELDALIAS-dst = dst_ip AS dest_ip
FIELDALIAS-user = user AS username
# 計算済みフィールド(Calculated Fields)
EVAL-response_time_sec = round(response_time / 1000, 3)
EVAL-is_error = if(status >= 400, "true", "false")
EVAL-request_size_kb = round(bytes / 1024, 2)
# transforms.conf - 外部フィールド抽出定義
[extract_custom_fields]
REGEX = (?i)level=(?P<log_level>\w+)\s+component=(?P<component>[\w\-]+)\s+msg="(?P<log_message>[^"]*)"
FORMAT = log_level::$1 component::$2 log_message::$3
[extract_json_payload]
REGEX = payload=(?P<_raw_payload>\{.*\})
FORMAT = _raw_payload::$1
5.2 ルックアップ
# transforms.conf - ルックアップ定義
[asset_lookup]
filename = assets.csv
# フィールドマッチング
match_type = WILDCARD(hostname)
min_matches = 1
default_match = Unknown
max_matches = 1
case_sensitive_match = false
[cidr_lookup]
filename = network_ranges.csv
match_type = CIDR(ip_address)
min_matches = 1
[kv_store_lookup]
collection = user_preferences
external_type = kvstore
fields_list = user_id, theme, language, timezone
filter = active=1
[external_lookup]
external_cmd = lookup_api.py user_id
fields_list = user_id, full_name, department, location
external_type = python
# props.conf - 自動ルックアップの適用
[access_combined]
LOOKUP-asset_info = asset_lookup hostname AS host OUTPUT asset_type, owner, location, criticality
LOOKUP-geo = cidr_lookup ip_address AS clientip OUTPUT country, region, city
# assets.csv の例
hostname,asset_type,owner,location,criticality
web-*,Web Server,Platform Team,US-East,High
db-*,Database,DBA Team,US-East,Critical
app-*,Application Server,App Team,US-West,High
5.3 イベントタイプとタグ
# eventtypes.conf
[web_error]
search = index=web_logs status>=400
[web_server_error]
search = index=web_logs status>=500
[authentication_failure]
search = index=auth_logs (action=login AND result=failure)
[high_latency_request]
search = index=web_logs response_time>5000
[brute_force_attempt]
search = index=auth_logs action=login result=failure | stats count by src_ip | where count > 10
# tags.conf
[eventtype=web_error]
error = enabled
web = enabled
[eventtype=web_server_error]
critical = enabled
web = enabled
server_error = enabled
[eventtype=authentication_failure]
authentication = enabled
failure = enabled
security = enabled
[eventtype=brute_force_attempt]
attack = enabled
brute_force = enabled
security = enabled
# タグを使った検索
tag=security tag=attack | stats count by src_ip | sort - count
tag=web tag=error | timechart count by host
5.4 データモデル
// データモデル定義例(Web Traffic)
{
"modelName": "Custom_Web_Traffic",
"displayName": "Custom Web Traffic",
"description": "Custom web traffic data model for SRE monitoring",
"objects": [
{
"objectName": "Web_Request",
"displayName": "Web Request",
"parentName": "BaseEvent",
"comment": "",
"fields": [
{
"fieldName": "uri",
"displayName": "URI",
"type": "string",
"fieldSearch": "",
"required": false,
"multivalue": false
},
{
"fieldName": "status",
"displayName": "HTTP Status",
"type": "number",
"fieldSearch": "",
"required": false,
"multivalue": false
},
{
"fieldName": "response_time",
"displayName": "Response Time (ms)",
"type": "number",
"fieldSearch": "",
"required": false,
"multivalue": false
},
{
"fieldName": "method",
"displayName": "HTTP Method",
"type": "string",
"fieldSearch": "",
"required": false,
"multivalue": false
}
],
"constraints": [
{
"search": "index=web_logs sourcetype=access_combined"
}
],
"children": [
{
"objectName": "Error_Request",
"displayName": "Error Request",
"parentName": "Web_Request",
"constraints": [
{
"search": "status>=400"
}
]
},
{
"objectName": "Slow_Request",
"displayName": "Slow Request",
"parentName": "Web_Request",
"constraints": [
{
"search": "response_time>3000"
}
]
}
]
}
]
}
5.5 保存済み検索(Saved Searches)
# savedsearches.conf
[Daily Error Summary]
search = index=web_logs status>=500 \
| stats count as error_count, dc(uri) as affected_endpoints, dc(host) as affected_hosts by sourcetype \
| sort - error_count
dispatch.earliest_time = -24h@h
dispatch.latest_time = now
cron_schedule = 0 8 * * *
enableSched = 1
is_visible = true
description = Daily summary of server errors across all web services
action.email.to = sre-team@example.com
action.email.subject = Daily Error Summary Report
action.email = 1
action.email.include.results_link = 1
action.email.format = html
[Hourly Error Rate Check]
search = index=web_logs \
| timechart span=5m count(eval(status>=500)) as errors, count as total \
| eval error_rate = round((errors/total)*100, 2) \
| where error_rate > 5
dispatch.earliest_time = -1h
dispatch.latest_time = now
cron_schedule = 0 * * * *
enableSched = 1
alert.severity = 4
alert.suppress = 1
alert.suppress.period = 30m
alert_type = number of results
alert_comparator = greater than
alert_threshold = 0
action.webhook.enable = 1
action.webhook.param.url = https://hooks.slack.com/services/xxx/yyy/zzz
6. ダッシュボードとビジュアライゼーション
6.1 Simple XML ダッシュボード
<dashboard version="1.1" theme="dark">
<label>SRE Operations Dashboard</label>
<description>Real-time infrastructure and application monitoring</description>
<!-- 入力フィルター -->
<fieldset submitButton="true" autoRun="true">
<input type="time" token="time_range" searchWhenChanged="true">
<label>Time Range</label>
<default>
<earliest>-4h@m</earliest>
<latest>now</latest>
</default>
</input>
<input type="dropdown" token="environment" searchWhenChanged="true">
<label>Environment</label>
<choice value="*">All</choice>
<choice value="production">Production</choice>
<choice value="staging">Staging</choice>
<default>production</default>
</input>
<input type="multiselect" token="hosts" searchWhenChanged="true">
<label>Hosts</label>
<search>
<query>index=web_logs environment=$environment$ | dedup host | sort host | table host</query>
<earliest>$time_range.earliest$</earliest>
<latest>$time_range.latest$</latest>
</search>
<fieldForLabel>host</fieldForLabel>
<fieldForValue>host</fieldForValue>
<delimiter>,</delimiter>
<prefix>(</prefix>
<suffix>)</suffix>
<valuePrefix>host="</valuePrefix>
<valueSuffix>"</valueSuffix>
<default>*</default>
</input>
</fieldset>
<!-- Row 1: KPIシングルバリュー -->
<row>
<panel>
<title>Total Requests</title>
<single>
<search>
<query>index=web_logs environment=$environment$ $hosts$
| stats count as total_requests</query>
<earliest>$time_range.earliest$</earliest>
<latest>$time_range.latest$</latest>
</search>
<option name="colorBy">value</option>
<option name="rangeColors">["0x53a051","0x0877a6","0xf8be34","0xf1813f","0xdc4e41"]</option>
<option name="rangeValues">[0,1000,5000,10000]</option>
<option name="useColors">1</option>
<option name="drilldown">all</option>
</single>
</panel>
<panel>
<title>Error Rate (%)</title>
<single>
<search>
<query>index=web_logs environment=$environment$ $hosts$
| stats count(eval(status>=500)) as errors, count as total
| eval error_rate = round((errors/total)*100, 2)
| fields error_rate</query>
<earliest>$time_range.earliest$</earliest>
<latest>$time_range.latest$</latest>
</search>
<option name="colorBy">value</option>
<option name="rangeColors">["0x53a051","0xf8be34","0xf1813f","0xdc4e41"]</option>
<option name="rangeValues">[1,3,5]</option>
<option name="useColors">1</option>
<option name="unit">%</option>
</single>
</panel>
<panel>
<title>P95 Response Time</title>
<single>
<search>
<query>index=web_logs environment=$environment$ $hosts$
| stats perc95(response_time) as p95
| eval p95 = round(p95, 0)</query>
<earliest>$time_range.earliest$</earliest>
<latest>$time_range.latest$</latest>
</search>
<option name="unit">ms</option>
<option name="rangeColors">["0x53a051","0xf8be34","0xf1813f","0xdc4e41"]</option>
<option name="rangeValues">[500,1000,3000]</option>
<option name="useColors">1</option>
</single>
</panel>
<panel>
<title>Active Hosts</title>
<single>
<search>
<query>index=web_logs environment=$environment$
| stats dc(host) as active_hosts</query>
<earliest>$time_range.earliest$</earliest>
<latest>$time_range.latest$</latest>
</search>
</single>
</panel>
</row>
<!-- Row 2: タイムチャート -->
<row>
<panel>
<title>Request Volume & Error Rate</title>
<chart>
<search>
<query>index=web_logs environment=$environment$ $hosts$
| timechart span=5m count as requests, count(eval(status>=500)) as errors
| eval error_rate = round((errors/requests)*100, 2)</query>
<earliest>$time_range.earliest$</earliest>
<latest>$time_range.latest$</latest>
</search>
<option name="charting.chart">column</option>
<option name="charting.chart.overlayFields">error_rate</option>
<option name="charting.axisTitleX.text">Time</option>
<option name="charting.axisTitleY.text">Requests</option>
<option name="charting.axisTitleY2.text">Error Rate (%)</option>
<option name="charting.chart.showDataLabels">none</option>
<option name="charting.legend.placement">bottom</option>
</chart>
</panel>
</row>
<!-- Row 3: レスポンスタイムのヒートマップ -->
<row>
<panel>
<title>Response Time by Endpoint</title>
<chart>
<search>
<query>index=web_logs environment=$environment$ $hosts$
| eval uri_path = replace(uri, "\?.*", "")
| stats avg(response_time) as avg_rt, perc95(response_time) as p95_rt, count by uri_path
| sort - count | head 20
| table uri_path, avg_rt, p95_rt, count</query>
<earliest>$time_range.earliest$</earliest>
<latest>$time_range.latest$</latest>
</search>
<option name="charting.chart">bar</option>
<option name="charting.axisTitleX.text">Response Time (ms)</option>
</chart>
</panel>
</row>
<!-- Row 4: テーブルとドリルダウン -->
<row>
<panel>
<title>Top Errors</title>
<table>
<search>
<query>index=web_logs environment=$environment$ $hosts$ status>=500
| stats count, latest(_time) as last_seen, values(host) as hosts by status, uri
| eval last_seen = strftime(last_seen, "%Y-%m-%d %H:%M:%S")
| sort - count | head 20</query>
<earliest>$time_range.earliest$</earliest>
<latest>$time_range.latest$</latest>
</search>
<option name="drilldown">row</option>
<option name="count">10</option>
<drilldown>
<set token="selected_uri">$row.uri$</set>
<set token="selected_status">$row.status$</set>
</drilldown>
</table>
</panel>
</row>
<!-- 条件付きパネル(ドリルダウン時に表示) -->
<row depends="$selected_uri$">
<panel>
<title>Error Details for $selected_uri$ (Status: $selected_status$)</title>
<event>
<search>
<query>index=web_logs environment=$environment$ uri="$selected_uri$" status=$selected_status$</query>
<earliest>$time_range.earliest$</earliest>
<latest>$time_range.latest$</latest>
</search>
<option name="count">20</option>
<option name="list.drilldown">full</option>
</event>
</panel>
</row>
</dashboard>
6.2 Dashboard Studio(JSON形式)
Dashboard Studioは新世代のダッシュボードフレームワークで、より柔軟なレイアウトと高度なビジュアライゼーションを提供する。
{
"visualizations": {
"viz_error_trend": {
"type": "splunk.area",
"dataSources": {
"primary": "ds_error_trend"
},
"options": {
"xAxisTitleText": "Time",
"yAxisTitleText": "Error Count",
"showOverlayY": true,
"stackMode": "stacked",
"nullValueMode": "zero"
},
"title": "Error Trend"
},
"viz_status_pie": {
"type": "splunk.pie",
"dataSources": {
"primary": "ds_status_dist"
},
"options": {
"showDonutHole": true
},
"title": "Status Distribution"
}
},
"dataSources": {
"ds_error_trend": {
"type": "ds.search",
"options": {
"query": "index=web_logs status>=400 | timechart span=5m count by status",
"queryParameters": {
"earliest": "$global_time.earliest$",
"latest": "$global_time.latest$"
}
}
},
"ds_status_dist": {
"type": "ds.search",
"options": {
"query": "index=web_logs | eval status_group=case(status<300,\"2xx\",status<400,\"3xx\",status<500,\"4xx\",true(),\"5xx\") | stats count by status_group"
}
}
},
"inputs": {
"global_time": {
"type": "input.timerange",
"options": {
"defaultValue": "-4h,now",
"token": "global_time"
}
}
},
"layout": {
"type": "absolute",
"options": {
"width": 1440,
"height": 900
},
"structure": [
{
"item": "viz_error_trend",
"position": {"x": 0, "y": 0, "w": 960, "h": 400}
},
{
"item": "viz_status_pie",
"position": {"x": 960, "y": 0, "w": 480, "h": 400}
}
]
}
}
6.3 トークンとドリルダウン
<!-- トークンの定義と使用 -->
<set token="my_token">value</set>
<unset token="my_token"/>
<!-- ドリルダウンアクション -->
<drilldown>
<!-- トークン設定 -->
<set token="selected_host">$click.value$</set>
<!-- 別のダッシュボードへのリンク -->
<link target="_blank">/app/search/host_detail?host=$click.value$</link>
<!-- 検索へのリンク -->
<link>search?q=index=web_logs host=$click.value$&earliest=$time_range.earliest$&latest=$time_range.latest$</link>
<!-- 条件付きドリルダウン -->
<condition field="host">
<set token="selected_host">$click.value$</set>
</condition>
<condition field="status">
<set token="selected_status">$click.value$</set>
</condition>
</drilldown>
7. アラート
7.1 アラートタイプ
# savedsearches.conf - アラート設定
# スケジュールアラート
[High Error Rate Alert]
search = index=web_logs \
| timechart span=5m count(eval(status>=500)) as errors, count as total \
| eval error_rate = round((errors/total)*100, 2) \
| where error_rate > 5
dispatch.earliest_time = -15m
dispatch.latest_time = now
cron_schedule = */5 * * * *
enableSched = 1
is_scheduled = 1
# アラート条件
alert_type = number of results
alert_comparator = greater than
alert_threshold = 0
alert.severity = 4
# スロットリング(同じアラートの抑制)
alert.suppress = 1
alert.suppress.period = 30m
alert.suppress.fields = host
alert.suppress.group_name = error_rate_alert
# アラートアクション
action.email = 1
action.email.to = sre-oncall@example.com
action.email.subject = [ALERT] High Error Rate Detected - $result.host$
action.email.format = html
action.email.inline = 1
action.email.include.results_link = 1
action.email.include.view_link = 1
action.email.priority = 1
# Webhookアクション(Slack/PagerDuty)
action.webhook = 1
action.webhook.param.url = https://hooks.slack.com/services/T00000/B00000/XXXXX
# リアルタイムアラート
[Real-time Security Alert]
search = index=security_logs action=login result=failure \
| stats count by src_ip \
| where count > 5
dispatch.earliest_time = rt-5m
dispatch.latest_time = rt
realtime_schedule = 1
alert_type = number of results
alert_comparator = greater than
alert_threshold = 0
alert.severity = 5
# ローリングウィンドウアラート
[Rolling Window CPU Alert]
search = index=infra_metrics metric_name=cpu.usage \
| stats avg(value) as avg_cpu by host \
| where avg_cpu > 90
dispatch.earliest_time = -30m
dispatch.latest_time = now
cron_schedule = */10 * * * *
enableSched = 1
alert_type = number of results
alert_comparator = greater than
alert_threshold = 0
7.2 アラートアクション
# カスタムアラートアクション - PagerDuty連携
# $SPLUNK_HOME/etc/apps/custom_alert_actions/bin/pagerduty_alert.py
import sys
import json
import requests
import csv
import gzip
def send_pagerduty_alert(settings):
routing_key = settings.get('routing_key')
severity = settings.get('severity', 'critical')
# 検索結果の読み取り
results_file = settings.get('results_file')
results = []
if results_file:
with gzip.open(results_file, 'rt') as f:
reader = csv.DictReader(f)
results = list(reader)
# PagerDuty Events API v2
payload = {
"routing_key": routing_key,
"event_action": "trigger",
"dedup_key": settings.get('search_name', 'splunk_alert'),
"payload": {
"summary": f"Splunk Alert: {settings.get('search_name')} - {len(results)} results",
"severity": severity,
"source": "Splunk",
"component": settings.get('app', 'search'),
"group": settings.get('search_name'),
"custom_details": {
"search_name": settings.get('search_name'),
"results_count": len(results),
"results_link": settings.get('results_link'),
"search_uri": settings.get('search_uri'),
"top_results": results[:5]
}
},
"links": [
{
"href": settings.get('results_link', ''),
"text": "View in Splunk"
}
]
}
response = requests.post(
'https://events.pagerduty.com/v2/enqueue',
json=payload,
headers={'Content-Type': 'application/json'}
)
return response.status_code == 202
if __name__ == '__main__':
payload = json.loads(sys.stdin.read())
settings = payload.get('configuration', {})
settings.update(payload.get('result', {}))
if send_pagerduty_alert(settings):
print("PagerDuty alert sent successfully")
else:
print("Failed to send PagerDuty alert", file=sys.stderr)
sys.exit(1)
7.3 Webhook設定例
// Slack Webhook ペイロード設定
{
"text": ":rotating_light: *Splunk Alert: $name$*",
"attachments": [
{
"color": "danger",
"fields": [
{
"title": "Search Name",
"value": "$name$",
"short": true
},
{
"title": "Results Count",
"value": "$job.resultCount$",
"short": true
},
{
"title": "Trigger Time",
"value": "$trigger_time$",
"short": true
},
{
"title": "Severity",
"value": "$alert.severity$",
"short": true
}
],
"actions": [
{
"type": "button",
"text": "View Results",
"url": "$results_link$"
}
]
}
]
}
8. インデックス管理
8.1 インデックスの作成と設定
# indexes.conf - 詳細なインデックス設定
[default]
# デフォルトのストレージパス
homePath = $SPLUNK_DB/$_index_name/db
coldPath = $SPLUNK_DB/$_index_name/colddb
thawedPath = $SPLUNK_DB/$_index_name/thaweddb
# デフォルトの保持期間
frozenTimePeriodInSecs = 7776000 # 90日
maxTotalDataSizeMB = 500000
[web_logs]
homePath = $SPLUNK_DB/web_logs/db
coldPath = /cold_storage/web_logs/colddb
thawedPath = $SPLUNK_DB/web_logs/thaweddb
# データサイズ制限
maxTotalDataSizeMB = 200000
maxDataSize = auto_high_volume
# バケット設定
maxHotBuckets = 10
maxHotSpanSecs = 86400 # 24時間
maxHotIdleSecs = 86400 # 24時間アイドルでウォームへ
maxWarmDBCount = 300
# 保持期間
frozenTimePeriodInSecs = 2592000 # 30日
# Cold→Frozenの動作
coldToFrozenDir = /archive/splunk/web_logs
# または coldToFrozenScript = $SPLUNK_HOME/bin/archiver.sh
# Bloomフィルター
enableOnlineBucketRepair = true
# TSIDXの圧縮
enableTsidxReduction = true
tsidxReductionCheckPeriodInSec = 600
[security_logs]
homePath = $SPLUNK_DB/security_logs/db
coldPath = /cold_storage/security_logs/colddb
thawedPath = $SPLUNK_DB/security_logs/thaweddb
maxTotalDataSizeMB = 500000
frozenTimePeriodInSecs = 31536000 # 365日
maxDataSize = auto_high_volume
maxHotBuckets = 10
# コンプライアンス要件: 削除防止
coldToFrozenDir = /archive/splunk/security_logs
[metrics_index]
homePath = $SPLUNK_DB/metrics_index/db
coldPath = $SPLUNK_DB/metrics_index/colddb
thawedPath = $SPLUNK_DB/metrics_index/thaweddb
datatype = metric
maxTotalDataSizeMB = 100000
frozenTimePeriodInSecs = 7776000 # 90日
[summary_index]
homePath = $SPLUNK_DB/summary_index/db
coldPath = $SPLUNK_DB/summary_index/colddb
thawedPath = $SPLUNK_DB/summary_index/thaweddb
maxTotalDataSizeMB = 50000
frozenTimePeriodInSecs = 31536000 # 365日
8.2 バケットライフサイクル
┌─────────────────────────────────────────────────────────────────┐
│ バケットライフサイクル │
│ │
│ Hot → Warm → Cold → Frozen │
│ (読み書き) (読み取り) (読み取り) (アーカイブ/削除) │
│ │
│ ・新規データ ・Hotから ・Warmから ・Coldから │
│ 書き込み ロール ロール ロール │
│ ・最も高速な ・高速な ・低速ストレ ・テープ/S3等 │
│ ストレージ ストレージ ージ可能 ・削除も可能 │
│ ・SSD推奨 ・SSD/HDD ・HDD ・thawで復元 │
│ │
│ 条件: 条件: 条件: 条件: │
│ ・maxHotBuckets ・自動 ・時間ベース ・frozenTimePeriod │
│ ・maxHotSpanSecs ・サイズベース ・maxTotalDataSize │
│ ・maxHotIdleSecs │
└─────────────────────────────────────────────────────────────────┘
8.3 SmartStore
SmartStoreは、ウォーム/コールドバケットをリモートストレージ(S3等)に保存する機能。
# indexes.conf - SmartStore設定
[volume:remote_store]
storageType = remote
path = s3://my-splunk-bucket/smartstore
remote.s3.access_key = AKIAIOSFODNN7EXAMPLE
remote.s3.secret_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
remote.s3.endpoint = https://s3.amazonaws.com
remote.s3.auth_region = us-east-1
[web_logs]
remotePath = volume:remote_store/$_index_name
homePath = $SPLUNK_DB/web_logs/db
coldPath = $SPLUNK_DB/web_logs/colddb
thawedPath = $SPLUNK_DB/web_logs/thaweddb
# SmartStoreキャッシュ設定
[cachemanager]
max_cache_size = 500000 # MB
hotlist_recency_secs = 86400
hotlist_bloom_filter_recency_hours = 360
8.4 インデックス管理のCLIコマンド
# インデックスの一覧表示
splunk list index -auth admin:password
# インデックスの作成
splunk add index my_new_index \
-maxTotalDataSizeMB 100000 \
-frozenTimePeriodInSecs 7776000 \
-auth admin:password
# バケット情報の確認
splunk list clustering-peers -auth admin:password
# インデックスの統計情報
splunk show index-stats web_logs -auth admin:password
# バケットの修復
splunk rebuild web_logs -auth admin:password
# Frozenバケットの復元(thaw)
splunk thaw web_logs -db /archive/splunk/web_logs/db_1704067200_1703462400_0
9. セキュリティ
9.1 認証設定
# authentication.conf - LDAP認証
[authentication]
authType = LDAP
authSettings = my_ldap
[my_ldap]
SSLEnabled = 1
host = ldap.example.com
port = 636
bindDN = cn=splunk_svc,ou=service_accounts,dc=example,dc=com
bindDNpassword = encrypted_password
userBaseDN = ou=users,dc=example,dc=com
userBaseFilter = (objectClass=person)
userNameAttribute = sAMAccountName
realNameAttribute = cn
emailAttribute = mail
groupBaseDN = ou=groups,dc=example,dc=com
groupBaseFilter = (objectClass=group)
groupMemberAttribute = member
groupNameAttribute = cn
nestedGroups = true
charset = utf-8
anonymous_referrals = false
timelimit = 15
network_timeout = 20
# SAML認証設定
[authentication]
authType = SAML
authSettings = my_saml
[my_saml]
idpSSOUrl = https://idp.example.com/saml/sso
idpCertPath = $SPLUNK_HOME/etc/auth/saml/idp_cert.pem
entityId = https://splunk.example.com
signAuthnRequest = true
signedAssertion = true
attributeQuerySoapUrl = https://idp.example.com/saml/aq
attributeQueryRequestSigned = true
redirectPort = 8000
idpSLOUrl = https://idp.example.com/saml/slo
# web.conf - SSO設定
[settings]
SSOMode = strict
trustedIP = 10.0.0.50
remoteUser = X-Remote-User
9.2 認可設定(ロールとケイパビリティ)
# authorize.conf - ロール定義
[role_sre_admin]
importRoles = user
srchFilter = *
srchIndexesAllowed = *
srchIndexesDefault = web_logs;app_logs;infra_metrics
srchMaxTime = 86400
srchDiskQuota = 10000
srchJobsQuota = 50
rtSrchJobsQuota = 10
cumulativeSrchJobsQuota = 100
cumulativeRTSrchJobsQuota = 20
# ケイパビリティの割り当て
schedule_search = enabled
edit_monitor = enabled
edit_tcp = enabled
list_inputs = enabled
rest_properties_get = enabled
rest_properties_set = enabled
admin_all_objects = disabled
change_own_password = enabled
edit_search_schedule_priority = enabled
[role_sre_viewer]
importRoles = user
srchFilter = index=web_logs OR index=app_logs
srchIndexesAllowed = web_logs;app_logs
srchIndexesDefault = web_logs
srchMaxTime = 3600
srchDiskQuota = 1000
srchJobsQuota = 10
rtSrchJobsQuota = 3
schedule_search = enabled
edit_monitor = disabled
list_inputs = enabled
[role_security_analyst]
importRoles = user
srchFilter = *
srchIndexesAllowed = security_logs;auth_logs;firewall_logs;web_logs
srchIndexesDefault = security_logs
srchMaxTime = 86400
srchDiskQuota = 5000
srchJobsQuota = 30
schedule_search = enabled
list_inputs = enabled
edit_saved_search = enabled
[role_developer]
importRoles = user
srchFilter = (index=app_logs OR index=web_logs) environment!="production"
srchIndexesAllowed = app_logs;web_logs
srchIndexesDefault = app_logs
srchMaxTime = 3600
srchDiskQuota = 500
srchJobsQuota = 5
9.3 TLS/SSL設定
# server.conf - SSL設定
[sslConfig]
sslVersions = tls1.2
sslVersionsForClient = tls1.2
cipherSuite = ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256
ecdhCurves = prime256v1, secp384r1, secp521r1
sslRootCAPath = $SPLUNK_HOME/etc/auth/ca/cacert.pem
[general]
serverCert = $SPLUNK_HOME/etc/auth/server.pem
sslPassword = encrypted_password
requireClientCert = false
# web.conf - Web UIのSSL設定
[settings]
enableSplunkWebSSL = true
privKeyPath = etc/auth/splunkweb/privkey.pem
serverCert = etc/auth/splunkweb/cert.pem
caCertPath = etc/auth/splunkweb/cacert.pem
sslPassword = encrypted_password
9.4 監査ログ
# 監査ログの検索
index=_audit sourcetype=audittrail
| stats count by action, user, info
| sort - count
# ユーザーアクティビティの追跡
index=_audit sourcetype=audittrail action=search
| stats count, avg(total_run_time) as avg_runtime, sum(scan_count) as total_scans by user
| sort - total_scans
# 権限変更の追跡
index=_audit sourcetype=audittrail action=edit_roles
| table _time, user, action, info, object
# ログインの監視
index=_audit sourcetype=audittrail action=login*
| stats count by user, action, info
| sort - count
# 設定変更の追跡
index=_internal sourcetype=splunkd_conf
| stats count by user, action, path
10. デプロイメントとスケーリング
10.1 分散デプロイメントアーキテクチャ
┌─────────────────────────────────────────────────────────────────────┐
│ 本番環境分散デプロイメントアーキテクチャ │
│ │
│ Site 1 (Primary DC) Site 2 (DR DC) │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ │ │ │ │
│ │ ┌───┐ ┌───┐ ┌───┐ │ │ ┌───┐ ┌───┐ ┌───┐ │ │
│ │ │SH1│ │SH2│ │SH3│ │ │ │SH4│ │SH5│ │SH6│ │ │
│ │ └─┬─┘ └─┬─┘ └─┬─┘ │ │ └─┬─┘ └─┬─┘ └─┬─┘ │ │
│ │ └──┬──┘──┬──┘ │ │ └──┬──┘──┬──┘ │ │
│ │ │ SHC Captain│ │ │ SHC Member │ │
│ │ v │ │ v │ │
│ │ ┌───┐ ┌───┐ ┌───┐ │ │ ┌───┐ ┌───┐ │ │
│ │ │IX1│ │IX2│ │IX3│ │ ←Rep→ │ │IX4│ │IX5│ │ │
│ │ └───┘ └───┘ └───┘ │ │ └───┘ └───┘ │ │
│ │ Index Cluster │ │ Index Cluster │ │
│ │ (RF=3, SF=2) │ │ (Multi-site) │ │
│ │ │ │ │ │
│ │ ┌────────────────┐ │ │ │ │
│ │ │ Cluster Manager│ │ │ │ │
│ │ └────────────────┘ │ │ │ │
│ │ ┌────────────────┐ │ │ │ │
│ │ │ License Master │ │ │ │ │
│ │ └────────────────┘ │ │ │ │
│ │ ┌────────────────┐ │ │ │ │
│ │ │Deployment Server│ │ │ │ │
│ │ └────────────────┘ │ │ │ │
│ │ ┌────────────────┐ │ │ │ │
│ │ │ SHC Deployer │ │ │ │ │
│ │ └────────────────┘ │ │ │ │
│ └─────────────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
10.2 インデックスクラスタリング
# server.conf - Cluster Manager
[clustering]
mode = manager
replication_factor = 3 # データの物理コピー数
search_factor = 2 # 検索可能なコピー数
pass4SymmKey = your_secret_key
cluster_label = production_cluster
# マルチサイト構成
multisite = true
available_sites = site1, site2
site_replication_factor = origin:2, total:3
site_search_factor = origin:1, total:2
# server.conf - Peer Node (Indexer)
[clustering]
mode = peer
manager_uri = https://cm.example.com:8089
pass4SymmKey = your_secret_key
[general]
site = site1
[replication_port://9100]
disabled = false
10.3 サーチヘッドクラスタリング
# server.conf - Search Head Cluster Member
[shclustering]
disabled = false
mgmt_uri = https://sh1.example.com:8089
replication_factor = 2
replication_port = 9200
conf_deploy_fetch_url = https://deployer.example.com:8089
pass4SymmKey = shc_secret_key
shcluster_label = production_shc
# captain選出設定
election = true
captain_is_adhoc_searchhead = false
preferred_captain = true # このノードをcaptain優先に
# SHCの初期化コマンド
# 各メンバーで実行
splunk init shcluster-config -auth admin:password \
-mgmt_uri https://sh1.example.com:8089 \
-replication_port 9200 \
-replication_factor 2 \
-conf_deploy_fetch_url https://deployer.example.com:8089 \
-secret shc_secret_key \
-shcluster_label production_shc
# captain選出(最初の1回のみ)
splunk bootstrap shcluster-captain -servers_list \
"https://sh1.example.com:8089,https://sh2.example.com:8089,https://sh3.example.com:8089" \
-auth admin:password
# SHCステータス確認
splunk show shcluster-status -auth admin:password
# アプリのデプロイ(Deployerで実行)
splunk apply shcluster-bundle -target https://sh1.example.com:8089 \
-auth admin:password --answer-yes
10.4 Deployment Server によるアプリ配布
# serverclass.conf - 詳細なサーバークラス設定
[global]
repositoryLocation = $SPLUNK_HOME/etc/deployment-apps
# Phoneホーム間隔
phoneHomeIntervalInSecs = 60
[serverClass:production_linux]
whitelist.0 = prod-linux-*
blacklist.0 = prod-linux-dev-*
restartSplunkd = true
stateOnClient = enabled
[serverClass:production_linux:app:base_linux_config]
restartSplunkd = true
[serverClass:production_linux:app:TA-syslog]
restartSplunkd = true
[serverClass:production_linux:app:TA-custom-app]
restartSplunkd = true
[serverClass:production_windows]
whitelist.0 = prod-win-*
restartSplunkd = true
[serverClass:production_windows:app:TA-windows]
restartSplunkd = true
[serverClass:heavy_forwarders]
machineTypesFilter = linux-x86_64
whitelist.0 = hf-*
restartSplunkd = true
[serverClass:heavy_forwarders:app:TA-heavy-forwarder-config]
restartSplunkd = true
11. パフォーマンスチューニング
11.1 検索の最適化
# 悪い例:フィルタリングが遅い
index=web_logs | search status=500 | stats count by host
# 良い例:最初からフィルタリング
index=web_logs status=500 | stats count by host
# 悪い例:不要なフィールドを処理
index=web_logs status=500 | stats count by host, uri, method, clientip, response_time
# 良い例:必要なフィールドのみに限定
index=web_logs status=500 | fields host | stats count by host
# 悪い例:joinの使用(大量データで遅い)
index=web_logs | join clientip [search index=user_db | fields clientip, username]
# 良い例:lookupの使用
index=web_logs | lookup user_lookup clientip OUTPUT username
# 悪い例:ワイルドカードの多用
index=* sourcetype=* error
# 良い例:具体的な指定
index=web_logs sourcetype=access_combined status>=500
# 悪い例:subsearchの乱用
index=web_logs [search index=app_logs log_level=ERROR | stats values(host) as host | mvexpand host | fields host]
# 良い例:tstatsの活用
| tstats count where index=web_logs status>=500 by host span=1h
11.2 サマリインデキシング
# savedsearches.conf - サマリインデキシング
[Hourly Web Stats Summary]
search = index=web_logs \
| stats count as total_requests, \
count(eval(status>=500)) as server_errors, \
count(eval(status>=400 AND status<500)) as client_errors, \
avg(response_time) as avg_response_time, \
perc95(response_time) as p95_response_time, \
dc(clientip) as unique_clients, \
sum(bytes) as total_bytes \
by host, uri, method \
| eval error_rate = round((server_errors / total_requests) * 100, 4)
dispatch.earliest_time = -65m@m
dispatch.latest_time = -5m@m
cron_schedule = 5 * * * *
enableSched = 1
action.summary_index = 1
action.summary_index._name = summary_index
action.summary_index.force_realtime_schedule = 1
# サマリインデックスからの高速検索
index=summary_index source="Hourly Web Stats Summary"
| timechart span=1h sum(total_requests) as requests, avg(error_rate) as error_rate by host
11.3 アクセラレーテッドデータモデル
# datamodels.conf
[Web_Traffic]
acceleration = true
acceleration.earliest_time = -1y
acceleration.backfill_time = -30d
acceleration.max_time = 3600
acceleration.cron_schedule = */5 * * * *
acceleration.manual_rebuilds = true
acceleration.max_concurrent = 3
# アクセラレーション済みデータモデルからのtstats検索
| tstats summariesonly=true count, avg(Web_Traffic.response_time) as avg_rt
from datamodel=Web_Traffic
where Web_Traffic.status>=500
by Web_Traffic.host, _time
span=1h
11.4 レポートアクセラレーション
# savedsearches.conf
[Accelerated Error Report]
search = index=web_logs status>=400 | stats count by host, status, uri
auto_summarize = 1
auto_summarize.dispatch.earliest_time = -30d@d
auto_summarize.dispatch.latest_time = now
auto_summarize.cron_schedule = */10 * * * *
auto_summarize.timespan = 3600
11.5 tstats vs stats の比較
┌─────────────────────────────────────────────────────────────┐
│ tstats vs stats パフォーマンス比較 │
│ │
│ tstats: │
│ ・tsidx(Time-Series Index)ファイルを直接読む │
│ ・rawデータを読まないため非常に高速 │
│ ・インデックス化されたフィールドのみ使用可能 │
│ ・データモデルアクセラレーションと組み合わせ可能 │
│ ・使用可能フィールド: _time, host, source, sourcetype, │
│ index, および明示的にインデックス化したフィールド │
│ │
│ stats: │
│ ・rawデータを読んでフィールドを抽出 │
│ ・全てのフィールドが使用可能 │
│ ・tstatsより遅いが柔軟 │
│ │
│ パフォーマンス: │
│ tstats ████████████████████ ~100x高速 │
│ stats █ 基準 │
│ │
│ 推奨: 可能な限りtstatsを使用し、必要な場合のみstatsを使用 │
└─────────────────────────────────────────────────────────────┘
11.6 Search Job Inspector
# 検索パフォーマンスの分析
# Splunk WebのSearch Job Inspectorで確認可能な項目:
# 1. 実行時間の内訳
# - command.search: インデックス検索時間
# - command.stats: 統計処理時間
# - command.eval: 計算処理時間
# 2. スキャンカウント
# - scanCount: スキャンされたイベント数
# - resultCount: 結果イベント数
# - 比率が低い = フィルタリングの改善余地あり
# 検索ジョブの情報をREST APIで取得
curl -k -u admin:password \
https://localhost:8089/services/search/jobs/{search_id} \
-d output_mode=json
# 実行中の検索ジョブ一覧
| rest /services/search/jobs
| where isRealTimeSearch=0 AND dispatchState="RUNNING"
| table sid, label, runDuration, scanCount, eventCount, author
| sort - runDuration
11.7 Bloomフィルター
# indexes.conf - Bloomフィルター設定
[web_logs]
# Bloomフィルターはデフォルトで有効
# 特定のインデックスで無効化する場合
# bloomHomePath =
# server.conf
[general]
# Bloomフィルターの設定
bloomfilter_refresh_interval_seconds = 900
# Bloomフィルターの効果を確認
index=_internal sourcetype=splunkd component=DatabaseDirectoryManager
| stats count(eval(like(message, "%bloom%"))) as bloom_events by host
12. Splunk自体の監視
12.1 _internalインデックス
# インデクサーのパフォーマンス監視
index=_internal sourcetype=splunkd component=Metrics group=pipeline
| timechart span=5m avg(cpu_seconds) by processor
# インデックスのスループット
index=_internal sourcetype=splunkd component=Metrics group=per_index_thruput
| timechart span=5m sum(kb) as throughput_kb by series
# キューの健全性
index=_internal sourcetype=splunkd component=Metrics group=queue
| eval fill_pct = round((current_size / max_size) * 100, 2)
| timechart span=5m avg(fill_pct) by name
# フォワーダーの接続状態
index=_internal sourcetype=splunkd component=Metrics group=tcpin_connections
| stats latest(kb) as kb, latest(ev) as events by hostname, sourceIp
| sort - kb
# 検索のパフォーマンス
index=_audit sourcetype=audittrail action=search info=completed
| stats avg(total_run_time) as avg_runtime,
max(total_run_time) as max_runtime,
avg(scan_count) as avg_scans,
count as search_count
by user, savedsearch_name
| sort - avg_runtime
# ディスク使用量の監視
index=_introspection sourcetype=splunk_disk_objects component=Indexes
| stats latest(sizeOnDiskMB) as disk_mb by title
| sort - disk_mb
| eval disk_gb = round(disk_mb / 1024, 2)
| fields title, disk_gb
12.2 Monitoring Console (DMC)
# ライセンス使用量の監視
index=_internal sourcetype=splunkd component=LicenseUsage type=Usage
| timechart span=1d sum(b) as bytes_indexed
| eval gb_indexed = round(bytes_indexed / 1073741824, 2)
# ライセンス使用量の予測
index=_internal sourcetype=splunkd component=LicenseUsage type=Usage
| timechart span=1d sum(b) as bytes
| predict bytes as predicted_bytes future_timespan=30
| eval predicted_gb = round(predicted_bytes / 1073741824, 2)
# インデクサークラスタの健全性
| rest /services/cluster/manager/peers
| table title, site, status, is_searchable, replication_count, search_count, bucket_count
| sort title
# サーチヘッドクラスタの健全性
| rest /services/shcluster/status
| table label, captain.mgmt_uri, captain.elected_captain, peers{}.label, peers{}.status
# スケジュール検索の遅延
index=_internal sourcetype=scheduler
| stats avg(run_time) as avg_runtime, avg(lag_time) as avg_lag by savedsearch_name, app
| where avg_lag > 60
| sort - avg_lag
# KVStoreの監視
| rest /services/kvstore/status
| table title, current, status, replicationStatus, backupRestoreStatus
12.3 SREダッシュボード用の検索クエリ
# Splunkプラットフォーム健全性サマリ
index=_internal sourcetype=splunkd log_level=ERROR earliest=-1h
| stats count by component, host
| sort - count
| head 20
# インジェストパイプラインの遅延
index=_internal sourcetype=splunkd component=Metrics group=queue name=parsingQueue OR name=indexQueue
| eval fill_pct = round((current_size / max_size) * 100, 2)
| where fill_pct > 70
| table _time, host, name, fill_pct
# 検索の同時実行数
index=_internal sourcetype=splunkd component=SearchScheduler
| timechart span=5m dc(search_id) as concurrent_searches by host
13. インテグレーション
13.1 REST API
# 認証トークンの取得
curl -k https://localhost:8089/services/auth/login \
-d username=admin \
-d password=changeme
# 検索の実行
curl -k -u admin:password \
https://localhost:8089/services/search/jobs \
-d search="search index=web_logs status>=500 | stats count by host" \
-d earliest_time="-1h" \
-d latest_time="now" \
-d output_mode=json
# 検索結果の取得
curl -k -u admin:password \
"https://localhost:8089/services/search/jobs/{search_id}/results?output_mode=json&count=100"
# エクスポート(ストリーミング)
curl -k -u admin:password \
"https://localhost:8089/services/search/jobs/export" \
-d search="search index=web_logs | head 1000" \
-d earliest_time="-1h" \
-d latest_time="now" \
-d output_mode=json
# インデックスの管理
# インデックス一覧
curl -k -u admin:password \
"https://localhost:8089/services/data/indexes?output_mode=json&count=0"
# インデックスの作成
curl -k -u admin:password \
https://localhost:8089/services/data/indexes \
-d name=my_new_index \
-d maxTotalDataSizeMB=100000 \
-d frozenTimePeriodInSecs=7776000
# HECトークンの管理
# トークン一覧
curl -k -u admin:password \
"https://localhost:8089/services/data/inputs/http?output_mode=json"
# トークンの作成
curl -k -u admin:password \
https://localhost:8089/services/data/inputs/http \
-d name=my_hec_token \
-d index=app_events \
-d sourcetype=_json \
-d useACK=false
# ユーザー管理
curl -k -u admin:password \
"https://localhost:8089/services/authentication/users?output_mode=json"
# ユーザーの作成
curl -k -u admin:password \
https://localhost:8089/services/authentication/users \
-d name=newuser \
-d password=securepassword \
-d roles=sre_viewer \
-d email=newuser@example.com
# 保存済み検索の管理
curl -k -u admin:password \
"https://localhost:8089/services/saved/searches?output_mode=json&count=0"
# ナレッジオブジェクトのエクスポート
curl -k -u admin:password \
"https://localhost:8089/services/data/transforms/lookups?output_mode=json"
# クラスタステータス
curl -k -u admin:password \
"https://localhost:8089/services/cluster/manager/status?output_mode=json"
# サーバー情報
curl -k -u admin:password \
"https://localhost:8089/services/server/info?output_mode=json"
# アプリの管理
curl -k -u admin:password \
"https://localhost:8089/services/apps/local?output_mode=json"
13.2 Python SDK
import splunklib.client as client
import splunklib.results as results
import time
import json
class SplunkClient:
def __init__(self, host, port, username, password, scheme='https'):
self.service = client.connect(
host=host,
port=port,
username=username,
password=password,
scheme=scheme,
autologin=True
)
def search_sync(self, query, earliest='-1h', latest='now', max_count=10000):
"""同期検索(結果を待ってから返す)"""
kwargs = {
'earliest_time': earliest,
'latest_time': latest,
'search_mode': 'normal',
'max_count': max_count,
'exec_mode': 'blocking' # 完了まで待機
}
job = self.service.jobs.create(query, **kwargs)
result_list = []
reader = results.JSONResultsReader(job.results(output_mode='json', count=0))
for result in reader:
if isinstance(result, results.Message):
continue
result_list.append(dict(result))
return result_list
def search_async(self, query, earliest='-1h', latest='now', callback=None):
"""非同期検索"""
kwargs = {
'earliest_time': earliest,
'latest_time': latest,
'search_mode': 'normal',
'exec_mode': 'normal'
}
job = self.service.jobs.create(query, **kwargs)
while not job.is_done():
time.sleep(2)
job.refresh()
progress = float(job['doneProgress']) * 100
scan_count = int(job['scanCount'])
print(f"Progress: {progress:.1f}% - Scanned: {scan_count}")
result_list = []
offset = 0
count = 1000
while True:
reader = results.JSONResultsReader(
job.results(output_mode='json', count=count, offset=offset)
)
batch = [dict(r) for r in reader if isinstance(r, dict)]
if not batch:
break
result_list.extend(batch)
offset += count
job.cancel()
if callback:
callback(result_list)
return result_list
def get_index_info(self):
"""インデックス情報の取得"""
indexes = self.service.indexes
info = []
for index in indexes:
info.append({
'name': index.name,
'totalEventCount': index['totalEventCount'],
'currentDBSizeMB': index['currentDBSizeMB'],
'maxTotalDataSizeMB': index['maxTotalDataSizeMB'],
'frozenTimePeriodInSecs': index['frozenTimePeriodInSecs']
})
return info
def create_hec_token(self, name, index, sourcetype='_json'):
"""HECトークンの作成"""
return self.service.inputs.create(
name, 'http',
index=index,
sourcetype=sourcetype
)
def manage_saved_search(self, name, search_query, cron='0 * * * *',
actions=None):
"""保存済み検索の作成・更新"""
kwargs = {
'search': search_query,
'cron_schedule': cron,
'is_scheduled': True,
'dispatch.earliest_time': '-1h',
'dispatch.latest_time': 'now'
}
if actions:
kwargs.update(actions)
try:
saved_search = self.service.saved_searches[name]
saved_search.update(**kwargs)
except KeyError:
saved_search = self.service.saved_searches.create(name, **kwargs)
return saved_search
# 使用例
splunk = SplunkClient('localhost', 8089, 'admin', 'password')
# エラーの統計を取得
error_stats = splunk.search_sync(
'search index=web_logs status>=500 | stats count by host, status',
earliest='-4h'
)
print(json.dumps(error_stats, indent=2))
# インデックス情報の取得
for idx in splunk.get_index_info():
print(f"{idx['name']}: {idx['currentDBSizeMB']}MB / {idx['maxTotalDataSizeMB']}MB")
13.3 Splunk SOAR連携
# Splunk SOAR (Phantom) プレイブック例
import phantom.rules as phantom
import json
def playbook_main(container=None, **kwargs):
"""セキュリティインシデント自動対応プレイブック"""
# 1. アーティファクトからIPアドレスを抽出
success, message, artifacts = phantom.get_artifacts(
container=container,
artifact_type='ip'
)
for artifact in artifacts:
src_ip = artifact['cef']['sourceAddress']
# 2. VirusTotalでIPレピュテーション確認
phantom.act(
'ip reputation',
parameters=[{'ip': src_ip}],
assets=['virustotal'],
callback=handle_reputation_result,
name='vt_check'
)
# 3. Splunkで関連イベントを検索
phantom.act(
'run query',
parameters=[{
'query': f'search index=security_logs src_ip="{src_ip}" | stats count by action',
'display': 'action,count'
}],
assets=['splunk'],
callback=handle_splunk_results,
name='splunk_search'
)
def handle_reputation_result(action_results=None, **kwargs):
"""レピュテーション結果に基づく対応"""
for result in action_results:
data = result.get_data()[0]
if data.get('positives', 0) > 5:
# 悪意のあるIPの場合、ファイアウォールでブロック
phantom.act(
'block ip',
parameters=[{'ip': result.get_param('ip')}],
assets=['firewall'],
name='block_malicious_ip'
)
13.4 Splunk Observability / APM
# OpenTelemetry Collector設定 - Splunk Observability Cloud連携
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
prometheus:
config:
scrape_configs:
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
exporters:
splunk_hec:
token: "your-hec-token"
endpoint: "https://ingest.us0.signalfx.com/v1/log"
source: "otel"
sourcetype: "otel:log"
index: "otel_logs"
signalfx:
access_token: "your-signalfx-token"
realm: "us0"
sapm:
access_token: "your-signalfx-token"
endpoint: "https://ingest.us0.signalfx.com/v2/trace"
processors:
batch:
timeout: 5s
send_batch_size: 1000
resourcedetection:
detectors: [system, env, docker]
timeout: 5s
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, resourcedetection]
exporters: [sapm]
metrics:
receivers: [otlp, prometheus]
processors: [batch, resourcedetection]
exporters: [signalfx]
logs:
receivers: [otlp]
processors: [batch]
exporters: [splunk_hec]
14. SREのためのベストプラクティス
14.1 ログ管理戦略
┌─────────────────────────────────────────────────────────────────┐
│ ログ管理のベストプラクティス │
│ │
│ 1. インデックス設計 │
│ ・用途別にインデックスを分離(web, app, security, infra) │
│ ・保持期間を用途に応じて設定 │
│ ・ライセンス使用量を考慮したサイジング │
│ │
│ 2. ソースタイプの標準化 │
│ ・命名規則: <vendor>:<product>:<type> │
│ ・例: apache:access:combined, custom:app:json │
│ ・タイムスタンプフォーマットの統一 │
│ │
│ 3. フィールド抽出の最適化 │
│ ・search-timeよりindex-timeの抽出を優先(高頻度フィールド) │
│ ・CIM(Common Information Model)への準拠 │
│ ・フィールドエイリアスでの標準化 │
│ │
│ 4. データフィルタリング │
│ ・デバッグログの選択的取り込み │
│ ・Heavy ForwarderでのPIIマスキング │
│ ・不要データのnullQueue送り │
└─────────────────────────────────────────────────────────────────┘
14.2 アラート設計のベストプラクティス
# 良いアラート設計の例
# 1. エラーバジェットベースのアラート
index=web_logs
| bin _time span=1h
| stats count(eval(status<500)) as good, count as total by _time
| eval availability = (good / total) * 100
| streamstats window=24 avg(availability) as rolling_24h_availability
| where rolling_24h_availability < 99.9
| eval message = "24h rolling availability dropped below SLO: " . tostring(round(rolling_24h_availability, 4)) . "%"
# 2. 多段階アラート(Warning → Critical)
index=web_logs
| timechart span=5m count(eval(status>=500)) as errors, count as total
| eval error_rate = (errors / total) * 100
| eval severity = case(
error_rate > 10, "CRITICAL",
error_rate > 5, "WARNING",
error_rate > 2, "INFO",
true(), "OK"
)
| where severity IN ("WARNING", "CRITICAL")
# 3. アノマリーベースのアラート(予測との差異)
index=web_logs
| timechart span=1h count as requests
| predict requests as predicted future_timespan=0
| eval deviation = abs(requests - predicted)
| eval threshold = predicted * 0.5
| where deviation > threshold
| eval message = "Traffic anomaly detected: actual=" . tostring(requests) . " predicted=" . tostring(round(predicted, 0))
# 4. 相関アラート(複数条件の組み合わせ)
index=web_logs status>=500
| stats count as error_count by host
| join host
[search index=infra_metrics metric_name=cpu.usage | stats avg(value) as avg_cpu by host | where avg_cpu > 80]
| where error_count > 100
| eval alert_msg = host . ": " . tostring(error_count) . " errors with CPU at " . tostring(round(avg_cpu, 1)) . "%"
14.3 ダッシュボード設計のベストプラクティス
レベル1: エグゼクティブダッシュボード
└─ SLO/SLA達成率、全体のヘルスステータス
レベル2: サービスダッシュボード
└─ 各サービスのKPI(リクエスト数、エラー率、レイテンシー)
レベル3: 詳細分析ダッシュボード
└─ エンドポイント別、ホスト別の詳細メトリクス
レベル4: デバッグダッシュボード
└─ ログイベントの詳細、トレース情報
推奨事項:
・ドリルダウンで各レベル間を遷移可能にする
・ベースサーチを使って重複クエリを削減する
・トークンフィルターで対話的な分析を可能にする
・自動更新間隔を適切に設定する(リアルタイム vs 定期)
14.4 キャパシティプランニング
# 日次インジェスト量のトレンド
index=_internal sourcetype=splunkd component=LicenseUsage type=Usage
| timechart span=1d sum(b) as daily_bytes
| eval daily_gb = round(daily_bytes / 1073741824, 2)
| predict daily_gb as predicted_gb future_timespan=90
| eval upper_bound = predicted_gb * 1.2
# インデクサーのディスク使用量トレンド
index=_introspection sourcetype=splunk_disk_objects component=Partitions
| timechart span=1d avg(available) as available_gb by mount_point
| predict available_gb as predicted_available future_timespan=30
# 検索のキャパシティ分析
index=_internal sourcetype=scheduler
| stats count as total_searches,
avg(run_time) as avg_runtime,
sum(run_time) as total_runtime,
avg(lag_time) as avg_lag
by host
| eval utilization_pct = round((total_runtime / 3600) * 100, 2)
# ライセンス使用量の予測とアラート
index=_internal sourcetype=splunkd component=LicenseUsage type=Usage
| timechart span=1d sum(b) as daily_bytes
| eval daily_gb = round(daily_bytes / 1073741824, 2)
| eventstats avg(daily_gb) as avg_daily
| eval license_limit_gb = 500
| eval usage_pct = round((avg_daily / license_limit_gb) * 100, 2)
| eval days_to_exceed = if(avg_daily > 0, round((license_limit_gb - avg_daily) / (avg_daily * 0.05), 0), 999)
14.5 インシデント対応ワークフロー
# Step 1: 影響範囲の特定
index=web_logs status>=500 earliest=-1h
| stats count as errors, dc(clientip) as affected_users, dc(uri) as affected_endpoints by host
| sort - errors
# Step 2: タイムラインの確認
index=web_logs status>=500 earliest=-4h
| timechart span=1m count by host
# Step 3: エラーの分類
index=web_logs status>=500 earliest=-1h
| rex field=_raw "(?P<error_type>\w+Exception|\w+Error)"
| stats count by error_type, host
| sort - count
# Step 4: デプロイメントとの相関
(index=web_logs status>=500) OR (index=deployment_logs)
| timechart span=5m
count(eval(index="web_logs" AND status>=500)) as errors,
count(eval(index="deployment_logs")) as deployments
# Step 5: 根本原因の調査
index=app_logs host=affected-host-01 log_level=ERROR earliest=-1h
| transaction request_id
| stats count by error_type
| sort - count
まとめ
Splunkは強力なマシンデータ分析プラットフォームであり、SREエンジニアにとって不可欠なツールである。本ガイドで紹介した以下のポイントを押さえることで、Splunkの効果的な活用が可能になる。
- アーキテクチャの理解: フォワーダー、インデクサー、サーチヘッドの役割を理解し、適切にスケーリングする
- SPLの習熟: 基本コマンドから高度な分析パターンまで、SPLの活用がSplunkの真価を発揮する鍵
- インデックス設計: 用途に応じたインデックスの分離、保持期間の設定、SmartStoreの活用
- アラートの最適化: エラーバジェットベース、アノマリー検出、多段階アラートの実装
- パフォーマンス: tstatsの活用、サマリインデキシング、データモデルアクセラレーション
- セキュリティ: 適切な認証・認可設定、TLS暗号化、監査ログの活用
- 自動化: REST API、Python SDK、SOAR連携による運用自動化
Splunkを効果的に活用し、信頼性の高いシステム運用を実現してほしい。