diff --git itests/qtest/pom.xml itests/qtest/pom.xml
index 1a19610..50c5da6 100644
--- itests/qtest/pom.xml
+++ itests/qtest/pom.xml
@@ -38,7 +38,7 @@
false
stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q
cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q
- tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q
+ tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q
cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transform_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q
add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 03a64e8..282e52b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -290,7 +290,7 @@ public void processOp(Object row, int tag) throws HiveException {
populateCachedDistributionKeys(row, 0);
// replace bucketing columns with hashcode % numBuckets
- int buckNum = 0;
+ int buckNum = -1;
if (bucketEval != null) {
buckNum = computeBucketNumber(row, conf.getNumBuckets());
cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
@@ -304,14 +304,12 @@ public void processOp(Object row, int tag) throws HiveException {
}
final int hashCode;
-
- if(autoParallel && partitionEval.length > 0) {
- // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0]
- hashCode = hash.hash(firstKey.getBytes(), distKeyLength, 0);
- } else if(bucketEval != null && bucketEval.length > 0) {
- hashCode = computeHashCode(row, buckNum);
+
+ // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0]
+ if (autoParallel && partitionEval.length > 0) {
+ hashCode = computeMurmurHash(firstKey);
} else {
- hashCode = computeHashCode(row);
+ hashCode = computeHashCode(row, buckNum);
}
firstKey.setHashCode(hashCode);
@@ -384,7 +382,11 @@ private void populateCachedDistinctKeys(Object row, int index) throws HiveExcept
union.setTag((byte) index);
}
- private int computeHashCode(Object row) throws HiveException {
+ protected final int computeMurmurHash(HiveKey firstKey) {
+ return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0);
+ }
+
+ private int computeHashCode(Object row, int buckNum) throws HiveException {
// Evaluate the HashCode
int keyHashCode = 0;
if (partitionEval.length == 0) {
@@ -403,13 +405,7 @@ private int computeHashCode(Object row) throws HiveException {
+ ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
}
}
- return keyHashCode;
- }
-
- private int computeHashCode(Object row, int buckNum) throws HiveException {
- int keyHashCode = computeHashCode(row);
- keyHashCode = keyHashCode * 31 + buckNum;
- return keyHashCode;
+ return buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
}
// Serialize the keys and append the tag
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
index 11024da..2ef0fc9 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
@@ -210,6 +210,9 @@ public void assign(VectorExpressionWriter[] writers,
if (limit >= 0 && memUsage > 0) {
reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
}
+
+ autoParallel = conf.isAutoParallel();
+
} catch(Exception e) {
throw new HiveException(e);
}
@@ -265,8 +268,8 @@ public void processOp(Object row, int tag) throws HiveException {
populatedCachedDistributionKeys(vrg, rowIndex, 0);
// replace bucketing columns with hashcode % numBuckets
- int buckNum = 0;
- if (bucketEval != null && bucketEval.length != 0) {
+ int buckNum = -1;
+ if (bucketEval != null) {
buckNum = computeBucketNumber(vrg, rowIndex, conf.getNumBuckets());
cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
}
@@ -280,12 +283,11 @@ public void processOp(Object row, int tag) throws HiveException {
final int hashCode;
- if(autoParallel && partitionEval.length > 0) {
- hashCode = hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0);
- } else if(bucketEval != null && bucketEval.length > 0) {
- hashCode = computeHashCode(vrg, rowIndex, buckNum);
+ // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0]
+ if (autoParallel && partitionEval.length > 0) {
+ hashCode = computeMurmurHash(firstKey);
} else {
- hashCode = computeHashCode(vrg, rowIndex);
+ hashCode = computeHashCode(vrg, rowIndex, buckNum);
}
firstKey.setHashCode(hashCode);
@@ -417,7 +419,7 @@ private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector);
}
- private int computeHashCode(VectorizedRowBatch vrg, int rowIndex) throws HiveException {
+ private int computeHashCode(VectorizedRowBatch vrg, int rowIndex, int buckNum) throws HiveException {
// Evaluate the HashCode
int keyHashCode = 0;
if (partitionEval.length == 0) {
@@ -440,13 +442,7 @@ private int computeHashCode(VectorizedRowBatch vrg, int rowIndex) throws HiveExc
partitionWriters[p].getObjectInspector());
}
}
- return keyHashCode;
- }
-
- private int computeHashCode(VectorizedRowBatch vrg, int rowIndex, int buckNum) throws HiveException {
- int keyHashCode = computeHashCode(vrg, rowIndex);
- keyHashCode = keyHashCode * 31 + buckNum;
- return keyHashCode;
+ return buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
}
private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int numBuckets) throws HiveException {
diff --git ql/src/test/queries/clientpositive/tez_join_hash.q ql/src/test/queries/clientpositive/tez_join_hash.q
new file mode 100644
index 0000000..8b15268
--- /dev/null
+++ ql/src/test/queries/clientpositive/tez_join_hash.q
@@ -0,0 +1,12 @@
+create table orc_src (key string, value string) STORED AS ORC;
+insert into table orc_src select * from src;
+
+set hive.execution.engine=tez;
+set hive.vectorized.execution.enabled=true;
+set hive.auto.convert.join.noconditionaltask.size=1;
+set hive.exec.reducers.bytes.per.reducer=20000;
+
+explain
+SELECT count(*) FROM src, orc_src where src.key=orc_src.key;
+
+SELECT count(*) FROM src, orc_src where src.key=orc_src.key;
diff --git ql/src/test/results/clientpositive/tez/tez_join_hash.q.out ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
new file mode 100644
index 0000000..b9b87a5
--- /dev/null
+++ ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
@@ -0,0 +1,116 @@
+PREHOOK: query: create table orc_src (key string, value string) STORED AS ORC
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: create table orc_src (key string, value string) STORED AS ORC
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orc_src
+PREHOOK: query: insert into table orc_src select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_src
+POSTHOOK: query: insert into table orc_src select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_src
+POSTHOOK: Lineage: orc_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: orc_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: explain
+SELECT count(*) FROM src, orc_src where src.key=orc_src.key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+SELECT count(*) FROM src, orc_src where src.key=orc_src.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: orc_src
+ Statistics: Num rows: 500 Data size: 88000 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: key (type: string)
+ sort order: +
+ Map-reduce partition columns: key (type: string)
+ Statistics: Num rows: 500 Data size: 88000 Basic stats: COMPLETE Column stats: NONE
+ Execution mode: vectorized
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: key (type: string)
+ sort order: +
+ Map-reduce partition columns: key (type: string)
+ Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {KEY.reducesinkkey0}
+ 1 {KEY.reducesinkkey0}
+ outputColumnNames: _col0, _col4
+ Statistics: Num rows: 550 Data size: 96800 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (_col0 = _col4) (type: boolean)
+ Statistics: Num rows: 275 Data size: 48400 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ Statistics: Num rows: 275 Data size: 48400 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: bigint)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT count(*) FROM src, orc_src where src.key=orc_src.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_src
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT count(*) FROM src, orc_src where src.key=orc_src.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_src
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+1028