:本地
?readrecords via standard table scan (including filters and projections) from sourceon local machine --------扫描表
?buildhashtable in memory -------在内存中建立hash表
?writehashtable to local disk --------hash表写进本地磁盘
?uploadhashtable to dfs -----------上传hash表到hdfs
?add hashtable to distributed cache --------把hash表加进分布式缓存
2) Maptask:Map任务
?readhashtable from local disk (distributed cache) into memory ------从本地磁盘(分布式缓存)把hash表读进内存
?matchrecords' keys against hashtable --------与hash表匹配key
?combine matches and write to output --------合并匹配,并写出output
3) Noreduce task:MapJoin特点,没有reduce
?
Limitationsof Prior Implementation
MAPJOIN在Hive 0.11之前有如下的一些限制:
1) 一个mapjoin只能一次处理一个key,它可以执行多表连接,但只有当所有的表都加入了相同的key。(典型的星型连接不属于这一类)
2) 就算是加了hint也未必真的使用mapjoin。
3) 一连串的mapjoins不会合并成一个单一的map job,除非查询写成一个级联的mapjoin(mapjoin(table,subquery(mapjoin(table, subquery....).自动转换后的也不会变成一个单一的map job。
4) mapjoin中用到的哈希表,每个子QUERY运行都会生成,先下载,再分发给map。
?
Enhancementsfor Star Joins
调优主要从三方面入手的:
1) 使用MapJoinHint时,把一连串的MapJoin操作变成一个map-only的job。
2) 把优化方案尽可能的变成自动优化(顺便备份下执行计划)。
3) 使得hashtable在taskside(map端)直接生成,现在的方案是先在本地生成,然后传到HDFS,再分布式缓存去分给每个map,未来版本会实现。
下面部分将描述每个优化加强方面:
OptimizeChains of Map Joins
下面的SQL会被分解为2个独立的map-only jobs执行:
select/*+ MAPJOIN(time_dim, date_dim) */ count(*) from
store_sales
jointime_dim on (ss_sold_time_sk = t_time_sk)
joindate_dim on (ss_sold_date_sk = d_date_sk)
where t_hour = 8 andd_year = 2002;
将小表读进内存,如果fact只读了一次,而不是2次,那么会极大的减少执行时间。
?
Current and Future Optimizations 当前和未来调优的方向
1) MergeM*-MR patterns into a single MR. ----把多个map-only的job+MRjob的模式变成单个MR
2) MergeMJ->MJ into a single MJ when possible. -----尽可能的把mapjoin嵌套模式变成一个mapjoin
3) Merge MJ* patterns intoa single Map stage as a chain of MJ operators. (Not yet implemented.) ------------把多个mapjoin串起来,变成一连串的mapjoin(上面的例子是分成两个独立的map-only的job,而不是一连串的,功能暂未实现)
如果hive.auto.convert.join为true的话,不仅仅会将join转化为mapjoin,还有可能转化成MJ*这种模式。
?
OptimizeAuto Join Conversion
当auto join打开时,就不再需要使用hint了,参数有两个:
sethive.auto.convert.join.noconditionaltask = true;
Hive0.11.0开始默认为true
sethive.auto.convert.join.noconditionaltask.size = 10000000;
小于这个size的表被放入内存,这个size大小指的是被放进内存的hash表的大小总和,当前版本,n-1个表都可以被放进内存,最大的那个表放在磁盘上match。在这里不会去检查表是否被压缩,直接从HDFS中得到的file大小。
之前的例子就可以变成:
selectcount(*) from
store_sales
jointime_dim on (ss_sold_time_sk = t_time_sk)
joindate_dim on (ss_sold_date_sk = d_date_sk)
where t_hour = 8 andd_year = 2002;
?
如果这2个维表的大小符合config的size,就会转换成map-join(2个).。这里的size 我认为应该是指hive.smalltable.filesize 这个值 默认25m。
如果维表的总和小于noconditionaltask.size 会把2个map-join 合并成一个。这样做减少了MR job的数量,并显著提高了query的速度。.这个例子可以很容易地扩展为muti-way join 以及将按预期工作。
外连接不能用map-join。因为map-join 只能有一个steam表,steam表的所有column都应该是全的,外连接可能出现null。
这意味着外连接不能用,只有内连接才能map-join。外连接只能用stream table的形式来调优了。笛卡尔积就更别说了,无法使用map-jon。
自动开关也可以作用在sort-merge-bucketjoins
?
CurrentOptimization 当前的优化方案
把多个MJ合并成一个MJ。
?
AutoConversion to SMB(Sort-Merge-Bucket) Map Join
基于桶的join,可以转换成为基于桶的map join。
前提是表按照桶划分的。排过序的表会比没有排序的表做map join更快。如果表又是分区表,又是bucket表,可能会慢一点,因为每个mapper需要去获取一个单键分区中的一小块(eachmapper would need to get a very small chunk of a partition which has a singlekey)。
下面的配置参数使一个SMB转为map-joinSMB:
sethive.auto.convert.sortmerge.join=true;
sethive.optimize.bucketmapjoin = true;
sethiv