10日で覚えるSplunkDay 7: サーチの応用

Day 7: サーチの応用

今日学ぶこと

  • サブサーチ
  • lookup コマンド
  • join コマンド
  • transaction コマンド
  • append と appendpipe

サブサーチ

サーチの中に別のサーチを埋め込む機能です。

# 最もエラーが多いホストのログを表示
index=main
  [search index=main status>=500
   | stats count by host
   | sort -count
   | head 1
   | fields host]
flowchart TB
    Outer["メインサーチ<br>index=main"]
    Inner["サブサーチ<br>[search ... | head 1 | fields host]"]
    Result["結果<br>host=web-01 のイベント"]
    Inner -->|"host=web-01"| Outer --> Result
    style Outer fill:#3b82f6,color:#fff
    style Inner fill:#22c55e,color:#fff
    style Result fill:#f59e0b,color:#fff

サブサーチの仕組み

  1. [...]内のサーチが先に実行される
  2. 結果がOR条件としてメインサーチに展開される
  3. 例: [search ... | fields host](host="web-01")

実用例

# 過去1時間でエラーが多いIPのアクセスを全件表示
index=main sourcetype=access_combined
  [search index=main sourcetype=access_combined status>=400 earliest=-1h
   | stats count by clientip
   | where count > 10
   | fields clientip]
| table _time, clientip, uri, status

# 特定ユーザーのセッション内アクティビティ
index=main sourcetype=app_log
  [search index=main sourcetype=auth_log action=login user=alice
   | head 1
   | fields session_id]

注意: サブサーチには実行時間の制限(デフォルト60秒)と結果数の制限(デフォルト10,500件)があります。大量のデータにはjoinの使用を検討しましょう。


lookup コマンド

外部のCSVファイルやKVストアのデータとイベントを紐付けます。

CSVルックアップの作成

  1. CSVファイルを作成
status_code,status_description,severity
200,OK,info
301,Moved Permanently,info
400,Bad Request,warning
401,Unauthorized,warning
403,Forbidden,warning
404,Not Found,warning
500,Internal Server Error,critical
502,Bad Gateway,critical
503,Service Unavailable,critical
  1. Splunkにアップロード: Settings > Lookups > Lookup table files > Add new

  2. ルックアップ定義を作成: Settings > Lookups > Lookup definitions > Add new

lookup コマンドの使用

# ステータスコードの説明を追加
index=main sourcetype=access_combined
| lookup http_status status_code AS status OUTPUT status_description, severity
| table _time, uri, status, status_description, severity

自動ルックアップ

transforms.confで設定すると、検索時に自動的にルックアップが適用されます。

# transforms.conf
[http_status_lookup]
filename = http_status.csv
# props.conf
[access_combined]
LOOKUP-http_status = http_status_lookup status_code AS status OUTPUT status_description severity

inputlookup / outputlookup

# ルックアップテーブルの内容を表示
| inputlookup http_status.csv

# 検索結果をルックアップに保存
index=main sourcetype=access_combined
| stats count by clientip
| outputlookup ip_activity.csv

join コマンド

2つの検索結果をフィールドの値で結合します。

# 認証ログとアクセスログを結合
index=main sourcetype=access_combined
| join type=left clientip
  [search index=main sourcetype=auth_log
   | stats latest(user) AS user, latest(action) AS last_action by src_ip
   | rename src_ip AS clientip]
| table _time, clientip, user, uri, status

join の種類

タイプ 説明
inner(デフォルト) 両方にマッチするイベントのみ
left 左側(メインサーチ)のすべて + マッチした右側
outer 両方のすべて

注意: joinはメモリ使用量が多く、大量データには不向きです。可能な場合は**statslookup**で代替しましょう。

join の代替手段

# join の代わりに stats を使う
index=main (sourcetype=access_combined OR sourcetype=auth_log)
| stats values(uri) AS uris, values(user) AS users, latest(status) AS status by clientip

transaction コマンド

関連するイベントをグループ化します。セッション分析やワークフローの追跡に便利です。

# ユーザーごとのセッションをグループ化
index=main sourcetype=access_combined
| transaction clientip maxspan=30m maxpause=5m
| table clientip, duration, eventcount, _time

# セッションの開始と終了を指定
index=main sourcetype=app_log
| transaction session_id startswith="login" endswith="logout"
| table session_id, user, duration, eventcount

transaction のパラメータ

パラメータ 説明
maxspan トランザクションの最大期間 maxspan=1h
maxpause イベント間の最大間隔 maxpause=5m
startswith 開始イベントの条件 startswith="login"
endswith 終了イベントの条件 endswith="logout"
maxevents 最大イベント数 maxevents=100

transaction で追加されるフィールド

フィールド 説明
duration トランザクションの期間(秒)
eventcount イベント数
closed_txn トランザクションが正常終了したか
flowchart LR
    E1["10:00<br>login"]
    E2["10:05<br>browse"]
    E3["10:10<br>purchase"]
    E4["10:15<br>logout"]
    subgraph TX["transaction session_id"]
        E1 --> E2 --> E3 --> E4
    end
    TX -->|"duration=900s<br>eventcount=4"| Result["結果"]
    style TX fill:#3b82f6,color:#fff
    style Result fill:#22c55e,color:#fff

パフォーマンスTip: transactionは重いコマンドです。可能な場合はstatsで代替しましょう。

# transaction の代替(stats で実現)
index=main sourcetype=access_combined
| stats min(_time) AS start, max(_time) AS end, count AS eventcount, values(uri) AS pages by clientip
| eval duration = end - start

append と appendpipe

append

別のサーチの結果を末尾に追加します。

# 通常の結果 + 合計行
index=main sourcetype=access_combined
| stats count by host
| append
  [search index=main sourcetype=access_combined
   | stats count
   | eval host="TOTAL"]
| sort -count

appendpipe

現在の結果に対して追加の集計行を追加します。

# 各ホストのカウント + 合計行
index=main sourcetype=access_combined
| stats count by host
| appendpipe [stats sum(count) AS count | eval host="TOTAL"]
| sort -count

推奨: 合計行の追加にはappendpipeの方がシンプルです。


実践: セキュリティ分析クエリ

# 1. ブルートフォース検知
# 同一IPから短時間に多数のログイン失敗
index=main sourcetype=auth_log action=failed
| stats count by src_ip
| where count > 5
| lookup geo_ip ip AS src_ip OUTPUT country, city
| sort -count

# 2. 異常なアクセスパターン
# 通常と異なるページへのアクセス
index=main sourcetype=access_combined
| transaction clientip maxspan=10m
| where eventcount > 50
| table clientip, eventcount, duration

# 3. エラーの連鎖分析
# 特定エラーの前後のイベントを確認
index=main sourcetype=app_log
| transaction host maxspan=5m maxpause=1m
| search "OutOfMemoryError"
| table _time, host, eventcount, _raw

# 4. ユーザー行動の追跡
index=main sourcetype=access_combined
  [search index=main sourcetype=access_combined status=403
   | stats count by clientip
   | where count > 3
   | fields clientip]
| transaction clientip maxspan=1h
| table clientip, duration, eventcount, _raw

まとめ

概念 説明
サブサーチ [search ...]で別のサーチを埋め込む
lookup 外部データとの紐付け
join 2つの結果セットの結合
transaction 関連イベントのグループ化
append 別のサーチ結果の追加
appendpipe 集計行の追加

重要ポイント

  1. サブサーチは結果数と時間の制限に注意
  2. **lookup**は外部データの紐付けに最適
  3. **join**はメモリを多く使うので大量データには注意
  4. **transaction**は重いので、可能な限りstatsで代替

練習問題

問題1: 基本

HTTPステータスコードのCSVルックアップを作成し、アクセスログに説明を追加して表示してください。

問題2: 応用

サブサーチを使って、過去1時間で最もエラーが多いホストの全ログを表示してください。

チャレンジ問題

transactionを使って、ユーザーセッションを分析し、平均セッション時間、平均ページビュー数、直帰率(1ページのみのセッション率)を計算してください。


参考リンク


次回予告: Day 8では「アラートとレポート」について学びます。自動化された監視とレポート生成の方法をマスターしましょう。