diff --git itests/qtest/pom.xml itests/qtest/pom.xml index cfd2e5e..fd3c0d9 100644 --- itests/qtest/pom.xml +++ itests/qtest/pom.xml @@ -39,7 +39,7 @@ stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q - join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q + join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,union9.q add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 3972a9c..0ead289 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -776,6 +776,14 @@ public Path read(Kryo kryo, Input input, Class type) { } } + public static Set> cloneOperatorTree(Configuration conf, Set> roots) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); + serializePlan(roots, baos, conf, true); + Set> result = deserializePlan(new ByteArrayInputStream(baos.toByteArray()), + roots.getClass(), conf, true); + return result; + } + private static void serializePlan(Object plan, OutputStream out, Configuration conf, boolean cloningPlan) { PerfLogger perfLogger = PerfLogger.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 77c0c46..c74b4f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -82,10 +82,12 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; @@ -94,6 +96,7 @@ import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; +import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput; import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput; import org.apache.tez.runtime.library.output.OnFileSortedOutput; @@ -185,9 +188,29 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { } /** + * Given a Vertex group and a vertex createEdge will create an + * Edge between them. + * + * @param group The parent VertexGroup + * @param wConf The job conf of the child vertex + * @param w The child vertex + * @param edgeType the type of connection between the two + * endpoints. + */ + public GroupInputEdge createEdge(VertexGroup group, JobConf wConf, + Vertex w, EdgeType edgeType) + throws IOException { + + LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getVertexName()); + w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); + + return new GroupInputEdge(group, w, createEdgeProperty(edgeType), + new InputDescriptor(ConcatenatedMergedKeyValuesInput.class.getName())); + } + + /** * Given two vertices and their respective configuration objects createEdge - * will create an Edge object that connects the two. Currently the edge will - * always be a stable bi-partite edge. + * will create an Edge object that connects the two. * * @param vConf JobConf of the first vertex * @param v The first vertex (source) @@ -206,6 +229,13 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf)); w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); + return new Edge(v, w, createEdgeProperty(edgeType)); + } + + /* + * Helper function to create an edge property from an edge type. + */ + private EdgeProperty createEdgeProperty(EdgeType edgeType) { DataMovementType dataMovementType; Class logicalInputClass; Class logicalOutputClass; @@ -231,7 +261,8 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, SchedulingType.SEQUENTIAL, new OutputDescriptor(logicalOutputClass.getName()), new InputDescriptor(logicalInputClass.getName())); - return new Edge(v, w, edgeProperty); + + return edgeProperty; } /* diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 7c2c2a6..d89f2c7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -55,7 +55,6 @@ import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.library.api.KeyValuesReader; -import org.apache.tez.runtime.library.input.ShuffledMergedInput; /** * Process input from tez LogicalInput and write output - for a map plan @@ -184,15 +183,19 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in @Override void run() throws IOException{ - List shuffleInputs = getShuffleInputs(inputs); + List shuffleInputs = getShuffleInputs(inputs); KeyValuesReader kvsReader; - if(shuffleInputs.size() == 1){ - //no merging of inputs required - kvsReader = shuffleInputs.get(0).getReader(); - }else { - //get a sort merged input - kvsReader = new InputMerger(shuffleInputs); + try { + if(shuffleInputs.size() == 1){ + //no merging of inputs required + kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader(); + }else { + //get a sort merged input + kvsReader = new InputMerger(shuffleInputs); + } + } catch (Exception e) { + throw new IOException(e); } while(kvsReader.next()){ @@ -211,12 +214,12 @@ void run() throws IOException{ * @param inputs * @return */ - private List getShuffleInputs(Map inputs) { + private List getShuffleInputs(Map inputs) { //the reduce plan inputs have tags, add all inputs that have tags Map tag2input = redWork.getTagToInput(); - ArrayList shuffleInputs = new ArrayList(); + ArrayList shuffleInputs = new ArrayList(); for(String inpStr : tag2input.values()){ - shuffleInputs.add((ShuffledMergedInput)inputs.get(inpStr)); + shuffleInputs.add((LogicalInput)inputs.get(inpStr)); } return shuffleInputs; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 792e9a4..9b3fa58 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; @@ -52,9 +54,11 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.StatusGetOpts; @@ -206,24 +210,52 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, // translate work to vertex perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); - JobConf wxConf = utils.initializeVertexConf(conf, w); - Vertex wx = utils.createVertex(wxConf, w, tezDir, - appJarLr, additionalLr, fs, ctx, !isFinal); - dag.addVertex(wx); - utils.addCredentials(w, dag); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); - workToVertex.put(w, wx); - workToConf.put(w, wxConf); - - // add all dependencies (i.e.: edges) to the graph - for (BaseWork v: work.getChildren(w)) { - assert workToVertex.containsKey(v); - Edge e = null; - - EdgeType edgeType = work.getEdgeProperty(w, v); - - e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType); - dag.addEdge(e); + + if (w instanceof UnionWork) { + // Special case for unions. These items translate to VertexGroups + + List unionVertices = new LinkedList(); + List children = new LinkedList(); + + for (BaseWork v: work.getChildren(w)) { + EdgeType type = work.getEdgeProperty(w, v); + if (type == EdgeType.CONTAINS) { + unionVertices.add(workToVertex.get(v)); + } else { + children.add(v); + } + } + + Vertex[] vertexArray = new Vertex[unionVertices.size()]; + vertexArray = unionVertices.toArray(vertexArray); + VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray); + + for (BaseWork v: children) { + GroupInputEdge e = utils.createEdge(group, workToConf.get(v), + workToVertex.get(v), work.getEdgeProperty(w, v)); + dag.addEdge(e); + } + } else { + // Regular vertices + JobConf wxConf = utils.initializeVertexConf(conf, w); + Vertex wx = utils.createVertex(wxConf, w, tezDir, appJarLr, + additionalLr, fs, ctx, !isFinal); + dag.addVertex(wx); + utils.addCredentials(w, dag); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); + workToVertex.put(w, wx); + workToConf.put(w, wxConf); + + // add all dependencies (i.e.: edges) to the graph + for (BaseWork v: work.getChildren(w)) { + assert workToVertex.containsKey(v); + Edge e = null; + + EdgeType edgeType = work.getEdgeProperty(w, v); + + e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType); + dag.addEdge(e); + } } } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java index e5746c4..843a530 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java @@ -26,12 +26,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.library.api.KeyValuesReader; -import org.apache.tez.runtime.library.input.ShuffledMergedInput; /** * A KeyValuesReader implementation that returns a sorted stream of key-values - * by doing a sorted merge of the key-value in ShuffledMergedInputs. + * by doing a sorted merge of the key-value in LogicalInputs. * Tags are in the last byte of the key, so no special handling for tags is required. * Uses a priority queue to pick the KeyValuesReader of the input that is next in * sort order. @@ -42,12 +42,12 @@ private PriorityQueue pQueue = null; private KeyValuesReader nextKVReader = null; - public InputMerger(List shuffleInputs) throws IOException { - //get KeyValuesReaders from the ShuffledMergedInput and add them to priority queue + public InputMerger(List shuffleInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue int initialCapacity = shuffleInputs.size(); pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator()); - for(ShuffledMergedInput input : shuffleInputs){ - addToQueue(input.getReader()); + for(LogicalInput input : shuffleInputs){ + addToQueue((KeyValuesReader)input.getReader()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index f4b6016..af95aa5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -105,6 +107,10 @@ // used to group dependent tasks for multi table inserts public final DependencyCollectionTask dependencyTask; + // used to hook up unions + public final Map, BaseWork> unionWorkMap; + public final List currentUnionOperators; + @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, List> moveTask, List> rootTasks, @@ -126,6 +132,8 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext, this.linkChildOpWithDummyOp = new HashMap, List>>(); this.dependencyTask = (DependencyCollectionTask) TaskFactory.get(new DependencyCollectionWork(), conf); + this.unionWorkMap = new HashMap, BaseWork>(); + this.currentUnionOperators = new LinkedList(); rootTasks.add(currentTask); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 8363bbf..daa370d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -19,15 +19,26 @@ package org.apache.hadoop.hive.ql.parse; import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -37,6 +48,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; /** @@ -99,6 +111,42 @@ public Object process(Node nd, Stack stack, context.rootToWorkMap.put(root, work); } + if (!context.currentUnionOperators.isEmpty()) { + // if there are union all operators we need to add the work to the set + // of union operators. + + UnionWork unionWork; + if (context.unionWorkMap.containsKey(operator)) { + // we've seen this terminal before and have created a union work object. + // just need to add this work to it. There will be no children of this one + // since we've passed this operator before. + assert operator.getChildOperators().isEmpty(); + unionWork = (UnionWork) context.unionWorkMap.get(operator); + + // clear the entry in the map that subsequent tasks will use to match operators with + // the next step in the graph. That step has to be the union now, which we can do + // right away. + context.leafOperatorToFollowingWork.remove(operator); + + } else { + // first time through. we need to create a union work object and add this + // work to it. Subsequent work should reference the union and not the actual + // work. + unionWork = new UnionWork("Union "+ (++sequenceNumber)); + context.unionWorkMap.put(operator, unionWork); + tezWork.add(unionWork); + } + + // we don't need the union operator in this plan anymore + removeUnionOperators(context.conf, work); + + // finally hook everything up + tezWork.connect(unionWork, work, EdgeType.CONTAINS); + unionWork.addUnionOperators(context.currentUnionOperators); + context.currentUnionOperators.clear(); + work = unionWork; + } + // We're scanning a tree from roots to leaf (this is not technically // correct, demux and mux operators might form a diamond shape, but // we will only scan one path and ignore the others, because the @@ -256,4 +304,63 @@ protected void setupMapWork(MapWork mapWork, GenTezProcContext context, GenMapRedUtils.setMapWork(mapWork, context.parseContext, context.inputs, null, root, alias, context.conf, false); } + + protected void removeUnionOperators(Configuration conf, BaseWork work) + throws SemanticException { + + Set> roots = work.getAllRootOperators(); + + // need to clone the plan. + Set> newRoots = Utilities.cloneOperatorTree(conf, roots); + + Map, Operator> replacementMap = new HashMap, Operator>(); + + Iterator> it = newRoots.iterator(); + for (Operator orig: roots) { + replacementMap.put(orig,it.next()); + } + + // now we remove all the unions. we throw away any branch that's not reachable from + // the current set of roots. The reason is that those branches will be handled in + // different tasks. + Deque> operators = new LinkedList>(); + operators.addAll(newRoots); + + Set> seen = new HashSet>(); + + while(!operators.isEmpty()) { + Operator current = operators.pop(); + seen.add(current); + + if (current instanceof UnionOperator) { + Operator parent = null; + int count = 0; + + for (Operator op: current.getParentOperators()) { + if (seen.contains(op)) { + ++count; + parent = op; + } + } + + // we should have been able to reach the union from only one side. + assert count <= 1; + + if (parent == null) { + // root operator is union (can happen in reducers) + replacementMap.put(current, current.getChildOperators().get(0)); + } else { + parent.removeChildAndAdoptItsChildren(current); + } + } + + if (current instanceof FileSinkOperator + || current instanceof ReduceSinkOperator) { + current.setChildOperators(null); + } else { + operators.addAll(current.getChildOperators()); + } + } + work.replaceRoots(replacementMap); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index dff743f..4a6fdf7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -131,14 +131,18 @@ protected void generateTaskTree(List> rootTasks, Pa FileSinkOperator.getOperatorName() + "%"), new CompositeProcessor(new FileSinkProcessor(), genTezWork)); - opRules.put(new RuleRegExp("Bail on Union", + opRules.put(new RuleRegExp("Handle union", UnionOperator.getOperatorName() + "%"), new NodeProcessor() { @Override public Object process(Node n, Stack s, NodeProcessorCtx procCtx, Object... os) throws SemanticException { - throw new SemanticException("Unions not yet supported on Tez." - +" Please use MR for this query"); + GenTezProcContext context = (GenTezProcContext) procCtx; + UnionOperator union = (UnionOperator) n; + + // simply need to remember that we've seen a union. + context.currentUnionOperators.add(union); + return null; } }); @@ -155,14 +159,15 @@ public Object process(Node n, Stack s, protected void setInputFormat(Task task) { if (task instanceof TezTask) { TezWork work = ((TezTask)task).getWork(); - Set roots = work.getRoots(); - for (BaseWork w: roots) { - assert w instanceof MapWork; - MapWork mapWork = (MapWork)w; - HashMap> opMap = mapWork.getAliasToWork(); - if (!opMap.isEmpty()) { - for (Operator op : opMap.values()) { - setInputFormat(mapWork, op); + List all = work.getAllWork(); + for (BaseWork w: all) { + if (w instanceof MapWork) { + MapWork mapWork = (MapWork) w; + HashMap> opMap = mapWork.getAliasToWork(); + if (!opMap.isEmpty()) { + for (Operator op : opMap.values()) { + setInputFormat(mapWork, op); + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index eb85446..38c4c11 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -22,6 +22,7 @@ import java.util.LinkedList; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.Stack; @@ -82,7 +83,9 @@ public void addDummyOp(HashTableDummyOperator dummyOp) { dummyOps.add(dummyOp); } - protected abstract Set> getAllRootOperators(); + public abstract void replaceRoots(Map, Operator> replacementMap); + + public abstract Set> getAllRootOperators(); public Set> getAllOperators() { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 19b553f..e1cc3f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -305,6 +305,17 @@ public String getVectorModeOn() { } @Override + public void replaceRoots(Map, Operator> replacementMap) { + LinkedHashMap> newAliasToWork = new LinkedHashMap>(); + + for (Map.Entry> entry: aliasToWork.entrySet()) { + newAliasToWork.put(entry.getKey(), replacementMap.get(entry.getValue())); + } + + setAliasToWork(newAliasToWork); + } + + @Override @Explain(displayName = "Map Operator Tree") public Set> getAllRootOperators() { Set> opSet = new LinkedHashSet>(); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index afb3648..a68374e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -129,7 +129,13 @@ public void setTagToInput(final Map tagToInput) { } @Override - protected Set> getAllRootOperators() { + public void replaceRoots(Map, Operator> replacementMap) { + assert replacementMap.size() == 1; + setReducer(replacementMap.get(getReducer())); + } + + @Override + public Set> getAllRootOperators() { Set> opSet = new LinkedHashSet>(); opSet.add(getReducer()); return opSet; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 9112a77..3b31378 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -45,7 +45,8 @@ public enum EdgeType { SIMPLE_EDGE, - BROADCAST_EDGE + BROADCAST_EDGE, + CONTAINS } private static transient final Log LOG = LogFactory.getLog(TezWork.class); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java new file mode 100644 index 0000000..edb9791 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.HashSet; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; + +/** + * Simple wrapper for union all cases. All contributing work for a union all + * is collected here. Downstream work will connect to the union not the individual + * work. + */ +public class UnionWork extends BaseWork { + + private final Set unionOperators = new HashSet(); + + public UnionWork() { + super(); + } + + public UnionWork(String name) { + super(name); + } + + @Explain(displayName = "Vertex") + @Override + public String getName() { + return super.getName(); + } + + @Override + public Set> getAllRootOperators() { + return null; + } + + public void addUnionOperators(Collection unions) { + unionOperators.addAll(unions); + } + + public Set getUnionOperators() { + return unionOperators; + } +}