介绍
flume是收集、移动、聚合大量日志数据的服务。flume基于流数据的架构,用于在线日志分析。flume在生产和消费者之间启动协调作用。flume是基于事件的一种服务,提供了事务保证,确保消息一定被分发。
flume的特点
- 支持各种接入资源数据的类型以及接出数据类型
- 支持水平扩展(增加节点服务器)
- 支持竖直扩展(增加硬件,如硬盘)
flume的结构组件
- source:接受数据,类型有多种,然后把这些数据传递到一个或多个channel中。
- channel:数据临时存放地,对source中来的数据进行缓冲,直到sink消费掉。在source和sink之间扮演桥梁的角色
- sink:从channel提取数据存放到中央化存储(hadoop / hbase)。
注意: 一个source可以向多个channel写数据,但是每个sink只能从一个channel中取数据
安装flume
- 下载apache-flume-1.8.0-bin.tar.gz
- 将压缩文件上传到服务器并解压
- 修改环境变量
export FLUME_HOME=/soft/ export PATH=$PATH:$FLUME_HOME/bin
- 查看安装是否生效
flume-ng version
配置flume
-
前往/apache-flume-1.8.0-bin/conf目录下,复制flume-conf.properties.template模板文件为新的文件,文件名可以自定义
cp flume-conf.properties.template flume-testNC.properties
-
修改配置文件flume-testNC.properties
#定义三种组件,a1是自定义代理名称 a1.sources=r1 a1.channels=c1 a1.sinks=s1 #定义source信息 #指定连接源netcat(瑞士军刀,一种套接字程序) a1.sources.r1.type=netcat #指定连接source服务器的ip a1.sources.r1.bind=localhost #指定source的端口 a1.sources.r1.port=8888 #定义sink输出的类型 a1.sinks.s1.type=logger #定义channnel的类型,memory指内存通道 a1.channels.c1.type=memory #将source和sink与channel绑定在一起,注意channel的单复数,一个source和连接多个channel,一个sink只能连接一个channel a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
-
下载安装套接字程序netcat,以做测试
yum install nmap-ncat.x86_64
-
启动测试
#先启动flume #-f:声明上面配置文件的路径 -n:声明代理的名称,-Dflume.root.logger设置日志等级和输出的位置,console表示输出到控制台 flume-ng agent -f /soft/apache-flume-1.8.0-bin/conf/flume-testNC.properties -n a1 -Dflume.root.logger=INFO,console #后台运行 flume-ng agent -f /soft/apache-flume-1.8.0-bin/conf/flume-testNC.properties -n a1 & #新开一个会话启动瑞士军刀连接flume的source nc localhost 8888 #输入任意内容查看flume的接收情况
source
-
netcat:监听会话
-
exec:监听文件
配置文件:a1.sources=r1 a1.channels=c1 a1.sinks=s1 a1.sources.r1.type=exec #监听/home/a.txt文件;-c+0从头开始收集数据(注意+号两边不能有空格),-F持续收集新数据 a1.sources.r1.command=tail -c+0 -F /home/a.txt a1.sinks.s1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
-
spooldir:监听文件夹
配置文件:a1.sources=r1 a1.channels=c1 a1.sinks=s1 a1.sources.r1.type=spooldir #监听/home/xuhx文件夹,注意:type的值的d是小写,这边的属性的D是大写 a1.sources.r1.spoolDir=/home/xuhx a1.sources.r1.fileHeader=true a1.sinks.s1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
注意:该文件夹下生成的文件都会被flume监听到,监听到的文件都会打一个后缀标记
.COMPLETED
,已经打过标记的文件必须是静态文件,不能在发生变化,否则会报错! -
seq:序列
a1.sources=r1 a1.channels=c1 a1.sinks=s1 a1.sources.r1.type=seq #设置压力测试序列数1000个 a1.sources.r1.totalEvents=1000 a1.sinks.s1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
-
StressSource 用于压力测试
a1.sources=r1 a1.channels=c1 a1.sinks=s1 a1.sources.r1.type=org.apache.flume.source.StressSource a1.sources.r1.size=1024 a1.sources.r1.maxTotalEvents=1000000 a1.sinks.s1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
sink
-
hdfs
a1.sources=r1 a1.channels=c1 a1.sinks=s1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 a1.sinks.s1.type=hdfs #设置hdfs的保存路径,可以根据时间动态设置目录 a1.sinks.s1.hdfs.path = /flume/events/%y-%m-%d/%H/%M/%S #设置数据保存文件名的前缀 a1.sinks.s1.hdfs.filePrefix=events- #是否生成新的目录 a1.sinks.s1.hdfs.round=true #生成目录的时间 a1.sinks.s1.hdfs.roundValue=10 #生成目录的时间单位 a1.sinks.s1.hdfs.roundUnit=second #是否取用hdfs系统时间作为参考(一般配置涉及到时间的都要设置这个属性) a1.sinks.s1.hdfs.useLocalTimeStamp=true #滚动当前文件之前等待的秒数 a1.sinks.s1.hdfs.rollInterval=10 #设置文件满足多大时创建新的文件,单位是字节 a1.sinks.s1.hdfs.rollSize=10 #创建文件之前写入文件的事件数 a1.sinks.s1.hdfs.rollCount=3 a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
-
hive
-
hbase
a1.sources=r1 a1.channels=c1 a1.sinks=s1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 a1.sinks.s1.type=hbase #设置数据存储的表格 a1.sinks.s1.table=mydb1:t8 #设置数据存储的列族 a1.sinks.s1.columnFamily=f1 a1.sinks.s1.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
注意: 本人在操作过程中遇到了一个错误
HBase major version number must be less than 2 for hbase-sink
,我的Hbase是2.0版本的,百度找到解决方案将apache-hive-2.3.3-bin/lib
下所有hbase-*.jar
复制到apache-flume-1.8.0-bin/lib
下,确实解决了,至于为什么从hive里面取jar包我也不知道,有知道原因的欢迎评论留言。 -
kafka
跃点处理
跃点即为路由,即当一个数据传输需要经过多个网络,每一个网点都要有一个中转站,来中转这些数据,这些中转站就是跃点,在众多的数据传输格式中,avro的数据传输量是最小的,所以在这里我们使用avro配合flume来搭建一个跃点。
- 搭建接收数据源数据
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=5555 #sink提取数据转为avro发送 a1.sinks.k1.type=avro a1.sinks.k1.hostname=localhost #数据输出端口6666 a1.sinks.k1.port=6666 a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
- 搭建接收端展示
a2.sources=r2 a2.sinks=k2 a2.channels=c2 a2.sources.r2.type=avro a2.sources.r2.bind=localhost #数据接收端口7777 a2.sources.r2.port=7777 #打印输出 a2.sinks.k2.type=logger a2.channels.c2.type=memory a2.sources.r2.channels=c2 a2.sinks.k2.channel=c2
- 搭建跃点(中转站)
a3.sources=r3 a3.sinks=k3 a3.channels=c3 a3.sources.r3.type=avro a3.sources.r3.bind=localhost #接收数据源 a3.sources.r3.port=6666 #sink提取数据转为avro发送 a3.sinks.k3.type=avro a3.sinks.k3.hostname=localhost a3.sinks.k3.port=7777 a3.channels.c3.type=memory a3.sources.r3.channels=c3 a3.sinks.k3.channel=c3
- 启动顺序:
先启动a2,再启动a3,最后启动a1(原因:source是开发一个服务端口,由其他应用往这个端口灌输数据,而sink是向其他端口写数据,所以说只有先开放接收端口,人家才能往里面写,而最终的接收是a2,所以先开a2,a3是第二接收,所以再开a3,然后是a1,最后开启netcat往里面写数据)
channel
-
MemoryChannel(内存通道)
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=6666 a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
非永久性暂存数据,速度快,不可靠(一旦系统崩溃,就会造成数据丢失)
-
FileChannel(文件通道)
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=6666 a1.sinks.k1.type=logger a1.channels.c1.type=file #配置检查站点的目录 a1.channels.c1.checkpointDir=/home/fc_check #配置通道数据存放的目录 a1.channels.c1.dataDirs=/home/fc_data a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
永久性暂存数据,速度慢,吞吐量低,可靠
-
Spillable Memory Channel(可溢出内存通道)
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=6666 a1.sinks.k1.type=logger a1.channels.c1.type=SPILLABLEMEMORY a1.channels.c1.memoryCapacity=10000 a1.channels.c1.overflowCapacity=10000000 a1.channels.c1.byteCapacity=800000 a1.channels.c1.checkpointDir=/home/fc_check a1.channels.c1.dataDirs=/home/fc_data a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
内存通道和文件通道的组合