AWS WAF の Count モードで検知されたログを Amazon Athena で集計する
2023.8.22
はじめに
WAF のログで Count モードで検知されたものだけ取得したいなと思い色々と頑張ったので記録しておきます。
WAF ログのアップデート
2022 年 11 月にアップデートがあったらしく、アップデート前と Count ログについて変更がありました。
変更点が多いため関連するところだけ引用します。
“action” フィールド
ログの最初に記録される “action” フィールドは、“ALLOW” か “BLOCK”、または “CAPTCHA” (ルールの Action に CAPTCHA が利用され、リクエストに有効なトークンがない場合)のいずれかとなり、ここに “COUNT” は記録されません。
Count されたかどうかは、最初の Action とは別のフィールドに記録されます。
はじめ、action フィールドがあるのでそれの Count で条件絞ってましたがうまくいかなくて困ってました。
Count モードは、どうやら判定された値の格納される場所が Allow と Block のときとは異なるようです。
詳細については以下記事をご覧いただければと思います。
AWS WAF のログ分析に関する考慮事項 | Amazon Web Services ブログ
テーブル作成
パーティションなしで作成してます。
AWS WAF ログのクエリ – Amazon Athena
LOCATION
の S3 バケット先は適宜変更下さい。
CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string > ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://aws-waf-logs-[Name]/AWSLogs/[AWS Account ID]/WAFLogs/[region]/'
各カラムの詳細は以下の WAF ログから確認できます。
ログフィールド – AWS WAF、AWS Firewall Manager、および AWS Shield Advanced
集計クエリ実行
はじめに、実際に格納されているログについてクレデンシャルなものはマスクしていますが記述します。
SELECT * FROM "waf_logs" limit 1;
上記 SQL を実行した結果が以下です。
(クレデンシャルな部分については「hoge」 に置き換えてたりします)
# timestamp formatversion webaclid terminatingruleid terminatingruletype action terminatingrulematchdetails httpsourcename httpsourceid rulegrouplist ratebasedrulelist nonterminatingmatchingrules requestheadersinserted responsecodesent httprequest labels captcharesponse 1 1688573101112 1 arn:aws:wafv2:us-east-1:xxxxxxx:global/webacl/hoge/c265e0a7-53b8-45a7-a3ae-234589051d61 Default_Action REGULAR ALLOW [] CF E1IDDASP7ADBHZ [{rulegroupid=arn:aws:wafv2:us-east-1:xxxxxxxxxx:global/rulegroup/hoge/3cf65b764d0fef18, terminatingrule=null, nonterminatingmatchingrules=[], excludedrules=null}, {rulegroupid=arn:aws:wafv2:us-east-1:xxxxxxxxxx:global/rulegroup/hoge/08adfd, terminatingrule=null, nonterminatingmatchingrules=[], excludedrules=null}, {rulegroupid=arn:aws:wafv2:us-east-1:xxxxxxxxxxx:global/rulegroup/hoge/9dfc842ca867, terminatingrule=null, nonterminatingmatchingrules=[], excludedrules=null}] [] [] {clientip=192.168.1.1, country=US, headers=[{name=User-Agent, value=Mozilla/5.0+(compatible; UptimeRobot/2.0; http://www.uptimerobot.com/)}, {name=Accept, value=text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8}, {name=Accept-Language, value=en-US,en;q=0.8}, {name=Connection, value=close}, {name=cache-control, value=no-cache}, {name=Referer, value=https://www.hoge.co.jp/}, {name=host, value=www.hoge.co.jp}, {name=accept-encoding, value=gzip, deflate}, {name=content-length, value=0}], uri=/, args=, httpversion=HTTP/1.1, httpmethod=HEAD, requestid=vxH-x6z3iRmFS8hQ==}
結構複雑に色々と入れ子になってるのでこれをいい感じに見れるようにクエリを整えます。
Count モードの一覧
Count モードで検知されたログを取得するクエリです。
Count モードで検知されたログはどのルールで引っかかったかが格納されているのでそれで判定します。
SELECT timestamp, httprequest, groupList.excludedRules FROM waf_logs, UNNEST(ruleGroupList) t(groupList) WHERE groupList.excludedRules IS NOT NULL
実行結果は以下です。
# timestamp httprequest excludedRules 1 1688582941195 {clientip=192.168.1.1, country=CN, headers=[{name=Host, value=www.hoge.com}, {name=Connection, value=close}, {name=Accept-Encoding, value=gzip, deflate}, {name=Accept, value=text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8}, {name=User-Agent, value=Mozilla/5.0}, {name=Accept-Language, value=en-US,en;q=0.5}, {name=Upgrade-Insecure-Requests, value=1}], uri=/, args=lang=../..////data/./config//sys_vd_root%2Broot.conf.gz, httpversion=HTTP/1.1, httpmethod=GET, requestid=vFUsbFga7-FHGWIXqacg==} [{"rulematchdetails":"null","exclusiontype":"EXCLUDED_AS_COUNT","ruleid":"ディレクトリトラバーサル系を検知するルール"}]
Count モードのホスト名取得
WAF に複数のドメインリソースを紐づけている場合は、どのサイトに対する攻撃なのかを判断したいためホスト名を取得するクエリです。
Count モードで検知されたものを対象として取得したかったため、それらを結合するため連番を払い出ししています。
また、host が先頭大文字だったりすべて小文字のものが混じってたので統一するため lower してます。
WITH dataset AS ( SELECT row_number() over () AS id, timestamp, httprequest FROM waf_logs, UNNEST(ruleGroupList) t(groupList) WHERE groupList.excludedRules IS NOT NULL ), host_dataset AS ( SELECT id, n.value AS host FROM dataset CROSS JOIN unnest( cast( httprequest.headers AS ARRAY(ROW(name VARCHAR, value VARCHAR)) ) ) AS x(n) WHERE LOWER(n.name) = 'host' ) SELECT host,httprequest.uri,count(*) as c FROM dataset LEFT JOIN host_dataset ON host_dataset.id = dataset.id GROUP BY host,httprequest.uri order by c desc
実行結果は以下です。
# host uri c 1 www.hogehoge.jp /hoge/add 1310 2 www.hogehoge.jp / 222 3 www.hoge.com / 162
検知されたルール一覧
検知したルールを取得するクエリです。
ルールが格納されているカラムがかなり入れ込んでいるので力技で取り出すようにしました。
with dataset as ( SELECT cast(json_parse(groupList.excludedRules) as array(json)) as rule FROM waf_logs, UNNEST(ruleGroupList) t(groupList) WHERE groupList.excludedRules IS NOT NULL ) select distinct json_extract(waf_logs, '$.ruleid') as ruleid from dataset, unnest(rule) AS t(waf_logs)
実行結果は以下です。
# ruleid 1 "SQLインジェクション系のルール1" 2 "Log4jのルール" 3 "ブラックリストルール"
検知されたルールごとの検索
特定の検知したルールのログを取得するクエリです。
ルールで検知したログをまとめて分析するときに使いました。
なお、ここで検索するルール名は、ルール一覧を取得するクエリから選択してます。
[ルール名]のところは適宜変更下さい。
WITH dataset AS ( SELECT row_number() over () AS id, timestamp, httprequest FROM waf_logs, UNNEST(ruleGroupList) t(groupList) WHERE cast(json_extract(element_at(cast(json_parse(groupList.excludedRules) as array(json)),1),'$.ruleid')as varchar) like '%[ルール名]%' ), host_dataset AS ( SELECT id, n.value AS host FROM dataset CROSS JOIN unnest( cast( httprequest.headers AS ARRAY(ROW(name VARCHAR, value VARCHAR)) ) ) AS x(n) WHERE LOWER(n.name) = 'host' ) SELECT distinct host,httprequest.uri,httprequest.args, httprequest.clientip,httprequest.country FROM dataset LEFT JOIN host_dataset ON host_dataset.id = dataset.id
実行結果は以下です。
# host uri args clientip country 1 www.hoge.co.jp / server=db&username=root&db=mysql&table=event%3C%2Fscript%t.domain%29%3C%2Fscript%3E 192.168.1.1 TH 2 www.hoge.com //wp-22.php sfilename=nyx-jjlsy.php&sfilecontent=%3Ctitle%3E404%20Not%20Found%3C/title%3E%3Ccenter%3E%3Cbr%3E%3Cbr20echo%20'Failed!%3C/b%3E%3Cbr%3E%3Cbr%3E';%20%7D%7D%0A?%3E&supfiles=nyx-jjlsy.php 192.168.1.2 PL 3 www.hoge.co.jp /plugins/address_add/add.php first=HOVER%20ME!%3C%2Fscript%3nt.domain%29%3C%2Fscript%3E 192.168.1.1 TH
日付を指定する
最後におまけ的な条件式です。
日付を特定するときに使用するクエリです。
SELECT row_number() over () AS id, timestamp, httprequest, groupList.excludedRules FROM waf_logs, UNNEST(ruleGroupList) t(groupList) WHERE groupList.excludedRules IS NOT NULL AND from_unixtime(timestamp/1000, 'Asia/Tokyo') <= date_parse('2023/07/24', '%Y/%m/%d')
期間を指定する場合。
SELECT row_number() over () AS id, timestamp, httprequest, groupList.excludedRules FROM waf_logs, UNNEST(ruleGroupList) t(groupList) WHERE groupList.excludedRules IS NOT NULL AND from_unixtime(timestamp/1000, 'Asia/Tokyo') between date_parse('2023/07/24', '%Y/%m/%d') and date_parse('2023/07/28', '%Y/%m/%d')
参考情報
AWS WAF ログのクエリ – Amazon Athena
CloudWatch または Amazon S3 に保存されている AWS WAF ログを分析する | AWS re:Post
AWS WAF のログ分析に関する考慮事項 | Amazon Web Services ブログ
テックブログ新着情報のほか、AWSやGoogle Cloudに関するお役立ち情報を配信中!
Follow @twitter2021年新卒入社。インフラエンジニアです。RDBが三度の飯より好きです。 主にデータベースやAWSのサーバレスについて書く予定です。あと寒いのは苦手です。
Recommends
こちらもおすすめ
-
QuickSightとAthenaを活用!データ分析入門
2017.12.11
-
Amazon DynamoDB テーブルを別のアカウントに移行する
2022.9.2
Special Topics
注目記事はこちら
データ分析入門
これから始めるBigQuery基礎知識
2024.02.28
AWSの料金が 10 %割引になる!
『AWSの請求代行リセールサービス』
2024.07.16