淘先锋技术网

首页 1 2 3 4 5 6 7

1 Hive–执行计划

Hive–explain查询执行计划

1.1 执行计划介绍

Hive 执行计划

  • 执行计划解析
    • 对于SQL解析器来说,我们编写的SQL就是一个String,通过HSQL 解析器解析成抽象语法树
    • Semantic Analyzer把抽象语法树转换成了查询块:QueryBlock(QB)
    • Logic Plan Generator 把查询款转换成了逻辑查询计划
    • LogicalOptimizer 重写了逻辑执行计划
    • Physical Plan Generator 把逻辑执行计划转换成了物理执行计划
    • PhysicalOptimizer 优化物理执行计划,选择一个最优的方案执行
  • 对于操作符的优化来说,每个操作符都会有一个对应Operator的实现类,Operator是操作的基类
org.apache.hadoop.hive.ql.exec.Operator
  • 常用的Operator有如下几类
  • TableScanOperator:表扫描的操作符
Table Scan Operator If the data is coming from the map-reduce framework, just
forward it. This will be needed as part of local work when data is not being
read as part of map-reduce framework
  • ReduceSinkOperator: Reduce输出的操作符
  • JoinOperator:Join操作符
  • MapJoinOperator:MapJoin操作符
  • SelectOperator:查询操作符
  • FilterOperator:过滤操作符
  • GroupByOperator:group by 操作符
  • LimitOperator:Limit操作符

1.2 使用执行计划查询一个SQL

  • 查看两个表Join的执行计划
hive> set hive.auto.convert.join;
hive.auto.convert.join=true

explain select * from emp e join dept d on d.deptno = e.dept_no;
  • hive.auto.convert.join参数是开启MapJoin的开关,默认是开启
  • Map Join 小表的最大的数据大小通过hive.mapjoin.smalltable.filesize这个参数来控制,默认是25000000,单位是字节
  • MapJoin执行计划如下
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        d 
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        d 
          TableScan
            alias: d
            Statistics: Num rows: 1 Data size: 49 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: deptno is not null (type: boolean)
              Statistics: Num rows: 1 Data size: 49 Basic stats: COMPLETE Column stats: NONE
              HashTable Sink Operator
                keys:
                  0 dept_no (type: string)
                  1 deptno (type: string)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: e
            Statistics: Num rows: 1 Data size: 195 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: dept_no is not null (type: boolean)
              Statistics: Num rows: 1 Data size: 195 Basic stats: COMPLETE Column stats: NONE
              Map Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 dept_no (type: string)
                  1 deptno (type: string)
                outputColumnNames: _col0, _col1, _col2, _col6, _col7
                Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col6 (type: string), _col7 (type: string)
                  outputColumnNames: _col0, _col1, _col2, _col3, _col4
                  Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
                  File Output Operator
                    compressed: false
                    Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
                    table:
                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3
  • 可以看出这个job分成了三个stage,Stage-4是第一个Stage,Stage-3依赖于Stage-4,Stage-0依赖于Stage-3
 Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        d 
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        d 
          TableScan
            alias: d
            Statistics: Num rows: 1 Data size: 49 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: deptno is not null (type: boolean)
              Statistics: Num rows: 1 Data size: 49 Basic stats: COMPLETE Column stats: NONE
              HashTable Sink Operator
                keys:
                  0 dept_no (type: string)
                  1 deptno (type: string)
  • 由于这是一个MapJoin,MapJoin适用于大小表Join,是通过把小表缓存起来,大表数据Map进来之后,通过拿到缓存中小表的数据来进行Join,只会发生Map而没有Reduce产生
  • Stage-4是把小表加入到缓存的操作,通过计算把dept看成小表,于是把dept里面的数据加入到缓存里面
  • Stage-3依赖于Stage-4,我们再来看看Stage-3的具体执行流程
Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: e
            Statistics: Num rows: 1 Data size: 195 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: dept_no is not null (type: boolean)
              Statistics: Num rows: 1 Data size: 195 Basic stats: COMPLETE Column stats: NONE
              Map Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 dept_no (type: string)
                  1 deptno (type: string)
                outputColumnNames: _col0, _col1, _col2, _col6, _col7
                Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col6 (type: string), _col7 (type: string)
                  outputColumnNames: _col0, _col1, _col2, _col3, _col4
                  Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
                  File Output Operator
                    compressed: false
                    Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
                    table:
                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work
  • Stage-3 是把大表(emp)加载出来,并通过拿取缓存中的小表(dept)来进行Join操作
  • 第一步是TableScan,先把大表里面的数据扫描出来
  • 然后进行Filter ,Filter 的条件是dept_no is not null ,其实我们的SQL里面并没有加任何的过滤条件,但是我们的JOIN的条件是dept_no,而且使用的是JOIN,默认是 INNER JOIN 通过优化器的时候,会自动加上dept_no is not null,因为dept_no如果是空则无法JOIN上的
  • 后面进行Map Join,可以看出默认的JOIN就是Inner Join,并且写出了是哪两个字段进行Join的
  • 后面确定了outputColumnNames(输出字段)
  • 通过 Select Operator来确定有哪些字段输出,就是根据我们SQL里面SELECT 的字段进行输出的
  • 通过File Output Operator把文件输出出去
  • 最后写出了数据输入、输出、序列化的类
  • 最后一步则是Stage-0
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
  • 可以看出这个Stage有个Limit的操作,不过参数是-1,默认是禁用
  • 如果想看详细的执行计划

2 Hive–数据倾斜解决方案

  • 其实数据倾斜的解决方案都大体差不多,核心思路就是先把数据倾斜的KEY打散,聚合之后再把KEY改成之前的一样,再进行聚合,以此来规避数据倾斜,在大数据场景中,数据倾斜是无法避免的
  • 大数据场景从来都不怕数据量大,怕的就是数据倾斜,
  • 只有发生Suffle的时候才会产生数据倾斜
  • 首先我们应该避免发生Shuffle,有些场景无法避免,则可以使用下面的解决思路来进行解决

2.1 JOIN数据倾斜

  • Join数据倾斜思路
    • 两个表JOIN出现数据倾斜,可以先查询出来出现数据倾斜的KEY是哪些
    • 把没有出现数据倾斜的的数据正常清洗
    • 出现数据倾斜的数据如果不需要,则可以不进行处理
    • 如果需要处理,则把出现数据倾斜的数据KEY前面加一个N以内的随机数,并把JOIN的另一张表给扩大N倍,前面都加上N的随机数,最后进行JOIN,再把随机数给去掉
    • 把两个结果集UNION起来,得到最终的结果
    • 虽然最后结果的Stage增加了,过程复杂了,但是可以有效的规避数据倾斜

2.2 Group By 数据倾斜

  • 这个有点类似于之前MapReduce代码的解决思路
  • 大致的思路如下
    • 通过先把所有的Key加一个随机数,然后进行聚合
    • 再把Key上面的随机数给去掉在进行聚合

MapReduce–数据倾斜解决方案

MapReduce–数据倾斜解决方案Code

  • 下面我给大家举个例子
  • 如果我们想要查询每个cookie有多少数据,数据如下
hive> select * from bigdata.window1;
cookie1	2015-04-10 10:00:02	url2
cookie2	2015-04-10 10:50:01	url55
cookie1	2015-04-10 10:00:02	url2
cookie1	2015-04-10 10:00:00	url1
cookie1	2015-04-10 10:03:04	1url3
cookie1	2015-04-10 10:50:05	url6
cookie1	2015-04-10 11:00:00	url7
cookie1	2015-04-10 10:10:00	url4
cookie1	2015-04-10 10:50:01	url5
cookie2	2015-04-10 10:00:02	url22
cookie2	2015-04-10 10:00:00	url11
cookie2	2015-04-10 10:03:04	1url33
cookie2	2015-04-10 10:50:05	url66
cookie2	2015-04-10 11:00:00	url77
cookie2	2015-04-10 10:10:00	url44
cookie2	2015-04-10 10:50:01	url55
  • SQL 如下
select
    split(tmp.cookie,"-")[1] as cookie,
    sum(tmp.cnt) as cnt
from(
    select 
    concat(rand(10),"-",cookie) as cookie,
    count(1) as cnt
    from bigdata.window1
    group by
    concat(rand(10),"-",cookie)
) as tmp
group by split(tmp.cookie,"-")[1]