AWS WAF の Count モードで検知されたログを Amazon Athena で集計する

AWS

2023.8.22

Topics

はじめに

WAF のログで Count モードで検知されたものだけ取得したいなと思い色々と頑張ったので記録しておきます。

WAF ログのアップデート

2022 年 11 月にアップデートがあったらしく、アップデート前と Count ログについて変更がありました。

変更点が多いため関連するところだけ引用します。

“action” フィールド
ログの最初に記録される “action” フィールドは、“ALLOW” か “BLOCK”、または “CAPTCHA” (ルールの Action に CAPTCHA が利用され、リクエストに有効なトークンがない場合)のいずれかとなり、ここに “COUNT” は記録されません。
Count されたかどうかは、最初の Action とは別のフィールドに記録されます。

はじめ、action フィールドがあるのでそれの Count で条件絞ってましたがうまくいかなくて困ってました。
Count モードは、どうやら判定された値の格納される場所が Allow と Block のときとは異なるようです。

詳細については以下記事をご覧いただければと思います。
AWS WAF のログ分析に関する考慮事項 | Amazon Web Services ブログ

テーブル作成

パーティションなしで作成してます。
AWS WAF ログのクエリ – Amazon Athena

LOCATIONの S3 バケット先は適宜変更下さい。

CREATE EXTERNAL TABLE `waf_logs`(
  `timestamp` bigint,
  `formatversion` int,
  `webaclid` string,
  `terminatingruleid` string,
  `terminatingruletype` string,
  `action` string,
  `terminatingrulematchdetails` array <
                                    struct <
                                        conditiontype: string,
                                        sensitivitylevel: string,
                                        location: string,
                                        matcheddata: array < string >
                                          >
                                     >,
  `httpsourcename` string,
  `httpsourceid` string,
  `rulegrouplist` array <
                      struct <
                          rulegroupid: string,
                          terminatingrule: struct <
                                              ruleid: string,
                                              action: string,
                                              rulematchdetails: array <
                                                                   struct <
                                                                       conditiontype: string,
                                                                       sensitivitylevel: string,
                                                                       location: string,
                                                                       matcheddata: array < string >
                                                                          >
                                                                    >
                                                >,
                          nonterminatingmatchingrules: array <
                                                              struct <
                                                                  ruleid: string,
                                                                  action: string,
                                                                  overriddenaction: string,
                                                                  rulematchdetails: array <
                                                                                       struct <
                                                                                           conditiontype: string,
                                                                                           sensitivitylevel: string,
                                                                                           location: string,
                                                                                           matcheddata: array < string >
                                                                                              >
                                                                                       >
                                                                    >
                                                             >,
                          excludedrules: string
                            >
                       >,
`ratebasedrulelist` array <
                         struct <
                             ratebasedruleid: string,
                             limitkey: string,
                             maxrateallowed: int
                               >
                          >,
  `nonterminatingmatchingrules` array <
                                    struct <
                                        ruleid: string,
                                        action: string,
                                        rulematchdetails: array <
                                                             struct <
                                                                 conditiontype: string,
                                                                 sensitivitylevel: string,
                                                                 location: string,
                                                                 matcheddata: array < string >
                                                                    >
                                                             >,
                                        captcharesponse: struct <
                                                            responsecode: string,
                                                            solvetimestamp: string
                                                             >
                                          >
                                     >,
  `requestheadersinserted` array <
                                struct <
                                    name: string,
                                    value: string
                                      >
                                 >,
  `responsecodesent` string,
  `httprequest` struct <
                    clientip: string,
                    country: string,
                    headers: array <
                                struct <
                                    name: string,
                                    value: string
                                      >
                                 >,
                    uri: string,
                    args: string,
                    httpversion: string,
                    httpmethod: string,
                    requestid: string
                      >,
  `labels` array <
               struct <
                   name: string
                     >
                >,
  `captcharesponse` struct <
                        responsecode: string,
                        solvetimestamp: string,
                        failureReason: string
                          >
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://aws-waf-logs-[Name]/AWSLogs/[AWS Account ID]/WAFLogs/[region]/'

各カラムの詳細は以下の WAF ログから確認できます。
ログフィールド – AWS WAF、AWS Firewall Manager、および AWS Shield Advanced

集計クエリ実行

はじめに、実際に格納されているログについてクレデンシャルなものはマスクしていますが記述します。

SELECT *
FROM "waf_logs"
limit 1;

上記 SQL を実行した結果が以下です。
(クレデンシャルな部分については「hoge」 に置き換えてたりします)

#  timestamp   formatversion   webaclid    terminatingruleid   terminatingruletype action  terminatingrulematchdetails httpsourcename  httpsourceid    rulegrouplist   ratebasedrulelist   nonterminatingmatchingrules requestheadersinserted  responsecodesent    httprequest labels  captcharesponse
1   1688573101112   1   arn:aws:wafv2:us-east-1:xxxxxxx:global/webacl/hoge/c265e0a7-53b8-45a7-a3ae-234589051d61 Default_Action  REGULAR ALLOW   []  CF  E1IDDASP7ADBHZ  [{rulegroupid=arn:aws:wafv2:us-east-1:xxxxxxxxxx:global/rulegroup/hoge/3cf65b764d0fef18, terminatingrule=null, nonterminatingmatchingrules=[], excludedrules=null}, {rulegroupid=arn:aws:wafv2:us-east-1:xxxxxxxxxx:global/rulegroup/hoge/08adfd, terminatingrule=null, nonterminatingmatchingrules=[], excludedrules=null}, {rulegroupid=arn:aws:wafv2:us-east-1:xxxxxxxxxxx:global/rulegroup/hoge/9dfc842ca867, terminatingrule=null, nonterminatingmatchingrules=[], excludedrules=null}]    []  []          {clientip=192.168.1.1, country=US, headers=[{name=User-Agent, value=Mozilla/5.0+(compatible; UptimeRobot/2.0; http://www.uptimerobot.com/)}, {name=Accept, value=text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8}, {name=Accept-Language, value=en-US,en;q=0.8}, {name=Connection, value=close}, {name=cache-control, value=no-cache}, {name=Referer, value=https://www.hoge.co.jp/}, {name=host, value=www.hoge.co.jp}, {name=accept-encoding, value=gzip, deflate}, {name=content-length, value=0}], uri=/, args=, httpversion=HTTP/1.1, httpmethod=HEAD, requestid=vxH-x6z3iRmFS8hQ==}

結構複雑に色々と入れ子になってるのでこれをいい感じに見れるようにクエリを整えます。

Count モードの一覧

Count モードで検知されたログを取得するクエリです。
Count モードで検知されたログはどのルールで引っかかったかが格納されているのでそれで判定します。

SELECT
    timestamp,
    httprequest,
    groupList.excludedRules
FROM
    waf_logs, UNNEST(ruleGroupList) t(groupList)
WHERE
    groupList.excludedRules IS NOT NULL

実行結果は以下です。

#  timestamp   httprequest excludedRules
1   1688582941195   {clientip=192.168.1.1, country=CN, headers=[{name=Host, value=www.hoge.com}, {name=Connection, value=close}, {name=Accept-Encoding, value=gzip, deflate}, {name=Accept, value=text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8}, {name=User-Agent, value=Mozilla/5.0}, {name=Accept-Language, value=en-US,en;q=0.5}, {name=Upgrade-Insecure-Requests, value=1}], uri=/, args=lang=../..////data/./config//sys_vd_root%2Broot.conf.gz, httpversion=HTTP/1.1, httpmethod=GET, requestid=vFUsbFga7-FHGWIXqacg==}    [{"rulematchdetails":"null","exclusiontype":"EXCLUDED_AS_COUNT","ruleid":"ディレクトリトラバーサル系を検知するルール"}]

Count モードのホスト名取得

WAF に複数のドメインリソースを紐づけている場合は、どのサイトに対する攻撃なのかを判断したいためホスト名を取得するクエリです。

Count モードで検知されたものを対象として取得したかったため、それらを結合するため連番を払い出ししています。
また、host が先頭大文字だったりすべて小文字のものが混じってたので統一するため lower してます。

WITH dataset AS (
    SELECT
        row_number() over () AS id,
        timestamp,
        httprequest
    FROM
        waf_logs, UNNEST(ruleGroupList) t(groupList)
    WHERE
        groupList.excludedRules IS NOT NULL
), host_dataset AS (
    SELECT
        id,
        n.value AS host
    FROM
        dataset
        CROSS JOIN unnest(
            cast(
                httprequest.headers AS ARRAY(ROW(name VARCHAR, value VARCHAR))
            )
        ) AS x(n)
    WHERE
        LOWER(n.name) = 'host'
)
SELECT
    host,httprequest.uri,count(*) as c
FROM
    dataset
    LEFT JOIN host_dataset ON host_dataset.id = dataset.id
GROUP BY
    host,httprequest.uri
order by c desc

実行結果は以下です。

#  host    uri c
1   www.hogehoge.jp /hoge/add   1310
2   www.hogehoge.jp /   222
3   www.hoge.com    /   162

検知されたルール一覧

検知したルールを取得するクエリです。

ルールが格納されているカラムがかなり入れ込んでいるので力技で取り出すようにしました。

with dataset as (
    SELECT
        cast(json_parse(groupList.excludedRules) as array(json)) as rule
    FROM
        waf_logs, UNNEST(ruleGroupList) t(groupList)
    WHERE
        groupList.excludedRules IS NOT NULL
)
select
    distinct json_extract(waf_logs, '$.ruleid') as ruleid
from
  dataset, unnest(rule) AS t(waf_logs)

実行結果は以下です。

#  ruleid
1   "SQLインジェクション系のルール1"
2   "Log4jのルール"
3   "ブラックリストルール"

検知されたルールごとの検索

特定の検知したルールのログを取得するクエリです。

ルールで検知したログをまとめて分析するときに使いました。
なお、ここで検索するルール名は、ルール一覧を取得するクエリから選択してます。

[ルール名]のところは適宜変更下さい。

WITH dataset AS (
    SELECT
        row_number() over () AS id,
        timestamp,
        httprequest
    FROM
        waf_logs, UNNEST(ruleGroupList) t(groupList)
    WHERE
    cast(json_extract(element_at(cast(json_parse(groupList.excludedRules) as array(json)),1),'$.ruleid')as varchar) like '%[ルール名]%'
), host_dataset AS (
    SELECT
        id,
        n.value AS host
    FROM
        dataset
        CROSS JOIN unnest(
            cast(
                httprequest.headers AS ARRAY(ROW(name VARCHAR, value VARCHAR))
            )
        ) AS x(n)
    WHERE
        LOWER(n.name) = 'host'
)
SELECT
    distinct host,httprequest.uri,httprequest.args, httprequest.clientip,httprequest.country
FROM
    dataset
    LEFT JOIN host_dataset ON host_dataset.id = dataset.id

実行結果は以下です。

#  host    uri args    clientip    country
1   www.hoge.co.jp  /   server=db&username=root&db=mysql&table=event%3C%2Fscript%t.domain%29%3C%2Fscript%3E 192.168.1.1 TH
2   www.hoge.com    //wp-22.php sfilename=nyx-jjlsy.php&sfilecontent=%3Ctitle%3E404%20Not%20Found%3C/title%3E%3Ccenter%3E%3Cbr%3E%3Cbr20echo%20'Failed!%3C/b%3E%3Cbr%3E%3Cbr%3E';%20%7D%7D%0A?%3E&supfiles=nyx-jjlsy.php    192.168.1.2 PL
3   www.hoge.co.jp  /plugins/address_add/add.php    first=HOVER%20ME!%3C%2Fscript%3nt.domain%29%3C%2Fscript%3E  192.168.1.1 TH

日付を指定する

最後におまけ的な条件式です。
日付を特定するときに使用するクエリです。

    SELECT
        row_number() over () AS id,
        timestamp,
        httprequest,
        groupList.excludedRules
    FROM
        waf_logs, UNNEST(ruleGroupList) t(groupList)
    WHERE
        groupList.excludedRules IS NOT NULL
        AND
        from_unixtime(timestamp/1000, 'Asia/Tokyo') <=  date_parse('2023/07/24', '%Y/%m/%d')

期間を指定する場合。

    SELECT
        row_number() over () AS id,
        timestamp,
        httprequest,
        groupList.excludedRules
    FROM
        waf_logs, UNNEST(ruleGroupList) t(groupList)
    WHERE
        groupList.excludedRules IS NOT NULL
        AND
        from_unixtime(timestamp/1000, 'Asia/Tokyo') between date_parse('2023/07/24', '%Y/%m/%d') and date_parse('2023/07/28', '%Y/%m/%d')

参考情報

AWS WAF ログのクエリ – Amazon Athena
CloudWatch または Amazon S3 に保存されている AWS WAF ログを分析する | AWS re:Post
AWS WAF のログ分析に関する考慮事項 | Amazon Web Services ブログ

Cold-Airflow

2021年新卒入社。インフラエンジニアです。RDBが三度の飯より好きです。 主にデータベースやAWSのサーバレスについて書く予定です。あと寒いのは苦手です。

Recommends

こちらもおすすめ

Special Topics

注目記事はこちら