淘先锋技术网

首页 1 2 3 4 5 6 7

目录

一.Flume汇入数据到Hive中

方法一:汇入到Hive指定的HDFS路径中:

方法二:利用HiveSink汇入数据

二、HBaseSinks的三种序列化模式使用

1.SimpleHbaseEventSerializer

2.SimpleAsyncHbaseEventSerializer

3.RegexHbaseEventSerializer


一.Flume汇入数据到Hive中

方法一:汇入到Hive指定的HDFS路径中:

 1)在hive中创建数据库和外部

        create table flume;

create external table flume_into_hive(name string,age int) partitioned by (dt string) row format delimited fields terminated by ',' location '/user/hive/warehouse/flume.db/flume_into_hive';

2)在/root中创建hive.log文件

mkdir flume-hive

cd flume-hive/

vi hive.log

 3)在flume的conf路径中编写配置文件flume-into-hive-1.conf

agent.sources=r1
agent.channels=c1
agent.sinks=s1

agent.sources.r1.type=exec
agent.sources.r1.command=tail -F /root/flume-hive/hive.log

agent.channels.c1.type=memory
agent.channels.capacity=1000
agent.channels.c1.transactionCapacity=100

agent.sinks.s1.type=hdfs
agent.sinks.s1.hdfs.path = hdfs://node01:9000/user/hive/warehouse/flume.db/flume_into_hive/dt=%Y%m%d
agent.sinks.s1.hdfs.filePrefix = upload-
agent.sinks.s1.hdfs.fileSuffix=.txt
#是否按照时间滚动文件夹
agent.sinks.s1.hdfs.round = true
#多少时间单位创建一个新的文件夹
agent.sinks.s1.hdfs.roundValue = 1
#重新定义时间单位
agent.sinks.s1.hdfs.roundUnit = hour
#是否使用本地时间戳
agent.sinks.s1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
agent.sinks.s1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
agent.sinks.s1.hdfs.fileType = DataStream
agent.sinks.s1.hdfs.writeFormat=Text
#多久生成一个新的文件
agent.sinks.s1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
agent.sinks.s1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关

agent.sinks.s1.hdfs.rollCount = 0

agent.sources.r1.channels=c1
agent.sinks.s1.channel=c

 4)运行flume

bin/flume-ng agent -c conf -f conf/flume-into-hive-1.conf -n agent

5)查询hdfs中的数据

  hdfs dfs -cat /user/hive/warehouse/flume.db/flume_into_hive/dt=20221110/upload-.1668082651723.txt

6)在hive表中加载数据

load data inpath '/user/hive/warehouse/flume.db/flume_into_hive/dt=20221110' into table flume_into_hive partition(dt=20221110);

select * from flume_into_hive;

方法二:利用HiveSink汇入数据

1)从hive/lib和hive/hcatalog/share/hcatalog/中找寻下列JAR包,放入到flume/lib中。

如果flume中有重名的则先删除flume中的再进行复制。

cp /jar包 想要存入的目录

2)编写flume的配置文件

vi flume-into-hive-2.conf

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type=exec

a1.sources.s1.command=tail -F /root/flume-hive/hive.log

a1.sinks.k1.type = hive

a1.sinks.k1.channel=c1

a1.sinks.k1.hive.metastore = thrift://node01:9083

a1.sinks.k1.hive.database = flume_hive

a1.sinks.k1.hive.table = flume_into_hive_1

a1.sinks.k1.useLocalTimeStamp = true

a1.sinks.k1.round = false

a1.sinks.k1.roundValue = 10

a1.sinks.k1.roundUnit = minute

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.fieldnames =name,age

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

a1.sinks.k1.channel = c1

a1.sources.s1.channels = c1

3)在hive中创建表

create table flume_into_hive_1(name string,age int) clustered by (age) into 2 buckets stored as orc tblproperties("transactional"='true');

 ​​​​​​​​​​​

4)在hive中设置权限

set hive.support.concurrency=true;

set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

 5)启动metastore服务

hive --service metastore -p 9083

6)运行flume

bin/flume-ng agent -c conf -f conf/hive/flume-into-hive-2.conf -n a1

7)查看表 

二、HBaseSinks的三种序列化模式使用

1.SimpleHbaseEventSerializer

1)首先在HBase里面建立一个表flume-hbase-table,拥有colfamily1colfamily2两个列族

create 'flume-hbase-table','colfamily1','colfamily2'

2)然后写一个flume的配置文件flume-into-hbase.conf

agent.sources = r1

agent.channels = c1

agent.sinks = s1

agent.sources.r1.type = exec

agent.sources.r1.command = tail -F /root/flume-hbase/test.log

agent.sources.r1.checkperiodic = 50

agent.channels.c1.type = memory

agent.channels.c1.capacity = 1000

agent.channels.c1.transactionCapacity = 100

agent.sinks.s1.type = org.apache.flume.sink.hbase.HBaseSink

 agent.sinks.s1.zookeeperQuorum=node01:2181

agent.sinks.s1.table=flume-hbase-table

#HBase表的列族名称

agent.sinks.s1.columnFamily=colfamily1

agent.sinks.s1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

#HBase表的列族下的某个列名称

agent.sinks.s1.serializer.payloadColumn=column-1

agent.sources.r1.channels = c1

agent.sinks.s1.channel=c1

3)运行Flume:

bin/flume-ng agent -c conf -f conf/hbase/flume-into-hbase.conf -n agent  -Dflume.root.logger=INFO,console

4)scan 'flume-hbase-table'

2.SimpleAsyncHbaseEventSerializer

1)编写flume-into-hbase-1.conf配置文件:

agent.sources = r1

agent.channels = c1

agent.sinks = s1

agent.sources.r1.type = exec

agent.sources.r1.command = tail -F /root/flume-hbase/test.log

agent.sources.r1.checkperiodic = 50

agent.channels.c1.type = memory

agent.channels.c1.capacity = 1000

agent.channels.c1.transactionCapacity = 100

agent.sinks.s1.type = org.apache.flume.sink.hbase.AsyncHBaseSink

agent.sinks.s1.zookeeperQuorum=node01:2181

agent.sinks.s1.table=flume-hbase-table

#HBase表的列族名称

agent.sinks.s1.columnFamily=colfamily2

agent.sinks.s1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

#HBase表的列族下的某个列名称

agent.sinks.s1.serializer.payloadColumn=column-2

agent.sources.r1.channels = c1

agent.sinks.s1.channel=c1

2)运行flume:

bin/flume-ng agent -c conf -f conf/hbase/flume-into-hbase-2.conf -n agent  -Dflume.root.logger=INFO,console

3)在hbase中查看

3.RegexHbaseEventSerializer

1)编写flume-into-hbase-2.conf配置文件:

agent.sources = r1

agent.channels = c1

agent.sinks = s1

agent.sources.r1.type = exec

agent.sources.r1.command = tail -F /root/flume-hbase/test.log

agent.sources.r1.checkperiodic = 50

agent.channels.c1.type = memory

agent.channels.c1.capacity = 1000

agent.channels.c1.transactionCapacity = 100

agent.sinks.s1.type = org.apache.flume.sink.hbase.HBaseSink

agent.sinks.s1.zookeeperQuorum=node01:2181

agent.sinks.s1.table=flume-hbase-table

#HBase表的列族名称

agent.sinks.s1.columnFamily=colfamily1

agent.sinks.s1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

agent.sinks.s1.serializer.regex=\\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]

agent.sinks.s1.serializer.colNames=time,url,number

agent.sources.r1.channels = c1

agent.sinks.s1.channel=c1

2)运行Flume:

bin/flume-ng agent -c conf -f conf/hbase/flume-into-hbase-3.conf -n agent  -Dflume.root.logger=INFO,console

3)在/root/flume-hbase/test.log中添加如下数据: 

[2022-05-17] [http://www.baidu.com] [20]

[2022-05-17] [http://www.bilibili.com] [25]

[2022-05-17] [http://www.qq.com] [26]