freeBuf
主站

分类

漏洞 工具 极客 Web安全 系统安全 网络安全 无线安全 设备/客户端安全 数据安全 安全管理 企业安全 工控安全

特色

头条 人物志 活动 视频 观点 招聘 报告 资讯 区块链安全 标准与合规 容器安全 公开课

官方公众号企业安全新浪微博

FreeBuf.COM网络安全行业门户,每日发布专业的安全资讯、技术剖析。

FreeBuf+小程序

FreeBuf+小程序

Siem落地方案之JXWAF联动clickhouse
2020-09-16 22:22:02
所属地 广东省

前文

Siem落地方案:初识clickhouseJxwaf安装部署使用方案

之前写了jxwaf一些搭建部署内容,还是没有说到真正可以落地的方式(日志存储这一块),对于日志存储,无非就是这几样:普遍ES,逐渐流行clickhouse以及真香splunk,针对于这三种日志存储方式,之后也会写上相应的文章,以作参考。

这篇文章的主角是clickhouse,不熟悉clickhouse的朋友可以去看前文,了解一下。

docker-compose集成:

https://github.com/yingshang/jxwaf-docker-clickhouse.git

部署架构

clickhouse支持客户端连接(9000端口)和restful API(8123端口),那么就引出了两种传输架构。

jxwaf(tcp)->logstash(input)->logstash(fileter)->logstash(output)->kfaka->clickhouse

jxwaf(tcp)->logstash(input)->logstash(fileter)->logstash(output)->http->clickhouse

这两种经过实践测试,http方式一旦并发数量超过100,就会出现报错问题;kafka这种方式就很香了,logstash将日志丢给kafka作为生产者,clickhouse作为消费者去消费,这样的好处就是日志丢包率很低,clickhouse不容易崩溃。

部署说明

因为之前已经写过jxwaf的安装了,在这里也不想写太多,最近更新就是控制台开源了,所以也不需要使用docker去跑,其他的可以参考前文安装。由于WEB控制台已经开源,所以作者将语义识别的特征内容放在云上读取下载,后续会在web控制台提供按钮(国庆之后),截至写稿完成,我这边需要人工去调用下载,嫌麻烦就等作者更新吧。

首先,需要设置jxwaf的日志丢到logstash那里,我设置了logstash input的端口为5000

1601108010_5f6ef82a1a5344721bbd6.png!small

因为jxwaf丢过来的日志是json格式的,所以logstash编码类型也是json。

input {
    tcp {
        port => 5000
        codec => json
    }
}

启动logstash捕捉到日志内容如下图所示,key-value一一对应,在以前旧版的时候,headers是以json形式扩展,这样如果选择clickhouse就有一个问题,你永远不知道headers会新增什么样key-value,而clickhouse是定义好字段内容,这样就无法弹性扩展,所以在最新版里面直接将headers头变成字符串类型。

1601106114_5f6ef0c2a6b681c903443.png!small

{
"bytes_received" => "453",
   "server_uuid" => "cda8b94b-f328-45f9-87c8-f970522bec3f",
      "@version" => "1",
             "protection_info" => "block",
        "method" => "GET",
          "host" => "test.com",
     "client_ip" => "172.20.0.1",
    "user_agent" => "Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Firefox/68.0",
  "query_string" => "id=1%27%20union%20select%201,2,3,4,5%20union%20all",
           "uri" => "/index.php",
  "request_time" => "2020-09-26 08:19:28",
          "connections_active" => "1",
         "upstream_bytes_sent" => "-",
          "port" => 33362,
      "log_type" => "owasp_attack",
        "scheme" => "http",
        "request_process_time" => "0.000",
          "body" => "",
 "upstream_addr" => "-",
    "@timestamp" => 2020-09-26T08:19:28.354Z,
        "status" => "403",
     "upstream_bytes_received" => "-",
    "bytes_sent" => "320",
          "uuid" => "6618b4d1-950a-47a9-937d-ec1543976624",
         "connections_waiting" => "0",
       "version" => "1.1",
             "upstream_status" => "-",
      "upstream_response_time" => "-",
             "protection_type" => "jxwaf-sql_check",
    "raw_header" => "Host: test.com\r\nUser-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Firefox/68.0\r\nAccept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\nAccept-Language: en-US,en;q=0.5\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\nCookie: PHPSESSID=abauuaistng65l8hecafuu2l54; showhints=1\r\nUpgrade-Insecure-Requests: 1\r\nCache-Control: max-age=0\r\n\r\n"
}

来到logstash的filter阶段,对于input过来的日志,有些key值我不想要,有些key值我想要改一下名字,那么就在这个阶段内处理。

将request_time参数重命名timestamp用于clickhouse数据库的时间戳,然后删掉@timestamp和@version无用参数名。

filter {
   mutate {
	rename => {"[request_time]" => "timestamp"}
	remove_field =>["@timestamp","@version"]
  }
}

实现效果如下图。

1601109133_5f6efc8dc7bca37c08089.png!small

既然已经知道了参数名,就可以去定义clickhouse数据库的表结构,我采用的MergeTree(合并树)结构,以timestamp作为分区。

create table jxwaf(
timestamp  DateTime,
bytes_received String NULL,
server_uuid String NULL,
protection_info String NULL,
method String NULL,
host String NULL,
client_ip String NULL,
user_agent String NULL,
query_string String NULL,
uri String NULL,
connections_active String NULL,
upstream_bytes_sent String NULL,
port Int8 NULL,
log_type String NULL,
scheme String NULL,
request_process_time String NULL,
body String NULL,
upstream_addr String NULL,
status String NULL,
upstream_bytes_received String NULL,
bytes_sent String NULL,
uuid String NULL,
connections_waiting String NULL,
version String NULL,
upstream_status String NULL,
upstream_response_time String NULL,
protection_type String NULL,
raw_header String NULL

)ENGINE MergeTree() PARTITION BY toYYYYMMDD(timestamp) ORDER BY (ti
# SIEM # clickhouse # JXWAF
本文为 独立观点,未经允许不得转载,授权请联系FreeBuf客服小蜜蜂,微信:freebee2022
被以下专辑收录,发现更多精彩内容
+ 收入我的专辑
+ 加入我的收藏
相关推荐
  • 0 文章数
  • 0 关注者
文章目录