Day 7: サーチの応用
今日学ぶこと
- サブサーチ
- lookup コマンド
- join コマンド
- transaction コマンド
- append と appendpipe
サブサーチ
サーチの中に別のサーチを埋め込む機能です。
# 最もエラーが多いホストのログを表示
index=main
[search index=main status>=500
| stats count by host
| sort -count
| head 1
| fields host]
flowchart TB
Outer["メインサーチ<br>index=main"]
Inner["サブサーチ<br>[search ... | head 1 | fields host]"]
Result["結果<br>host=web-01 のイベント"]
Inner -->|"host=web-01"| Outer --> Result
style Outer fill:#3b82f6,color:#fff
style Inner fill:#22c55e,color:#fff
style Result fill:#f59e0b,color:#fff
サブサーチの仕組み
[...]内のサーチが先に実行される- 結果がOR条件としてメインサーチに展開される
- 例:
[search ... | fields host]→(host="web-01")
実用例
# 過去1時間でエラーが多いIPのアクセスを全件表示
index=main sourcetype=access_combined
[search index=main sourcetype=access_combined status>=400 earliest=-1h
| stats count by clientip
| where count > 10
| fields clientip]
| table _time, clientip, uri, status
# 特定ユーザーのセッション内アクティビティ
index=main sourcetype=app_log
[search index=main sourcetype=auth_log action=login user=alice
| head 1
| fields session_id]
注意: サブサーチには実行時間の制限(デフォルト60秒)と結果数の制限(デフォルト10,500件)があります。大量のデータには
joinの使用を検討しましょう。
lookup コマンド
外部のCSVファイルやKVストアのデータとイベントを紐付けます。
CSVルックアップの作成
- CSVファイルを作成
status_code,status_description,severity
200,OK,info
301,Moved Permanently,info
400,Bad Request,warning
401,Unauthorized,warning
403,Forbidden,warning
404,Not Found,warning
500,Internal Server Error,critical
502,Bad Gateway,critical
503,Service Unavailable,critical
-
Splunkにアップロード: Settings > Lookups > Lookup table files > Add new
-
ルックアップ定義を作成: Settings > Lookups > Lookup definitions > Add new
lookup コマンドの使用
# ステータスコードの説明を追加
index=main sourcetype=access_combined
| lookup http_status status_code AS status OUTPUT status_description, severity
| table _time, uri, status, status_description, severity
自動ルックアップ
transforms.confで設定すると、検索時に自動的にルックアップが適用されます。
# transforms.conf
[http_status_lookup]
filename = http_status.csv
# props.conf
[access_combined]
LOOKUP-http_status = http_status_lookup status_code AS status OUTPUT status_description severity
inputlookup / outputlookup
# ルックアップテーブルの内容を表示
| inputlookup http_status.csv
# 検索結果をルックアップに保存
index=main sourcetype=access_combined
| stats count by clientip
| outputlookup ip_activity.csv
join コマンド
2つの検索結果をフィールドの値で結合します。
# 認証ログとアクセスログを結合
index=main sourcetype=access_combined
| join type=left clientip
[search index=main sourcetype=auth_log
| stats latest(user) AS user, latest(action) AS last_action by src_ip
| rename src_ip AS clientip]
| table _time, clientip, user, uri, status
join の種類
| タイプ | 説明 |
|---|---|
inner(デフォルト) |
両方にマッチするイベントのみ |
left |
左側(メインサーチ)のすべて + マッチした右側 |
outer |
両方のすべて |
注意:
joinはメモリ使用量が多く、大量データには不向きです。可能な場合は**statsやlookup**で代替しましょう。
join の代替手段
# join の代わりに stats を使う
index=main (sourcetype=access_combined OR sourcetype=auth_log)
| stats values(uri) AS uris, values(user) AS users, latest(status) AS status by clientip
transaction コマンド
関連するイベントをグループ化します。セッション分析やワークフローの追跡に便利です。
# ユーザーごとのセッションをグループ化
index=main sourcetype=access_combined
| transaction clientip maxspan=30m maxpause=5m
| table clientip, duration, eventcount, _time
# セッションの開始と終了を指定
index=main sourcetype=app_log
| transaction session_id startswith="login" endswith="logout"
| table session_id, user, duration, eventcount
transaction のパラメータ
| パラメータ | 説明 | 例 |
|---|---|---|
maxspan |
トランザクションの最大期間 | maxspan=1h |
maxpause |
イベント間の最大間隔 | maxpause=5m |
startswith |
開始イベントの条件 | startswith="login" |
endswith |
終了イベントの条件 | endswith="logout" |
maxevents |
最大イベント数 | maxevents=100 |
transaction で追加されるフィールド
| フィールド | 説明 |
|---|---|
duration |
トランザクションの期間(秒) |
eventcount |
イベント数 |
closed_txn |
トランザクションが正常終了したか |
flowchart LR
E1["10:00<br>login"]
E2["10:05<br>browse"]
E3["10:10<br>purchase"]
E4["10:15<br>logout"]
subgraph TX["transaction session_id"]
E1 --> E2 --> E3 --> E4
end
TX -->|"duration=900s<br>eventcount=4"| Result["結果"]
style TX fill:#3b82f6,color:#fff
style Result fill:#22c55e,color:#fff
パフォーマンスTip:
transactionは重いコマンドです。可能な場合はstatsで代替しましょう。
# transaction の代替(stats で実現)
index=main sourcetype=access_combined
| stats min(_time) AS start, max(_time) AS end, count AS eventcount, values(uri) AS pages by clientip
| eval duration = end - start
append と appendpipe
append
別のサーチの結果を末尾に追加します。
# 通常の結果 + 合計行
index=main sourcetype=access_combined
| stats count by host
| append
[search index=main sourcetype=access_combined
| stats count
| eval host="TOTAL"]
| sort -count
appendpipe
現在の結果に対して追加の集計行を追加します。
# 各ホストのカウント + 合計行
index=main sourcetype=access_combined
| stats count by host
| appendpipe [stats sum(count) AS count | eval host="TOTAL"]
| sort -count
推奨: 合計行の追加には
appendpipeの方がシンプルです。
実践: セキュリティ分析クエリ
# 1. ブルートフォース検知
# 同一IPから短時間に多数のログイン失敗
index=main sourcetype=auth_log action=failed
| stats count by src_ip
| where count > 5
| lookup geo_ip ip AS src_ip OUTPUT country, city
| sort -count
# 2. 異常なアクセスパターン
# 通常と異なるページへのアクセス
index=main sourcetype=access_combined
| transaction clientip maxspan=10m
| where eventcount > 50
| table clientip, eventcount, duration
# 3. エラーの連鎖分析
# 特定エラーの前後のイベントを確認
index=main sourcetype=app_log
| transaction host maxspan=5m maxpause=1m
| search "OutOfMemoryError"
| table _time, host, eventcount, _raw
# 4. ユーザー行動の追跡
index=main sourcetype=access_combined
[search index=main sourcetype=access_combined status=403
| stats count by clientip
| where count > 3
| fields clientip]
| transaction clientip maxspan=1h
| table clientip, duration, eventcount, _raw
まとめ
| 概念 | 説明 |
|---|---|
| サブサーチ | [search ...]で別のサーチを埋め込む |
lookup |
外部データとの紐付け |
join |
2つの結果セットの結合 |
transaction |
関連イベントのグループ化 |
append |
別のサーチ結果の追加 |
appendpipe |
集計行の追加 |
重要ポイント
- サブサーチは結果数と時間の制限に注意
- **
lookup**は外部データの紐付けに最適 - **
join**はメモリを多く使うので大量データには注意 - **
transaction**は重いので、可能な限りstatsで代替
練習問題
問題1: 基本
HTTPステータスコードのCSVルックアップを作成し、アクセスログに説明を追加して表示してください。
問題2: 応用
サブサーチを使って、過去1時間で最もエラーが多いホストの全ログを表示してください。
チャレンジ問題
transactionを使って、ユーザーセッションを分析し、平均セッション時間、平均ページビュー数、直帰率(1ページのみのセッション率)を計算してください。
参考リンク
次回予告: Day 8では「アラートとレポート」について学びます。自動化された監視とレポート生成の方法をマスターしましょう。