freeBuf
主站

分类

云安全 AI安全 开发安全 终端安全 数据安全 Web安全 基础安全 企业安全 关基安全 移动安全 系统安全 其他安全

特色

热点 工具 漏洞 人物志 活动 安全招聘 攻防演练 政策法规

点我创作

试试在FreeBuf发布您的第一篇文章 让安全圈留下您的足迹
我知道了

官方公众号企业安全新浪微博

FreeBuf.COM网络安全行业门户,每日发布专业的安全资讯、技术剖析。

FreeBuf+小程序

FreeBuf+小程序

ElastAlert监控日志告警Web攻击行为
2018-01-20 09:00:17
所属地 广东省

*本文作者:shystartree,本文属 FreeBuf 原创奖励计划,未经许可禁止转载。

由于公司需要监控web攻击行为,而因某些原因搭不了waf,才不得不用ElastAlert进行告警,此为前提。

一、ELK安装

Elasticsearch 是一个分布式、可扩展、实时的搜索与数据分析引擎。 它能从项目一开始就赋予你的数据以搜索、分析和探索的能力。

Logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行自定义的处理,然后传输到指定的位置,

Kibana是一个开源的分析与可视化平台,设计出来用于和Elasticsearch一起使用的。你可以用kibana搜索、查看、交互存放在Elasticsearch索引里的数据,使用各种不同的图表、表格、地图等kibana能够很轻易地展示高级数据分析与可视化。

ELK这一套软件可以当作一个MVC模型,logstash是controller层,Elasticsearch是一个model层,kibana是view层。首先将数据传给logstash,它将数据进行过滤和格式化(转成JSON格式),然后传给Elasticsearch进行存储、建搜索的索引,kibana提供前端的页面再进行搜索和图表可视化,它是调用Elasticsearch的接口返回的数据进行可视化。logstash和Elasticsearch是用Java写的,kibana使用node.js框架。

安装方法网上有好多,此处就不再阐述。在试用了几乎所有的安装方法后,介绍下本人觉得比较快捷有效的安装方法:

1.1 下载安装匹配版本的elk

elastalert目前还不支持elk6.0以上版本,本人就是因为版本问题而折腾了好久,所以在安装elk的时候需要特别注意版本问题。

我的服务器概况:
Distributor ID:   Ubuntu
Description:  Ubuntu 16.04.1 LTS
Release:  16.04
Codename: xenial

在尝试了众多安装方法后,还是发现在官方网站下载deb包直接安装最为有效便捷。 (系统若为centos,下载对应的rpm包)

搜索版本下载

Elasticsearch:5.5.2
Kibana:5.5.2
Logstash:6.0.0
filebeat:6.0.0 (轻量级的logstash,这个下载tar包)

​ 理论上,Elasticsearch及Kibana版本为5.x都可以,而Logstash与elastalert没啥联系,所以Logstash(大于或等于Elasticsearch及Kibana的5.x版本)能向Elasticsearch传递日志信息即可。

下载完elk的deb包后,使用dpkg -i命令很快就能顺利安装。

1.2 elk配置

​ 同样,此处也只介绍本人的简单配置。我这里是只让Logstash对外开放负责收集日志,而Elasticsearch及Kibana仅在内网访问,故Elasticsearch及Kibana并未开启账户认证登陆,有需要开启或其他需求的读者们请自行搜索。

/etc/elasticsearch/elasticsearch.yml:
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host: 127.0.0.1
#
# Set a custom port for HTTP:
#
http.port: 9200
/etc/kibana/kibana.yml:
 Kibana is served by a back end server. This setting specifies the port to use.
  server.port: 5601

  # Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
  # The default is 'localhost', which usually means remote machines will not be able to connect.
  # To allow connections from remote users, set this parameter to a non-loopback address.
  server.host: "localhost"

  # The URL of the Elasticsearch instance to use for all your queries.
  elasticsearch.url: "http://localhost:9200"

我这里需要用到自定义的配置文件,故配置文件是自行创建的,放在/usr/share/logstash/bin中,取名为filebeat_log.conf : 

  input {
  beats {
  port => 5044
  client_inactivity_timeout => 90
  codec => json
      }

  }

  filter {
    date {
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      target => ["datetime"]
    }
    geoip {
      source => "remote_addr"
    }

    mutate {
  remove_field => ["tags", "beat"]
    }
  }

  output {
  elasticsearch {
  hosts => "localhost:9200"
  index => "logstash-%{+YYYY.MM.dd}"
  }
  stdout {
  codec => rubydebug
  }
  

 1.3 启动方法

elasticsearch&kibana启动

扫描新的单元&重新载入systemd:

systemctl daemon-reload

加入开机自启动:

systemctl enable elasticsearch.service

systemctl enable kibana.service

启动:

systemctl start elasticsearch

systemctl start kibana

查看状态:

systemctl status elasticsearch

systemctl status kibana

logstash启动

进入/usr/share/logstash/bin:

nohup ./logstash -f filebeat_log.conf> /dev/null 2>&1 &

二、使用filebeat进行分布式收集

一开始直接使用logstash进行日志收集,发现资源消耗实在太大,无奈寻找解决方法,发现filebeat是一个轻量级的日志传输工具,故使用filebeat作为日志收集,而logstash作为中心过滤器向es传递日志。

所以整体的架构如:   

* A、B、C、D…(这些服务器是准备监控被攻击行为,装上filebeat)
* 主服务器(装上elk和elastalert,负责收集过滤分析filebeat传递的日志和告警)

下面以tomcat为例子,分享我的配置文件filebeat.yml(nginx的话,修改paths的路径):

  filebeat.prospectors:

  # Each - is a prospector. Most options can be set at the prospector level, so
  # you can use different prospectors for various configurations.
  # Below are the prospector specific configurations.

  - type: log

    # Change to true to enable this prospector configuration.
    enabled: true

    # Paths that should be crawled and fetched. Glob based paths.
    paths:
      - /home/qy/apache-tomcat-9.0.1/logs/localhost_access_log.*.txt
      #- c:\programdata\elasticsearch\logs\*
    # Exclude lines. A list of regular expressions to match. It drops the lines that are
    # matching any regular expression from the list.
    #exclude_lines: ['^DBG']
    document_type: tomcat-log
    scan_frequency: 15s
    ignore_older: 20m
    close_inactive: 12m
    clean_inactive: 30m
    close_removed: true
    clean_removed: true
    ....
    #----------------------------- Logstash output --------------------------------

  output.logstash:
    # The Logstash hosts
    hosts: ["188.88.88.88:5044"]

直接解压下载的tar包,进入目录修改配置文件。然后启动filebeat:nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &

三、日志格式转json​

 为方便kibana分析和elastalert的取值,日志的格式要为json格式,上述的logstash配置文件已适配json格式。

公司的应用服务器中均为nginx和tomcat,故本文只介绍tomcat及nginx的json格式配置方法,其他服务器配置方法请自行搜索。

3.1 tomcat的json格式配置

​ 打开config/server.xml,在最后的位置修改log的输出配置为:

    <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
                 prefix="localhost_access_log" suffix=".txt" pattern="{&quot;time&quot;:&quot;%t&quot;,&quot;remote_addr&quot;:&quot;%h&quot;,&quot;remote_user&quot;:&quot;%l&quot;,&quot;request&quot;:&quot;%r&quot;,&quot;status&quot;:&quot;%s&quot;,&quot;body_bytes_sent&quot;:&quot;%b&quot;,&quot;http_referer&quot;:&quot;%{Referer}i&quot;,&quot;http_user_agent&quot;:&quot;%{User-Agent}i&quot;,&quot;http_x_forwarded_for&quot;:&quot; %{X-Forwarded-For}i&quot;,&quot;request_time&quot;:&quot;%T&quot;,&quot;host&quot;:&quot;%v&quot;,&quot;port&quot;:&quot;%p&quot;}"/>

​ 然后重启tomcat,即生效。

3.2 nginx的json格式配置

进入`/etc/nginx`打`nginx.conf`,加入如下配置:
  http {

          ##
          # Basic Settings
          ##

          sendfile on;
          tcp_nopush on;
          tcp_nodelay on;
          keepalive_timeout 65;
          types_hash_max_size 2048;
          # server_tokens off;

          log_format logstash_json '{"time": "$time_local", '
         '"remote_addr": "$remote_addr", '
         '"remote_user": "$remote_user", '
         '"request": "$request", '
         '"status": "$status", '
         '"body_bytes_sent": "$body_bytes_sent", '
         '"http_referer": "$http_referer", '
         '"http_user_agent": "$http_user_agent", '
         '"http_x_forwarded_for": "$http_x_forwarded_for", '
         '"request_time": "$request_time", '
         '"request_length": "$request_length", '
         '"host": "$http_host"}';
       }


最后nginx -s reload即可

四、使用elastalert进行告警

​ 在经过上述的安装及配置后,终于轮到我们的主角--ElastAlert出来了,其他的告警工具还有411 Alert ManagementElasticsearch watch ,请读者们自行确定需要使用哪个。

ElastAlert使用python编写,具有容易上手、文档全等特点,虽然这个工具拥有如此多的优点,在搭建过程还是遇到了很多很多的未知错误,主要原因是网上的资料大多是针对es5.x以前的版本而没什么现成的资料可供参考。

4.1 安装elastalert

  git clone https://github.com/Yelp/elastalert.git 
  cd elastalert
  python setup.py install            //可能需要sudo
  Pip install -r requirements.txt    //可能需要sudo
  cp config.yaml.example config.yaml

​ 具体的功能本文就不一一介绍了,请自行前往官方文档了解

4.2 创建索引

​ 安装完成后会系统中会自带三个命令:

​  elastalert-create-indexelastalert-rule-from-kibana 、elastalert-test-rule

​ 使用elastalert-create-index,根据提示设置es后按回车默认即可。
​ 配置完索引及配置文件后,可以使用elastalert-test-rule进行测试。这里有个bug,如果出现TransportError(400, u'search_phase_execution_exception', u'No mapping found for [alert_time] in order to sort on')之类的错误,在确认没有其他的问题时,可以先删除索引curl -XDELETE http://localhost:9200/*,再使用elastalert-create-index重新生成索引。

4.3 配置config.yaml

  rules_folder: example_rules

  # How often ElastAlert will query Elasticsearch
  # The unit can be anything from weeks to seconds
  run_every:
    seconds: 3   #每三秒向es请求数据
  # ElastAlert will buffer results from the most recent
  # period of time, in case some log sources are not in real time
  buffer_time:
    minutes: 15  
    #日志会延迟进入es,这里是配置query的向前的时间范围,这是15分钟,即查询 time[now-15m, now]

  # The Elasticsearch hostname for metadata writeback
  # Note that every rule can have its own Elasticsearch host
  es_host: 188.88.88.88

  # The Elasticsearch port
  es_port: 9200

  # Optional URL prefix for Elasticsearch
  #es_url_prefix: elasticsearch

  # Connect with TLS to Elasticsearch
  #use_ssl: True

  # Verify TLS certificates
  #verify_certs: True

  # GET request with body is the default option for Elasticsearch.
  # If it fails for some reason, you can pass 'GET', 'POST' or 'source'.
  # See http://elasticsearch-py.readthedocs.io/en/master/connection.html?highlight=send_get_body_as#transport
  # for details
  #es_send_get_body_as: GET

  # Option basic-auth username and password for Elasticsearch
  #es_username: someusername
  #es_password: somepassword

  # The index on es_host which is used for metadata storage
  # This can be a unmapped index, but it is recommended that you run
  # elastalert-create-index to set a mapping
  writeback_index: elastalert_status

  # If an alert fails for some reason, ElastAlert will retry
  # sending the alert until this time period has elapsed
  alert_time_limit:
    days: 1

以上各字段的解释:

Rules_folder:用来加载下一阶段rule的设置,默认是example_rules

Run_every:用来设置定时向elasticsearch发送请求

Buffer_time:用来设置请求里时间字段的范围,默认是45分钟

Es_host:elasticsearch的host地址

Es_port:elasticsearch 对应的端口号

Use_ssl:可选的,选择是否用SSL连接es,true或者false

Verify_certs:可选的,是否验证TLS证书,设置为true或者false,默认为- true

Es_username:es认证的username

Es_password:es认证的password

Es_url_prefix:可选的,es的url前缀(我的理解是https或者http)

Es_send_get_body_as:可选的,查询es的方式,默认的是GET

Writeback_index:elastalert产生的日志在elasticsearch中的创建的索引

Alert_time_limit:失败重试的时间限制

4.4 告警配置介绍

​ 在example_rules目录中新建yaml配置文件 webattack_frequency.yaml,下面分开介绍这个配置文件的内容(下个小节将分享我的配置文件,此小节仅解释其中的必要设置项): 

1、告警规则

ElastAlert支持11种告警规则,本文不一一介绍了,为响应web攻击行为,本文选用的告警规则是frequency。 

name: web attack

# (Required)
# Type of alert.
# the frequency rule type alerts when num_events events occur with timeframe time
type: frequency

# (Required, frequency specific)
# Alert when this many documents matching the query occur within a timeframe
num_events: 10

# (Required, frequency specific)
# num_events must occur within this amount of time to trigger an alert
timeframe:
  minutes: 1
  
# (Required)
# Index to search, wildcard supported
index: logstash-*      #对应logstash的配置文件中output的elasticsearch index前缀

filter:
- query_string:
# sql insert  xss detect
        query: "request: select.+(from|limit) OR request: union(.*?)select OR request: into.+(dump|out)file "

上述配置文件的意图即是:在一分钟内将匹配query里面的sql注入规则,若匹配次数达到10次,即进行报警。

2、使用邮箱进行告警

ElastAlert提供了 10 多种通知的类型,本文选用的是邮箱告警,还有微信告警钉钉告警,若有需要,请自行配置。

smtp_host: smtp.qiye.163.com
smtp_port: 25

smtp_auth_file: /Users/qy/Downloads/work/elastalert/example_rules/smtp_auth_file.yaml
#回复给那个邮箱
email_reply_to: xxx@163.com
#从哪个邮箱发送
from_addr: xxx@163.com



# (Required)
# The alert is use when a match is found
alert:
- "email"

# (required, email specific)
# a list of email addresses to send alerts to
email:
- "shystartree@163.com"

alert_subject: "web attack may be by {} at @{}"
alert_subject_args:
  - remote_addr
  - time

alert_text_type: alert_text_only

alert_text: |
  你好,服务器({})可能正在受到web攻击,请采取手段阻止!!!!
  ### 截止发邮件前匹配到的请求数:{}
  > 发生时间: {}
  > timestamp:{}
  > attacker's ip: {}
  > request: {}
  > status:{}
  > UA头:{}
  >>> 参考来源:{}

alert_text_args:
  - host
  - num_hits
  - time
  - "@timestamp"
  - remote_addr
  - request
  - status
  - http_user_agent
  - source

smtp_auth_file.yaml的配置内容会在下个小节给出,在这个配置中,我自定义了 alert 的内容,更为精确地突出了攻击者ip、受攻击的服务器、攻击事件等信息。

3、减少重复告警的频率

在实际的使用中,若使用上述的配置,受到攻击的时候邮箱将不断地收到邮件,而这些邮件都对应着同一个攻击实例,根本没必要重复收取,于是,我使用了如下的配置:

   # 用来区分报警,跟 realert 配合使用,在这里意味着,
   # 5 分钟内如果有重复报警,那么当 name 不同时,会当做不同的报警处理,可以是数组
   query_key:
     - name

   # 5 分钟内相同的报警不会重复发送
   realert:
     minutes: 5

   # 指数级扩大 realert 时间,中间如果有报警,
   # 则按照 5 -> 10 -> 20 -> 40 -> 60 不断增大报警时间到制定的最大时间,
   # 如果之后报警减少,则会慢慢恢复原始 realert 时间
   exponential_realert:
     hours: 1

在本人实际测试的攻击场景中,发现使用了exponential_realert后,会错过很多告警(这些告警并不是同一个攻击实例),暂时不确定原因,还请读者们自行确定是否开启该设置。

4.5webattack_frequency.yamlsmtp_auth_file.yaml配置文件内容

上述的4.4小节中对每个配置都作了简单的介绍,这里就直接放出web攻击预警的配置文件供各位读者参考。

webattack_frequency.yaml:

# Alert when the rate of events exceeds a threshold

# (Optional)
# Elasticsearch host
#es_host: 188.88.88.88

# (Optional)
# Elasticsearch port
#es_port: 9200

# (OptionaL) Connect with SSL to Elasticsearch
#use_ssl: True

# (Optional) basic-auth username and password for Elasticsearch
#es_username: someusername
#es_password: somepassword

# (Required)
# Rule name, must be unique
name: web attack

realert:
  minutes: 5

# (Required)
# Type of alert.
# the frequency rule type alerts when num_events events occur with timeframe time
type: frequency

# (Required)
# Index to search, wildcard supported
index: logstash-*

# (Required, frequency specific)
# Alert when this many documents matching the query occur within a timeframe
num_events: 10

# (Required, frequency specific)
# num_events must occur within this amount of time to trigger an alert
timeframe:
  #hours: 4
  minutes: 1

# (Required)
# A list of Elasticsearch filters used for find events
# These filters are joined with AND and nested in a filtered query
# For more info: http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/query-dsl.html
#filter:
#- term:
#    some_field: "some_value"

filter:
- query_string:
# sql insert  xss detect
        query: "request: select.+(from|limit) OR request: union(.*?)select OR request: into.+(dump|out)file OR

        request: (base64_decode|sleep|benchmark|and.+1=1|and.+1=2|or%20|exec|information_schema|where%20|union%20|%2ctable_name%20|cmdshell|table_schema) OR

        request: (iframe|script|body|img|layer|div|meta|style|base|object|input|onmouseover|onerror|onload) OR

        request: .+etc.+passwd OR http_user_agent:(HTTrack|harvest|audit|dirbuster|pangolin|nmap|sqln|-scan|hydra|Parser|libwww|BBBike|sqlmap|w3af|owasp|Nikto|fimap|havij|PycURL|zmeu|BabyKrokodil|netsparker|httperf|bench) OR

        status: (400|404|500|501)

        NOT (request:_health.html OR remote_addr:222.222.222.222  )
        "


smtp_host: smtp.qiye.163.com
smtp_port: 25

smtp_auth_file: /Users/qy/Downloads/work/elastalert/example_rules/smtp_auth_file.yaml
#回复给那个邮箱
email_reply_to: xxx@163.com
#从哪个邮箱发送
from_addr: xxx@163.com



# (Required)
# The alert is use when a match is found
alert:
- "email"

# (required, email specific)
# a list of email addresses to send alerts to
email:
- "shystartree@163.com"

alert_subject: "web attack may be by {} at @{}"
alert_subject_args:
  - remote_addr
  - time

alert_text_type: alert_text_only

alert_text: |
  你好,服务器({})可能正在受到web攻击,请采取手段阻止!!!!
  ### 截止发邮件前匹配到的请求数:{}
  > 发生时间: {}
  > timestamp:{}
  > attacker's ip: {}
  > request: {}
  > status:{}
  > UA头:{}
  >>> 参考来源:{}

alert_text_args:
  - host
  - num_hits
  - time
  - "@timestamp"
  - remote_addr
  - request
  - status
  - http_user_agent
  - source

smtp_auth_file.yaml:

user: xxx@163.com
password: password

4.6 运行elastalert

在成功配置完ElastAlert后将生成三个配置文件:

config.yaml、webattack_frequency.yaml、smtp_auth_file.yaml

  • 启动elastalert服务,监听elasticsearch:

nohup python -m elastalert.elastalert --verbose --rule webattack_frequency.yaml >/dev/null 2>&1 &

  • 为实现守护进程的作用,可以配合supervisor进行使用,本文不再阐述。

4.7 运行效果:

当匹配到自定义攻击规则的时候,ElastAlert将会以邮件方式发送告警信息:

web attack may be by 104.38.13.21 at @[13/Jan/2018:16:06:58 +0800]
xxx 发给 shystartree   
你好,服务器(199.222.36.31)可能正在受到web攻击,请采取手段阻止!!!!
### 截止发邮件前匹配到的请求数:20
> 发生时间: [13/Jan/2018:16:06:58 +0800]
> timestamp:2018-01-13T08:07:04.930Z
> attacker's ip: 184.233.9.121
> request: GET /dbadmin/scripts/setup.php HTTP/1.0
> status:200
> UA头:ZmEu
>>> 参考来源:/log/localhost_access_log.2018-01-13.txt

五、总结

ElastAlert除了本文介绍的告警web攻击行为外,还能进行异常告警等。使用了frequency的规则后,基本能达到识别web攻击的目的。在实际的使用中,elastalert能稳定运行,且能根据自定义配置文件精确告警,缺点是告警的格式不够美观和需要频繁地修改配置文件。

参考链接:

ElastAlert:『Hi,咱服务挂了』:https://xizhibei.github.io/2017/11/19/alerting-with-elastalert/
[ElastAlert]介绍和安装:https://segmentfault.com/a/1190000008227486
被elastalert虐了:http://blog.csdn.net/vbaspdelphi/article/details/54291066

*本文作者:shystartree,本文属 FreeBuf 原创奖励计划,未经许可禁止转载。

# ElastAlert
免责声明
1.一般免责声明:本文所提供的技术信息仅供参考,不构成任何专业建议。读者应根据自身情况谨慎使用且应遵守《中华人民共和国网络安全法》,作者及发布平台不对因使用本文信息而导致的任何直接或间接责任或损失负责。
2. 适用性声明:文中技术内容可能不适用于所有情况或系统,在实际应用前请充分测试和评估。若因使用不当造成的任何问题,相关方不承担责任。
3. 更新声明:技术发展迅速,文章内容可能存在滞后性。读者需自行判断信息的时效性,因依据过时内容产生的后果,作者及发布平台不承担责任。
本文为 独立观点,未经授权禁止转载。
如需授权、对文章有疑问或需删除稿件,请联系 FreeBuf 客服小蜜蜂(微信:freebee1024)
被以下专辑收录,发现更多精彩内容
+ 收入我的专辑
+ 加入我的收藏
相关推荐
  • 0 文章数
  • 0 关注者
文章目录