diff --git itests/qtest/testconfiguration.properties itests/qtest/testconfiguration.properties index d0df420..e05f3a5 100644 --- itests/qtest/testconfiguration.properties +++ itests/qtest/testconfiguration.properties @@ -1,5 +1,5 @@ minimr.query.files=stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q,empty_dir_in_table.q,temp_table_external.q minimr.query.negative.files=cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q -minitez.query.files=tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q,tez_bmj_schema_evolution.q +minitez.query.files=tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q,tez_bmj_schema_evolution.q,tez_smb_1.q minitez.query.files.shared=cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transform_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q,metadataonly1.q,temp_table.q,vectorized_ptf.q,optimize_nullscan.q beeline.positive.exclude=add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q diff --git ql/if/queryplan.thrift ql/if/queryplan.thrift index eafbe5a..dbf35e6 100644 --- ql/if/queryplan.thrift +++ ql/if/queryplan.thrift @@ -56,6 +56,7 @@ enum OperatorType { PTF, MUX, DEMUX, + MERGEJOIN, } struct Operator { diff --git ql/src/gen/thrift/gen-cpp/queryplan_types.cpp ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index 96dbb29..34e9007 100644 --- ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -51,7 +51,8 @@ int _kOperatorTypeValues[] = { OperatorType::HASHTABLEDUMMY, OperatorType::PTF, OperatorType::MUX, - OperatorType::DEMUX + OperatorType::DEMUX, + OperatorType::MERGEJOIN }; const char* _kOperatorTypeNames[] = { "JOIN", @@ -74,9 +75,10 @@ const char* _kOperatorTypeNames[] = { "HASHTABLEDUMMY", "PTF", "MUX", - "DEMUX" + "DEMUX", + "MERGEJOIN" }; -const std::map _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(22, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); int _kTaskTypeValues[] = { TaskType::MAP, diff --git ql/src/gen/thrift/gen-cpp/queryplan_types.h ql/src/gen/thrift/gen-cpp/queryplan_types.h index 634dd55..71b5c88 100644 --- ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -56,7 +56,8 @@ struct OperatorType { HASHTABLEDUMMY = 17, PTF = 18, MUX = 19, - DEMUX = 20 + DEMUX = 20, + MERGEJOIN = 21 }; }; diff --git ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java index aa094ee..b286e06 100644 --- ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java +++ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java @@ -32,7 +32,8 @@ HASHTABLEDUMMY(17), PTF(18), MUX(19), - DEMUX(20); + DEMUX(20), + MERGEJOIN(21); private final int value; @@ -95,6 +96,8 @@ public static OperatorType findByValue(int value) { return MUX; case 20: return DEMUX; + case 21: + return MERGEJOIN; default: return null; } diff --git ql/src/gen/thrift/gen-php/Types.php ql/src/gen/thrift/gen-php/Types.php index 5164b2c..f97ebf8 100644 --- ql/src/gen/thrift/gen-php/Types.php +++ ql/src/gen/thrift/gen-php/Types.php @@ -56,6 +56,7 @@ final class OperatorType { const PTF = 18; const MUX = 19; const DEMUX = 20; + const MERGEJOIN = 21; static public $__names = array( 0 => 'JOIN', 1 => 'MAPJOIN', @@ -78,6 +79,7 @@ final class OperatorType { 18 => 'PTF', 19 => 'MUX', 20 => 'DEMUX', + 21 => 'MERGEJOIN', ); } diff --git ql/src/gen/thrift/gen-py/queryplan/ttypes.py ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 2a3f745..6c8970c 100644 --- ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -66,6 +66,7 @@ class OperatorType: PTF = 18 MUX = 19 DEMUX = 20 + MERGEJOIN = 21 _VALUES_TO_NAMES = { 0: "JOIN", @@ -89,6 +90,7 @@ class OperatorType: 18: "PTF", 19: "MUX", 20: "DEMUX", + 21: "MERGEJOIN", } _NAMES_TO_VALUES = { @@ -113,6 +115,7 @@ class OperatorType: "PTF": 18, "MUX": 19, "DEMUX": 20, + "MERGEJOIN": 21, } class TaskType: diff --git ql/src/gen/thrift/gen-rb/queryplan_types.rb ql/src/gen/thrift/gen-rb/queryplan_types.rb index 35e1f0f..82ae7a5 100644 --- ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -42,8 +42,9 @@ module OperatorType PTF = 18 MUX = 19 DEMUX = 20 - VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX"} - VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX]).freeze + MERGEJOIN = 21 + VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "MERGEJOIN"} + VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, MERGEJOIN]).freeze end module TaskType diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 03194a4..a3f9e5b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -217,7 +217,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { joinFilters = new List[tagLen]; JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(),order,NOTSKIPBIGTABLE); - joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues, inputObjInspectors,NOTSKIPBIGTABLE, tagLen); joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters, @@ -225,6 +224,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { joinValuesStandardObjectInspectors = JoinUtil.getStandardObjectInspectors( joinValuesObjectInspectors,NOTSKIPBIGTABLE, tagLen); + LOG.info("Our join values standard object inspectors are : " + + joinValuesStandardObjectInspectors.length); filterMaps = conf.getFilterMap(); if (noOuterJoin) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java new file mode 100644 index 0000000..42d460a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; + +/* + * This join operator offers a generic implementation of the sort-merge join. + * The idea is that all the sides of the join are sorted on the join column. This allows us to + * have an efficient sort-merge join algorithm. This operator works in either map side or + * reduce side of the execution. + */ + +public class CommonMergeJoinOperator extends AbstractMapJoinOperator implements + Serializable { + private static final Log LOG = LogFactory.getLog(CommonMergeJoinOperator.class.getName()); + + JoinOperator joinOp; + private boolean isBigTableWork; + private Map aliasToInputNameMap; + Map> aliasToKeyMap = new HashMap>(); + transient List[] keyWritables; + transient List[] nextKeyWritables; + private transient RowContainer>[] nextGroupStorage; + private transient RowContainer>[] candidateStorage; + + private transient String[] tagToAlias; + private transient boolean[] fetchDone; + private transient boolean[] foundNextKeyGroup; + private transient boolean firstFetchHappened = false; + private transient boolean localWorkInited = false; + private transient boolean initDone = false; + private transient List otherKey = null; + private transient List values = null; + + public CommonMergeJoinOperator() { + this.joinOp = null; + } + + public CommonMergeJoinOperator(JoinOperator joinOp) { + super(); + this.joinOp = joinOp; + } + + @Override + public void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hive.ql.exec.Operator#processOp(java.lang.Object, + * int) this processor has a push-pull model. First call to this method is a + * push but the rest is pulled until we run out of records. + */ + @Override + public void processOp(Object row, int tag) throws HiveException { + + byte alias = (byte) tag; + + // compute keys and values as StandardObjects + List key = mergeJoinComputeKeys(row, alias); + + List value = getFilteredValue(alias, row); + + + //have we reached a new key group? + boolean nextKeyGroup = processKey(alias, key); + if (nextKeyGroup) { + //assert this.nextGroupStorage[alias].size() == 0; + this.nextGroupStorage[alias].addRow(value); + foundNextKeyGroup[tag] = true; + if (tag != posBigTable) { + return; + } + } + + reportProgress(); + numMapRowsRead++; + + // the big table has reached a new key group. try to let the small tables + // catch up with the big table. + if (nextKeyGroup) { + assert tag == posBigTable; + List smallestPos = null; + do { + smallestPos = joinOneGroup(); + //jump out the loop if we need input from the big table + } while (smallestPos != null && smallestPos.size() > 0 + && !smallestPos.contains(this.posBigTable)); + + return; + } + + assert !nextKeyGroup; + candidateStorage[tag].addRow(value); + + } + + private List joinOneGroup() throws HiveException { + int[] smallestPos = findSmallestKey(); + List listOfNeedFetchNext = null; + if (smallestPos != null) { + listOfNeedFetchNext = joinObject(smallestPos); + if (listOfNeedFetchNext.size() > 0) { + // listOfNeedFetchNext contains all tables that we have joined data in their + // candidateStorage, and we need to clear candidate storage and promote their + // nextGroupStorage to candidateStorage and fetch data until we reach a + // new group. + for (Byte b : listOfNeedFetchNext) { + try { + fetchNextGroup(b); + } catch (Exception e) { + throw new HiveException(e); + } + } + } + } + return listOfNeedFetchNext; + } + + private List joinObject(int[] smallestPos) throws HiveException { + List needFetchList = new ArrayList(); + byte index = (byte) (smallestPos.length - 1); + for (; index >= 0; index--) { + if (smallestPos[index] > 0 || keyWritables[index] == null) { + putDummyOrEmpty(index); + continue; + } + storage[index] = candidateStorage[index]; + needFetchList.add(index); + if (smallestPos[index] < 0) { + break; + } + } + for (index--; index >= 0; index--) { + putDummyOrEmpty(index); + } + checkAndGenObject(); + for (Byte pos : needFetchList) { + this.candidateStorage[pos].clearRows(); + this.keyWritables[pos] = null; + } + return needFetchList; + } + + private void putDummyOrEmpty(Byte i) { + // put a empty list or null + if (noOuterJoin) { + storage[i] = emptyList; + } else { + storage[i] = dummyObjVectors[i]; + } + } + + private int[] findSmallestKey() { + int[] result = new int[order.length]; + List smallestOne = null; + + for (byte pos = 0; pos < order.length; pos++) { + List key = keyWritables[pos]; + if (key == null) { + continue; + } + if (smallestOne == null) { + smallestOne = key; + result[pos] = -1; + continue; + } + result[pos] = compareKeys(key, smallestOne); + if (result[pos] < 0) { + smallestOne = key; + } + } + return smallestOne == null ? null : result; + } + + private void fetchNextGroup(Byte t) throws Exception { + if (foundNextKeyGroup[t]) { + // first promote the next group to be the current group if we reached a + // new group in the previous fetch + if (this.nextKeyWritables[t] != null) { + promoteNextGroupToCandidate(t); + } else { + this.keyWritables[t] = null; + this.candidateStorage[t] = null; + this.nextGroupStorage[t] = null; + } + foundNextKeyGroup[t] = false; + } + // for the big table, we only need to promote the next group to the current group. + if (t == posBigTable) { + return; + } + + // for tables other than the big table, we need to fetch more data until reach a new group or + // done. + while (!foundNextKeyGroup[t]) { + if (fetchDone[t]) { + break; + } + fetchOneRow(t); + } + if (!foundNextKeyGroup[t] && fetchDone[t]) { + this.nextKeyWritables[t] = null; + } + } + + private void fetchOneRow(byte tag) throws Exception { + Object rowOfSide = getParentOperators().get(tag).getNextRow(aliasToInputNameMap.get(tag)); + while (rowOfSide != null) { + values = getFilteredValue(tag, rowOfSide); + otherKey = mergeJoinComputeKeys(rowOfSide, tag); + // FIXME + // int comparedKey = compareKeys(key, otherKey); + int comparedKey = 0; + if (comparedKey < 0) { + this.storage[alias].clearRows(); + aliasToKeyMap.put(tag, otherKey); + this.storage[tag].addRow(values); + return; + } else if (comparedKey > 0) { + continue; + } + this.storage[tag].addRow(values); + rowOfSide = getParentOperators().get(tag).getNextRow(aliasToInputNameMap.get(tag)); + } + } + + private void promoteNextGroupToCandidate(Byte t) throws HiveException { + this.keyWritables[t] = this.nextKeyWritables[t]; + this.nextKeyWritables[t] = null; + RowContainer> oldRowContainer = this.candidateStorage[t]; + oldRowContainer.clearRows(); + this.candidateStorage[t] = this.nextGroupStorage[t]; + this.nextGroupStorage[t] = oldRowContainer; + } + + private boolean processKey(byte alias, List key) throws HiveException { + List keyWritable = keyWritables[alias]; + if (keyWritable == null) { + // the first group. + keyWritables[alias] = key; + return false; + } else { + int cmp = compareKeys(key, keyWritable); + if (cmp != 0) { + nextKeyWritables[alias] = key; + return true; + } + return false; + } + } + + private int compareKeys(List k1, List k2) { + int ret = 0; + + // join keys have difference sizes? + ret = k1.size() - k2.size(); + if (ret != 0) { + return ret; + } + + for (int i = 0; i < k1.size(); i++) { + WritableComparable key_1 = (WritableComparable) k1.get(i); + WritableComparable key_2 = (WritableComparable) k2.get(i); + if (key_1 == null && key_2 == null) { + return nullsafes != null && nullsafes[i] ? 0 : -1; // just return k1 is + // smaller than k2 + } else if (key_1 == null) { + return -1; + } else if (key_2 == null) { + return 1; + } + ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2); + if (ret != 0) { + return ret; + } + } + return ret; + } + + private List mergeJoinComputeKeys(Object row, Byte alias) throws HiveException { + return null; + } + + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "MERGEJOIN"; + } + + @Override + public OperatorType getType() { + return OperatorType.MERGEJOIN; + } + + @Override + public void initializeLocalWork(Configuration hconf) throws HiveException { + Operator parent = null; + for (Operator parentOp : parentOperators) { + if (parentOp != null) { + parent = parentOp; + break; + } + } + + if (parent == null) { + throw new HiveException("No valid parents."); + } + LOG.info("VIKRAM> fetch connect ops."); + List dummyOps = parent.getConnectOps(); + LOG.info("VIKRAM> fetched connect ops"); + for (Operator connectOp : dummyOps) { + parentOperators.add(connectOp); + connectOp.getChildOperators().add(this); + } + super.initializeLocalWork(hconf); + return; + } + + public void setJoinOp(JoinOperator joinOp) { + this.joinOp = joinOp; + } + + public boolean isBigTableWork() { + return isBigTableWork; + } + + public void setIsBigTableWork(boolean bigTableWork) { + this.isBigTableWork = bigTableWork; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index d5de58e..4baa1be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -33,8 +33,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor; +import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -180,7 +181,7 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, PartitionDesc pd = ctx.partDesc; TableDesc td = pd.getTableDesc(); - + MapOpCtx opCtx = new MapOpCtx(); // Use table properties in case of unpartitioned tables, // and the union of table properties and partition properties, with partition @@ -204,42 +205,42 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( partRawRowObjectInspector, opCtx.tblRawRowObjectInspector); - + // Next check if this table has partitions and if so // get the list of partition names as well as allocate // the serdes for the partition columns String pcols = overlayedProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - + if (pcols != null && pcols.length() > 0) { String[] partKeys = pcols.trim().split("/"); String pcolTypes = overlayedProps .getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); String[] partKeyTypes = pcolTypes.trim().split(":"); - + if (partKeys.length > partKeyTypes.length) { throw new HiveException("Internal error : partKeys length, " +partKeys.length + " greater than partKeyTypes length, " + partKeyTypes.length); } - + List partNames = new ArrayList(partKeys.length); Object[] partValues = new Object[partKeys.length]; List partObjectInspectors = new ArrayList(partKeys.length); - + for (int i = 0; i < partKeys.length; i++) { String key = partKeys[i]; partNames.add(key); ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector (TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i])); - + // Partitions do not exist for this table if (partSpec == null) { // for partitionless table, initialize partValue to null partValues[i] = null; } else { - partValues[i] = + partValues[i] = ObjectInspectorConverters. getConverter(PrimitiveObjectInspectorFactory. - javaStringObjectInspector, oi).convert(partSpec.get(key)); + javaStringObjectInspector, oi).convert(partSpec.get(key)); } partObjectInspectors.add(oi); } @@ -340,6 +341,53 @@ private boolean isPartitioned(PartitionDesc pd) { return pd.getPartSpec() != null && !pd.getPartSpec().isEmpty(); } + public void setChildrenSecondary(Configuration hconf) throws HiveException { + + List> children = + new ArrayList>(); + + Map convertedOI = getConvertedOI(hconf); + try { + for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { + String onefile = entry.getKey(); + List aliases = entry.getValue(); + + Path onepath = new Path(onefile); + + PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); + + for (String onealias : aliases) { + Operator op = conf.getAliasToWork().get(onealias); + MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); + if (opCtxMap.containsKey(inp)) { + continue; + } + MapOpCtx opCtx = initObjectInspector(hconf, inp, convertedOI); + opCtxMap.put(inp, opCtx); + + op.setParentOperators(new ArrayList>()); + op.getParentOperators().add(this); + // check for the operators who will process rows coming to this Map + // Operator + children.add(op); + childrenOpToOpCtxMap.put(op, opCtx); + current = opCtx; // just need for TestOperators.testMapOperator + } + } + + if (children.size() == 0) { + // didn't find match for input file path in configuration! + // serious problem .. + throw new HiveException("Unable to set up children"); + } + + // we found all the operators that we are supposed to process. + setChildOperators(children); + } catch (Exception e) { + throw new HiveException(e); + } + } + public void setChildren(Configuration hconf) throws HiveException { Path fpath = IOContext.get().getInputPath(); @@ -350,9 +398,10 @@ public void setChildren(Configuration hconf) throws HiveException { new ArrayList>(); Map convertedOI = getConvertedOI(hconf); - try { for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { + LOG.info("Path to aliases: " + entry.getKey() + " values: " + + Arrays.toString(entry.getValue().toArray())); String onefile = entry.getKey(); List aliases = entry.getValue(); @@ -366,7 +415,7 @@ public void setChildren(Configuration hconf) throws HiveException { for (String onealias : aliases) { Operator op = conf.getAliasToWork().get(onealias); if (LOG.isDebugEnabled()) { - LOG.debug("Adding alias " + onealias + " to work list for file " + LOG.info("Adding alias " + onealias + " to work list for file " + onefile); } MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); @@ -625,4 +674,15 @@ public OperatorType getType() { return null; } + @Override + public Object getNextRow(String inputName) throws Exception { + // This map operator definitely belongs to merge work. + Object nextRow = MapRecordProcessor.getNextRow(inputName); + return nextRow; + } + + @Override + public List getConnectOps() { + return MapRecordProcessor.getConnectOps(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index db94271..6ac3ec8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -146,6 +146,7 @@ public int getNumChild() { /** * Implements the getChildren function for the Node Interface. */ + @Override public ArrayList getChildren() { if (getChildOperators() == null) { @@ -851,6 +852,7 @@ public void logStats() { * * @return the name of the operator */ + @Override public String getName() { return getOperatorName(); } @@ -1061,7 +1063,7 @@ public boolean supportSkewJoinOptimization() { if (parents != null) { for (Operator parent : parents) { - parentClones.add((Operator)(parent.clone())); + parentClones.add((parent.clone())); } } @@ -1082,8 +1084,8 @@ public boolean supportSkewJoinOptimization() { public Operator cloneOp() throws CloneNotSupportedException { T descClone = (T) conf.clone(); Operator ret = - (Operator) OperatorFactory.getAndMakeChild( - descClone, getSchema()); + OperatorFactory.getAndMakeChild( + descClone, getSchema()); return ret; } @@ -1254,15 +1256,15 @@ public Statistics getStatistics() { } return null; } - + public OpTraits getOpTraits() { if (conf != null) { return conf.getOpTraits(); } - + return null; } - + public void setOpTraits(OpTraits metaInfo) { if (LOG.isDebugEnabled()) { LOG.debug("Setting traits ("+metaInfo+") on "+this); @@ -1299,7 +1301,24 @@ public static Operator createDummy() { private static class DummyOperator extends Operator { public DummyOperator() { super("dummy"); } + @Override public void processOp(Object row, int tag) { } + @Override public OperatorType getType() { return null; } } + + public Object getNextRow(String string) throws Exception { + // all operators except the MapOperator just fetch and process the row. + assert (getParentOperators().size() == 1); + Object row = getParentOperators().get(0).getNextRow(string); + return row; + } + + public List getConnectOps() { + if ((parentOperators == null) || (parentOperators.size() == 0)) { + return null; + } + List dummyOps = parentOperators.get(0).getConnectOps(); + return dummyOps; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index 5d41fa1..1d9bccc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -28,11 +28,12 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.CollectDesc; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.DemuxDesc; import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -114,6 +115,8 @@ public OpTuple(Class descClass, Class> opClass) { DemuxOperator.class)); opvec.add(new OpTuple(MuxDesc.class, MuxOperator.class)); + opvec.add(new OpTuple(CommonMergeJoinDesc.class, + CommonMergeJoinOperator.class)); } public static ArrayList vectorOpvec; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 4450ad3..8673a32 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -141,6 +141,7 @@ import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -197,6 +198,7 @@ public static String HADOOP_LOCAL_FS = "file:///"; public static String MAP_PLAN_NAME = "map.xml"; public static String REDUCE_PLAN_NAME = "reduce.xml"; + public static String MERGE_PLAN_NAME = "merge.xml"; public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; @@ -330,7 +332,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { } if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { - LOG.debug("Loading plan from string: "+path.toUri().getPath()); + LOG.debug("Loading plan from string: " + path.toUri().getPath()); String planString = conf.get(path.toUri().getPath()); if (planString == null) { LOG.info("Could not find plan string in conf"); @@ -343,7 +345,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { in = new FileInputStream(localPath.toUri().getPath()); } - if(MAP_PLAN_NAME.equals(name)){ + if (MAP_PLAN_NAME.equals(name)) { if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ gWork = deserializePlan(in, MapWork.class, conf); } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { @@ -363,6 +365,8 @@ private static BaseWork getBaseWork(Configuration conf, String name) { throw new RuntimeException("unable to determine work from configuration ." + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; } + } else if (MERGE_PLAN_NAME.equals(name)) { + gWork = deserializePlan(in, MergeJoinWork.class, conf); } gWorkMap.put(path, gWork); } else { @@ -586,14 +590,19 @@ public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScra } public static Path setMapWork(Configuration conf, MapWork w, Path hiveScratchDir, boolean useCache) { - return setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME, useCache); + List baseWorkList = new ArrayList(); + baseWorkList.add(w); + return setBaseWork(conf, baseWorkList, hiveScratchDir, MAP_PLAN_NAME, useCache); } public static Path setReduceWork(Configuration conf, ReduceWork w, Path hiveScratchDir, boolean useCache) { - return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache); + List baseWorkList = new ArrayList(); + baseWorkList.add(w); + return setBaseWork(conf, baseWorkList, hiveScratchDir, REDUCE_PLAN_NAME, useCache); } - private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) { + private static Path setBaseWork(Configuration conf, List workList, Path hiveScratchDir, + String name, boolean useCache) { try { setPlanPath(conf, hiveScratchDir); @@ -605,7 +614,9 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch // add it to the conf ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED)); - serializePlan(w, out, conf); + for (BaseWork w : workList) { + serializePlan(w, out, conf); + } LOG.info("Setting plan: "+planPath.toUri().getPath()); conf.set(planPath.toUri().getPath(), Base64.encodeBase64String(byteOut.toByteArray())); @@ -613,7 +624,9 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch // use the default file system of the conf FileSystem fs = planPath.getFileSystem(conf); out = fs.create(planPath); - serializePlan(w, out, conf); + for (BaseWork w : workList) { + serializePlan(w, out, conf); + } // Serialize the plan to the default hdfs instance // Except for hadoop local mode execution where we should be @@ -634,7 +647,9 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch } // Cache the plan in this process - gWorkMap.put(planPath, w); + if (workList.size() == 1) { + gWorkMap.put(planPath, workList.get(0)); + } return planPath; } catch (Exception e) { @@ -3424,7 +3439,7 @@ public static boolean createDirsWithPermission(Configuration conf, Path mkdir, return createDirsWithPermission(conf, mkdir, fsPermission, recursive); } - private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, + private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, String origUmask, FileSystem fs) throws IOException { if (unsetUmask) { if (origUmask != null) { @@ -3499,4 +3514,13 @@ public static String getQualifiedPath(HiveConf conf, Path path) throws HiveExcep public static boolean isDefaultNameNode(HiveConf conf) { return !conf.getChangedProperties().containsKey(HiveConf.ConfVars.HADOOPFS.varname); } + + public static Path setMergeWork(JobConf conf, List workList, Path mrScratchDir, + boolean b) { + return setBaseWork(conf, workList, mrScratchDir, MERGE_PLAN_NAME, b); + } + + public static MergeJoinWork getMergeWork(JobConf jconf) { + return (MergeJoinWork) getBaseWork(jconf, MERGE_PLAN_NAME); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 407d8ac..acb3fc7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -31,7 +31,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.BaseWork.VertexType; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.mapred.FileSplit; @@ -78,9 +80,10 @@ private List dataInformationEvents; private int numBuckets = -1; private Configuration conf = null; - private boolean rootVertexInitialized = false; private final SplitGrouper grouper = new SplitGrouper(); private int taskCount = 0; + private VertexType vertexType; + private String inputNameDecidingParallelism; public CustomPartitionVertex(VertexManagerPluginContext context) { super(context); @@ -112,17 +115,11 @@ public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) { public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { } - // One call per root Input - and for now only one is handled. + // One call per root Input. In case of SMB joins, this method will be called once for each input @Override public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List events) { - // Ideally, since there's only 1 Input expected at the moment - - // ensure this method is called only once. Tez will call it once per Root - // Input. - Preconditions.checkState(rootVertexInitialized == false); - LOG.info("Root vertex not initialized"); - rootVertexInitialized = true; try { // This is using the payload from the RootVertexInitializer corresponding // to InputName. Ideally it should be using it's own configuration class - @@ -166,9 +163,6 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr // No tasks should have been started yet. Checked by initial state // check. Preconditions.checkState(dataInformationEventSeen == false); - Preconditions - .checkState(context.getVertexNumTasks(context.getVertexName()) == -1, - "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism"); RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event; // The vertex cannot be configured until all DataEvents are seen - to @@ -250,11 +244,13 @@ private void processAllEvents(String inputName, // Construct the EdgeManager descriptor to be used by all edges which need // the routing table. - EdgeManagerDescriptor hiveEdgeManagerDesc = - new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); - byte[] payload = getBytePayload(bucketToTaskMap); - hiveEdgeManagerDesc.setUserPayload(payload); - + EdgeManagerDescriptor hiveEdgeManagerDesc = null; + if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES) + || (vertexType == VertexType.INITIALIZED_EDGES)) { + hiveEdgeManagerDesc = new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); + byte[] payload = getBytePayload(bucketToTaskMap); + hiveEdgeManagerDesc.setUserPayload(payload); + } Map emMap = Maps.newHashMap(); // Replace the edge manager for all vertices which have routing type custom. @@ -287,10 +283,12 @@ private void processAllEvents(String inputName, rootInputSpecUpdate.put( inputName, RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); - context.setVertexParallelism( - taskCount, - new VertexLocationHint(grouper.createTaskLocationHints(finalSplits - .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); + if (inputNameDecidingParallelism.compareTo(inputName) == 0) { + context.setVertexParallelism( + taskCount, + new VertexLocationHint(grouper.createTaskLocationHints(finalSplits + .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); + } // Set the actual events for the tasks. context.addRootInputEvents(inputName, taskEvents); @@ -318,7 +316,8 @@ private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event) thr if (!(inputSplit instanceof FileSplit)) { throw new UnsupportedOperationException( - "Cannot handle splits other than FileSplit for the moment"); + "Cannot handle splits other than FileSplit for the moment. Current input split type: " + + inputSplit.getClass().getSimpleName()); } return (FileSplit) inputSplit; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index d964eb1..c0f25f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -25,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -36,14 +39,17 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; @@ -57,12 +63,15 @@ public class MapRecordProcessor extends RecordProcessor { + private static KeyValueReader mergeQueue; private MapOperator mapOp; + private MapOperator mergeMapOp; public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); private final ExecMapperContext execContext = new ExecMapperContext(); private boolean abort = false; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; + private static List connectOps = new ArrayList(); @Override void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, @@ -90,11 +99,17 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr ObjectCache cache = ObjectCacheFactory.getCache(jconf); try { + MapWork mergeMapWork = null; execContext.setJc(jconf); // create map and fetch operators mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); if (mapWork == null) { mapWork = Utilities.getMapWork(jconf); + MergeJoinWork mergeJoinWork = Utilities.getMergeWork(jconf); + // FIXME currently hard-coded for single side table. Fixed in exec patch. + if (mergeJoinWork != null) { + mergeMapWork = (MapWork) mergeJoinWork.getBaseWorkList().get(0); + } cache.cache(MAP_PLAN_KEY, mapWork); l4j.info("Plan: "+mapWork); for (String s: mapWork.getAliases()) { @@ -109,16 +124,38 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr mapOp = new MapOperator(); } + if (mergeMapWork != null) { + if (mergeMapWork.getVectorMode()) { + mergeMapOp = new VectorMapOperator(); + } else { + mergeMapOp = new MapOperator(); + } + } + + // initialize the merge operators first. + if (mergeMapOp != null) { + mergeMapOp.setConf(mergeMapWork); + mergeMapOp.setChildrenSecondary(jconf); + DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp); + l4j.info("VIKRAM> connect ops added: " + dummyOp); + connectOps.add(dummyOp); + mergeMapOp.setExecContext(execContext); + mergeMapOp.initializeLocalWork(jconf); + } + // initialize map operator mapOp.setConf(mapWork); mapOp.setChildren(jconf); - l4j.info(mapOp.dump(0)); MapredContext.init(true, new JobConf(jconf)); ((TezContext)MapredContext.get()).setInputs(inputs); mapOp.setExecContext(execContext); mapOp.initializeLocalWork(jconf); + mapOp.initialize(jconf, null); + if (mergeMapOp != null) { + mergeMapOp.initialize(jconf, null); + } // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the @@ -148,8 +185,20 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } + private DummyStoreOperator getJoinParentOp(Operator mergeMapOp) { + for (Operator childOp : mergeMapOp.getChildOperators()) { + l4j.info("Child op we are walking: " + childOp); + if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) { + return (DummyStoreOperator) childOp; + } else { + return getJoinParentOp(childOp); + } + } + return null; + } + @Override - void run() throws IOException{ + void run() throws IOException, InterruptedException { MRInputLegacy in = TezProcessor.getMRInput(inputs); KeyValueReader reader = in.getReader(); @@ -239,4 +288,25 @@ void close(){ MapredContext.close(); } } + + public static Object getNextRow(String inputName) throws Exception { + // get the next row from the priority queue corresponding to this input name + if (mergeQueue == null) { + MultiMRInput multiMRInput = TezProcessor.getInput(inputName); + Collection kvReaders = multiMRInput.getKeyValueReaders(); + List kvReaderList = new ArrayList(kvReaders); + mergeQueue = new KeyValueInputMerger(kvReaderList); + } + + if (mergeQueue.next()) { + return mergeQueue.getCurrentValue(); + } else { + return null; + } + } + + public static List getConnectOps() { + l4j.info("VIKRAM> connect ops length: " + connectOps.size()); + return connectOps; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index e884afd..ba8923e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -38,7 +37,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; -import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValuesInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -59,7 +58,6 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.processor.MRTaskReporter; @@ -120,6 +118,10 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr ObjectCache cache = ObjectCacheFactory.getCache(jconf); + for (Entry entry : inputs.entrySet()) { + l4j.info("REDUCER name : " + entry.getKey() + " Logical input type: " + entry.getValue()); + } + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; @@ -140,7 +142,7 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr try { keyTableDesc = redWork.getKeyDesc(); - inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc .getDeserializerClass(), null); SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null); keyObjectInspector = inputKeyDeserializer.getObjectInspector(); @@ -152,7 +154,7 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr keyStructInspector = (StructObjectInspector)keyObjectInspector; batches = new VectorizedRowBatch[maxTags]; valueStructInspectors = new StructObjectInspector[maxTags]; - valueStringWriters = (List[])new List[maxTags]; + valueStringWriters = new List[maxTags]; keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); buffer = new DataOutputBuffer(); } @@ -279,7 +281,7 @@ void run() throws Exception { kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader(); }else { //get a sort merged input - kvsReader = new InputMerger(shuffleInputs); + kvsReader = new KeyValuesInputMerger(shuffleInputs); } } catch (Exception e) { throw new IOException(e); @@ -293,7 +295,6 @@ void run() throws Exception { break; } } - } /** @@ -306,7 +307,7 @@ void run() throws Exception { Map tag2input = redWork.getTagToInput(); ArrayList shuffleInputs = new ArrayList(); for(String inpStr : tag2input.values()){ - shuffleInputs.add((LogicalInput)inputs.get(inpStr)); + shuffleInputs.add(inputs.get(inpStr)); } return shuffleInputs; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index ea3770d..fdc1a52 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; import java.text.NumberFormat; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,6 +33,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; @@ -60,6 +62,7 @@ private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private TezProcessorContext processorContext; + private static Map multiMRInputMap; protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); protected static final NumberFormat jobIdFormat = NumberFormat.getInstance(); @@ -96,6 +99,7 @@ public void initialize() throws IOException { Configuration conf = TezUtils.createConfFromUserPayload(userPayload); this.jobConf = new JobConf(conf); setupMRLegacyConfigs(processorContext); + multiMRInputMap = new HashMap(); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); } @@ -139,10 +143,12 @@ public void run(Map inputs, Map out if (isMap) { rproc = new MapRecordProcessor(); MRInputLegacy mrInput = getMRInput(inputs); - try { - mrInput.init(); - } catch (IOException e) { - throw new RuntimeException("Failed while initializing MRInput", e); + if (mrInput != null) { + try { + mrInput.init(); + } catch (IOException e) { + throw new RuntimeException("Failed while initializing MRInput", e); + } } } else { rproc = new ReduceRecordProcessor(); @@ -208,23 +214,36 @@ void initialize() throws Exception { this.writer = (KeyValueWriter) output.getWriter(); } + @Override public void collect(Object key, Object value) throws IOException { writer.write(key, value); } } - static MRInputLegacy getMRInput(Map inputs) { + static MRInputLegacy getMRInput(Map inputs) throws InterruptedException, + IOException { //there should be only one MRInput MRInputLegacy theMRInput = null; - for(LogicalInput inp : inputs.values()){ - if(inp instanceof MRInputLegacy){ + for (Entry inp : inputs.entrySet()) { + if (inp.getValue() instanceof MRInputLegacy) { if(theMRInput != null){ throw new IllegalArgumentException("Only one MRInput is expected"); } //a better logic would be to find the alias - theMRInput = (MRInputLegacy)inp; + theMRInput = (MRInputLegacy) inp.getValue(); + } else if (inp.getValue() instanceof MultiMRInput) { + multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue()); + // Iterator mrInputKeyValueIterator = + // multiMRInput.getKeyValueReaders().iterator(); + // while (mrInputKeyValueIterator.hasNext()) { + // KeyValueReader kvReader = mrInputKeyValueIterator.next(); + // } } } return theMRInput; } + + public static MultiMRInput getInput(String inputName) { + return multiMRInputMap.get(inputName); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java new file mode 100644 index 0000000..30cdd51 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.library.api.KeyValueReader; + +/** + * A KeyValuesReader implementation that returns a sorted stream of key-values + * by doing a sorted merge of the key-value in LogicalInputs. + * Tags are in the last byte of the key, so no special handling for tags is required. + * Uses a priority queue to pick the KeyValuesReader of the input that is next in + * sort order. + */ +public class KeyValueInputMerger implements KeyValueReader { + + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private PriorityQueue pQueue = null; + private KeyValueReader nextKVReader = null; + + public KeyValueInputMerger(List multiMRInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue + int initialCapacity = multiMRInputs.size(); + pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator()); + for (KeyValueReader input : multiMRInputs) { + addToQueue(input); + } + } + + /** + * Add KeyValueReader to queue if it has more key-value + * + * @param kvReader + * @throws IOException + */ + private void addToQueue(KeyValueReader kvReader) throws IOException { + if (kvReader.next()) { + pQueue.add(kvReader); + } + } + + /** + * @return true if there are more key-values and advances to next key-values + * @throws IOException + */ + @Override + public boolean next() throws IOException { + //add the previous nextKVReader back to queue + if(nextKVReader != null){ + addToQueue(nextKVReader); + } + + //get the new nextKVReader with lowest key + nextKVReader = pQueue.poll(); + return nextKVReader != null; + } + + @Override + public Object getCurrentKey() throws IOException { + return nextKVReader.getCurrentKey(); + } + + @Override + public Object getCurrentValue() throws IOException { + return nextKVReader.getCurrentValue(); + } + + /** + * Comparator that compares KeyValuesReader on their current key + */ + class KVReaderComparator implements Comparator { + + @Override + public int compare(KeyValueReader kvReadr1, KeyValueReader kvReadr2) { + try { + BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentValue(); + BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentValue(); + return key1.compareTo(key2); + } catch (IOException e) { + l4j.error("Caught exception while reading shuffle input", e); + //die! + throw new RuntimeException(e); + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java new file mode 100644 index 0000000..f62bedb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.api.KeyValuesReader; + +/** + * A KeyValuesReader implementation that returns a sorted stream of key-values + * by doing a sorted merge of the key-value in LogicalInputs. + * Tags are in the last byte of the key, so no special handling for tags is required. + * Uses a priority queue to pick the KeyValuesReader of the input that is next in + * sort order. + */ +public class KeyValuesInputMerger implements KeyValuesReader { + + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private PriorityQueue pQueue = null; + private KeyValuesReader nextKVReader = null; + + public KeyValuesInputMerger(List shuffleInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue + int initialCapacity = shuffleInputs.size(); + pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator()); + for(Input input : shuffleInputs){ + addToQueue((KeyValuesReader)input.getReader()); + } + } + + /** + * Add KeyValuesReader to queue if it has more key-values + * @param kvsReadr + * @throws IOException + */ + private void addToQueue(KeyValuesReader kvsReadr) throws IOException{ + if(kvsReadr.next()){ + pQueue.add(kvsReadr); + } + } + + /** + * @return true if there are more key-values and advances to next key-values + * @throws IOException + */ + public boolean next() throws IOException { + //add the previous nextKVReader back to queue + if(nextKVReader != null){ + addToQueue(nextKVReader); + } + + //get the new nextKVReader with lowest key + nextKVReader = pQueue.poll(); + return nextKVReader != null; + } + + public Object getCurrentKey() throws IOException { + return nextKVReader.getCurrentKey(); + } + + public Iterable getCurrentValues() throws IOException { + return nextKVReader.getCurrentValues(); + } + + /** + * Comparator that compares KeyValuesReader on their current key + */ + class KVReaderComparator implements Comparator { + + @Override + public int compare(KeyValuesReader kvReadr1, KeyValuesReader kvReadr2) { + try { + BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentKey(); + BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentKey(); + return key1.compareTo(key2); + } catch (IOException e) { + l4j.error("Caught exception while reading shuffle input", e); + //die! + throw new RuntimeException(e); + } + } + } + + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java index d9139b8..ff405eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java @@ -40,7 +40,7 @@ public TezMergedLogicalInput(TezMergedInputContext context, List inputs) @Override public Reader getReader() throws Exception { - return new InputMerger(getInputs()); + return new KeyValuesInputMerger(getInputs()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 6d9739f..d9610bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hive.ql.plan; -import java.util.LinkedList; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,6 +36,13 @@ @SuppressWarnings({"serial", "deprecation"}) public abstract class BaseWork extends AbstractOperatorDesc { + public enum VertexType { + AUTO_INITIALIZED_EDGES, // no custom vertex or edge + INITIALIZED_EDGES, // custom vertex and custom edge but single MR Input + MULTI_INPUT_INITIALIZED_EDGES, // custom vertex, custom edge and multi MR Input + MULTI_INPUT_UNINITIALIZED_EDGES // custom vertex, no custom edge, multi MR Input + } + // dummyOps is a reference to all the HashTableDummy operators in the // plan. These have to be separately initialized when we setup a task. // Their function is mainly as root ops to give the mapjoin the correct @@ -95,7 +102,7 @@ public void addDummyOp(HashTableDummyOperator dummyOp) { // add all children opStack.addAll(opSet); - + while(!opStack.empty()) { Operator op = opStack.pop(); returnSet.add(op); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java new file mode 100644 index 0000000..6b9555b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.Operator; + +@Explain(displayName = "Common Merge Join Operator") +public class CommonMergeJoinDesc extends MapJoinDesc implements Serializable { + private static final long serialVersionUID = 1L; + private int numBuckets; + private boolean isSubQuery; + private int mapJoinConversionPos; + private JoinDesc bigTableJoinDesc; + private Map> aliasToSinkMap; + + CommonMergeJoinDesc() { + } + + public CommonMergeJoinDesc(int numBuckets, boolean isSubQuery, int mapJoinConversionPos, + MapJoinDesc joinDesc) { + super(joinDesc); + this.numBuckets = numBuckets; + this.isSubQuery = isSubQuery; + this.mapJoinConversionPos = mapJoinConversionPos; + } + + public boolean getCustomMerge() { + return isSubQuery; + } + + public int getNumBuckets() { + return numBuckets; + } + + public int getBigTablePosition() { + return mapJoinConversionPos; + } + + public Map getTagToAlias() { + // FIXME this is supposed to be populated in the planning phase. Has a + // parent index to input name mapping. + return null; + } + + public Map> getAliasToSinkMap() { + return aliasToSinkMap; + } + + public void setAliasToSinkMap(Map> aliasToSinkMap) { + this.aliasToSinkMap = aliasToSinkMap; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 1d96c5d..a2e40db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -26,9 +26,9 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; -import java.util.Set; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -120,6 +120,10 @@ private Map> scratchColumnMap = null; private boolean vectorMode = false; + private boolean doSplitsGrouping = true; + + private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES; + public MapWork() {} public MapWork(String name) { @@ -555,4 +559,45 @@ public void logPathToAliases() { } } } + + public void setDoSplitsGrouping(boolean doSplitsGrouping) { + this.doSplitsGrouping = doSplitsGrouping; + } + + public boolean getDoSplitsGrouping() { + return this.doSplitsGrouping; + } + + public void setVertexType(VertexType incomingVertexType) { + switch (this.vertexType) { + case INITIALIZED_EDGES: + if (incomingVertexType == VertexType.MULTI_INPUT_UNINITIALIZED_EDGES) { + vertexType = VertexType.MULTI_INPUT_INITIALIZED_EDGES; + } + break; + + case MULTI_INPUT_INITIALIZED_EDGES: + // nothing to do + break; + + case MULTI_INPUT_UNINITIALIZED_EDGES: + if (incomingVertexType == VertexType.INITIALIZED_EDGES) { + vertexType = VertexType.MULTI_INPUT_INITIALIZED_EDGES; + } + break; + + case AUTO_INITIALIZED_EDGES: + vertexType = incomingVertexType; + break; + + default: + break; + } + this.vertexType = vertexType; + } + + public VertexType getVertexType() { + // TODO Auto-generated method stub + return this.vertexType; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java new file mode 100644 index 0000000..d6630b0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.mapred.JobConf; + +public class MergeJoinWork extends BaseWork { + + private final List mergeJoinOpList = + new ArrayList(); + private final List mergeWorkList = new ArrayList(); + private int bigTableInputId; + private boolean isBigTableWork; + private BaseWork bigTableWork; + + public MergeJoinWork() { + super(); + } + + public MergeJoinWork(BaseWork work) { + super(work.getName()); + } + + @Explain(displayName = "Vertex") + @Override + public String getName() { + return super.getName(); + } + + @Override + public void replaceRoots(Map, Operator> replacementMap) { + } + + @Override + @Explain(displayName = "Merge Join Operator Tree") + public Set> getAllRootOperators() { + Set> opSet = new LinkedHashSet>(); + + BaseWork baseWork = getMainWork(); + opSet.addAll(baseWork.getAllRootOperators()); + for (BaseWork work : getBaseWorkList()) { + opSet.addAll(work.getAllRootOperators()); + } + + return opSet; + } + + @Override + public void configureJobConf(JobConf job) { + } + + public List getMergeJoinOperator() { + return this.mergeJoinOpList; + } + + private void addMergeJoinOperator(CommonMergeJoinOperator mergeJoinOp) { + this.mergeJoinOpList.add(mergeJoinOp); + this.bigTableInputId = mergeJoinOp.getConf().getBigTablePosition(); + } + + public void + addMergedWork(BaseWork work, BaseWork connectWork, CommonMergeJoinOperator mergeJoinOp) { + addMergeJoinOperator(mergeJoinOp); + if (work != null) { + if ((bigTableWork != null) && (bigTableWork != work)) { + assert false; + } + this.bigTableWork = work; + setName(work.getName()); + if (work instanceof MapWork) { + MapWork mapWork = (MapWork) work; + mapWork.setVertexType(VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); + mapWork.setDoSplitsGrouping(false); + } + } + + if (connectWork != null) { + this.mergeWorkList.add(connectWork); + } + } + + public List getBaseWorkList() { + return mergeWorkList; + } + + public String getBigTableAlias() { + return ((MapWork) bigTableWork).getAliasToWork().keySet().iterator().next(); + } + + public BaseWork getMainWork() { + return bigTableWork; + } +} diff --git ql/src/test/queries/clientpositive/tez_smb_1.q ql/src/test/queries/clientpositive/tez_smb_1.q new file mode 100644 index 0000000..e11d89f --- /dev/null +++ ql/src/test/queries/clientpositive/tez_smb_1.q @@ -0,0 +1,36 @@ +explain +select a.key, a.value from src a join src b where a.key = b.key; + +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask=true; +set hive.auto.convert.join.noconditionaltask.size=10000; +set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ; + +CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); + +set hive.enforce.bucketing=true; +set hive.enforce.sorting = true; +set hive.optimize.bucketingsorting=false; +insert overwrite table tab_part partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin_part; + +CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +insert overwrite table tab partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin; + +set hive.convert.join.bucket.mapjoin.tez = false; +set hive.auto.convert.sortmerge.join = true; + +explain +select a.key, a.value, b.value +from tab a join tab_part b on a.key = b.key; diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java index c96fc2d..b689afb 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java @@ -51,7 +51,7 @@ protected MyField() { super(); } - + public MyField(int fieldID, String fieldName, ObjectInspector fieldObjectInspector) { this.fieldID = fieldID; @@ -65,18 +65,22 @@ public MyField(int fieldID, String fieldName, this.fieldComment = fieldComment; } + @Override public int getFieldID() { return fieldID; } + @Override public String getFieldName() { return fieldName; } + @Override public ObjectInspector getFieldObjectInspector() { return fieldObjectInspector; } + @Override public String getFieldComment() { return fieldComment; } @@ -133,10 +137,12 @@ protected void init(List fields) { } } + @Override public String getTypeName() { return ObjectInspectorUtils.getStandardStructTypeName(this); } + @Override public final Category getCategory() { return Category.STRUCT; }