diff --git itests/qtest/pom.xml itests/qtest/pom.xml
index cfd2e5e..fd3c0d9 100644
--- itests/qtest/pom.xml
+++ itests/qtest/pom.xml
@@ -39,7 +39,7 @@
stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q
cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q
tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q
- join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q
+ join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,union9.q
add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 3972a9c..0ead289 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -776,6 +776,14 @@ public Path read(Kryo kryo, Input input, Class type) {
}
}
+ public static Set> cloneOperatorTree(Configuration conf, Set> roots) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ serializePlan(roots, baos, conf, true);
+ Set> result = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+ roots.getClass(), conf, true);
+ return result;
+ }
+
private static void serializePlan(Object plan, OutputStream out, Configuration conf, boolean cloningPlan) {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 77c0c46..a4d18b1 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.exec.tez.tools.TezMergedLogicalInput;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -82,10 +83,12 @@
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -94,6 +97,8 @@
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
@@ -185,9 +190,56 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
}
/**
+ * Given a Vertex group and a vertex createEdge will create an
+ * Edge between them.
+ *
+ * @param group The parent VertexGroup
+ * @param wConf The job conf of the child vertex
+ * @param w The child vertex
+ * @param edgeType the type of connection between the two
+ * endpoints.
+ */
+ public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
+ Vertex w, EdgeType edgeType)
+ throws IOException {
+
+ Class mergeInputClass;
+
+ LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getVertexName());
+ w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+
+ switch (edgeType) {
+ case BROADCAST_EDGE:
+ mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+ break;
+
+ case SIMPLE_EDGE:
+ default:
+ mergeInputClass = TezMergedLogicalInput.class;
+ break;
+ }
+
+ return new GroupInputEdge(group, w, createEdgeProperty(edgeType),
+ new InputDescriptor(mergeInputClass.getName()));
+ }
+
+ /**
+ * Given two vertices a, b update their configurations to be used in an Edge a-b
+ */
+ public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w)
+ throws IOException {
+
+ // Tez needs to setup output subsequent input pairs correctly
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
+
+ // update payloads (configuration for the vertices might have changed)
+ v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
+ w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+ }
+
+ /**
* Given two vertices and their respective configuration objects createEdge
- * will create an Edge object that connects the two. Currently the edge will
- * always be a stable bi-partite edge.
+ * will create an Edge object that connects the two.
*
* @param vConf JobConf of the first vertex
* @param v The first vertex (source)
@@ -199,13 +251,15 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
EdgeType edgeType)
throws IOException {
- // Tez needs to setup output subsequent input pairs correctly
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
+ updateConfigurationForEdge(vConf, v, wConf, w);
- // update payloads (configuration for the vertices might have changed)
- v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
- w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+ return new Edge(v, w, createEdgeProperty(edgeType));
+ }
+ /*
+ * Helper function to create an edge property from an edge type.
+ */
+ private EdgeProperty createEdgeProperty(EdgeType edgeType) {
DataMovementType dataMovementType;
Class logicalInputClass;
Class logicalOutputClass;
@@ -231,7 +285,8 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
SchedulingType.SEQUENTIAL,
new OutputDescriptor(logicalOutputClass.getName()),
new InputDescriptor(logicalInputClass.getName()));
- return new Edge(v, w, edgeProperty);
+
+ return edgeProperty;
}
/*
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 7c2c2a6..d89f2c7 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -55,7 +55,6 @@
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
/**
* Process input from tez LogicalInput and write output - for a map plan
@@ -184,15 +183,19 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in
@Override
void run() throws IOException{
- List shuffleInputs = getShuffleInputs(inputs);
+ List shuffleInputs = getShuffleInputs(inputs);
KeyValuesReader kvsReader;
- if(shuffleInputs.size() == 1){
- //no merging of inputs required
- kvsReader = shuffleInputs.get(0).getReader();
- }else {
- //get a sort merged input
- kvsReader = new InputMerger(shuffleInputs);
+ try {
+ if(shuffleInputs.size() == 1){
+ //no merging of inputs required
+ kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader();
+ }else {
+ //get a sort merged input
+ kvsReader = new InputMerger(shuffleInputs);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
}
while(kvsReader.next()){
@@ -211,12 +214,12 @@ void run() throws IOException{
* @param inputs
* @return
*/
- private List getShuffleInputs(Map inputs) {
+ private List getShuffleInputs(Map inputs) {
//the reduce plan inputs have tags, add all inputs that have tags
Map tag2input = redWork.getTagToInput();
- ArrayList shuffleInputs = new ArrayList();
+ ArrayList shuffleInputs = new ArrayList();
for(String inpStr : tag2input.values()){
- shuffleInputs.add((ShuffledMergedInput)inputs.get(inpStr));
+ shuffleInputs.add((LogicalInput)inputs.get(inpStr));
}
return shuffleInputs;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 792e9a4..7e225b6 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -42,6 +43,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -52,9 +54,11 @@
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.StatusGetOpts;
@@ -206,24 +210,68 @@ DAG build(JobConf conf, TezWork work, Path scratchDir,
// translate work to vertex
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
- JobConf wxConf = utils.initializeVertexConf(conf, w);
- Vertex wx = utils.createVertex(wxConf, w, tezDir,
- appJarLr, additionalLr, fs, ctx, !isFinal);
- dag.addVertex(wx);
- utils.addCredentials(w, dag);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
- workToVertex.put(w, wx);
- workToConf.put(w, wxConf);
-
- // add all dependencies (i.e.: edges) to the graph
- for (BaseWork v: work.getChildren(w)) {
- assert workToVertex.containsKey(v);
- Edge e = null;
-
- EdgeType edgeType = work.getEdgeProperty(w, v);
-
- e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType);
- dag.addEdge(e);
+
+ if (w instanceof UnionWork) {
+ // Special case for unions. These items translate to VertexGroups
+
+ List unionWorkItems = new LinkedList();
+ List children = new LinkedList();
+
+ // split the children into vertices that make up the union and vertices that are
+ // proper children of the union
+ for (BaseWork v: work.getChildren(w)) {
+ EdgeType type = work.getEdgeProperty(w, v);
+ if (type == EdgeType.CONTAINS) {
+ unionWorkItems.add(v);
+ } else {
+ children.add(v);
+ }
+ }
+
+ // create VertexGroup
+ Vertex[] vertexArray = new Vertex[unionWorkItems.size()];
+
+ int i = 0;
+ for (BaseWork v: unionWorkItems) {
+ vertexArray[i++] = workToVertex.get(v);
+ }
+ VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray);
+
+ // now hook up the children
+ for (BaseWork v: children) {
+ // need to pairwise patch up the configuration of the vertices
+ for (BaseWork part: unionWorkItems) {
+ utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part),
+ workToConf.get(v), workToVertex.get(v));
+ }
+
+ // finally we can create the grouped edge
+ GroupInputEdge e = utils.createEdge(group, workToConf.get(v),
+ workToVertex.get(v), work.getEdgeProperty(w, v));
+
+ dag.addEdge(e);
+ }
+ } else {
+ // Regular vertices
+ JobConf wxConf = utils.initializeVertexConf(conf, w);
+ Vertex wx = utils.createVertex(wxConf, w, tezDir, appJarLr,
+ additionalLr, fs, ctx, !isFinal);
+ dag.addVertex(wx);
+ utils.addCredentials(w, dag);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
+ workToVertex.put(w, wx);
+ workToConf.put(w, wxConf);
+
+ // add all dependencies (i.e.: edges) to the graph
+ for (BaseWork v: work.getChildren(w)) {
+ assert workToVertex.containsKey(v);
+ Edge e = null;
+
+ EdgeType edgeType = work.getEdgeProperty(w, v);
+
+ e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType);
+ dag.addEdge(e);
+ }
}
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
index e5746c4..726e122 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
@@ -26,12 +26,13 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor;
import org.apache.hadoop.io.BinaryComparable;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
/**
* A KeyValuesReader implementation that returns a sorted stream of key-values
- * by doing a sorted merge of the key-value in ShuffledMergedInputs.
+ * by doing a sorted merge of the key-value in LogicalInputs.
* Tags are in the last byte of the key, so no special handling for tags is required.
* Uses a priority queue to pick the KeyValuesReader of the input that is next in
* sort order.
@@ -42,12 +43,12 @@
private PriorityQueue pQueue = null;
private KeyValuesReader nextKVReader = null;
- public InputMerger(List shuffleInputs) throws IOException {
- //get KeyValuesReaders from the ShuffledMergedInput and add them to priority queue
+ public InputMerger(List extends Input> shuffleInputs) throws Exception {
+ //get KeyValuesReaders from the LogicalInput and add them to priority queue
int initialCapacity = shuffleInputs.size();
pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator());
- for(ShuffledMergedInput input : shuffleInputs){
- addToQueue(input.getReader());
+ for(Input input : shuffleInputs){
+ addToQueue((KeyValuesReader)input.getReader());
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
new file mode 100644
index 0000000..413eb7b
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez.tools;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.Reader;
+
+/**
+ * TezMergedLogicalInput is an adapter to make union input look like
+ * a single input in tez.
+ */
+public class TezMergedLogicalInput extends MergedLogicalInput {
+
+ @Override
+ public Reader getReader() throws Exception {
+ return new InputMerger(getInputs());
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
index f4b6016..6ec2be0 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
@@ -20,6 +20,8 @@
import java.io.Serializable;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,6 +30,7 @@
import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -105,6 +108,11 @@
// used to group dependent tasks for multi table inserts
public final DependencyCollectionTask dependencyTask;
+ // used to hook up unions
+ public final Map, BaseWork> unionWorkMap;
+ public final List currentUnionOperators;
+ public final Set workWithUnionOperators;
+
@SuppressWarnings("unchecked")
public GenTezProcContext(HiveConf conf, ParseContext parseContext,
List> moveTask, List> rootTasks,
@@ -126,6 +134,9 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext,
this.linkChildOpWithDummyOp = new HashMap, List>>();
this.dependencyTask = (DependencyCollectionTask)
TaskFactory.get(new DependencyCollectionWork(), conf);
+ this.unionWorkMap = new HashMap, BaseWork>();
+ this.currentUnionOperators = new LinkedList();
+ this.workWithUnionOperators = new HashSet();
rootTasks.add(currentTask);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 8363bbf..f415e6f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -19,15 +19,26 @@
package org.apache.hadoop.hive.ql.parse;
import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -37,6 +48,7 @@
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
/**
@@ -99,6 +111,43 @@ public Object process(Node nd, Stack stack,
context.rootToWorkMap.put(root, work);
}
+ // This is where we cut the tree as described above. We also remember that
+ // we might have to connect parent work with this work later.
+ for (Operator> parent: new ArrayList>(root.getParentOperators())) {
+ context.leafOperatorToFollowingWork.put(parent, work);
+ LOG.debug("Removing " + parent + " as parent from " + root);
+ root.removeParent(parent);
+ }
+
+ if (!context.currentUnionOperators.isEmpty()) {
+ // if there are union all operators we need to add the work to the set
+ // of union operators.
+
+ UnionWork unionWork;
+ if (context.unionWorkMap.containsKey(operator)) {
+ // we've seen this terminal before and have created a union work object.
+ // just need to add this work to it. There will be no children of this one
+ // since we've passed this operator before.
+ assert operator.getChildOperators().isEmpty();
+ unionWork = (UnionWork) context.unionWorkMap.get(operator);
+
+ } else {
+ // first time through. we need to create a union work object and add this
+ // work to it. Subsequent work should reference the union and not the actual
+ // work.
+ unionWork = new UnionWork("Union "+ (++sequenceNumber));
+ context.unionWorkMap.put(operator, unionWork);
+ tezWork.add(unionWork);
+ }
+
+ // finally hook everything up
+ tezWork.connect(unionWork, work, EdgeType.CONTAINS);
+ unionWork.addUnionOperators(context.currentUnionOperators);
+ context.currentUnionOperators.clear();
+ context.workWithUnionOperators.add(work);
+ work = unionWork;
+ }
+
// We're scanning a tree from roots to leaf (this is not technically
// correct, demux and mux operators might form a diamond shape, but
// we will only scan one path and ignore the others, because the
@@ -127,16 +176,10 @@ public Object process(Node nd, Stack stack,
// remember the output name of the reduce sink
rs.getConf().setOutputName(rWork.getName());
- // add dependency between the two work items
- tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE);
- }
-
- // This is where we cut the tree as described above. We also remember that
- // we might have to connect parent work with this work later.
- for (Operator> parent: new ArrayList>(root.getParentOperators())) {
- context.leafOperatorToFollowingWork.put(parent, work);
- LOG.debug("Removing " + parent + " as parent from " + root);
- root.removeParent(parent);
+ if (!context.unionWorkMap.containsKey(operator)) {
+ // add dependency between the two work items
+ tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE);
+ }
}
// No children means we're at the bottom. If there are more operators to scan
@@ -175,7 +218,7 @@ public Object process(Node nd, Stack stack,
for (BaseWork parentWork : linkWorkList) {
tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
- // need to set up output name for reduce sink not that we know the name
+ // need to set up output name for reduce sink now that we know the name
// of the downstream work
for (ReduceSinkOperator r:
context.linkWorkWithReduceSinkMap.get(parentWork)) {
@@ -188,7 +231,7 @@ public Object process(Node nd, Stack stack,
}
protected ReduceWork createReduceWork(GenTezProcContext context, Operator> root,
- TezWork tezWork) {
+ TezWork tezWork) throws SemanticException {
assert !root.getParentOperators().isEmpty();
ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
@@ -256,4 +299,63 @@ protected void setupMapWork(MapWork mapWork, GenTezProcContext context,
GenMapRedUtils.setMapWork(mapWork, context.parseContext,
context.inputs, null, root, alias, context.conf, false);
}
+
+ public void removeUnionOperators(Configuration conf, BaseWork work)
+ throws SemanticException {
+
+ Set> roots = work.getAllRootOperators();
+
+ // need to clone the plan.
+ Set> newRoots = Utilities.cloneOperatorTree(conf, roots);
+
+ Map, Operator>> replacementMap = new HashMap, Operator>>();
+
+ Iterator> it = newRoots.iterator();
+ for (Operator> orig: roots) {
+ replacementMap.put(orig,it.next());
+ }
+
+ // now we remove all the unions. we throw away any branch that's not reachable from
+ // the current set of roots. The reason is that those branches will be handled in
+ // different tasks.
+ Deque> operators = new LinkedList>();
+ operators.addAll(newRoots);
+
+ Set> seen = new HashSet>();
+
+ while(!operators.isEmpty()) {
+ Operator> current = operators.pop();
+ seen.add(current);
+
+ if (current instanceof UnionOperator) {
+ Operator> parent = null;
+ int count = 0;
+
+ for (Operator> op: current.getParentOperators()) {
+ if (seen.contains(op)) {
+ ++count;
+ parent = op;
+ }
+ }
+
+ // we should have been able to reach the union from only one side.
+ assert count <= 1;
+
+ if (parent == null) {
+ // root operator is union (can happen in reducers)
+ replacementMap.put(current, current.getChildOperators().get(0));
+ } else {
+ parent.removeChildAndAdoptItsChildren(current);
+ }
+ }
+
+ if (current instanceof FileSinkOperator
+ || current instanceof ReduceSinkOperator) {
+ current.setChildOperators(null);
+ } else {
+ operators.addAll(current.getChildOperators());
+ }
+ }
+ work.replaceRoots(replacementMap);
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index dff743f..5f0cfbe 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -131,14 +131,18 @@ protected void generateTaskTree(List> rootTasks, Pa
FileSinkOperator.getOperatorName() + "%"),
new CompositeProcessor(new FileSinkProcessor(), genTezWork));
- opRules.put(new RuleRegExp("Bail on Union",
+ opRules.put(new RuleRegExp("Handle union",
UnionOperator.getOperatorName() + "%"), new NodeProcessor()
{
@Override
public Object process(Node n, Stack s,
NodeProcessorCtx procCtx, Object... os) throws SemanticException {
- throw new SemanticException("Unions not yet supported on Tez."
- +" Please use MR for this query");
+ GenTezProcContext context = (GenTezProcContext) procCtx;
+ UnionOperator union = (UnionOperator) n;
+
+ // simply need to remember that we've seen a union.
+ context.currentUnionOperators.add(union);
+ return null;
}
});
@@ -149,20 +153,26 @@ public Object process(Node n, Stack s,
topNodes.addAll(pCtx.getTopOps().values());
GraphWalker ogw = new GenTezWorkWalker(disp, procCtx);
ogw.startWalking(topNodes, null);
+
+ // we need to clone some operator plans and remove union operators still
+ for (BaseWork w: procCtx.workWithUnionOperators) {
+ genTezWork.removeUnionOperators(conf, w);
+ }
}
@Override
protected void setInputFormat(Task extends Serializable> task) {
if (task instanceof TezTask) {
TezWork work = ((TezTask)task).getWork();
- Set roots = work.getRoots();
- for (BaseWork w: roots) {
- assert w instanceof MapWork;
- MapWork mapWork = (MapWork)w;
- HashMap> opMap = mapWork.getAliasToWork();
- if (!opMap.isEmpty()) {
- for (Operator extends OperatorDesc> op : opMap.values()) {
- setInputFormat(mapWork, op);
+ List all = work.getAllWork();
+ for (BaseWork w: all) {
+ if (w instanceof MapWork) {
+ MapWork mapWork = (MapWork) w;
+ HashMap> opMap = mapWork.getAliasToWork();
+ if (!opMap.isEmpty()) {
+ for (Operator extends OperatorDesc> op : opMap.values()) {
+ setInputFormat(mapWork, op);
+ }
}
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index eb85446..38c4c11 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -22,6 +22,7 @@
import java.util.LinkedList;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.Stack;
@@ -82,7 +83,9 @@ public void addDummyOp(HashTableDummyOperator dummyOp) {
dummyOps.add(dummyOp);
}
- protected abstract Set> getAllRootOperators();
+ public abstract void replaceRoots(Map, Operator>> replacementMap);
+
+ public abstract Set> getAllRootOperators();
public Set> getAllOperators() {
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 19b553f..e1cc3f4 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -305,6 +305,17 @@ public String getVectorModeOn() {
}
@Override
+ public void replaceRoots(Map, Operator>> replacementMap) {
+ LinkedHashMap> newAliasToWork = new LinkedHashMap>();
+
+ for (Map.Entry> entry: aliasToWork.entrySet()) {
+ newAliasToWork.put(entry.getKey(), replacementMap.get(entry.getValue()));
+ }
+
+ setAliasToWork(newAliasToWork);
+ }
+
+ @Override
@Explain(displayName = "Map Operator Tree")
public Set> getAllRootOperators() {
Set> opSet = new LinkedHashSet>();
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
index afb3648..a68374e 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
@@ -129,7 +129,13 @@ public void setTagToInput(final Map tagToInput) {
}
@Override
- protected Set> getAllRootOperators() {
+ public void replaceRoots(Map, Operator>> replacementMap) {
+ assert replacementMap.size() == 1;
+ setReducer(replacementMap.get(getReducer()));
+ }
+
+ @Override
+ public Set> getAllRootOperators() {
Set> opSet = new LinkedHashSet>();
opSet.add(getReducer());
return opSet;
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
index 9112a77..3b31378 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
@@ -45,7 +45,8 @@
public enum EdgeType {
SIMPLE_EDGE,
- BROADCAST_EDGE
+ BROADCAST_EDGE,
+ CONTAINS
}
private static transient final Log LOG = LogFactory.getLog(TezWork.class);
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java
new file mode 100644
index 0000000..60781e6
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+
+/**
+ * Simple wrapper for union all cases. All contributing work for a union all
+ * is collected here. Downstream work will connect to the union not the individual
+ * work.
+ */
+public class UnionWork extends BaseWork {
+
+ private final Set unionOperators = new HashSet();
+
+ public UnionWork() {
+ super();
+ }
+
+ public UnionWork(String name) {
+ super(name);
+ }
+
+ @Explain(displayName = "Vertex")
+ @Override
+ public String getName() {
+ return super.getName();
+ }
+
+ @Override
+ public void replaceRoots(Map, Operator>> replacementMap) {
+ }
+
+ @Override
+ public Set> getAllRootOperators() {
+ return new HashSet>();
+ }
+
+ public void addUnionOperators(Collection unions) {
+ unionOperators.addAll(unions);
+ }
+
+ public Set getUnionOperators() {
+ return unionOperators;
+ }
+}