freeBuf
主站

分类

云安全 AI安全 开发安全 终端安全 数据安全 Web安全 基础安全 企业安全 关基安全 移动安全 系统安全 其他安全

特色

热点 工具 漏洞 人物志 活动 安全招聘 攻防演练 政策法规

点我创作

试试在FreeBuf发布您的第一篇文章 让安全圈留下您的足迹
我知道了

官方公众号企业安全新浪微博

FreeBuf.COM网络安全行业门户,每日发布专业的安全资讯、技术剖析。

FreeBuf+小程序

FreeBuf+小程序

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

攻防系统之攻防环境介绍&搭建
gncao 2019-01-22 08:30:56 1111818

*本文原创作者:gncao,本文属于FreeBuf原创奖励计划,未经许可禁止转载

一、前言

为什么要搞这个东西呢,因为自己希望能够通过这个“课题”提高自己的渗透技术、提高自己的应急响应能力,同时提升监测平台的有效检测率。听起来感觉不错的样子,但是目前没有在互联网上发现类似的环境,所以就只能自己动手来搭建这套系统,希望这篇文章能够给让大家多一点想法,多一点思路。当然本文肯定会有不足之处,也希望大家多多提出意见,有则改进、无则勉之。

二、连载介绍

既然是一个课题,是一个连载系列,所以后面还会有其他文章输出,那作为第一个开篇的文章,我要在这里给大家简单介绍下这个课程大纲 。目前规划主要有四个大章节。

第一章:攻防环境介绍&搭建(也就是本篇文章)

第二章:主机入侵&响应

2.1 ssh端口爆破

2.2 ftp端口爆破

2.3 mysql服务爆破

2.4 telnet服务爆破

...

第三章:Web应用入侵&响应

3.1 sql注入

3.2 xss攻击

3.3 文件上传

3.4 命令执行

...

第四章:后门/木马入侵&响应

3.1 后门入侵&检测

3.2 挖矿病毒入侵&检测

...

三、环境介绍

因为资源的限制,所以目前整套系统都在虚拟机环境中搭建。整个环境包含三台虚拟机(未来应该会扩展到四台虚拟机),一台攻击主机、两台受害主机、一台监测主机。拓扑图如下(没有visio工具,画的很丑,大家将就着看吧):

拓扑图.jpg看图可能有些同学看不出来是个啥玩意,我来解释下。

attacker作为一个攻击者会对受害者1和受害者2分别发起攻击(两种操作系统),然后事先我们在两个受害者主机上分别安装了flume客户端,flume会采集操作系统、应用软件的日志信息实时传送给observer监控主机,监控主机上运行着监控平台,通过加载检测规则分析日志,发现攻击行为并发出告警信息,当然最后在observer主机上会有展示页面,展示攻击行为。

通过这样一套环境,我们需要掌握攻击技能、响应能力和检测能力,所以对于个人的技术提升有很大的帮助。

各个虚拟机的详细信息如下表所示:

主机名称 角色 IP地址 操作系统
Attacker 攻击机 192.168.171.1 Win10
Victim1 受害主机1 192.168.171.121 Centos7_64
Victim2 受害主机2 192.168.171.122 WinServer2012_64
Observer 监测主机 192.168.171.120 Centos7_64

四、环境搭建

目前第一阶段先不启用victim2虚拟机,第一阶段力求搞懂Linux环境下的攻防,所以整个环境就需要从三个虚拟机开始搭建。

4.1 Attacker

为了保证资源的合理运用,攻击机就使用物理主机,IP地址和操作系统均符合上述要求。

4.2 Victim1

受害主机在未来需要安装许多程序软件,用来作为一个攻击靶机,包含ssh、mysql、apache、ftp等等;同时也需要安装flume客户端采集服务器日志信息;

4.2.1 安装系统

安装系统比较简单,只是在选择镜像文件的时候没有直接选择centos7镜像文件,而是选择稍后安装,同时在启动虚拟机后选择安装的类型是minimal模式,即没有图形化的最简约模式。

4.2.2 给用户授予sudo权限

在centos7安装的时候会创建一个账户(可选),但是创建完的账户是没有sudo权限的,安装一些程序的时候就比较麻烦,所以先给这个账户授予sudo权限。

1)首先找到sudoers文件,一般都在/etc/sudoers这个位置:

whereis sudoers

2)修改该文件权限为可写状态:

chmod -v u+w /etc/sudoers

3)编辑文件内容:

在root  ALL=(ALL)ALL这一行下面增加新的一行

victim ALL=(ALL)ALL

这里表示victim用户将会具备sudoers权限。

4)还原sudoers文件权限:

chmod -v u-w /etc/sudoers

4.2.3 更新yum源并安装ssh服务

这时候就切到普通用户victim下执行下面的命令:

sudo yum -y update

sudo yum -y install openssh-server

一般安装目录都是/etc/sshd/。

启动ssh:service sshd start。

4.2.4 安装java并配置环境变量

安装flume前必须要先安装java环境,且对Java的版本要求有限制,这里我使用的jdk是1.8的,flume是1.6.0-cdh5.7.0的。

1)分别在home目录下创建app和software文件夹,app是程序安装目录,software是程序安装包目录:

mkdir app

mkdir software

2)解压software目录下的jdk文件到app目录下:

tar zxvf jdk-8u131-linux-x64.tar.gz -C ../app/

3)配置环境变量

Vim ~/.bahsrc 增加如下信息:

#jdk

export JAVA_HOME=/home/victim/app/jdk1.8.0_131

export JRE_HOME=${JAVA_HOME}/jre

export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib

export PATH=${JAVA_HOME}/bin:$PATH

保存退出后激活环境变量:

Source ~/.bashrc

在命令行下输入java命令能够查看到配置选项信息则表示java环境安装成功。

4.2.5 安装flume并配置环境变量

flume的作用就是从服务器这里采集日志数据传送给logger或者其他服务器。类似的客户端还有filebeat等。

1)解压flume文件到app目录下:

tar zxvf flume-ng-1.6.0-cdh5.7.0.tar.gz -C../app/

2)配置环境变量

Vim ~/.bahsrc 增加如下信息:

#flume

exportFLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin

export PATH=${JAVA_HOME}/bin:$PATH

保存退出后激活环境变量:

Source ~/.bashrc

在命令行下输入echo $FLUME_HOME命令能够查看到flume的安装目录则表示成功安装。

4.2.6 配置flume-env.sh

这个文件在启动flume的时候有用到,需要配置该文件中java环境变量:

1)切换到flume的conf目录下,拷贝flume-env.sh.template文件并重名为flume-env.sh;

2)编辑flume-env.sh文件。

取消export JAVA_HOME前面的注释,并修改JAVA_HOME变量指向JAVA目录:

export JAVA_HOME=/home/victim/app/jdk1.8.0_131

4.2.7 检查flume是否能够正常启动

1)在flume的conf目录下,新建example.conf文件,这个文件是flume的配置文件,配置监听的数据源和指定数据输出,内容如下:

#a1:agent名称

#r1:数据源名称

#k1:sink的名称

#c1:channel的名称

# example.conf: A single-node Flume configuration

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = victim

a1.sources.r1.port = 44444

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

# Bind the source and sink to the channel

#一个source可以输出到多个channel,所以是channels

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

这里的配置文件表示监听victim主机的44444端口并输出到logger窗口,保存文件并退出。

2)启动agent

切换到flume的bin目录下,执行如下命令:

./flume-ng agent --name a1 --conf $FLUME_HOME/conf--conf-file $FLUME_HOME/conf/example.conf -Dflume.root.logger=INFO,console

a1表示的是agent名称,若出现如下信息,表示agent启动成功。其中console表示接收到的数据显示在命令行窗口。

image.png

3)模拟数据来源

新开命令行窗口,输入命令:

telnet victim  44444

并在命令行中随意输入信息,这些信息将成为数据源传送到logger窗口:

image.png

4)检查是否成功

若在logger窗口下看到telnet窗口的输入数据则表示flume安装成功:

image.png至此,victim的安装工作基本完成,后续再陆续安装其他程序。在跟observer联动的时候flume配置文件需要更改,更改内容后续会有介绍。

4.3 Observer

observer作为一台监控主机上面运行着监控程序,用来监控日志中是否存在攻击行为并发出告警信息。

在这台主机上需要安装flume接收victim的日志(其实可以直接用kafka接收victim的日志),kafka做数据传输,spark streaming做数据实时分析,MySQL存储处理后的数据,django做数据展示。

其中核心的是spark streaming其中的规则处置,规则的好坏决定着系统的误报和漏报情况。

4.3.1 安装系统

4.3.2 给用户授予sudo权限

4.3.3 更新yum源并安装ssh

4.3.4 安装java环境

4.3.5 安装flume

这五个步骤的安装方法同victim服务器一致,下面是observer的安装重点。

4.3.6 安装zookeeper

在安装kafka之前要先安装zookeeper,因为kafka的集群管理都是通过zookeeper来实现的。

1)下载zookeeper并解压到app目录下:

tar zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ../app/

2)配置zookeeper的环境变量

vim ~/.bashrc,增加如下内容:

#zk

export ZK_HOME=/home/hadoop/app/zookeeper-3.4.5-cdh5.7.0

export PATH=${ZK_HOME}/bin:$PATH

保存推出后,激活当前环境变量:

source ~/.bashrc

然后命令行下输入 echo $ZK_HOME如果能够正常输出zookeeper目录则表示环境变量配置成功。

3)配置zookeeper

切换到zookeeper的conf目录下,拷贝zoo_sample.cfg文件为zoo.cfg文件,并编辑zoo.cfg文件。

修改datadir为自定义的文件目录,如果不修改该选项则有可能存在默认的文件目录被删除的情况。

dataDir=/home/hadoop/app/tmp/zk    

因为资源有限所以只能单机模式运行zookeeper和kafka。

4)启动zookeeper

切换到zookeeper的bin目录下,输入如下命令启动zookeeper:

./zkServer.sh start    

如果出现如下所示信息则表示zookeeper启动成功:

image.png

4.3.7 安装kafka

安装完zookeeper之后就可以安装kafka了,配置过程也不麻烦。

1)下载kafka并解压到app目录下:

tar zxvf kafka_2.11-0.9.0.0.tgz -C ../app/

2)配置kafka环境变量

vim ~/.bashrc,增加如下内容:

#kafka

export KAFKA_HOME=/home/hadoop/app/kafka_2.11-0.9.0.0

export PATH=${KAFKA_HOME}/bin:$PATH

保存退出后,激活当前环境变量:

source ~/.bashrc

然后命令行下输入 echo $KAFKA_HOME如果能够正常输出kafka目录则表示环境变量配置成功。

3)配置server.properties文件

切换到kafka的config目录下,编辑server.properties文件,编辑配置如下选项:

broker.id=0  #唯一标识broker身份的id信息

listeners=PLAINTEXT://:9092  #表示监听端口是9092

host.name=observer  #hostname表示当前主机名称

log.dirs=/home/hadoop/app/tmp/kafka-logs   #指定kafka的log日志目录

num.partitions=1   #指定分区数量

zookeeper.connect=observer:2181  #配置zookeeper的地址和端口

配置完成后保存退出

4)启动kafka

运行bin目录下的kafka-server-start.sh文件:

bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties    

输入完命令后会显示如下信息:

image.png新开一个窗口输入jps -m命令,查看如果存在如下进程则表示程序已经正常启动。

image.png5)创建topic

一个业务一个topic,在正式使用kafka之前我们必须要配置topic,我们先创建一个topic来测试程序能否正常工作:

bin/kafka-topics.sh --create --zookeeper observer:2181 --replication-factor 1 --partitions 1 --topic testtopic    

这里表示创建一个名为testtopic的话题。

查看是否创建成功,新开窗口输入如下命令:

bin/kafka-topics.sh --list --zookeeper observer:2181    

如果出现testtopic即表明话题创建成功:

image.png6)topic消息测试

kafka整体架构分为consumer和producer,consumer表示消费消息端,producer表示生产消息端。在测试之前我们要先生产消息才能消费消息。

首先我们输入如下命令生产消息:

bin/kafka-console-producer.sh --broker-list observer:9092 --topic testtopic    

然后我们输入如下命令消费消息:

bin/kafka-console-consumer.sh --zookeeper observer:2181 --topic testtopic --from-beginning 

#from-beginning表示从头开始接收消息。

两个窗口的程序在运行以后就会停在窗口不动,当我们在生产端输入任意字符都会在消费端显示的时候,表示我们的kafka可以正常运行了。

生产端:

image.png消费端:

image.png从上面可以看出我们的kafka已经正常运行了。

因为资源有限只能搞单机部署,如果各位老板有钱可以多搞几台主机做冗余,但是一定要注意kafka部署的主机数量一定是2n+1台主机。

7)kafka和flume联动

flume采集到的数据是需要通过kafka传输的,所以它们之间的联动必须要有效才可以。这里我们还是以上面那个testtopic话题为测试对象。

7.1)首先启动kafka,开启consumer消费指定的topic消息

bin/kafka-console-consumer.sh --zookeeper observer:2181 --topic testtopic --from-beginning    

7.2)然后修改victim主机上的flume配置文件,修改source监听源为data.log文件,sink输出类型为avro sink(这个类型能跟其他flume客户端联动)。具体配置如下:

# example.conf: A single-node Flume configuration

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

#监听某文件

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/data/data.log

a1.sources.r1.shell=/bin/sh -c

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = observer

a1.sinks.k1.port = 44445

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

# Bind the source and sink to the channel

#一个source可以输出到多个channel,所以是channels

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

上面表示数据来源于data.log文件,输出到observer的44445端口。

7.3)修改observer主机的flume配置文件,配置文件内容如下:

# example.conf: A single-node Flume configuration

# Name the components on this agent

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

#监听某文件

a2.sources.r2.type = avro

a2.sources.r2.bind = observer

a2.sources.r2.port=44445

# export to kafka

a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink

a2.sinks.k2.brokerList = observer:9092

a2.sinks.k2.topic = testtopic

a2.sinks.k2.requiredAcks = 1

a2.sinks.k2.batchSize = 5

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

# Bind the source and sink to the channel

#一个source可以输出到多个channel,所以是channels

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

sink type类型为KafkaSink。

topic为我们之前创建的topic名称,batchSize表示当记录数到达5个后再发送。

7.4)先启动observer的flume程序,然后再启动victim的flume程序,即先监听再发送数据。

7.4.1)先启动observer的flume程序

../bin/flume-ng agent --name a2 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/example-kafka.conf -Dflume.root.logger=INFO,console    

7.4.2)再启动victim的flume程序

../bin/flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/example-send.conf -Dflume.root.logger=INFO,console

7.4.3)然后我们再向文件/home/hadoop/data/data.log输入测试文件时,如果在kafka的监听窗口看到我们输入的信息时则表示双击联动成功。

image.png切换到kafka窗口看到我们在data.log文件中新增的内容:

image.png至此我们的kafka已经安装成功且已经能够跟flume进行联动。下面就是重要数据处理和规则匹配环节了。

4.3.8 安装spark

spark环境安装也是比较麻烦的,需要有耐心才可以,但是跟着步骤走一定会搞定的。

1)scala安装

安装spark之前必须要安装scala和hadoop等环境。首先我们先安装scala语言。

1.1)下载scala安装包并解压到app目录下:

tar zxvf scala-2.11.8.tgz -C ../app/

1.2)配置环境变量

vim ~/.bashrc并加入如下内容:

#scala

export SCALA_HOME=/home/hadoop/app/scala-2.11.8

export PATH=${SCALA_HOME}/bin:$PATH

保存文件后退出,source ~/.bashrc 激活环境变量后在命令行输入scala出现scala交互窗口则表示安装配置成功。

image.png

2)hadoop环境搭建

2.1)下载解压hadoop安装包到app目录下。

tar zxvf hadoop-2.6.0-cdh5.7.0.tar.gz -C ../app/    

2.2)配置环境变量

vim ~/.bashrc并加入如下内容:

#hadoop

export HADOOP_HOME=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0

export PATH=${HADOOP_HOME}/bin:$PATH

保存文件后退出,source ~/.bashrc 激活环境变量。

2.3)修改hadoop配置文件

切换到Hadoop的etc/hadoop/目录下,编辑hadoop-env.sh文件,配置java环境变量:

export JAVA_HOME=/home/hadoop/app/jdk1.8.0_131    

保存退出后编辑同级目录下的core-site.xml文件,增加如下配置信息:

<configuration>

<property>

        <name>fs.defaultFS</name>

                <value>hdfs://observer:8020</value>

</property>

<property>

        <name>hadoop.tmp.dir</name>

                <value>/home/hadoop/app/tmp</value>

</property>

</configuration>

配置文件指定fs服务器地址和临时目录。

保存退出后编辑统计目录下的hdf-site.xml文件,增加如下配置信息:

<configuration>

<property>

        <name>dfs.replication</name>

                <value>1</value>

</property>

</configuration>

保存退出编辑slaves文件,增加主机名后保存退出。

2.4)格式化

切换到bin目录下,执行如下命令:

./hdfs namenode -format    

执行完成后会在上面指定的tmp目录下生成dfs文件夹。

image.png

2.5)启动hadoop服务

切换到sbin目录下,执行如下命令启动hadoop服务:

./start-dfs.sh 

然后输入三次当前账户密码后即可启动服务,在浏览器输入ip:50070后如果在浏览器中出现如下信息则表示hadoop安装成功:

image.png

2.5)配置yarn

yarn作为Hadoop的资源调度守护进程,同样也需要安装。

切换到etc/hadoop/目录下,编辑mapred-site.xml文件,增加如下内容,指定框架为yarn:

<configuration>

<property>

        <name>mapreduce.framework.name</name>

                <value>yarn</value>

</property>

</configuration>

2.6)启动yarn

切换到sbin目录下,执行./start-yarn.sh命令。

同样的输入完密码后,启动yarn。

启动完成后在浏览器输入ip:8088后如果出现如下所示信息则表示yarn配置成功:

image.png

jps -m查看进程信息也表示Hadoop和yarn配置成功。

image.png

3)spark安装

3.1)首先下载源代码编译适合hadoop版本的,然后再安装编译完之后的包。

image.png

3.2)这里我编译的是spark2.2.0的,解压该文件到app目录下并配置环境变量:

tar zxvf spark-2.2.0-bin-2.6.0-cdh5.7.0.tgz -C ../app/    

vim ~/.bashrc 增加如下内容:

#spark

export SPARK_HOME=/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0

export PATH=${SPARK_HOME}/bin:$PATH

保存退出后source ~/.bashrc 激活环境变量。

3.3)检查是否安装成功

切换到bin目录下执行./spark-shell --master local[2] 命令,如果进入scala环境则表示安装成功。

image.png4.3.9 安装MySQL

安装MySQL的目的是将spark streaming的处理结果存储下来,然后通过django程序对结果做展示,MySQL安装直接通过sudo yum -y install mysql-server命令来执行安装即可。

4.3.10安装django

django程序的安装直接使用pip或者easy_install命令来安装即可:

pip install django

django安装完成后可以使用django-admin.py文件来创建一个django项目:

django-admin.py startproject attackview

然后创建一个app:

cd attackview

python manage.py startapp sparkmysql

app创建完毕后需要切换到attackview目录下,并编辑settings.py文件,指定数据库为mysql:

DATABASES = {

    'default': {

        'ENGINE': 'django.db.backends.mysql',

        'NAME': 'echart',

        'USER':'root',

        'PASSWORD':'root@qwe',

        'HOST':'127.0.0.1',

        'PORT':'3306',

    }

}

保存后退出,然后编辑urls.py文件指定视图和函数之间的关系,保证当有新的url请求后django能够正常处理。

这里的细节不再赘述,包括上面各个程序,如果大家有疑问可以联系我或者百度查询解决方案。

这样我们的攻防环境基本上就完成了,那最后我们要做个flume+kafka+spark+mysql的联动测试,如果mysql数据库中能够存储flume客户端采集后被处理的数据,那表明我们的环境联动没有问题了。

4.3.11 联动测试

1)首先配置flume,监听某日志文件,鉴于文件篇幅,这里只贴上部分代码,大体思路跟flume配置环节一致。

# Describe/configure the source

#监听某端口

a2.sources.r2.type = exec

a2.sources.r2.command = tail -F /home/hadoop/source/access.log

a2.sources.r2.shell=/bin/sh -c

# Describe the sink

a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink

a2.sinks.k2.topic = streamingtopic

a2.sinks.k2.brokerList = hadoop0:9092

a2.sinks.k2.batchSize = 20

a2.sinks.k2.requiredAcks = 1

2)zookeeper和kafka配置保持上面配置不变即可;

3)配置spark streaming的配置文件;

切换到/examples/src/main/python/streaming/目录下,新增一个kafkadirect.py文件,文件内容如下:

#!/usr/bin/python

#coding:utf-8

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

import datetime,yaml, os,re,pymysql

#时间格式转换

def timeutil(time):

        return datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S").strftime("%Y%m%d%H%M%S")

#第一步:创建一个本地的StreamingContext,并设置批处理周期为5s

sc=SparkContext("local[2]","flumeWordCount")

ssc=StreamingContext(sc,10)

#第二步:创建一个kafka连接

topic="streamingtopic"

brokers="hadoop0:9092"

#参数分别表示ssc连接名,列表形式显示的topic名称,brokers列表

directkafkaStream = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":brokers})

#第三步:数据处理&转换&存储

#提取kafka数据的第一个字段为消息正文字段并打印数据长度

lines = directkafkaStream.map(lambda x: x[1])

#加载规则文件

f=open('attackregexp_file.yaml','r')

cont=f.read()

x=yaml.load(cont)

#识别攻击行为

def identify_attack(url):

rule_len=len(x['attackregex'])

for i in xrange(rule_len):

rex = x['attackregex'][i]['regex']

if re.match(rex,url):

#连接数据库并追加记录到数据库中

mysqldb = pymysql.connect(host='localhost',port=3306,db='ana_log',user='root',passwd='P@ssw0rd',charset='utf8')

cursor = mysqldb.cursor()

query_sql="insert into test values('%d','%s');" % (5,'ceshizhuru')

cursor.execute(query_sql)

mysqldb.commit()

data=cursor.fetchone()

cursor.close()

mysqldb.close()

#处理日志记录

def test(i):

infos=i.split(" ")

url=infos[6]

identify_attack(url)

host=infos[0]

ip=infos[0]

time=infos[3]

status_code=int(infos[8])

return (ip,url,time,status_code)

words=lines.map(lambda x:test(x)).pprint()

#第四步:开始sparkstreaming

ssc.start()

ssc.awaitTermination()

保存文件后退出。这里加载了一个规则文件也是这里的核心文件。

4)分别启动zk、kafka等程序

4.1)启动zk

./bin/zkServer.sh start    

4.2)启动kafka

./bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties    

4.3)创建topic

./bin/kafka-topics.sh --create --zookeeper hadoop0:2181 --replication-factor 1 --partitions 1 --topic streamingtopic    

4.4)启动flume

./bin/flume-ng agent --name a2 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/access-log-flume-kafka.conf -Dflume.root.logger=INFO,console    

4.5)启动spark streaming

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 examples/src/main/python/streaming/kafkadirect1.py    

4.6)然后我们向监听的文件中增加新的记录,如果在mysql中出现存储后的结果就表示联动成功。这里我们的规则是判断请求行为是否存在攻击行为。这里我贴上在spark streaming窗口看到的处理记录:

image.png

五、总结

终于我们的环境安装完毕了,后面我们要做的就是开展每一个课程内容了。

文章内容比较多,肯定会存在一些纰漏,应该也会存在更优的解决方案,毕竟我在这方面的研究时间不长,希望大家在观阅的过程中提出自己的见解,共同成长,有问题或者好的见解请联系我,谢谢。

*本文原创作者:gncao,本文属于FreeBuf原创奖励计划,未经许可禁止转载

# 攻防系统
本文为 gncao 独立观点,未经授权禁止转载。
如需授权、对文章有疑问或需删除稿件,请联系 FreeBuf 客服小蜜蜂(微信:freebee1024)
被以下专辑收录,发现更多精彩内容
+ 收入我的专辑
+ 加入我的收藏
gncao LV.4
这家伙太懒了,还未填写个人描述!
  • 11 文章数
  • 23 关注者
todesk日志分析
2023-04-11
不成熟的第三方组件漏洞治理
2023-03-28
甲方纵深防御体系范畴和落地实践
2023-01-30
文章目录