1 Hive–执行计划
Hive–explain查询执行计划
1.1 执行计划介绍
- 执行计划解析
- 对于SQL解析器来说,我们编写的SQL就是一个String,通过HSQL 解析器解析成抽象语法树
- Semantic Analyzer把抽象语法树转换成了查询块:QueryBlock(QB)
- Logic Plan Generator 把查询款转换成了逻辑查询计划
- LogicalOptimizer 重写了逻辑执行计划
- Physical Plan Generator 把逻辑执行计划转换成了物理执行计划
- PhysicalOptimizer 优化物理执行计划,选择一个最优的方案执行
- 对于操作符的优化来说,每个操作符都会有一个对应Operator的实现类,Operator是操作的基类
org.apache.hadoop.hive.ql.exec.Operator
- 常用的Operator有如下几类
- TableScanOperator:表扫描的操作符
Table Scan Operator If the data is coming from the map-reduce framework, just
forward it. This will be needed as part of local work when data is not being
read as part of map-reduce framework
- ReduceSinkOperator: Reduce输出的操作符
- JoinOperator:Join操作符
- MapJoinOperator:MapJoin操作符
- SelectOperator:查询操作符
- FilterOperator:过滤操作符
- GroupByOperator:group by 操作符
- LimitOperator:Limit操作符
1.2 使用执行计划查询一个SQL
hive> set hive.auto.convert.join;
hive.auto.convert.join=true
explain select * from emp e join dept d on d.deptno = e.dept_no;
- hive.auto.convert.join参数是开启MapJoin的开关,默认是开启
- Map Join 小表的最大的数据大小通过hive.mapjoin.smalltable.filesize这个参数来控制,默认是25000000,单位是字节
- MapJoin执行计划如下
STAGE DEPENDENCIES:
Stage-4 is a root stage
Stage-3 depends on stages: Stage-4
Stage-0 depends on stages: Stage-3
STAGE PLANS:
Stage: Stage-4
Map Reduce Local Work
Alias -> Map Local Tables:
d
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
d
TableScan
alias: d
Statistics: Num rows: 1 Data size: 49 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: deptno is not null (type: boolean)
Statistics: Num rows: 1 Data size: 49 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 dept_no (type: string)
1 deptno (type: string)
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
alias: e
Statistics: Num rows: 1 Data size: 195 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: dept_no is not null (type: boolean)
Statistics: Num rows: 1 Data size: 195 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 dept_no (type: string)
1 deptno (type: string)
outputColumnNames: _col0, _col1, _col2, _col6, _col7
Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col6 (type: string), _col7 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Local Work:
Map Reduce Local Work
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Stage-4 is a root stage
Stage-3 depends on stages: Stage-4
Stage-0 depends on stages: Stage-3
- 可以看出这个job分成了三个stage,Stage-4是第一个Stage,Stage-3依赖于Stage-4,Stage-0依赖于Stage-3
Stage: Stage-4
Map Reduce Local Work
Alias -> Map Local Tables:
d
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
d
TableScan
alias: d
Statistics: Num rows: 1 Data size: 49 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: deptno is not null (type: boolean)
Statistics: Num rows: 1 Data size: 49 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 dept_no (type: string)
1 deptno (type: string)
- 由于这是一个MapJoin,MapJoin适用于大小表Join,是通过把小表缓存起来,大表数据Map进来之后,通过拿到缓存中小表的数据来进行Join,只会发生Map而没有Reduce产生
- Stage-4是把小表加入到缓存的操作,通过计算把dept看成小表,于是把dept里面的数据加入到缓存里面
- Stage-3依赖于Stage-4,我们再来看看Stage-3的具体执行流程
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
alias: e
Statistics: Num rows: 1 Data size: 195 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: dept_no is not null (type: boolean)
Statistics: Num rows: 1 Data size: 195 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 dept_no (type: string)
1 deptno (type: string)
outputColumnNames: _col0, _col1, _col2, _col6, _col7
Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col6 (type: string), _col7 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 214 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Local Work:
Map Reduce Local Work
- Stage-3 是把大表(emp)加载出来,并通过拿取缓存中的小表(dept)来进行Join操作
- 第一步是TableScan,先把大表里面的数据扫描出来
- 然后进行Filter ,Filter 的条件是dept_no is not null ,其实我们的SQL里面并没有加任何的过滤条件,但是我们的JOIN的条件是dept_no,而且使用的是JOIN,默认是 INNER JOIN 通过优化器的时候,会自动加上dept_no is not null,因为dept_no如果是空则无法JOIN上的
- 后面进行Map Join,可以看出默认的JOIN就是Inner Join,并且写出了是哪两个字段进行Join的
- 后面确定了outputColumnNames(输出字段)
- 通过 Select Operator来确定有哪些字段输出,就是根据我们SQL里面SELECT 的字段进行输出的
- 通过File Output Operator把文件输出出去
- 最后写出了数据输入、输出、序列化的类
- 最后一步则是Stage-0
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
- 可以看出这个Stage有个Limit的操作,不过参数是-1,默认是禁用
- 如果想看详细的执行计划
2 Hive–数据倾斜解决方案
- 其实数据倾斜的解决方案都大体差不多,核心思路就是先把数据倾斜的KEY打散,聚合之后再把KEY改成之前的一样,再进行聚合,以此来规避数据倾斜,在大数据场景中,数据倾斜是无法避免的
- 大数据场景从来都不怕数据量大,怕的就是数据倾斜,
- 只有发生Suffle的时候才会产生数据倾斜
- 首先我们应该避免发生Shuffle,有些场景无法避免,则可以使用下面的解决思路来进行解决
2.1 JOIN数据倾斜
- Join数据倾斜思路
- 两个表JOIN出现数据倾斜,可以先查询出来出现数据倾斜的KEY是哪些
- 把没有出现数据倾斜的的数据正常清洗
- 出现数据倾斜的数据如果不需要,则可以不进行处理
- 如果需要处理,则把出现数据倾斜的数据KEY前面加一个N以内的随机数,并把JOIN的另一张表给扩大N倍,前面都加上N的随机数,最后进行JOIN,再把随机数给去掉
- 把两个结果集UNION起来,得到最终的结果
- 虽然最后结果的Stage增加了,过程复杂了,但是可以有效的规避数据倾斜
2.2 Group By 数据倾斜
- 这个有点类似于之前MapReduce代码的解决思路
- 大致的思路如下
- 通过先把所有的Key加一个随机数,然后进行聚合
- 再把Key上面的随机数给去掉在进行聚合
MapReduce–数据倾斜解决方案
MapReduce–数据倾斜解决方案Code
- 下面我给大家举个例子
- 如果我们想要查询每个cookie有多少数据,数据如下
hive> select * from bigdata.window1;
cookie1 2015-04-10 10:00:02 url2
cookie2 2015-04-10 10:50:01 url55
cookie1 2015-04-10 10:00:02 url2
cookie1 2015-04-10 10:00:00 url1
cookie1 2015-04-10 10:03:04 1url3
cookie1 2015-04-10 10:50:05 url6
cookie1 2015-04-10 11:00:00 url7
cookie1 2015-04-10 10:10:00 url4
cookie1 2015-04-10 10:50:01 url5
cookie2 2015-04-10 10:00:02 url22
cookie2 2015-04-10 10:00:00 url11
cookie2 2015-04-10 10:03:04 1url33
cookie2 2015-04-10 10:50:05 url66
cookie2 2015-04-10 11:00:00 url77
cookie2 2015-04-10 10:10:00 url44
cookie2 2015-04-10 10:50:01 url55
select
split(tmp.cookie,"-")[1] as cookie,
sum(tmp.cnt) as cnt
from(
select
concat(rand(10),"-",cookie) as cookie,
count(1) as cnt
from bigdata.window1
group by
concat(rand(10),"-",cookie)
) as tmp
group by split(tmp.cookie,"-")[1]