freeBuf
主站

分类

漏洞 工具 极客 Web安全 系统安全 网络安全 无线安全 设备/客户端安全 数据安全 安全管理 企业安全 工控安全

特色

头条 人物志 活动 视频 观点 招聘 报告 资讯 区块链安全 标准与合规 容器安全 公开课

官方公众号企业安全新浪微博

FreeBuf.COM网络安全行业门户,每日发布专业的安全资讯、技术剖析。

FreeBuf+小程序

FreeBuf+小程序

攻防系统之攻防环境介绍&搭建
2019-01-22 08:30:56

*本文原创作者:gncao,本文属于FreeBuf原创奖励计划,未经许可禁止转载

一、前言

为什么要搞这个东西呢,因为自己希望能够通过这个“课题”提高自己的渗透技术、提高自己的应急响应能力,同时提升监测平台的有效检测率。听起来感觉不错的样子,但是目前没有在互联网上发现类似的环境,所以就只能自己动手来搭建这套系统,希望这篇文章能够给让大家多一点想法,多一点思路。当然本文肯定会有不足之处,也希望大家多多提出意见,有则改进、无则勉之。

二、连载介绍

既然是一个课题,是一个连载系列,所以后面还会有其他文章输出,那作为第一个开篇的文章,我要在这里给大家简单介绍下这个课程大纲 。目前规划主要有四个大章节。

第一章:攻防环境介绍&搭建(也就是本篇文章)

第二章:主机入侵&响应

2.1 ssh端口爆破

2.2 ftp端口爆破

2.3 mysql服务爆破

2.4 telnet服务爆破

...

第三章:Web应用入侵&响应

3.1 sql注入

3.2 xss攻击

3.3 文件上传

3.4 命令执行

...

第四章:后门/木马入侵&响应

3.1 后门入侵&检测

3.2 挖矿病毒入侵&检测

...

三、环境介绍

因为资源的限制,所以目前整套系统都在虚拟机环境中搭建。整个环境包含三台虚拟机(未来应该会扩展到四台虚拟机),一台攻击主机、两台受害主机、一台监测主机。拓扑图如下(没有visio工具,画的很丑,大家将就着看吧):

拓扑图.jpg看图可能有些同学看不出来是个啥玩意,我来解释下。

attacker作为一个攻击者会对受害者1和受害者2分别发起攻击(两种操作系统),然后事先我们在两个受害者主机上分别安装了flume客户端,flume会采集操作系统、应用软件的日志信息实时传送给observer监控主机,监控主机上运行着监控平台,通过加载检测规则分析日志,发现攻击行为并发出告警信息,当然最后在observer主机上会有展示页面,展示攻击行为。

通过这样一套环境,我们需要掌握攻击技能、响应能力和检测能力,所以对于个人的技术提升有很大的帮助。

各个虚拟机的详细信息如下表所示:

主机名称 角色 IP地址 操作系统
Attacker 攻击机 192.168.171.1 Win10
Victim1 受害主机1 192.168.171.121 Centos7_64
Victim2 受害主机2 192.168.171.122 WinServer2012_64
Observer 监测主机 192.168.171.120 Centos7_64

四、环境搭建

目前第一阶段先不启用victim2虚拟机,第一阶段力求搞懂Linux环境下的攻防,所以整个环境就需要从三个虚拟机开始搭建。

4.1 Attacker

为了保证资源的合理运用,攻击机就使用物理主机,IP地址和操作系统均符合上述要求。

4.2 Victim1

受害主机在未来需要安装许多程序软件,用来作为一个攻击靶机,包含ssh、mysql、apache、ftp等等;同时也需要安装flume客户端采集服务器日志信息;

4.2.1 安装系统

安装系统比较简单,只是在选择镜像文件的时候没有直接选择centos7镜像文件,而是选择稍后安装,同时在启动虚拟机后选择安装的类型是minimal模式,即没有图形化的最简约模式。

4.2.2 给用户授予sudo权限

在centos7安装的时候会创建一个账户(可选),但是创建完的账户是没有sudo权限的,安装一些程序的时候就比较麻烦,所以先给这个账户授予sudo权限。

1)首先找到sudoers文件,一般都在/etc/sudoers这个位置:

whereis sudoers

2)修改该文件权限为可写状态:

chmod -v u+w /etc/sudoers

3)编辑文件内容:

在root  ALL=(ALL)ALL这一行下面增加新的一行

victim ALL=(ALL)ALL

这里表示victim用户将会具备sudoers权限。

4)还原sudoers文件权限:

chmod -v u-w /etc/sudoers

4.2.3 更新yum源并安装ssh服务

这时候就切到普通用户victim下执行下面的命令:

sudo yum -y update

sudo yum -y install openssh-server

一般安装目录都是/etc/sshd/。

启动ssh:service sshd start。

4.2.4 安装java并配置环境变量

安装flume前必须要先安装java环境,且对Java的版本要求有限制,这里我使用的jdk是1.8的,flume是1.6.0-cdh5.7.0的。

1)分别在home目录下创建app和software文件夹,app是程序安装目录,software是程序安装包目录:

mkdir app

mkdir software

2)解压software目录下的jdk文件到app目录下:

tar zxvf jdk-8u131-linux-x64.tar.gz -C ../app/

3)配置环境变量

Vim ~/.bahsrc 增加如下信息:

#jdk

export JAVA_HOME=/home/victim/app/jdk1.8.0_131

export JRE_HOME=${JAVA_HOME}/jre

export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib

export PATH=${JAVA_HOME}/bin:$PATH

保存退出后激活环境变量:

Source ~/.bashrc

在命令行下输入java命令能够查看到配置选项信息则表示java环境安装成功。

4.2.5 安装flume并配置环境变量

flume的作用就是从服务器这里采集日志数据传送给logger或者其他服务器。类似的客户端还有filebeat等。

1)解压flume文件到app目录下:

tar zxvf flume-ng-1.6.0-cdh5.7.0.tar.gz -C../app/

2)配置环境变量

Vim ~/.bahsrc 增加如下信息:

#flume

exportFLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin

export PATH=${JAVA_HOME}/bin:$PATH

保存退出后激活环境变量:

Source ~/.bashrc

在命令行下输入echo $FLUME_HOME命令能够查看到flume的安装目录则表示成功安装。

4.2.6 配置flume-env.sh

这个文件在启动flume的时候有用到,需要配置该文件中java环境变量:

1)切换到flume的conf目录下,拷贝flume-env.sh.template文件并重名为flume-env.sh;

2)编辑flume-env.sh文件。

取消export JAVA_HOME前面的注释,并修改JAVA_HOME变量指向JAVA目录:

export JAVA_HOME=/home/victim/app/jdk1.8.0_131

4.2.7 检查flume是否能够正常启动

1)在flume的conf目录下,新建example.conf文件,这个文件是flume的配置文件,配置监听的数据源和指定数据输出,内容如下:

#a1:agent名称

#r1:数据源名称

#k1:sink的名称

#c1:channel的名称

 

# example.conf: A single-node Flume configuration

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = victim

a1.sources.r1.port = 44444

 

# Describe the sink

a1.sinks.k1.type = logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

 

# Bind the source and sink to the channel

#一个source可以输出到多个channel,所以是channels

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

这里的配置文件表示监听victim主机的44444端口并输出到logger窗口,保存文件并退出。

2)启动agent

切换到flume的bin目录下,执行如下命令:

./flume-ng agent --name a1 --conf $FLUME_HOME/conf--conf-file $FLUME_HOME/conf/example.conf -Dflume.root.logger=INFO,console

a1表示的是agent名称,若出现如下信息,表示agent启动成功。其中console表示接收到的数据显示在命令行窗口。

image.png

3)模拟数据来源

新开命令行窗口,输入命令:

telnet victim  44444

并在命令行中随意输入信息,这些信息将成为数据源传送到logger窗口:

image.png

4)检查是否成功

若在logger窗口下看到telnet窗口的输入数据则表示flume安装成功:

image.png至此,victim的安装工作基本完成,后续再陆续安装其他程序。在跟observer联动的时候flume配置文件需要更改,更改内容后续会有介绍。

4.3 Observer

observer作为一台监控主机上面运行着监控程序,用来监控日志中是否存在攻击行为并发出告警信息。

在这台主机上需要安装flume接收victim的日志(其实可以直接用kafka接收victim的日志),kafka做数据传输,spark streaming做数据实时分析,MySQL存储处理后的数据,django做数据展示。

其中核心的是spark streaming其中的规则处置,规则的好坏决定着系统的误报和漏报情况。

4.3.1 安装系统

4.3.2 给用户授予sudo权限

4.3.3 更新yum源并安装ssh

4.3.4 安装java环境

4.3.5 安装flume

这五个步骤的安装方法同victim服务器一致,下面是observer的安装重点。

4.3.6 安装zookeeper

在安装kafka之前要先安装zookeeper,因为kafka的集群管理都是通过zookeeper来实现的。

1)下载zookeeper并解压到app目录下:

tar zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ../app/

2)配置zookeeper的环境变量

vim ~/.bashrc,增加如下内容:

#zk

export ZK_HOME=/home/hadoop/app/zookeeper-3.4.5-cdh5.7.0

export PATH=${ZK_HOME}/bin:$PATH

保存推出后,激活当前环境变量:

source ~/.bashrc

然后命令行下输入 echo $ZK_HOME如果能够正常输出zookeeper目录则表示环境变量配置成功。

3)配置zookeeper

切换到zookeeper的conf目录下,拷贝zoo_sample.cfg文件为zoo.cfg文件,并编辑zoo.cfg文件。

修改datadir为自定义的文件目录,如果不修改该选项则有可能存在默认的文件目录被删除的情况。

dataDir=/home/hadoop/app/tmp/zk    

因为资源有限所以只能单机模式运行zookeeper和kafka。

4)启动zookeeper

切换到zookeeper的bin目录下,输入如下命令启动zookeeper:

./zkServer.sh start    

如果出现如下所示信息则表示zookeeper启动成功:

image.png

4.3.7 安装kafka

安装完zookeeper之后就可以安装kafka了,配置过程也不麻烦。

1)下载kafka并解压到app目录下:

tar zxvf kafka_2.11-0.9.0.0.tgz -C ../app/

2)配置kafka环境变量

vim ~/.bashrc,增加如下内容:

#kafka

export KAFKA_HOME=/home/hadoop/app/kafka_2.11-0.9.0.0

export PATH=${KAFKA_HOME}/bin:$PATH

保存退出后,激活当前环境变量:

source ~/.bashrc

然后命令行下输入 echo $KAFKA_HOME如果能够正常输出kafka目录则表示环境变量配置成功。

3)配置server.properties文件

切换到kafka的config目录下,编辑server.properties文件,编辑配置如下选项:

broker.id=0  #唯一标识broker身份的id信息

listeners=PLAINTEXT://:9092  #表示监听端口是9092

host.name=observer  #hostname表示当前主机名称

log.dirs=/home/hadoop/app/tmp/kafka-logs   #指定kafka的log日志目录

num.partitions=1   #指定分区数量

zookeeper.connect=observer:2181  #配置zookeeper的地址和端口

配置完成后保存退出

4)启动kafka

运行bin目录下的kafka-server-start.sh文件:

bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties    

输入完命令后会显示如下信息:

image.png新开一个窗口输入jps -m命令,查看如果存在如下进程则表示程序已经正常启动。

image.png5)创建topic

一个业务一个topic,在正式使用kafka之前我们必须要配置topic,我们先创建一个topic来测试程序能否正常工作:

bin/kafka-topics.sh --create --zookeeper observer:2181 --replication-factor 1 --partitions 1 --topic testtopic    

这里表示创建一个名为testtopic的话题。

查看是否创建成功,新开窗口输入如下命令:

bin/kafka-topics.sh --list --zookeeper observer:2181    

如果出现testtopic即表明话题创建成功:

image.png6)topic消息测试

kafka整体架构分为consumer和producer,consumer表示消费消息端,producer表示生产消息端。在测试之前我们要先生产消息才能消费消息。

首先我们输入如下命令生产消息:

bin/kafka-console-producer.sh --broker-list observer:9092 --topic testtopic    

然后我们输入如下命令消费消息:

bin/kafka-console-consumer.sh --zookeeper observer:2181 --topic testtopic --from-beginning 

#from-beginning表示从头开始接收消息。

两个窗口的程序在运行以后就会停在窗口不动,当我们在生产端输入任意字符都会在消费端显示的时候,表示我们的kafka可以正常运行了。

生产端:

image.png消费端:

image.png从上面可以看出我们的kafka已经正常运行了。

因为资源有限只能搞单机部署,如果各位老板有钱可以多搞几台主机做冗余,但是一定要注意kafka部署的主机数量一定是2n+1台主机。

7)kafka和flume联动

flume采集到的数据是需要通过kafka传输的,所以它们之间的联动必须要有效才可以。这里我们还是以上面那个testtopic话题为测试对象。

7.1)首先启动kafka,开启consumer消费指定的topic消息

bin/kafka-console-consumer.sh --zookeeper observer:2181 --topic testtopic --from-beginning    

7.2)然后修改victim主机上的flume配置文件,修改source监听源为data.log文件,sink输出类型为avro sink(这个类型能跟其他flume客户端联动)。具体配置如下:

# example.conf: A single-node Flume configuration

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

#监听某文件

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/data/data.log

a1.sources.r1.shell=/bin/sh -c

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = observer

a1.sinks.k1.port = 44445

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

# Bind the source and sink to the channel

#一个source可以输出到多个channel,所以是channels

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

上面表示数据来源于data.log文件,输出到observer的44445端口。

7.3)修改observer主机的flume配置文件,配置文件内容如下:

# example.conf: A single-node Flume configuration

# Name the components on this agent

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

#监听某文件

a2.sources.r2.type = avro

a2.sources.r2.bind = observer

a2.sources.r2.port=44445

# export to kafka

a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink

a2.sinks.k2.brokerList = observer:9092

a2.sinks.k2.topic = testtopic

a2.sinks.k2.requiredAcks = 1

a2.sinks.k2.batchSize = 5

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

# Bind the source and sink to the channel

#一个source可以输出到多个channel,所以是channels

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

sink type类型为KafkaSink。

topic为我们之前创建的topic名称,batchSize表示当记录数到达5个后再发送。

7.4)先启动observer的flume程序,然后再启动victim的flume程序,即先监听再发送数据。

7.4.1)先启动observer的flume程序

../bin/flume-ng agent --name a2 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/example-kafka.conf -Dflume.root.logger=INFO,console    

7.4.2)再启动victim的flume程序

../bin/flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/example-send.conf -Dflume.root.logger=INFO,console

7.4.3)然后我们再向文件/home/hadoop/data/data.log输入测试文件时,如果在kafka的监听窗口看到我们输入的信息时则表示双击联动成功。

image.png切换到kafka窗口看到我们在data.log文件中新增的内容:

image.png至此我们的kafka已经安装成功且已经能够跟flume进行联动。下面就是重要数据处理和规则匹配环节了。

4.3.8 安装spark

spark环境安装也是比较麻烦的,需要有耐心才可以,但是跟着步骤走一定会搞定的。

1)scala安装

安装spark之前必须要安装scala和hadoop等环境。首先我们先安装scala语言。

1.1)下载scala安装包并解压到app目录下:

tar zxvf scala-2.11.8.tgz -C ../app/

1.2)配置环境变量

vim ~/.bashrc并加入如下内容:

#scala

export SCALA_HOME=/home/hadoop/app/scala-2.11.8

export PATH=${SCALA_HOME}/bin:$PATH

保存文件后退出,source ~/.bashrc 激活环境变量后在命令行输入scala出现scala交互窗口则表示安装配置成功。

image.png

2)hadoop环境搭建

2.1)下载解压hadoop安装包到app目录下。

tar zxvf hadoop-2.6.0-cdh5.7.0.tar.gz -C ../app/    

2.2)配置环境变量

vim ~/.bashrc并加入如下内容:

#hadoop

export HADOOP_HOME=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0

export PATH=${HADOOP_HOME}/bin:$PATH

保存文件后退出,source ~/.bashrc 激活环境变量。

2.3)修改hadoop配置文件

切换到Hadoop的etc/hadoop/目录下,编辑hadoop-env.sh文件,配置java环境变量:

export JAVA_HOME=/home/hadoop/app/jdk1.8.0_131    

保存退出后编辑同级目录下的core-site.xml文件,增加如下配置信息:

<configuration>

<property>

        <name>fs.defaultFS</name>

                <value>hdfs://observer:8020</value>

</property>

<property>

        <name>hadoop.tmp.dir</name>

                <value>/home/hadoop/app/tmp</value>

</property>

</configuration>

配置文件指定fs服务器地址和临时目录。

保存退出后编辑统计目录下的hdf-site.xml文件,增加如下配置信息:

<configuration>

<property>

        <name>dfs.replication</name>

                <value>1</value>

</property>

</configuration>

保存退出编辑slaves文件,增加主机名后保存退出。

2.4)格式化

切换到bin目录下,执行如下命令:

./hdfs namenode -format    

执行完成后会在上面指定的tmp目录下生成dfs文件夹。

image.png

2.5)启动hadoop服务

切换到sbin目录下,执行如下命令启动hadoop服务:

./start-dfs.sh 

然后输入三次当前账户密码后即可启动服务,在浏览器输入ip:50070后如果在浏览器中出现如下信息则表示hadoop安装成功:

image.png

2.5)配置yarn

yarn作为Hadoop的资源调度守护进程,同样也需要安装。

切换到etc/hadoop/目录下,编辑mapred-site.xml文件,增加如下内容,指定框架为yarn:

<configuration>

<property>

        <name>mapreduce.framework.name</name>

                <value>yarn</value>

</property>

</configuration>

2.6)启动yarn

切换到sbin目录下,执行./start-yarn.sh命令。

同样的输入完密码后,启动yarn。

启动完成后在浏览器输入ip:8088后如果出现如下所示信息则表示yarn配置成功:

image.png

jps -m查看进程信息也表示Hadoop和yarn配置成功。

image.png

3)spark安装

3.1)首先下载源代码编译适合hadoop版本的,然后再安装编译完之后的包。

image.png

3.2)这里我编译的是spark2.2.0的,解压该文件到app目录下并配置环境变量:

tar zxvf spark-2.2.0-bin-2.6.0-cdh5.7.0.tgz -C ../app/    

vim ~/.bashrc 增加如下内容:

#spark

export SPARK_HOME=/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0

export PATH=${SPARK_HOME}/bin:$PATH

保存退出后source ~/.bashrc 激活环境变量。

3.3)检查是否安装成功

切换到bin目录下执行./spark-shell --master local[2] 命令,如果进入scala环境则表示安装成功。

image.png4.3.9 安装MySQL

安装MySQL的目的是将spark streaming的处理结果存储下来,然后通过django程序对结果做展示,MySQL安装直接通过sudo yum -y install mysql-server命令来执行安装即可。

4.3.10安装django

django程序的安装直接使用pip或者easy_install命令来安装即可:

pip install django

django安装完成后可以使用django-admin.py文件来创建一个django项目:

django-admin.py startproject attackview

然后创建一个app:

cd attackview

python manage.py startapp sparkmysql

app创建完毕后需要切换到attackview目录下,并编辑settings.py文件,指定数据库为mysql:

DATABASES = {

    'default': {

        'ENGINE': 'django.db.backends.mysql',

        'NAME': 'echart',

        'USER':'root',

        'PASSWORD':'root@qwe',

        'HOST':'127.0.0.1',

        'PORT':'3306',

    }

}

保存后退出,然后编辑urls.py文件指定视图和函数之间的关系,保证当有新的url请求后django能够正常处理。

这里的细节不再赘述,包括上面各个程序,如果大家有疑问可以联系我或者百度查询解决方案。

这样我们的攻防环境基本上就完成了,那最后我们要做个flume+kafka+spark+mysql的联动测试,如果mysql数据库中能够存储flume客户端采集后被处理的数据,那表明我们的环境联动没有问题了。

4.3.11 联动测试

1)首先配置flume,监听某日志文件,鉴于文件篇幅,这里只贴上部分代码,大体思路跟flume配置环节一致。

# Describe/configure the source

#监听某端口

a2.sources.r2.type = exec

a2.sources.r2.command = tail -F /home/hadoop/source/access.log

a2.sources.r2.shell=/bin/sh -c

# Describe the sink

a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink

a2.sinks.k2.topic = streamingtopic

a2.sinks.k2.brokerList = hadoop0:9092

a2.sinks.k2.batchSize = 20

a2.sinks.k2.requiredAcks = 1

2)zookeeper和kafka配置保持上面配置不变即可;

3)配置spark streaming的配置文件;

切换到/examples/src/main/python/streaming/目录下,新增一个kafkadirect.py文件,文件内容如下:

#!/usr/bin/python

#coding:utf-8

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

import datetime,yaml, os,re,pymysql

#时间格式转换

def timeutil(time):

        return datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S").strftime("%Y%m%d%H%M%S")

#第一步:创建一个本地的StreamingContext,并设置批处理周期为5s

sc=SparkContext("local[2]","flumeWordCount")

ssc=StreamingContext(sc,10)

#第二步:创建一个kafka连接

topic="streamingtopic"

brokers="hadoop0:9092"

#参数分别表示ssc连接名,列表形式显示的topic名称,brokers列表

directkafkaStream = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":brokers})

#第三步:数据处理&转换&存储

#提取kafka数据的第一个字段为消息正文字段并打印数据长度

lines = directkafkaStream.map(lambda x: x[1])

#加载规则文件

f=open('attackregexp_file.yaml','r')

cont=f.read()

x=yaml.load(cont)

#识别攻击行为

def identify_attack(url):

rule_len=len(x['attackregex'])

for i in xrange(rule_len):

rex = x['attackregex'][i]['regex']

if re.match(rex,url):

#连接数据库并追加记录到数据库中

mysqldb = pymysql.connect(host='localhost',port=3306,db='ana_log',user='root',passwd='P@ssw0rd',charset='utf8')

cursor = mysqldb.cursor()

query_sql="insert into test values('%d','%s');" % (5,'ceshizhuru')

cursor.execute(query_sql)

mysqldb.commit()

data=cursor.fetchone()

cursor.close()

mysqldb.close()

#处理日志记录

def test(i):

infos=i.split(" ")

url=infos[6]

identify_attack(url)

host=infos[0]

ip=infos[0]

time=infos[3]

status_code=int(infos[8])

return (ip,url,time,status_code)

words=lines.map(lambda x:test(x)).pprint()

#第四步:开始sparkstreaming

ssc.start()

ssc.awaitTermination()

保存文件后退出。这里加载了一个规则文件也是这里的核心文件。

4)分别启动zk、kafka等程序

4.1)启动zk

./bin/zkServer.sh start    

4.2)启动kafka

./bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties    

4.3)创建topic

./bin/kafka-topics.sh --create --zookeeper hadoop0:2181 --replication-factor 1 --partitions 1 --topic streamingtopic    

4.4)启动flume

./bin/flume-ng agent --name a2 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/access-log-flume-kafka.conf -Dflume.root.logger=INFO,console    

4.5)启动spark streaming

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 examples/src/main/python/streaming/kafkadirect1.py    

4.6)然后我们向监听的文件中增加新的记录,如果在mysql中出现存储后的结果就表示联动成功。这里我们的规则是判断请求行为是否存在攻击行为。这里我贴上在spark streaming窗口看到的处理记录:

image.png

五、总结

终于我们的环境安装完毕了,后面我们要做的就是开展每一个课程内容了。

文章内容比较多,肯定会存在一些纰漏,应该也会存在更优的解决方案,毕竟我在这方面的研究时间不长,希望大家在观阅的过程中提出自己的见解,共同成长,有问题或者好的见解请联系我,谢谢。

*本文原创作者:gncao,本文属于FreeBuf原创奖励计划,未经许可禁止转载

# 攻防系统
本文为 独立观点,未经允许不得转载,授权请联系FreeBuf客服小蜜蜂,微信:freebee2022
被以下专辑收录,发现更多精彩内容
+ 收入我的专辑
+ 加入我的收藏
相关推荐
  • 0 文章数
  • 0 关注者