flink-sql 读取hbase 维度数据配置
- 若维度数据插入后几乎不发生更新,或不发生更新,可以用hbase存储大数据量维度数据
- 可以同步异步读取hbase数据,提高flink与外部hbase 交互的并发数
flink-sql连接hbase 配置
hbase相关配置
CREATE TABLE dim_hljy_ord_stu_info (
rowkey STRING,
cf1 ROW<id BIGINT ,order_type STRING ,order_no STRING ,create_time timestamp(1) ,update_time timestamp(1) ,is_deleted tinyint>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'dim:dim_stu_info',
'zookeeper.quorum' = 'localhost:2181',
'sink.buffer-flush.max-size'='10mb',
'sink.buffer-flush.max-rows' = '2000',
'sink.buffer-flush.interval' = '1s',
'sink.async' = 'true'
);
– flink-sql 读取hbase列簇(数据列)
SELECT cf1.order_type from dim_hljy_ord_stu_info limit 10 ;
flink-sql 将kafka 数据写入hbase
- kafka 流表
CREATE TABLE ods_ord_stu_info` (
`id` BIGINT COMMENT '主键',
`order_type` STRING COMMENT '订单类型1-报名2-择升 3-升级',
`order_no` STRING COMMENT '订单号',
`create_time` timestamp(1) COMMENT '创建时间' ,
`update_time` timestamp(1) COMMENT '更新时间',
`is_deleted` tinyint COMMENT '0:未删除,1:删除',
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'topic_ods_ord_stu_info',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'value.json.fail-on-missing-field' = 'false'
);
– 定义在flink hbase流表
CREATE TABLE dim_hljy_ord_stu_info (
rowkey STRING,
cf1 ROW<id BIGINT ,order_type STRING ,order_no STRING ,create_time timestamp(1) ,update_time timestamp(1) ,is_deleted tinyint>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'hljy:dim_hljy_ord_stu_info',
'zookeeper.quorum' = '10.155.0.40:2181',
'sink.buffer-flush.max-size'='10mb',
'sink.buffer-flush.max-rows' = '2000',
'sink.buffer-flush.interval' = '1s',
'sink.async' = 'true'
);
– flink-sql 将kafka 流表写入hbase
insert into dim_hljy_ord_stu_info select concat_ws('_',UUID(),`sno`,`enroll_code`),row(`id`,`sno`,`enroll_code ,`create_time`,`update_time`,`pay_route`)
from ods_ord_stu_info ;