diff --git itests/qtest/pom.xml itests/qtest/pom.xml index 642202f..1487992 100644 --- itests/qtest/pom.xml +++ itests/qtest/pom.xml @@ -39,7 +39,7 @@ stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q - join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q + join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index cc840be..0418d18 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -777,6 +777,14 @@ public Path read(Kryo kryo, Input input, Class type) { } } + public static Set> cloneOperatorTree(Configuration conf, Set> roots) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); + serializePlan(roots, baos, conf, true); + Set> result = deserializePlan(new ByteArrayInputStream(baos.toByteArray()), + roots.getClass(), conf, true); + return result; + } + private static void serializePlan(Object plan, OutputStream out, Configuration conf, boolean cloningPlan) { PerfLogger perfLogger = PerfLogger.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 8d058d8..c074545 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; +import org.apache.hadoop.hive.ql.exec.tez.tools.TezMergedLogicalInput; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -82,6 +83,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -90,6 +92,7 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.client.PreWarmContext; import org.apache.tez.client.TezSessionConfiguration; +import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; @@ -98,6 +101,8 @@ import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; +import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; +import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput; import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput; import org.apache.tez.runtime.library.output.OnFileSortedOutput; @@ -189,9 +194,56 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { } /** + * Given a Vertex group and a vertex createEdge will create an + * Edge between them. + * + * @param group The parent VertexGroup + * @param wConf The job conf of the child vertex + * @param w The child vertex + * @param edgeType the type of connection between the two + * endpoints. + */ + public GroupInputEdge createEdge(VertexGroup group, JobConf wConf, + Vertex w, EdgeType edgeType) + throws IOException { + + Class mergeInputClass; + + LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getVertexName()); + w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); + + switch (edgeType) { + case BROADCAST_EDGE: + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + break; + + case SIMPLE_EDGE: + default: + mergeInputClass = TezMergedLogicalInput.class; + break; + } + + return new GroupInputEdge(group, w, createEdgeProperty(edgeType), + new InputDescriptor(mergeInputClass.getName())); + } + + /** + * Given two vertices a, b update their configurations to be used in an Edge a-b + */ + public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) + throws IOException { + + // Tez needs to setup output subsequent input pairs correctly + MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf); + + // update payloads (configuration for the vertices might have changed) + v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf)); + w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); + } + + /** * Given two vertices and their respective configuration objects createEdge - * will create an Edge object that connects the two. Currently the edge will - * always be a stable bi-partite edge. + * will create an Edge object that connects the two. * * @param vConf JobConf of the first vertex * @param v The first vertex (source) @@ -203,13 +255,15 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, EdgeType edgeType) throws IOException { - // Tez needs to setup output subsequent input pairs correctly - MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf); + updateConfigurationForEdge(vConf, v, wConf, w); - // update payloads (configuration for the vertices might have changed) - v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf)); - w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); + return new Edge(v, w, createEdgeProperty(edgeType)); + } + /* + * Helper function to create an edge property from an edge type. + */ + private EdgeProperty createEdgeProperty(EdgeType edgeType) { DataMovementType dataMovementType; Class logicalInputClass; Class logicalOutputClass; @@ -235,7 +289,8 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, SchedulingType.SEQUENTIAL, new OutputDescriptor(logicalOutputClass.getName()), new InputDescriptor(logicalInputClass.getName())); - return new Edge(v, w, edgeProperty); + + return edgeProperty; } /* diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 7c2c2a6..d89f2c7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -55,7 +55,6 @@ import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.library.api.KeyValuesReader; -import org.apache.tez.runtime.library.input.ShuffledMergedInput; /** * Process input from tez LogicalInput and write output - for a map plan @@ -184,15 +183,19 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in @Override void run() throws IOException{ - List shuffleInputs = getShuffleInputs(inputs); + List shuffleInputs = getShuffleInputs(inputs); KeyValuesReader kvsReader; - if(shuffleInputs.size() == 1){ - //no merging of inputs required - kvsReader = shuffleInputs.get(0).getReader(); - }else { - //get a sort merged input - kvsReader = new InputMerger(shuffleInputs); + try { + if(shuffleInputs.size() == 1){ + //no merging of inputs required + kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader(); + }else { + //get a sort merged input + kvsReader = new InputMerger(shuffleInputs); + } + } catch (Exception e) { + throw new IOException(e); } while(kvsReader.next()){ @@ -211,12 +214,12 @@ void run() throws IOException{ * @param inputs * @return */ - private List getShuffleInputs(Map inputs) { + private List getShuffleInputs(Map inputs) { //the reduce plan inputs have tags, add all inputs that have tags Map tag2input = redWork.getTagToInput(); - ArrayList shuffleInputs = new ArrayList(); + ArrayList shuffleInputs = new ArrayList(); for(String inpStr : tag2input.values()){ - shuffleInputs.add((ShuffledMergedInput)inputs.get(inpStr)); + shuffleInputs.add((LogicalInput)inputs.get(inpStr)); } return shuffleInputs; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 3e0bc34..d4b459c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; @@ -54,9 +56,11 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.StatusGetOpts; @@ -97,9 +101,6 @@ public int execute(DriverContext driverContext) { DAGClient client = null; TezSessionState session = null; - // Tez requires us to use RPC for the query plan - HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true); - try { // Get or create Context object. If we create it we have to clean // it later as well. @@ -216,24 +217,68 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, // translate work to vertex perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); - JobConf wxConf = utils.initializeVertexConf(conf, w); - Vertex wx = utils.createVertex(wxConf, w, tezDir, - appJarLr, additionalLr, fs, ctx, !isFinal); - dag.addVertex(wx); - utils.addCredentials(w, dag); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); - workToVertex.put(w, wx); - workToConf.put(w, wxConf); - - // add all dependencies (i.e.: edges) to the graph - for (BaseWork v: work.getChildren(w)) { - assert workToVertex.containsKey(v); - Edge e = null; - - EdgeType edgeType = work.getEdgeProperty(w, v); - - e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType); - dag.addEdge(e); + + if (w instanceof UnionWork) { + // Special case for unions. These items translate to VertexGroups + + List unionWorkItems = new LinkedList(); + List children = new LinkedList(); + + // split the children into vertices that make up the union and vertices that are + // proper children of the union + for (BaseWork v: work.getChildren(w)) { + EdgeType type = work.getEdgeProperty(w, v); + if (type == EdgeType.CONTAINS) { + unionWorkItems.add(v); + } else { + children.add(v); + } + } + + // create VertexGroup + Vertex[] vertexArray = new Vertex[unionWorkItems.size()]; + + int i = 0; + for (BaseWork v: unionWorkItems) { + vertexArray[i++] = workToVertex.get(v); + } + VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray); + + // now hook up the children + for (BaseWork v: children) { + // need to pairwise patch up the configuration of the vertices + for (BaseWork part: unionWorkItems) { + utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part), + workToConf.get(v), workToVertex.get(v)); + } + + // finally we can create the grouped edge + GroupInputEdge e = utils.createEdge(group, workToConf.get(v), + workToVertex.get(v), work.getEdgeProperty(w, v)); + + dag.addEdge(e); + } + } else { + // Regular vertices + JobConf wxConf = utils.initializeVertexConf(conf, w); + Vertex wx = utils.createVertex(wxConf, w, tezDir, appJarLr, + additionalLr, fs, ctx, !isFinal); + dag.addVertex(wx); + utils.addCredentials(w, dag); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); + workToVertex.put(w, wx); + workToConf.put(w, wxConf); + + // add all dependencies (i.e.: edges) to the graph + for (BaseWork v: work.getChildren(w)) { + assert workToVertex.containsKey(v); + Edge e = null; + + EdgeType edgeType = work.getEdgeProperty(w, v); + + e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType); + dag.addEdge(e); + } } } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java index e5746c4..726e122 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java @@ -26,12 +26,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.library.api.KeyValuesReader; -import org.apache.tez.runtime.library.input.ShuffledMergedInput; /** * A KeyValuesReader implementation that returns a sorted stream of key-values - * by doing a sorted merge of the key-value in ShuffledMergedInputs. + * by doing a sorted merge of the key-value in LogicalInputs. * Tags are in the last byte of the key, so no special handling for tags is required. * Uses a priority queue to pick the KeyValuesReader of the input that is next in * sort order. @@ -42,12 +43,12 @@ private PriorityQueue pQueue = null; private KeyValuesReader nextKVReader = null; - public InputMerger(List shuffleInputs) throws IOException { - //get KeyValuesReaders from the ShuffledMergedInput and add them to priority queue + public InputMerger(List shuffleInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue int initialCapacity = shuffleInputs.size(); pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator()); - for(ShuffledMergedInput input : shuffleInputs){ - addToQueue(input.getReader()); + for(Input input : shuffleInputs){ + addToQueue((KeyValuesReader)input.getReader()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java new file mode 100644 index 0000000..413eb7b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.MergedLogicalInput; +import org.apache.tez.runtime.api.Reader; + +/** + * TezMergedLogicalInput is an adapter to make union input look like + * a single input in tez. + */ +public class TezMergedLogicalInput extends MergedLogicalInput { + + @Override + public Reader getReader() throws Exception { + return new InputMerger(getInputs()); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java index 9592992..30c39db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java @@ -51,28 +51,9 @@ public Object process(Node nd, Stack stack, GenTezProcContext context = (GenTezProcContext) procCtx; FileSinkOperator fileSink = (FileSinkOperator) nd; - ParseContext parseContext = context.parseContext; - - - boolean isInsertTable = // is INSERT OVERWRITE TABLE - GenMapRedUtils.isInsertInto(parseContext, fileSink); - HiveConf hconf = parseContext.getConf(); - - boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask, - hconf, fileSink, context.currentTask, isInsertTable); - - Path finalName = GenMapRedUtils.createMoveTask(context.currentTask, - chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask); - - if (chDir) { - // Merge the files in the destination table/partitions by creating Map-only merge job - // If underlying data is RCFile a RCFileBlockMerge task would be created. - LOG.info("using CombineHiveInputformat for the merge job"); - GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, - context.dependencyTask, context.moveTask, - hconf, context.currentTask); - } - + + // just remember it for later processing + context.fileSinkSet.add(fileSink); return true; } } \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index f4b6016..c9decb5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -20,14 +20,19 @@ import java.io.Serializable; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -39,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.TezWork; /** @@ -105,6 +111,15 @@ // used to group dependent tasks for multi table inserts public final DependencyCollectionTask dependencyTask; + // used to hook up unions + public final Map, BaseWork> unionWorkMap; + public final List currentUnionOperators; + public final Set workWithUnionOperators; + + // we link filesink that will write to the same final location + public final Map> linkedFileSinks; + public final Set fileSinkSet; + @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, List> moveTask, List> rootTasks, @@ -126,6 +141,11 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext, this.linkChildOpWithDummyOp = new HashMap, List>>(); this.dependencyTask = (DependencyCollectionTask) TaskFactory.get(new DependencyCollectionWork(), conf); + this.unionWorkMap = new HashMap, BaseWork>(); + this.currentUnionOperators = new LinkedList(); + this.workWithUnionOperators = new HashSet(); + this.linkedFileSinks = new HashMap>(); + this.fileSinkSet = new HashSet(); rootTasks.add(currentTask); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 042cb39..d9b5977 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -18,17 +18,35 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.fs.Path; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; /** @@ -59,6 +77,13 @@ public void resetSequenceNumber() { sequenceNumber = 0; } + public UnionWork createUnionWork(GenTezProcContext context, Operator operator, TezWork tezWork) { + UnionWork unionWork = new UnionWork("Union "+ (++sequenceNumber)); + context.unionWorkMap.put(operator, unionWork); + tezWork.add(unionWork); + return unionWork; + } + public ReduceWork createReduceWork(GenTezProcContext context, Operator root, TezWork tezWork) { assert !root.getParentOperators().isEmpty(); ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); @@ -128,4 +153,111 @@ protected void setupMapWork(MapWork mapWork, GenTezProcContext context, GenMapRedUtils.setMapWork(mapWork, context.parseContext, context.inputs, partitions, root, alias, context.conf, false); } + + // removes any union operator and clones the plan + public void removeUnionOperators(Configuration conf, GenTezProcContext context, + BaseWork work) + throws SemanticException { + + Set> roots = work.getAllRootOperators(); + + // need to clone the plan. + Set> newRoots = Utilities.cloneOperatorTree(conf, roots); + + Map, Operator> replacementMap = new HashMap, Operator>(); + + Iterator> it = newRoots.iterator(); + for (Operator orig: roots) { + replacementMap.put(orig,it.next()); + } + + // now we remove all the unions. we throw away any branch that's not reachable from + // the current set of roots. The reason is that those branches will be handled in + // different tasks. + Deque> operators = new LinkedList>(); + operators.addAll(newRoots); + + Set> seen = new HashSet>(); + + while(!operators.isEmpty()) { + Operator current = operators.pop(); + seen.add(current); + + if (current instanceof FileSinkOperator) { + FileSinkOperator fileSink = (FileSinkOperator)current; + + // remember it for additional processing later + context.fileSinkSet.add(fileSink); + + FileSinkDesc desc = fileSink.getConf(); + Path path = desc.getDirName(); + List linked; + + if (!context.linkedFileSinks.containsKey(path)) { + linked = new ArrayList(); + context.linkedFileSinks.put(path, linked); + } + linked = context.linkedFileSinks.get(path); + linked.add(desc); + + desc.setDirName(new Path(path, ""+linked.size())); + desc.setLinkedFileSinkDesc(linked); + } + + if (current instanceof UnionOperator) { + Operator parent = null; + int count = 0; + + for (Operator op: current.getParentOperators()) { + if (seen.contains(op)) { + ++count; + parent = op; + } + } + + // we should have been able to reach the union from only one side. + assert count <= 1; + + if (parent == null) { + // root operator is union (can happen in reducers) + replacementMap.put(current, current.getChildOperators().get(0)); + } else { + parent.removeChildAndAdoptItsChildren(current); + } + } + + if (current instanceof FileSinkOperator + || current instanceof ReduceSinkOperator) { + current.setChildOperators(null); + } else { + operators.addAll(current.getChildOperators()); + } + } + work.replaceRoots(replacementMap); + } + + public void processFileSink(GenTezProcContext context, FileSinkOperator fileSink) + throws SemanticException { + + ParseContext parseContext = context.parseContext; + + boolean isInsertTable = // is INSERT OVERWRITE TABLE + GenMapRedUtils.isInsertInto(parseContext, fileSink); + HiveConf hconf = parseContext.getConf(); + + boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask, + hconf, fileSink, context.currentTask, isInsertTable); + + Path finalName = GenMapRedUtils.createMoveTask(context.currentTask, + chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask); + + if (chDir) { + // Merge the files in the destination table/partitions by creating Map-only merge job + // If underlying data is RCFile a RCFileBlockMerge task would be created. + LOG.info("using CombineHiveInputformat for the merge job"); + GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, + context.dependencyTask, context.moveTask, + hconf, context.currentTask); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 475c940..a6c30a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -19,15 +19,21 @@ package org.apache.hadoop.hive.ql.parse; import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -37,6 +43,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; /** @@ -106,6 +113,41 @@ public Object process(Node nd, Stack stack, context.rootToWorkMap.put(root, work); } + // This is where we cut the tree as described above. We also remember that + // we might have to connect parent work with this work later. + for (Operator parent: new ArrayList>(root.getParentOperators())) { + context.leafOperatorToFollowingWork.put(parent, work); + LOG.debug("Removing " + parent + " as parent from " + root); + root.removeParent(parent); + } + + if (!context.currentUnionOperators.isEmpty()) { + // if there are union all operators we need to add the work to the set + // of union operators. + + UnionWork unionWork; + if (context.unionWorkMap.containsKey(operator)) { + // we've seen this terminal before and have created a union work object. + // just need to add this work to it. There will be no children of this one + // since we've passed this operator before. + assert operator.getChildOperators().isEmpty(); + unionWork = (UnionWork) context.unionWorkMap.get(operator); + + } else { + // first time through. we need to create a union work object and add this + // work to it. Subsequent work should reference the union and not the actual + // work. + unionWork = utils.createUnionWork(context, operator, tezWork); + } + + // finally hook everything up + tezWork.connect(unionWork, work, EdgeType.CONTAINS); + unionWork.addUnionOperators(context.currentUnionOperators); + context.currentUnionOperators.clear(); + context.workWithUnionOperators.add(work); + work = unionWork; + } + // We're scanning a tree from roots to leaf (this is not technically // correct, demux and mux operators might form a diamond shape, but // we will only scan one path and ignore the others, because the @@ -134,16 +176,10 @@ public Object process(Node nd, Stack stack, // remember the output name of the reduce sink rs.getConf().setOutputName(rWork.getName()); - // add dependency between the two work items - tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE); - } - - // This is where we cut the tree as described above. We also remember that - // we might have to connect parent work with this work later. - for (Operator parent: new ArrayList>(root.getParentOperators())) { - context.leafOperatorToFollowingWork.put(parent, work); - LOG.debug("Removing " + parent + " as parent from " + root); - root.removeParent(parent); + if (!context.unionWorkMap.containsKey(operator)) { + // add dependency between the two work items + tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE); + } } // No children means we're at the bottom. If there are more operators to scan @@ -182,7 +218,7 @@ public Object process(Node nd, Stack stack, for (BaseWork parentWork : linkWorkList) { tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE); - // need to set up output name for reduce sink not that we know the name + // need to set up output name for reduce sink now that we know the name // of the downstream work for (ReduceSinkOperator r: context.linkWorkWithReduceSinkMap.get(parentWork)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index b7738c5..cfc34eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -53,6 +54,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin; import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc; import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism; @@ -63,6 +65,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; /** * TezCompiler translates the operator plan into TezTasks. @@ -75,6 +78,22 @@ public TezCompiler() { } @Override + public void init(HiveConf conf, LogHelper console, Hive db) { + super.init(conf, console, db); + + // Tez requires us to use RPC for the query plan + HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true); + + // We require the use of recursive input dirs for union processing + conf.setBoolean("mapred.input.dir.recursive", true); + HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true); + + // Don't auto-merge files in tez + HiveConf.setBoolVar(conf, ConfVars.HIVEMERGEMAPFILES, false); + HiveConf.setBoolVar(conf, ConfVars.HIVEMERGEMAPREDFILES, false); + } + + @Override protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, Set outputs) throws SemanticException { @@ -138,14 +157,18 @@ protected void generateTaskTree(List> rootTasks, Pa TableScanOperator.getOperatorName() + "%"), new ProcessAnalyzeTable(GenTezUtils.getUtils())); - opRules.put(new RuleRegExp("Bail on Union", + opRules.put(new RuleRegExp("Handle union", UnionOperator.getOperatorName() + "%"), new NodeProcessor() { @Override public Object process(Node n, Stack s, NodeProcessorCtx procCtx, Object... os) throws SemanticException { - throw new SemanticException("Unions not yet supported on Tez." - +" Please use MR for this query"); + GenTezProcContext context = (GenTezProcContext) procCtx; + UnionOperator union = (UnionOperator) n; + + // simply need to remember that we've seen a union. + context.currentUnionOperators.add(union); + return null; } }); @@ -156,20 +179,31 @@ public Object process(Node n, Stack s, topNodes.addAll(pCtx.getTopOps().values()); GraphWalker ogw = new GenTezWorkWalker(disp, procCtx); ogw.startWalking(topNodes, null); + + // we need to clone some operator plans and remove union operators still + for (BaseWork w: procCtx.workWithUnionOperators) { + GenTezUtils.getUtils().removeUnionOperators(conf, procCtx, w); + } + + // finally make sure the file sink operators are set up right + for (FileSinkOperator fileSink: procCtx.fileSinkSet) { + GenTezUtils.getUtils().processFileSink(procCtx, fileSink); + } } @Override protected void setInputFormat(Task task) { if (task instanceof TezTask) { TezWork work = ((TezTask)task).getWork(); - Set roots = work.getRoots(); - for (BaseWork w: roots) { - assert w instanceof MapWork; - MapWork mapWork = (MapWork)w; - HashMap> opMap = mapWork.getAliasToWork(); - if (!opMap.isEmpty()) { - for (Operator op : opMap.values()) { - setInputFormat(mapWork, op); + List all = work.getAllWork(); + for (BaseWork w: all) { + if (w instanceof MapWork) { + MapWork mapWork = (MapWork) w; + HashMap> opMap = mapWork.getAliasToWork(); + if (!opMap.isEmpty()) { + for (Operator op : opMap.values()) { + setInputFormat(mapWork, op); + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index eb85446..38c4c11 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -22,6 +22,7 @@ import java.util.LinkedList; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.Stack; @@ -82,7 +83,9 @@ public void addDummyOp(HashTableDummyOperator dummyOp) { dummyOps.add(dummyOp); } - protected abstract Set> getAllRootOperators(); + public abstract void replaceRoots(Map, Operator> replacementMap); + + public abstract Set> getAllRootOperators(); public Set> getAllOperators() { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 19b553f..e1cc3f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -305,6 +305,17 @@ public String getVectorModeOn() { } @Override + public void replaceRoots(Map, Operator> replacementMap) { + LinkedHashMap> newAliasToWork = new LinkedHashMap>(); + + for (Map.Entry> entry: aliasToWork.entrySet()) { + newAliasToWork.put(entry.getKey(), replacementMap.get(entry.getValue())); + } + + setAliasToWork(newAliasToWork); + } + + @Override @Explain(displayName = "Map Operator Tree") public Set> getAllRootOperators() { Set> opSet = new LinkedHashSet>(); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index afb3648..a68374e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -129,7 +129,13 @@ public void setTagToInput(final Map tagToInput) { } @Override - protected Set> getAllRootOperators() { + public void replaceRoots(Map, Operator> replacementMap) { + assert replacementMap.size() == 1; + setReducer(replacementMap.get(getReducer())); + } + + @Override + public Set> getAllRootOperators() { Set> opSet = new LinkedHashSet>(); opSet.add(getReducer()); return opSet; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 9112a77..3b31378 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -45,7 +45,8 @@ public enum EdgeType { SIMPLE_EDGE, - BROADCAST_EDGE + BROADCAST_EDGE, + CONTAINS } private static transient final Log LOG = LogFactory.getLog(TezWork.class); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java new file mode 100644 index 0000000..60781e6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.HashSet; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; + +/** + * Simple wrapper for union all cases. All contributing work for a union all + * is collected here. Downstream work will connect to the union not the individual + * work. + */ +public class UnionWork extends BaseWork { + + private final Set unionOperators = new HashSet(); + + public UnionWork() { + super(); + } + + public UnionWork(String name) { + super(name); + } + + @Explain(displayName = "Vertex") + @Override + public String getName() { + return super.getName(); + } + + @Override + public void replaceRoots(Map, Operator> replacementMap) { + } + + @Override + public Set> getAllRootOperators() { + return new HashSet>(); + } + + public void addUnionOperators(Collection unions) { + unionOperators.addAll(unions); + } + + public Set getUnionOperators() { + return unionOperators; + } +} diff --git ql/src/test/results/clientpositive/tez/union2.q.out ql/src/test/results/clientpositive/tez/union2.q.out new file mode 100644 index 0000000..2811c25 --- /dev/null +++ ql/src/test/results/clientpositive/tez/union2.q.out @@ -0,0 +1,90 @@ +PREHOOK: query: -- union case: both subqueries are map-reduce jobs on same input, followed by reduce sink + +explain + select count(1) FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2) unionsrc +PREHOOK: type: QUERY +POSTHOOK: query: -- union case: both subqueries are map-reduce jobs on same input, followed by reduce sink + +explain + select count(1) FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2) unionsrc +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Union 2 (CONTAINS) + Map 4 <- Union 2 (CONTAINS) + Reducer 3 <- Union 2 (SIMPLE_EDGE) + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s1 + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Select Operator + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + value expressions: _col0 (type: bigint) + Map 4 + Map Operator Tree: + TableScan + alias: s2 + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Select Operator + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + value expressions: _col0 (type: bigint) + Reducer 3 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Union 2 + Vertex: Union 2 + + Stage: Stage-0 + Fetch Operator + limit: -1 + +PREHOOK: query: select count(1) FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2) unionsrc +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(1) FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2) unionsrc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1000 diff --git ql/src/test/results/clientpositive/tez/union3.q.out ql/src/test/results/clientpositive/tez/union3.q.out new file mode 100644 index 0000000..09254b8 --- /dev/null +++ ql/src/test/results/clientpositive/tez/union3.q.out @@ -0,0 +1,273 @@ +PREHOOK: query: explain +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + CLUSTER BY id + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + CLUSTER BY id + UNION ALL + SELECT 3 AS id + FROM (SELECT * FROM src LIMIT 1) s2 + UNION ALL + SELECT 4 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +PREHOOK: type: QUERY +POSTHOOK: query: explain +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + CLUSTER BY id + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + CLUSTER BY id + UNION ALL + SELECT 3 AS id + FROM (SELECT * FROM src LIMIT 1) s2 + UNION ALL + SELECT 4 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 10 <- Map 9 (SIMPLE_EDGE) + Reducer 11 <- Reducer 10 (SIMPLE_EDGE), Union 3 (CONTAINS) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Union 3 (CONTAINS) + Reducer 5 <- Map 4 (SIMPLE_EDGE), Union 3 (CONTAINS) + Reducer 7 <- Map 6 (SIMPLE_EDGE) + Reducer 8 <- Reducer 7 (SIMPLE_EDGE), Union 3 (CONTAINS) + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Map 4 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Map 6 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Map 9 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Reducer 10 + Reduce Operator Tree: + Extract + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: 2 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int) + Reducer 11 + Reduce Operator Tree: + Extract + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 2 + Reduce Operator Tree: + Extract + Limit + Number of rows: 1 + Select Operator + expressions: 4 (type: int) + outputColumnNames: _col0 + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Reduce Operator Tree: + Extract + Limit + Number of rows: 1 + Select Operator + expressions: 3 (type: int) + outputColumnNames: _col0 + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 7 + Reduce Operator Tree: + Extract + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: 1 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int) + Reducer 8 + Reduce Operator Tree: + Extract + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Union 3 + Vertex: Union 3 + + Stage: Stage-0 + Fetch Operator + limit: -1 + +PREHOOK: query: CREATE TABLE union_out (id int) +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE union_out (id int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@union_out +PREHOOK: query: insert overwrite table union_out +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + CLUSTER BY id + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + CLUSTER BY id + UNION ALL + SELECT 3 AS id + FROM (SELECT * FROM src LIMIT 1) s2 + UNION ALL + SELECT 4 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@union_out +POSTHOOK: query: insert overwrite table union_out +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + CLUSTER BY id + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + CLUSTER BY id + UNION ALL + SELECT 3 AS id + FROM (SELECT * FROM src LIMIT 1) s2 + UNION ALL + SELECT 4 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@union_out +POSTHOOK: Lineage: union_out.id EXPRESSION [] +PREHOOK: query: select * from union_out cluster by id +PREHOOK: type: QUERY +PREHOOK: Input: default@union_out +#### A masked pattern was here #### +POSTHOOK: query: select * from union_out cluster by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@union_out +#### A masked pattern was here #### +POSTHOOK: Lineage: union_out.id EXPRESSION [] +1 +2 +3 +4 diff --git ql/src/test/results/clientpositive/tez/union4.q.out ql/src/test/results/clientpositive/tez/union4.q.out new file mode 100644 index 0000000..6b618eb --- /dev/null +++ ql/src/test/results/clientpositive/tez/union4.q.out @@ -0,0 +1,153 @@ +PREHOOK: query: -- union case: both subqueries are map-reduce jobs on same input, followed by filesink + + +create table tmptable(key string, value int) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- union case: both subqueries are map-reduce jobs on same input, followed by filesink + + +create table tmptable(key string, value int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmptable +PREHOOK: query: explain +insert overwrite table tmptable + select unionsrc.key, unionsrc.value FROM (select 'tst1' as key, count(1) as value from src s1 + UNION ALL + select 'tst2' as key, count(1) as value from src s2) unionsrc +PREHOOK: type: QUERY +POSTHOOK: query: explain +insert overwrite table tmptable + select unionsrc.key, unionsrc.value FROM (select 'tst1' as key, count(1) as value from src s1 + UNION ALL + select 'tst2' as key, count(1) as value from src s2) unionsrc +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Union 3 (CONTAINS) + Reducer 5 <- Map 4 (SIMPLE_EDGE), Union 3 (CONTAINS) + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s1 + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Select Operator + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Map 4 + Map Operator Tree: + TableScan + alias: s2 + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Select Operator + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: 'tst1' (type: string), _col0 (type: bigint) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string), UDFToInteger(_col1) (type: int) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmptable + Reducer 5 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: 'tst2' (type: string), _col0 (type: bigint) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string), UDFToInteger(_col1) (type: int) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmptable + Union 3 + Vertex: Union 3 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmptable + + Stage: Stage-3 + Stats-Aggr Operator + +PREHOOK: query: insert overwrite table tmptable +select unionsrc.key, unionsrc.value FROM (select 'tst1' as key, count(1) as value from src s1 + UNION ALL + select 'tst2' as key, count(1) as value from src s2) unionsrc +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmptable +POSTHOOK: query: insert overwrite table tmptable +select unionsrc.key, unionsrc.value FROM (select 'tst1' as key, count(1) as value from src s1 + UNION ALL + select 'tst2' as key, count(1) as value from src s2) unionsrc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmptable +POSTHOOK: Lineage: tmptable.key EXPRESSION [] +POSTHOOK: Lineage: tmptable.value EXPRESSION [(src)s1.null, (src)s2.null, ] +PREHOOK: query: select * from tmptable x sort by x.key +PREHOOK: type: QUERY +PREHOOK: Input: default@tmptable +#### A masked pattern was here #### +POSTHOOK: query: select * from tmptable x sort by x.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmptable +#### A masked pattern was here #### +POSTHOOK: Lineage: tmptable.key EXPRESSION [] +POSTHOOK: Lineage: tmptable.value EXPRESSION [(src)s1.null, (src)s2.null, ] +tst1 500 +tst2 500 diff --git ql/src/test/results/clientpositive/tez/union5.q.out ql/src/test/results/clientpositive/tez/union5.q.out new file mode 100644 index 0000000..d332d4c --- /dev/null +++ ql/src/test/results/clientpositive/tez/union5.q.out @@ -0,0 +1,142 @@ +PREHOOK: query: -- union case: both subqueries are map-reduce jobs on same input, followed by reduce sink + +explain + select unionsrc.key, count(1) FROM (select 'tst1' as key, count(1) as value from src s1 + UNION ALL + select 'tst2' as key, count(1) as value from src s2) unionsrc group by unionsrc.key +PREHOOK: type: QUERY +POSTHOOK: query: -- union case: both subqueries are map-reduce jobs on same input, followed by reduce sink + +explain + select unionsrc.key, count(1) FROM (select 'tst1' as key, count(1) as value from src s1 + UNION ALL + select 'tst2' as key, count(1) as value from src s2) unionsrc group by unionsrc.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Union 3 (CONTAINS) + Reducer 4 <- Union 3 (SIMPLE_EDGE) + Reducer 6 <- Map 5 (SIMPLE_EDGE), Union 3 (CONTAINS) + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s1 + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Select Operator + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Map 5 + Map Operator Tree: + TableScan + alias: s2 + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Select Operator + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: 'tst1' (type: string), _col0 (type: bigint) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Group By Operator + aggregations: count(1) + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + value expressions: _col1 (type: bigint) + Reducer 4 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 88 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 96 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 96 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 6 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: 'tst2' (type: string), _col0 (type: bigint) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Group By Operator + aggregations: count(1) + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + value expressions: _col1 (type: bigint) + Union 3 + Vertex: Union 3 + + Stage: Stage-0 + Fetch Operator + limit: -1 + +PREHOOK: query: select unionsrc.key, count(1) FROM (select 'tst1' as key, count(1) as value from src s1 + UNION ALL + select 'tst2' as key, count(1) as value from src s2) unionsrc group by unionsrc.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select unionsrc.key, count(1) FROM (select 'tst1' as key, count(1) as value from src s1 + UNION ALL + select 'tst2' as key, count(1) as value from src s2) unionsrc group by unionsrc.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +tst1 1 +tst2 1 diff --git ql/src/test/results/clientpositive/tez/union6.q.out ql/src/test/results/clientpositive/tez/union6.q.out new file mode 100644 index 0000000..eb51a62 --- /dev/null +++ ql/src/test/results/clientpositive/tez/union6.q.out @@ -0,0 +1,161 @@ +PREHOOK: query: -- union case: 1 subquery is a map-reduce job, different inputs for sub-queries, followed by filesink + + +create table tmptable(key string, value string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- union case: 1 subquery is a map-reduce job, different inputs for sub-queries, followed by filesink + + +create table tmptable(key string, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmptable +PREHOOK: query: explain +insert overwrite table tmptable + select unionsrc.key, unionsrc.value FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1 + UNION ALL + select s2.key as key, s2.value as value from src1 s2) unionsrc +PREHOOK: type: QUERY +POSTHOOK: query: explain +insert overwrite table tmptable + select unionsrc.key, unionsrc.value FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1 + UNION ALL + select s2.key as key, s2.value as value from src1 s2) unionsrc +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 4 <- Union 3 (CONTAINS) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Union 3 (CONTAINS) + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s1 + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Select Operator + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Map 4 + Map Operator Tree: + TableScan + alias: s2 + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmptable + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: 'tst1' (type: string), UDFToString(_col0) (type: string) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmptable + Union 3 + Vertex: Union 3 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmptable + + Stage: Stage-3 + Stats-Aggr Operator + +PREHOOK: query: insert overwrite table tmptable +select unionsrc.key, unionsrc.value FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1 + UNION ALL + select s2.key as key, s2.value as value from src1 s2) unionsrc +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +PREHOOK: Output: default@tmptable +POSTHOOK: query: insert overwrite table tmptable +select unionsrc.key, unionsrc.value FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1 + UNION ALL + select s2.key as key, s2.value as value from src1 s2) unionsrc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src1 +POSTHOOK: Output: default@tmptable +POSTHOOK: Lineage: tmptable.key EXPRESSION [(src1)s2.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmptable.value EXPRESSION [(src)s1.null, (src1)s2.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: select * from tmptable x sort by x.key, x.value +PREHOOK: type: QUERY +PREHOOK: Input: default@tmptable +#### A masked pattern was here #### +POSTHOOK: query: select * from tmptable x sort by x.key, x.value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmptable +#### A masked pattern was here #### +POSTHOOK: Lineage: tmptable.key EXPRESSION [(src1)s2.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmptable.value EXPRESSION [(src)s1.null, (src1)s2.FieldSchema(name:value, type:string, comment:default), ] + + + + + val_165 + val_193 + val_265 + val_27 + val_409 + val_484 +128 +146 val_146 +150 val_150 +213 val_213 +224 +238 val_238 +255 val_255 +273 val_273 +278 val_278 +311 val_311 +369 +401 val_401 +406 val_406 +66 val_66 +98 val_98 +tst1 500 diff --git ql/src/test/results/clientpositive/tez/union7.q.out ql/src/test/results/clientpositive/tez/union7.q.out new file mode 100644 index 0000000..f9f57aa --- /dev/null +++ ql/src/test/results/clientpositive/tez/union7.q.out @@ -0,0 +1,141 @@ +PREHOOK: query: -- union case: 1 subquery is a map-reduce job, different inputs for sub-queries, followed by reducesink + +explain + select unionsrc.key, count(1) FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1 + UNION ALL + select s2.key as key, s2.value as value from src1 s2) unionsrc group by unionsrc.key +PREHOOK: type: QUERY +POSTHOOK: query: -- union case: 1 subquery is a map-reduce job, different inputs for sub-queries, followed by reducesink + +explain + select unionsrc.key, count(1) FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1 + UNION ALL + select s2.key as key, s2.value as value from src1 s2) unionsrc group by unionsrc.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 5 <- Union 3 (CONTAINS) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Union 3 (CONTAINS) + Reducer 4 <- Union 3 (SIMPLE_EDGE) + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s1 + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Select Operator + Statistics: Num rows: 0 Data size: 5812 Basic stats: PARTIAL Column stats: COMPLETE + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Map 5 + Map Operator Tree: + TableScan + alias: s2 + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Group By Operator + aggregations: count(1) + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + value expressions: _col1 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: 'tst1' (type: string), UDFToString(_col0) (type: string) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Group By Operator + aggregations: count(1) + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + value expressions: _col1 (type: bigint) + Reducer 4 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 88 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: _col0 (type: string), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 96 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 96 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Union 3 + Vertex: Union 3 + + Stage: Stage-0 + Fetch Operator + limit: -1 + +PREHOOK: query: select unionsrc.key, count(1) FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1 + UNION ALL + select s2.key as key, s2.value as value from src1 s2) unionsrc group by unionsrc.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +#### A masked pattern was here #### +POSTHOOK: query: select unionsrc.key, count(1) FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1 + UNION ALL + select s2.key as key, s2.value as value from src1 s2) unionsrc group by unionsrc.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src1 +#### A masked pattern was here #### + 10 +128 1 +146 1 +150 1 +213 1 +224 1 +238 1 +255 1 +273 1 +278 1 +311 1 +369 1 +401 1 +406 1 +66 1 +98 1 +tst1 1 diff --git ql/src/test/results/clientpositive/tez/union8.q.out ql/src/test/results/clientpositive/tez/union8.q.out new file mode 100644 index 0000000..8b8acb6 --- /dev/null +++ ql/src/test/results/clientpositive/tez/union8.q.out @@ -0,0 +1,1593 @@ +PREHOOK: query: -- union case: all subqueries are a map-only jobs, 3 way union, same input for all sub-queries, followed by filesink + +explain + select unionsrc.key, unionsrc.value FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2 UNION ALL + select s3.key as key, s3.value as value from src s3) unionsrc +PREHOOK: type: QUERY +POSTHOOK: query: -- union case: all subqueries are a map-only jobs, 3 way union, same input for all sub-queries, followed by filesink + +explain + select unionsrc.key, unionsrc.value FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2 UNION ALL + select s3.key as key, s3.value as value from src s3) unionsrc +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Union 2 (CONTAINS) + Map 3 <- Union 2 (CONTAINS) + Map 4 <- Union 2 (CONTAINS) + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s1 + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Map 3 + Map Operator Tree: + TableScan + alias: s2 + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Map 4 + Map Operator Tree: + TableScan + alias: s3 + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Union 2 + Vertex: Union 2 + + Stage: Stage-0 + Fetch Operator + limit: -1 + +PREHOOK: query: select unionsrc.key, unionsrc.value FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2 UNION ALL + select s3.key as key, s3.value as value from src s3) unionsrc +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select unionsrc.key, unionsrc.value FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2 UNION ALL + select s3.key as key, s3.value as value from src s3) unionsrc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +238 val_238 +86 val_86 +311 val_311 +27 val_27 +165 val_165 +409 val_409 +255 val_255 +278 val_278 +98 val_98 +484 val_484 +265 val_265 +193 val_193 +401 val_401 +150 val_150 +273 val_273 +224 val_224 +369 val_369 +66 val_66 +128 val_128 +213 val_213 +146 val_146 +406 val_406 +429 val_429 +374 val_374 +152 val_152 +469 val_469 +145 val_145 +495 val_495 +37 val_37 +327 val_327 +281 val_281 +277 val_277 +209 val_209 +15 val_15 +82 val_82 +403 val_403 +166 val_166 +417 val_417 +430 val_430 +252 val_252 +292 val_292 +219 val_219 +287 val_287 +153 val_153 +193 val_193 +338 val_338 +446 val_446 +459 val_459 +394 val_394 +237 val_237 +482 val_482 +174 val_174 +413 val_413 +494 val_494 +207 val_207 +199 val_199 +466 val_466 +208 val_208 +174 val_174 +399 val_399 +396 val_396 +247 val_247 +417 val_417 +489 val_489 +162 val_162 +377 val_377 +397 val_397 +309 val_309 +365 val_365 +266 val_266 +439 val_439 +342 val_342 +367 val_367 +325 val_325 +167 val_167 +195 val_195 +475 val_475 +17 val_17 +113 val_113 +155 val_155 +203 val_203 +339 val_339 +0 val_0 +455 val_455 +128 val_128 +311 val_311 +316 val_316 +57 val_57 +302 val_302 +205 val_205 +149 val_149 +438 val_438 +345 val_345 +129 val_129 +170 val_170 +20 val_20 +489 val_489 +157 val_157 +378 val_378 +221 val_221 +92 val_92 +111 val_111 +47 val_47 +72 val_72 +4 val_4 +280 val_280 +35 val_35 +427 val_427 +277 val_277 +208 val_208 +356 val_356 +399 val_399 +169 val_169 +382 val_382 +498 val_498 +125 val_125 +386 val_386 +437 val_437 +469 val_469 +192 val_192 +286 val_286 +187 val_187 +176 val_176 +54 val_54 +459 val_459 +51 val_51 +138 val_138 +103 val_103 +239 val_239 +213 val_213 +216 val_216 +430 val_430 +278 val_278 +176 val_176 +289 val_289 +221 val_221 +65 val_65 +318 val_318 +332 val_332 +311 val_311 +275 val_275 +137 val_137 +241 val_241 +83 val_83 +333 val_333 +180 val_180 +284 val_284 +12 val_12 +230 val_230 +181 val_181 +67 val_67 +260 val_260 +404 val_404 +384 val_384 +489 val_489 +353 val_353 +373 val_373 +272 val_272 +138 val_138 +217 val_217 +84 val_84 +348 val_348 +466 val_466 +58 val_58 +8 val_8 +411 val_411 +230 val_230 +208 val_208 +348 val_348 +24 val_24 +463 val_463 +431 val_431 +179 val_179 +172 val_172 +42 val_42 +129 val_129 +158 val_158 +119 val_119 +496 val_496 +0 val_0 +322 val_322 +197 val_197 +468 val_468 +393 val_393 +454 val_454 +100 val_100 +298 val_298 +199 val_199 +191 val_191 +418 val_418 +96 val_96 +26 val_26 +165 val_165 +327 val_327 +230 val_230 +205 val_205 +120 val_120 +131 val_131 +51 val_51 +404 val_404 +43 val_43 +436 val_436 +156 val_156 +469 val_469 +468 val_468 +308 val_308 +95 val_95 +196 val_196 +288 val_288 +481 val_481 +457 val_457 +98 val_98 +282 val_282 +197 val_197 +187 val_187 +318 val_318 +318 val_318 +409 val_409 +470 val_470 +137 val_137 +369 val_369 +316 val_316 +169 val_169 +413 val_413 +85 val_85 +77 val_77 +0 val_0 +490 val_490 +87 val_87 +364 val_364 +179 val_179 +118 val_118 +134 val_134 +395 val_395 +282 val_282 +138 val_138 +238 val_238 +419 val_419 +15 val_15 +118 val_118 +72 val_72 +90 val_90 +307 val_307 +19 val_19 +435 val_435 +10 val_10 +277 val_277 +273 val_273 +306 val_306 +224 val_224 +309 val_309 +389 val_389 +327 val_327 +242 val_242 +369 val_369 +392 val_392 +272 val_272 +331 val_331 +401 val_401 +242 val_242 +452 val_452 +177 val_177 +226 val_226 +5 val_5 +497 val_497 +402 val_402 +396 val_396 +317 val_317 +395 val_395 +58 val_58 +35 val_35 +336 val_336 +95 val_95 +11 val_11 +168 val_168 +34 val_34 +229 val_229 +233 val_233 +143 val_143 +472 val_472 +322 val_322 +498 val_498 +160 val_160 +195 val_195 +42 val_42 +321 val_321 +430 val_430 +119 val_119 +489 val_489 +458 val_458 +78 val_78 +76 val_76 +41 val_41 +223 val_223 +492 val_492 +149 val_149 +449 val_449 +218 val_218 +228 val_228 +138 val_138 +453 val_453 +30 val_30 +209 val_209 +64 val_64 +468 val_468 +76 val_76 +74 val_74 +342 val_342 +69 val_69 +230 val_230 +33 val_33 +368 val_368 +103 val_103 +296 val_296 +113 val_113 +216 val_216 +367 val_367 +344 val_344 +167 val_167 +274 val_274 +219 val_219 +239 val_239 +485 val_485 +116 val_116 +223 val_223 +256 val_256 +263 val_263 +70 val_70 +487 val_487 +480 val_480 +401 val_401 +288 val_288 +191 val_191 +5 val_5 +244 val_244 +438 val_438 +128 val_128 +467 val_467 +432 val_432 +202 val_202 +316 val_316 +229 val_229 +469 val_469 +463 val_463 +280 val_280 +2 val_2 +35 val_35 +283 val_283 +331 val_331 +235 val_235 +80 val_80 +44 val_44 +193 val_193 +321 val_321 +335 val_335 +104 val_104 +466 val_466 +366 val_366 +175 val_175 +403 val_403 +483 val_483 +53 val_53 +105 val_105 +257 val_257 +406 val_406 +409 val_409 +190 val_190 +406 val_406 +401 val_401 +114 val_114 +258 val_258 +90 val_90 +203 val_203 +262 val_262 +348 val_348 +424 val_424 +12 val_12 +396 val_396 +201 val_201 +217 val_217 +164 val_164 +431 val_431 +454 val_454 +478 val_478 +298 val_298 +125 val_125 +431 val_431 +164 val_164 +424 val_424 +187 val_187 +382 val_382 +5 val_5 +70 val_70 +397 val_397 +480 val_480 +291 val_291 +24 val_24 +351 val_351 +255 val_255 +104 val_104 +70 val_70 +163 val_163 +438 val_438 +119 val_119 +414 val_414 +200 val_200 +491 val_491 +237 val_237 +439 val_439 +360 val_360 +248 val_248 +479 val_479 +305 val_305 +417 val_417 +199 val_199 +444 val_444 +120 val_120 +429 val_429 +169 val_169 +443 val_443 +323 val_323 +325 val_325 +277 val_277 +230 val_230 +478 val_478 +178 val_178 +468 val_468 +310 val_310 +317 val_317 +333 val_333 +493 val_493 +460 val_460 +207 val_207 +249 val_249 +265 val_265 +480 val_480 +83 val_83 +136 val_136 +353 val_353 +172 val_172 +214 val_214 +462 val_462 +233 val_233 +406 val_406 +133 val_133 +175 val_175 +189 val_189 +454 val_454 +375 val_375 +401 val_401 +421 val_421 +407 val_407 +384 val_384 +256 val_256 +26 val_26 +134 val_134 +67 val_67 +384 val_384 +379 val_379 +18 val_18 +462 val_462 +492 val_492 +100 val_100 +298 val_298 +9 val_9 +341 val_341 +498 val_498 +146 val_146 +458 val_458 +362 val_362 +186 val_186 +285 val_285 +348 val_348 +167 val_167 +18 val_18 +273 val_273 +183 val_183 +281 val_281 +344 val_344 +97 val_97 +469 val_469 +315 val_315 +84 val_84 +28 val_28 +37 val_37 +448 val_448 +152 val_152 +348 val_348 +307 val_307 +194 val_194 +414 val_414 +477 val_477 +222 val_222 +126 val_126 +90 val_90 +169 val_169 +403 val_403 +400 val_400 +200 val_200 +97 val_97 +238 val_238 +86 val_86 +311 val_311 +27 val_27 +165 val_165 +409 val_409 +255 val_255 +278 val_278 +98 val_98 +484 val_484 +265 val_265 +193 val_193 +401 val_401 +150 val_150 +273 val_273 +224 val_224 +369 val_369 +66 val_66 +128 val_128 +213 val_213 +146 val_146 +406 val_406 +429 val_429 +374 val_374 +152 val_152 +469 val_469 +145 val_145 +495 val_495 +37 val_37 +327 val_327 +281 val_281 +277 val_277 +209 val_209 +15 val_15 +82 val_82 +403 val_403 +166 val_166 +417 val_417 +430 val_430 +252 val_252 +292 val_292 +219 val_219 +287 val_287 +153 val_153 +193 val_193 +338 val_338 +446 val_446 +459 val_459 +394 val_394 +237 val_237 +482 val_482 +174 val_174 +413 val_413 +494 val_494 +207 val_207 +199 val_199 +466 val_466 +208 val_208 +174 val_174 +399 val_399 +396 val_396 +247 val_247 +417 val_417 +489 val_489 +162 val_162 +377 val_377 +397 val_397 +309 val_309 +365 val_365 +266 val_266 +439 val_439 +342 val_342 +367 val_367 +325 val_325 +167 val_167 +195 val_195 +475 val_475 +17 val_17 +113 val_113 +155 val_155 +203 val_203 +339 val_339 +0 val_0 +455 val_455 +128 val_128 +311 val_311 +316 val_316 +57 val_57 +302 val_302 +205 val_205 +149 val_149 +438 val_438 +345 val_345 +129 val_129 +170 val_170 +20 val_20 +489 val_489 +157 val_157 +378 val_378 +221 val_221 +92 val_92 +111 val_111 +47 val_47 +72 val_72 +4 val_4 +280 val_280 +35 val_35 +427 val_427 +277 val_277 +208 val_208 +356 val_356 +399 val_399 +169 val_169 +382 val_382 +498 val_498 +125 val_125 +386 val_386 +437 val_437 +469 val_469 +192 val_192 +286 val_286 +187 val_187 +176 val_176 +54 val_54 +459 val_459 +51 val_51 +138 val_138 +103 val_103 +239 val_239 +213 val_213 +216 val_216 +430 val_430 +278 val_278 +176 val_176 +289 val_289 +221 val_221 +65 val_65 +318 val_318 +332 val_332 +311 val_311 +275 val_275 +137 val_137 +241 val_241 +83 val_83 +333 val_333 +180 val_180 +284 val_284 +12 val_12 +230 val_230 +181 val_181 +67 val_67 +260 val_260 +404 val_404 +384 val_384 +489 val_489 +353 val_353 +373 val_373 +272 val_272 +138 val_138 +217 val_217 +84 val_84 +348 val_348 +466 val_466 +58 val_58 +8 val_8 +411 val_411 +230 val_230 +208 val_208 +348 val_348 +24 val_24 +463 val_463 +431 val_431 +179 val_179 +172 val_172 +42 val_42 +129 val_129 +158 val_158 +119 val_119 +496 val_496 +0 val_0 +322 val_322 +197 val_197 +468 val_468 +393 val_393 +454 val_454 +100 val_100 +298 val_298 +199 val_199 +191 val_191 +418 val_418 +96 val_96 +26 val_26 +165 val_165 +327 val_327 +230 val_230 +205 val_205 +120 val_120 +131 val_131 +51 val_51 +404 val_404 +43 val_43 +436 val_436 +156 val_156 +469 val_469 +468 val_468 +308 val_308 +95 val_95 +196 val_196 +288 val_288 +481 val_481 +457 val_457 +98 val_98 +282 val_282 +197 val_197 +187 val_187 +318 val_318 +318 val_318 +409 val_409 +470 val_470 +137 val_137 +369 val_369 +316 val_316 +169 val_169 +413 val_413 +85 val_85 +77 val_77 +0 val_0 +490 val_490 +87 val_87 +364 val_364 +179 val_179 +118 val_118 +134 val_134 +395 val_395 +282 val_282 +138 val_138 +238 val_238 +419 val_419 +15 val_15 +118 val_118 +72 val_72 +90 val_90 +307 val_307 +19 val_19 +435 val_435 +10 val_10 +277 val_277 +273 val_273 +306 val_306 +224 val_224 +309 val_309 +389 val_389 +327 val_327 +242 val_242 +369 val_369 +392 val_392 +272 val_272 +331 val_331 +401 val_401 +242 val_242 +452 val_452 +177 val_177 +226 val_226 +5 val_5 +497 val_497 +402 val_402 +396 val_396 +317 val_317 +395 val_395 +58 val_58 +35 val_35 +336 val_336 +95 val_95 +11 val_11 +168 val_168 +34 val_34 +229 val_229 +233 val_233 +143 val_143 +472 val_472 +322 val_322 +498 val_498 +160 val_160 +195 val_195 +42 val_42 +321 val_321 +430 val_430 +119 val_119 +489 val_489 +458 val_458 +78 val_78 +76 val_76 +41 val_41 +223 val_223 +492 val_492 +149 val_149 +449 val_449 +218 val_218 +228 val_228 +138 val_138 +453 val_453 +30 val_30 +209 val_209 +64 val_64 +468 val_468 +76 val_76 +74 val_74 +342 val_342 +69 val_69 +230 val_230 +33 val_33 +368 val_368 +103 val_103 +296 val_296 +113 val_113 +216 val_216 +367 val_367 +344 val_344 +167 val_167 +274 val_274 +219 val_219 +239 val_239 +485 val_485 +116 val_116 +223 val_223 +256 val_256 +263 val_263 +70 val_70 +487 val_487 +480 val_480 +401 val_401 +288 val_288 +191 val_191 +5 val_5 +244 val_244 +438 val_438 +128 val_128 +467 val_467 +432 val_432 +202 val_202 +316 val_316 +229 val_229 +469 val_469 +463 val_463 +280 val_280 +2 val_2 +35 val_35 +283 val_283 +331 val_331 +235 val_235 +80 val_80 +44 val_44 +193 val_193 +321 val_321 +335 val_335 +104 val_104 +466 val_466 +366 val_366 +175 val_175 +403 val_403 +483 val_483 +53 val_53 +105 val_105 +257 val_257 +406 val_406 +409 val_409 +190 val_190 +406 val_406 +401 val_401 +114 val_114 +258 val_258 +90 val_90 +203 val_203 +262 val_262 +348 val_348 +424 val_424 +12 val_12 +396 val_396 +201 val_201 +217 val_217 +164 val_164 +431 val_431 +454 val_454 +478 val_478 +298 val_298 +125 val_125 +431 val_431 +164 val_164 +424 val_424 +187 val_187 +382 val_382 +5 val_5 +70 val_70 +397 val_397 +480 val_480 +291 val_291 +24 val_24 +351 val_351 +255 val_255 +104 val_104 +70 val_70 +163 val_163 +438 val_438 +119 val_119 +414 val_414 +200 val_200 +491 val_491 +237 val_237 +439 val_439 +360 val_360 +248 val_248 +479 val_479 +305 val_305 +417 val_417 +199 val_199 +444 val_444 +120 val_120 +429 val_429 +169 val_169 +443 val_443 +323 val_323 +325 val_325 +277 val_277 +230 val_230 +478 val_478 +178 val_178 +468 val_468 +310 val_310 +317 val_317 +333 val_333 +493 val_493 +460 val_460 +207 val_207 +249 val_249 +265 val_265 +480 val_480 +83 val_83 +136 val_136 +353 val_353 +172 val_172 +214 val_214 +462 val_462 +233 val_233 +406 val_406 +133 val_133 +175 val_175 +189 val_189 +454 val_454 +375 val_375 +401 val_401 +421 val_421 +407 val_407 +384 val_384 +256 val_256 +26 val_26 +134 val_134 +67 val_67 +384 val_384 +379 val_379 +18 val_18 +462 val_462 +492 val_492 +100 val_100 +298 val_298 +9 val_9 +341 val_341 +498 val_498 +146 val_146 +458 val_458 +362 val_362 +186 val_186 +285 val_285 +348 val_348 +167 val_167 +18 val_18 +273 val_273 +183 val_183 +281 val_281 +344 val_344 +97 val_97 +469 val_469 +315 val_315 +84 val_84 +28 val_28 +37 val_37 +448 val_448 +152 val_152 +348 val_348 +307 val_307 +194 val_194 +414 val_414 +477 val_477 +222 val_222 +126 val_126 +90 val_90 +169 val_169 +403 val_403 +400 val_400 +200 val_200 +97 val_97 +238 val_238 +86 val_86 +311 val_311 +27 val_27 +165 val_165 +409 val_409 +255 val_255 +278 val_278 +98 val_98 +484 val_484 +265 val_265 +193 val_193 +401 val_401 +150 val_150 +273 val_273 +224 val_224 +369 val_369 +66 val_66 +128 val_128 +213 val_213 +146 val_146 +406 val_406 +429 val_429 +374 val_374 +152 val_152 +469 val_469 +145 val_145 +495 val_495 +37 val_37 +327 val_327 +281 val_281 +277 val_277 +209 val_209 +15 val_15 +82 val_82 +403 val_403 +166 val_166 +417 val_417 +430 val_430 +252 val_252 +292 val_292 +219 val_219 +287 val_287 +153 val_153 +193 val_193 +338 val_338 +446 val_446 +459 val_459 +394 val_394 +237 val_237 +482 val_482 +174 val_174 +413 val_413 +494 val_494 +207 val_207 +199 val_199 +466 val_466 +208 val_208 +174 val_174 +399 val_399 +396 val_396 +247 val_247 +417 val_417 +489 val_489 +162 val_162 +377 val_377 +397 val_397 +309 val_309 +365 val_365 +266 val_266 +439 val_439 +342 val_342 +367 val_367 +325 val_325 +167 val_167 +195 val_195 +475 val_475 +17 val_17 +113 val_113 +155 val_155 +203 val_203 +339 val_339 +0 val_0 +455 val_455 +128 val_128 +311 val_311 +316 val_316 +57 val_57 +302 val_302 +205 val_205 +149 val_149 +438 val_438 +345 val_345 +129 val_129 +170 val_170 +20 val_20 +489 val_489 +157 val_157 +378 val_378 +221 val_221 +92 val_92 +111 val_111 +47 val_47 +72 val_72 +4 val_4 +280 val_280 +35 val_35 +427 val_427 +277 val_277 +208 val_208 +356 val_356 +399 val_399 +169 val_169 +382 val_382 +498 val_498 +125 val_125 +386 val_386 +437 val_437 +469 val_469 +192 val_192 +286 val_286 +187 val_187 +176 val_176 +54 val_54 +459 val_459 +51 val_51 +138 val_138 +103 val_103 +239 val_239 +213 val_213 +216 val_216 +430 val_430 +278 val_278 +176 val_176 +289 val_289 +221 val_221 +65 val_65 +318 val_318 +332 val_332 +311 val_311 +275 val_275 +137 val_137 +241 val_241 +83 val_83 +333 val_333 +180 val_180 +284 val_284 +12 val_12 +230 val_230 +181 val_181 +67 val_67 +260 val_260 +404 val_404 +384 val_384 +489 val_489 +353 val_353 +373 val_373 +272 val_272 +138 val_138 +217 val_217 +84 val_84 +348 val_348 +466 val_466 +58 val_58 +8 val_8 +411 val_411 +230 val_230 +208 val_208 +348 val_348 +24 val_24 +463 val_463 +431 val_431 +179 val_179 +172 val_172 +42 val_42 +129 val_129 +158 val_158 +119 val_119 +496 val_496 +0 val_0 +322 val_322 +197 val_197 +468 val_468 +393 val_393 +454 val_454 +100 val_100 +298 val_298 +199 val_199 +191 val_191 +418 val_418 +96 val_96 +26 val_26 +165 val_165 +327 val_327 +230 val_230 +205 val_205 +120 val_120 +131 val_131 +51 val_51 +404 val_404 +43 val_43 +436 val_436 +156 val_156 +469 val_469 +468 val_468 +308 val_308 +95 val_95 +196 val_196 +288 val_288 +481 val_481 +457 val_457 +98 val_98 +282 val_282 +197 val_197 +187 val_187 +318 val_318 +318 val_318 +409 val_409 +470 val_470 +137 val_137 +369 val_369 +316 val_316 +169 val_169 +413 val_413 +85 val_85 +77 val_77 +0 val_0 +490 val_490 +87 val_87 +364 val_364 +179 val_179 +118 val_118 +134 val_134 +395 val_395 +282 val_282 +138 val_138 +238 val_238 +419 val_419 +15 val_15 +118 val_118 +72 val_72 +90 val_90 +307 val_307 +19 val_19 +435 val_435 +10 val_10 +277 val_277 +273 val_273 +306 val_306 +224 val_224 +309 val_309 +389 val_389 +327 val_327 +242 val_242 +369 val_369 +392 val_392 +272 val_272 +331 val_331 +401 val_401 +242 val_242 +452 val_452 +177 val_177 +226 val_226 +5 val_5 +497 val_497 +402 val_402 +396 val_396 +317 val_317 +395 val_395 +58 val_58 +35 val_35 +336 val_336 +95 val_95 +11 val_11 +168 val_168 +34 val_34 +229 val_229 +233 val_233 +143 val_143 +472 val_472 +322 val_322 +498 val_498 +160 val_160 +195 val_195 +42 val_42 +321 val_321 +430 val_430 +119 val_119 +489 val_489 +458 val_458 +78 val_78 +76 val_76 +41 val_41 +223 val_223 +492 val_492 +149 val_149 +449 val_449 +218 val_218 +228 val_228 +138 val_138 +453 val_453 +30 val_30 +209 val_209 +64 val_64 +468 val_468 +76 val_76 +74 val_74 +342 val_342 +69 val_69 +230 val_230 +33 val_33 +368 val_368 +103 val_103 +296 val_296 +113 val_113 +216 val_216 +367 val_367 +344 val_344 +167 val_167 +274 val_274 +219 val_219 +239 val_239 +485 val_485 +116 val_116 +223 val_223 +256 val_256 +263 val_263 +70 val_70 +487 val_487 +480 val_480 +401 val_401 +288 val_288 +191 val_191 +5 val_5 +244 val_244 +438 val_438 +128 val_128 +467 val_467 +432 val_432 +202 val_202 +316 val_316 +229 val_229 +469 val_469 +463 val_463 +280 val_280 +2 val_2 +35 val_35 +283 val_283 +331 val_331 +235 val_235 +80 val_80 +44 val_44 +193 val_193 +321 val_321 +335 val_335 +104 val_104 +466 val_466 +366 val_366 +175 val_175 +403 val_403 +483 val_483 +53 val_53 +105 val_105 +257 val_257 +406 val_406 +409 val_409 +190 val_190 +406 val_406 +401 val_401 +114 val_114 +258 val_258 +90 val_90 +203 val_203 +262 val_262 +348 val_348 +424 val_424 +12 val_12 +396 val_396 +201 val_201 +217 val_217 +164 val_164 +431 val_431 +454 val_454 +478 val_478 +298 val_298 +125 val_125 +431 val_431 +164 val_164 +424 val_424 +187 val_187 +382 val_382 +5 val_5 +70 val_70 +397 val_397 +480 val_480 +291 val_291 +24 val_24 +351 val_351 +255 val_255 +104 val_104 +70 val_70 +163 val_163 +438 val_438 +119 val_119 +414 val_414 +200 val_200 +491 val_491 +237 val_237 +439 val_439 +360 val_360 +248 val_248 +479 val_479 +305 val_305 +417 val_417 +199 val_199 +444 val_444 +120 val_120 +429 val_429 +169 val_169 +443 val_443 +323 val_323 +325 val_325 +277 val_277 +230 val_230 +478 val_478 +178 val_178 +468 val_468 +310 val_310 +317 val_317 +333 val_333 +493 val_493 +460 val_460 +207 val_207 +249 val_249 +265 val_265 +480 val_480 +83 val_83 +136 val_136 +353 val_353 +172 val_172 +214 val_214 +462 val_462 +233 val_233 +406 val_406 +133 val_133 +175 val_175 +189 val_189 +454 val_454 +375 val_375 +401 val_401 +421 val_421 +407 val_407 +384 val_384 +256 val_256 +26 val_26 +134 val_134 +67 val_67 +384 val_384 +379 val_379 +18 val_18 +462 val_462 +492 val_492 +100 val_100 +298 val_298 +9 val_9 +341 val_341 +498 val_498 +146 val_146 +458 val_458 +362 val_362 +186 val_186 +285 val_285 +348 val_348 +167 val_167 +18 val_18 +273 val_273 +183 val_183 +281 val_281 +344 val_344 +97 val_97 +469 val_469 +315 val_315 +84 val_84 +28 val_28 +37 val_37 +448 val_448 +152 val_152 +348 val_348 +307 val_307 +194 val_194 +414 val_414 +477 val_477 +222 val_222 +126 val_126 +90 val_90 +169 val_169 +403 val_403 +400 val_400 +200 val_200 +97 val_97 diff --git ql/src/test/results/clientpositive/tez/union9.q.out ql/src/test/results/clientpositive/tez/union9.q.out new file mode 100644 index 0000000..5637eae --- /dev/null +++ ql/src/test/results/clientpositive/tez/union9.q.out @@ -0,0 +1,110 @@ +PREHOOK: query: -- union case: all subqueries are a map-only jobs, 3 way union, same input for all sub-queries, followed by reducesink + +explain + select count(1) FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2 UNION ALL + select s3.key as key, s3.value as value from src s3) unionsrc +PREHOOK: type: QUERY +POSTHOOK: query: -- union case: all subqueries are a map-only jobs, 3 way union, same input for all sub-queries, followed by reducesink + +explain + select count(1) FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2 UNION ALL + select s3.key as key, s3.value as value from src s3) unionsrc +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Union 2 (CONTAINS) + Map 4 <- Union 2 (CONTAINS) + Map 5 <- Union 2 (CONTAINS) + Reducer 3 <- Union 2 (SIMPLE_EDGE) + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s1 + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Select Operator + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + value expressions: _col0 (type: bigint) + Map 4 + Map Operator Tree: + TableScan + alias: s2 + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Select Operator + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + value expressions: _col0 (type: bigint) + Map 5 + Map Operator Tree: + TableScan + alias: s3 + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Select Operator + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + value expressions: _col0 (type: bigint) + Reducer 3 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Union 2 + Vertex: Union 2 + + Stage: Stage-0 + Fetch Operator + limit: -1 + +PREHOOK: query: select count(1) FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2 UNION ALL + select s3.key as key, s3.value as value from src s3) unionsrc +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(1) FROM (select s1.key as key, s1.value as value from src s1 UNION ALL + select s2.key as key, s2.value as value from src s2 UNION ALL + select s3.key as key, s3.value as value from src s3) unionsrc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1500