淘先锋技术网

首页 1 2 3 4 5 6 7

介绍

flume是收集、移动、聚合大量日志数据的服务。flume基于流数据的架构,用于在线日志分析。flume在生产和消费者之间启动协调作用。flume是基于事件的一种服务,提供了事务保证,确保消息一定被分发。

flume的特点

  1. 支持各种接入资源数据的类型以及接出数据类型
  2. 支持水平扩展(增加节点服务器)
  3. 支持竖直扩展(增加硬件,如硬盘)

flume的结构组件

在这里插入图片描述

  • source:接受数据,类型有多种,然后把这些数据传递到一个或多个channel中。
  • channel:数据临时存放地,对source中来的数据进行缓冲,直到sink消费掉。在source和sink之间扮演桥梁的角色
  • sink:从channel提取数据存放到中央化存储(hadoop / hbase)。

注意: 一个source可以向多个channel写数据,但是每个sink只能从一个channel中取数据

安装flume

  1. 下载apache-flume-1.8.0-bin.tar.gz
  2. 将压缩文件上传到服务器并解压
  3. 修改环境变量
    	export FLUME_HOME=/soft/
    	export PATH=$PATH:$FLUME_HOME/bin
    
  4. 查看安装是否生效
    	flume-ng version
    

配置flume

flume官方文档

  • 前往/apache-flume-1.8.0-bin/conf目录下,复制flume-conf.properties.template模板文件为新的文件,文件名可以自定义

    	cp flume-conf.properties.template flume-testNC.properties
    
  • 修改配置文件flume-testNC.properties

    	#定义三种组件,a1是自定义代理名称
    	a1.sources=r1
    	a1.channels=c1
    	a1.sinks=s1
    	
    	#定义source信息
    	#指定连接源netcat(瑞士军刀,一种套接字程序)
    	a1.sources.r1.type=netcat
    	#指定连接source服务器的ip
    	a1.sources.r1.bind=localhost
    	#指定source的端口
    	a1.sources.r1.port=8888
    	
    	#定义sink输出的类型
    	a1.sinks.s1.type=logger
    	
    	#定义channnel的类型,memory指内存通道
    	a1.channels.c1.type=memory
    	
    	#将source和sink与channel绑定在一起,注意channel的单复数,一个source和连接多个channel,一个sink只能连接一个channel
    	a1.sources.r1.channels=c1
    	a1.sinks.s1.channel=c1
    
  • 下载安装套接字程序netcat,以做测试

    	yum install nmap-ncat.x86_64
    
  • 启动测试

    	#先启动flume
    	#-f:声明上面配置文件的路径 -n:声明代理的名称,-Dflume.root.logger设置日志等级和输出的位置,console表示输出到控制台
    	flume-ng agent -f /soft/apache-flume-1.8.0-bin/conf/flume-testNC.properties -n a1 -Dflume.root.logger=INFO,console
    
    	#后台运行
    	flume-ng agent -f /soft/apache-flume-1.8.0-bin/conf/flume-testNC.properties -n a1 &
    
    	#新开一个会话启动瑞士军刀连接flume的source
    	nc localhost 8888
    
    	#输入任意内容查看flume的接收情况
    

source

  • netcat:监听会话

  • exec:监听文件
    配置文件:

    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    
    a1.sources.r1.type=exec
    #监听/home/a.txt文件;-c+0从头开始收集数据(注意+号两边不能有空格),-F持续收集新数据
    a1.sources.r1.command=tail -c+0 -F /home/a.txt
    
    a1.sinks.s1.type=logger
    
    a1.channels.c1.type=memory
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    
  • spooldir:监听文件夹
    配置文件:

    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    
    a1.sources.r1.type=spooldir
    #监听/home/xuhx文件夹,注意:type的值的d是小写,这边的属性的D是大写
    a1.sources.r1.spoolDir=/home/xuhx
    a1.sources.r1.fileHeader=true
    
    a1.sinks.s1.type=logger
    
    a1.channels.c1.type=memory
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    

    注意:该文件夹下生成的文件都会被flume监听到,监听到的文件都会打一个后缀标记.COMPLETED,已经打过标记的文件必须是静态文件,不能在发生变化,否则会报错!

  • seq:序列

    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    
    a1.sources.r1.type=seq
    #设置压力测试序列数1000个
    a1.sources.r1.totalEvents=1000
    
    a1.sinks.s1.type=logger
    
    a1.channels.c1.type=memory
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    
  • StressSource 用于压力测试

    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    
    a1.sources.r1.type=org.apache.flume.source.StressSource
    a1.sources.r1.size=1024
    a1.sources.r1.maxTotalEvents=1000000
    
    a1.sinks.s1.type=logger
    
    a1.channels.c1.type=memory
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    

sink

  • hdfs

    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=8888
    
    
    a1.sinks.s1.type=hdfs
    #设置hdfs的保存路径,可以根据时间动态设置目录
    a1.sinks.s1.hdfs.path = /flume/events/%y-%m-%d/%H/%M/%S
    #设置数据保存文件名的前缀
    a1.sinks.s1.hdfs.filePrefix=events-
    
    #是否生成新的目录
    a1.sinks.s1.hdfs.round=true
    #生成目录的时间
    a1.sinks.s1.hdfs.roundValue=10
    #生成目录的时间单位
    a1.sinks.s1.hdfs.roundUnit=second
    #是否取用hdfs系统时间作为参考(一般配置涉及到时间的都要设置这个属性)
    a1.sinks.s1.hdfs.useLocalTimeStamp=true
    
    #滚动当前文件之前等待的秒数
    a1.sinks.s1.hdfs.rollInterval=10
    #设置文件满足多大时创建新的文件,单位是字节
    a1.sinks.s1.hdfs.rollSize=10
    #创建文件之前写入文件的事件数
    a1.sinks.s1.hdfs.rollCount=3
    
    a1.channels.c1.type=memory
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    
  • hive

  • hbase

    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=8888
    
    a1.sinks.s1.type=hbase
    #设置数据存储的表格
    a1.sinks.s1.table=mydb1:t8
    #设置数据存储的列族
    a1.sinks.s1.columnFamily=f1
    a1.sinks.s1.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer
    
    a1.channels.c1.type=memory
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    

    注意: 本人在操作过程中遇到了一个错误HBase major version number must be less than 2 for hbase-sink,我的Hbase是2.0版本的,百度找到解决方案将apache-hive-2.3.3-bin/lib下所有hbase-*.jar复制到apache-flume-1.8.0-bin/lib下,确实解决了,至于为什么从hive里面取jar包我也不知道,有知道原因的欢迎评论留言。

  • kafka

跃点处理

跃点即为路由,即当一个数据传输需要经过多个网络,每一个网点都要有一个中转站,来中转这些数据,这些中转站就是跃点,在众多的数据传输格式中,avro的数据传输量是最小的,所以在这里我们使用avro配合flume来搭建一个跃点。

  1. 搭建接收数据源数据
    	a1.sources=r1
    	a1.sinks=k1
    	a1.channels=c1
    
    	a1.sources.r1.type=netcat
    	a1.sources.r1.bind=localhost
    	a1.sources.r1.port=5555
    	
    	#sink提取数据转为avro发送
    	a1.sinks.k1.type=avro
    	a1.sinks.k1.hostname=localhost
    	#数据输出端口6666
    	a1.sinks.k1.port=6666
    
    	a1.channels.c1.type=memory
    
    	a1.sources.r1.channels=c1
    	a1.sinks.k1.channel=c1
    
  2. 搭建接收端展示
    	a2.sources=r2
    	a2.sinks=k2
    	a2.channels=c2
    
    	a2.sources.r2.type=avro
    	a2.sources.r2.bind=localhost
    	#数据接收端口7777
    	a2.sources.r2.port=7777
    	
    	#打印输出
    	a2.sinks.k2.type=logger
    
    	a2.channels.c2.type=memory
    
    	a2.sources.r2.channels=c2
    	a2.sinks.k2.channel=c2
    
  3. 搭建跃点(中转站)
    	a3.sources=r3
    	a3.sinks=k3
    	a3.channels=c3
    
    	a3.sources.r3.type=avro
    	a3.sources.r3.bind=localhost
    	#接收数据源
    	a3.sources.r3.port=6666
    	
    	#sink提取数据转为avro发送
    	a3.sinks.k3.type=avro
    	a3.sinks.k3.hostname=localhost
    	a3.sinks.k3.port=7777
    
    	a3.channels.c3.type=memory
    
    	a3.sources.r3.channels=c3
    	a3.sinks.k3.channel=c3
    
  4. 启动顺序:
    先启动a2,再启动a3,最后启动a1(原因:source是开发一个服务端口,由其他应用往这个端口灌输数据,而sink是向其他端口写数据,所以说只有先开放接收端口,人家才能往里面写,而最终的接收是a2,所以先开a2,a3是第二接收,所以再开a3,然后是a1,最后开启netcat往里面写数据)

channel

  1. MemoryChannel(内存通道)

    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
    
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=6666
    
    a1.sinks.k1.type=logger
    
    a1.channels.c1.type=memory
    
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    

    非永久性暂存数据,速度快,不可靠(一旦系统崩溃,就会造成数据丢失)

  2. FileChannel(文件通道)

    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
    
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=6666
    
    a1.sinks.k1.type=logger
    
    a1.channels.c1.type=file
    #配置检查站点的目录
    a1.channels.c1.checkpointDir=/home/fc_check
    #配置通道数据存放的目录
    a1.channels.c1.dataDirs=/home/fc_data
    
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    

    永久性暂存数据,速度慢,吞吐量低,可靠

  3. Spillable Memory Channel(可溢出内存通道)

    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
    
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=6666
    
    a1.sinks.k1.type=logger
    
    a1.channels.c1.type=SPILLABLEMEMORY
    a1.channels.c1.memoryCapacity=10000
    a1.channels.c1.overflowCapacity=10000000
    a1.channels.c1.byteCapacity=800000
    a1.channels.c1.checkpointDir=/home/fc_check
    a1.channels.c1.dataDirs=/home/fc_data
    
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    

    内存通道和文件通道的组合