淘先锋技术网

首页 1 2 3 4 5 6 7

flink-sql 读取hbase 维度数据配置

  • 若维度数据插入后几乎不发生更新,或不发生更新,可以用hbase存储大数据量维度数据
  • 可以同步异步读取hbase数据,提高flink与外部hbase 交互的并发数

flink-sql连接hbase 配置

hbase相关配置


CREATE TABLE dim_hljy_ord_stu_info (
      rowkey STRING, 
      cf1 ROW<id BIGINT ,order_type STRING ,order_no STRING  ,create_time timestamp(1) ,update_time timestamp(1) ,is_deleted tinyint>,
      PRIMARY KEY (rowkey) NOT ENFORCED
 ) WITH (
        'connector' = 'hbase-2.2',
      'table-name' = 'dim:dim_stu_info',
      'zookeeper.quorum' = 'localhost:2181',
      'sink.buffer-flush.max-size'='10mb',
      'sink.buffer-flush.max-rows' = '2000',
      'sink.buffer-flush.interval' = '1s',
      'sink.async' = 'true' 
 );

– flink-sql 读取hbase列簇(数据列)

  SELECT  cf1.order_type from dim_hljy_ord_stu_info limit 10 ;

flink-sql 将kafka 数据写入hbase

  • kafka 流表

CREATE TABLE ods_ord_stu_info` (
  `id` BIGINT COMMENT '主键',
  `order_type` STRING  COMMENT '订单类型1-报名2-择升 3-升级', 
  `order_no` STRING  COMMENT '订单号', 
  `create_time` timestamp(1) COMMENT '创建时间' ,
  `update_time` timestamp(1) COMMENT '更新时间',
  `is_deleted` tinyint COMMENT '0:未删除,1:删除',
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
   'connector' = 'upsert-kafka',
  'topic' = 'topic_ods_ord_stu_info',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false'
);

– 定义在flink hbase流表


CREATE TABLE dim_hljy_ord_stu_info (
      rowkey STRING, 
      cf1 ROW<id BIGINT ,order_type STRING ,order_no STRING  ,create_time timestamp(1) ,update_time timestamp(1) ,is_deleted tinyint>,
      PRIMARY KEY (rowkey) NOT ENFORCED
 ) WITH (
        'connector' = 'hbase-2.2',
      'table-name' = 'hljy:dim_hljy_ord_stu_info',
      'zookeeper.quorum' = '10.155.0.40:2181',
      'sink.buffer-flush.max-size'='10mb',
      'sink.buffer-flush.max-rows' = '2000',
      'sink.buffer-flush.interval' = '1s',
      'sink.async' = 'true' 
 );

– flink-sql 将kafka 流表写入hbase


insert into  dim_hljy_ord_stu_info select concat_ws('_',UUID(),`sno`,`enroll_code`),row(`id`,`sno`,`enroll_code ,`create_time`,`update_time`,`pay_route`)
from ods_ord_stu_info ;