diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java index 22fb7f1..cf0b9f0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -501,12 +501,13 @@ public void initializeLocalWork(Configuration hconf) throws HiveException { if (parent == null) { throw new HiveException("No valid parents."); } - Map dummyOps = parent.getTagToOperatorTree(); + Map dummyOps = + ((TezContext) (MapredContext.get())).getDummyOpsMap(); for (Entry connectOp : dummyOps.entrySet()) { if (connectOp.getValue().getChildOperators() == null - || connectOp.getValue().getChildOperators().isEmpty()) { - parentOperators.add(connectOp.getKey(), connectOp.getValue()); - connectOp.getValue().getChildOperators().add(this); + || connectOp.getValue().getChildOperators().isEmpty()) { + parentOperators.add(connectOp.getKey(), connectOp.getValue()); + connectOp.getValue().getChildOperators().add(this); } } super.initializeLocalWork(hconf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index 66e3a77..d5ea96a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -637,11 +637,6 @@ public OperatorType getType() { return null; } - @Override - public Map getTagToOperatorTree() { - return MapRecordProcessor.getConnectOps(); - } - public void initializeContexts() { Path fpath = getExecContext().getCurrentInputPath(); String nominalPath = getNominalPath(fpath); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index a51c352..1b5b609 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -70,6 +70,7 @@ protected String operatorId; private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; + private transient Map dummyOpsMap = null; private static AtomicInteger seqId; @@ -1350,12 +1351,4 @@ public OperatorType getType() { return childOperators; } } - - public Map getTagToOperatorTree() { - if ((parentOperators == null) || (parentOperators.size() == 0)) { - return null; - } - Map dummyOps = parentOperators.get(0).getTagToOperatorTree(); - return dummyOps; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index f2ba3c5..bb819a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -208,6 +208,8 @@ public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; public static final String HIVE_ADDED_JARS = "hive.added.jars"; + public static String MAPNAME = "Map "; + public static String REDUCENAME = "Reducer "; /** * ReduceField: @@ -239,6 +241,7 @@ private Utilities() { private static ThreadLocal> gWorkMap = new ThreadLocal>() { + @Override protected Map initialValue() { return new HashMap(); } @@ -304,12 +307,13 @@ public static ReduceWork getReduceWork(Configuration conf) { public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir, boolean useCache) { for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) { - setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache); + String prefix = baseWork.getName(); + setBaseWork(conf, baseWork, mrScratchDir, prefix + MERGE_PLAN_NAME, useCache); String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); if (prefixes == null) { - prefixes = baseWork.getName(); + prefixes = prefix; } else { - prefixes = prefixes + "," + baseWork.getName(); + prefixes = prefixes + "," + prefix; } conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes); } @@ -429,7 +433,13 @@ private static BaseWork getBaseWork(Configuration conf, String name) { + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; } } else if (name.contains(MERGE_PLAN_NAME)) { - gWork = deserializePlan(in, MapWork.class, conf); + if (name.startsWith(MAPNAME)) { + gWork = deserializePlan(in, MapWork.class, conf); + } else if (name.startsWith(REDUCENAME)) { + gWork = deserializePlan(in, ReduceWork.class, conf); + } else { + throw new RuntimeException("Unknown work type: " + name); + } } gWorkMap.get().put(path, gWork); } else if (LOG.isDebugEnabled()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index d4d1b83..e1a9a42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.serde2.Deserializer; @@ -82,7 +83,7 @@ private boolean abort = false; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; - List mergeWorkList = null; + List mergeWorkList = null; List cacheKeys; ObjectCache cache; @@ -108,28 +109,7 @@ public Object call() { }); Utilities.setMapWork(jconf, mapWork); - String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); - if (prefixes != null) { - mergeWorkList = new ArrayList(); - - for (final String prefix : prefixes.split(",")) { - if (prefix == null || prefix.isEmpty()) { - continue; - } - - key = queryId + prefix; - cacheKeys.add(key); - - mergeWorkList.add( - (MapWork) cache.retrieve(key, - new Callable() { - @Override - public Object call() { - return Utilities.getMergeWork(jconf, prefix); - } - })); - } - } + mergeWorkList = getMergeWorkList(jconf, key, queryId, cache, cacheKeys); } @Override @@ -174,7 +154,8 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep connectOps.clear(); if (mergeWorkList != null) { MapOperator mergeMapOp = null; - for (MapWork mergeMapWork : mergeWorkList) { + for (BaseWork mergeWork : mergeWorkList) { + MapWork mergeMapWork = (MapWork) mergeWork; if (mergeMapWork.getVectorMode()) { mergeMapOp = new VectorMapOperator(); } else { @@ -199,6 +180,8 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep } } + ((TezContext) (MapredContext.get())).setDummyOpsMap(connectOps); + // initialize map operator mapOp.setConf(mapWork); l4j.info("Main input name is " + mapWork.getName()); @@ -356,10 +339,6 @@ void close(){ } } - public static Map getConnectOps() { - return connectOps; - } - private MRInputLegacy getMRInput(Map inputs) throws Exception { // there should be only one MRInput MRInputLegacy theMRInput = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index 258557f..15a2b5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -20,8 +20,13 @@ import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ObjectCache; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.tez.mapreduce.processor.MRTaskReporter; @@ -32,9 +37,12 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.net.URLClassLoader; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.Callable; /** * Process input from tez LogicalInput and write output @@ -110,4 +118,32 @@ protected void createOutputMap() { outMap.put(entry.getKey(), collector); } } + + public List getMergeWorkList(final JobConf jconf, String key, String queryId, + ObjectCache cache, List cacheKeys) throws HiveException { + String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); + if (prefixes != null) { + List mergeWorkList = new ArrayList(); + + for (final String prefix : prefixes.split(",")) { + if (prefix == null || prefix.isEmpty()) { + continue; + } + + key = queryId + prefix; + cacheKeys.add(key); + + mergeWorkList.add((BaseWork) cache.retrieve(key, new Callable() { + @Override + public Object call() { + return Utilities.getMergeWork(jconf, prefix); + } + })); + } + + return mergeWorkList; + } else { + return null; + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 63c63b8..cc8e6d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -18,16 +18,18 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TreeMap; import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; @@ -38,7 +40,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -64,84 +66,124 @@ public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); - private ReduceWork redWork; + private ReduceWork reduceWork; + + List mergeWorkList = null; + List cacheKeys; + + private final Map connectOps = + new TreeMap(); + private final Map tagToReducerMap = new HashMap(); private Operator reducer; private ReduceRecordSource[] sources; - private final byte position = 0; + private byte bigTablePosition = 0; private boolean abort; - @Override - void init(final JobConf jconf, ProcessorContext processorContext, - MRTaskReporter mrReporter, Map inputs, - Map outputs) throws Exception { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); - super.init(jconf, processorContext, mrReporter, inputs, outputs); + public ReduceRecordProcessor(final JobConf jconf) throws Exception { ObjectCache cache = ObjectCacheFactory.getCache(jconf); String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); cacheKey = queryId + REDUCE_PLAN_KEY; - redWork = (ReduceWork) cache.retrieve(cacheKey, new Callable() { + cacheKeys = new ArrayList(); + cacheKeys.add(cacheKey); + reduceWork = (ReduceWork) cache.retrieve(cacheKey, new Callable() { @Override public Object call() { return Utilities.getReduceWork(jconf); - } - }); - Utilities.setReduceWork(jconf, redWork); + } + }); - reducer = redWork.getReducer(); - reducer.getParentOperators().clear(); - reducer.setParentOperators(null); // clear out any parents as reducer is the root + Utilities.setReduceWork(jconf, reduceWork); + mergeWorkList = getMergeWorkList(jconf, cacheKey, queryId, cache, cacheKeys); + } + + @Override + void init(JobConf jconf, ProcessorContext processorContext, + MRTaskReporter mrReporter, Map inputs, + Map outputs) throws Exception { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); + super.init(jconf, processorContext, mrReporter, inputs, outputs); - int numTags = redWork.getTagToValueDesc().size(); + MapredContext.init(false, new JobConf(jconf)); + List shuffleInputs = getShuffleInputs(inputs); + if (shuffleInputs != null) { + l4j.info("Waiting for ShuffleInputs to become ready"); + processorContext.waitForAllInputsReady(new ArrayList(shuffleInputs)); + } - ObjectInspector[] ois = new ObjectInspector[numTags]; - sources = new ReduceRecordSource[numTags]; + connectOps.clear(); + ReduceWork redWork = reduceWork; + tagToReducerMap.put(redWork.getTag(), redWork); + if (mergeWorkList != null) { + for (BaseWork mergeWork : mergeWorkList) { + ReduceWork mergeReduceWork = (ReduceWork) mergeWork; + reducer = mergeReduceWork.getReducer(); + DummyStoreOperator dummyStoreOp = getJoinParentOp(reducer); + connectOps.put(mergeReduceWork.getTag(), dummyStoreOp); + tagToReducerMap.put(mergeReduceWork.getTag(), mergeReduceWork); + } - for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { - TableDesc keyTableDesc = redWork.getKeyDesc(); - TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag); - - // make the reader ready for prime time - Input input = inputs.get(redWork.getTagToInput().get(tag)); - input.start(); - processorContext.waitForAnyInputReady(Collections.singleton(input)); - KeyValuesReader reader = (KeyValuesReader) input.getReader(); - - // now we can setup the record source - sources[tag] = new ReduceRecordSource(); - sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc, - reader, tag == position, (byte) tag, - redWork.getAllScratchColumnVectorTypeMaps()); - ois[tag] = sources[tag].getObjectInspector(); + bigTablePosition = (byte) reduceWork.getTag(); + ((TezContext) MapredContext.get()).setDummyOpsMap(connectOps); } - MapredContext.init(false, new JobConf(jconf)); + ObjectInspector[] mainWorkOIs = null; ((TezContext) MapredContext.get()).setInputs(inputs); ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); - ((TezContext) MapredContext.get()).setRecordSources(sources); + int numTags = reduceWork.getTagToValueDesc().size(); + reducer = reduceWork.getReducer(); + if (numTags > 1) { + sources = new ReduceRecordSource[numTags]; + mainWorkOIs = new ObjectInspector[numTags]; + initializeMultipleSources(reduceWork, numTags, mainWorkOIs, sources); + ((TezContext) MapredContext.get()).setRecordSources(sources); + reducer.initialize(jconf, mainWorkOIs); + } else { + numTags = tagToReducerMap.keySet().size(); + sources = new ReduceRecordSource[numTags]; + mainWorkOIs = new ObjectInspector[numTags]; + for (int i : tagToReducerMap.keySet()) { + redWork = tagToReducerMap.get(i); + reducer = redWork.getReducer(); + initializeSourceForTag(redWork, i, mainWorkOIs, sources, + redWork.getTagToValueDesc().get(0), redWork.getTagToInput().get(0)); + reducer.initializeLocalWork(jconf); + } + reducer = reduceWork.getReducer(); + ((TezContext) MapredContext.get()).setRecordSources(sources); + reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[bigTablePosition] }); + for (int i : tagToReducerMap.keySet()) { + if (i == bigTablePosition) { + continue; + } + redWork = tagToReducerMap.get(i); + reducer = redWork.getReducer(); + reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[i] }); + } + } + reducer = reduceWork.getReducer(); // initialize reduce operator tree try { l4j.info(reducer.dump(0)); - reducer.initialize(jconf, ois); // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the // dummy parent operators as well. List dummyOps = redWork.getDummyOps(); if (dummyOps != null) { - for (Operator dummyOp : dummyOps){ + for (HashTableDummyOperator dummyOp : dummyOps) { dummyOp.initialize(jconf, null); } } // set output collector for any reduce sink operators in the pipeline. - List> children = new LinkedList>(); + List> children = new LinkedList>(); children.add(reducer); if (dummyOps != null) { children.addAll(dummyOps); @@ -165,13 +207,36 @@ public Object call() { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } + private void initializeMultipleSources(ReduceWork redWork, int numTags, ObjectInspector[] ois, + ReduceRecordSource[] sources) throws Exception { + for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { + if (redWork.getTagToValueDesc().get(tag) == null) { + continue; + } + initializeSourceForTag(redWork, tag, ois, sources, redWork.getTagToValueDesc().get(tag), + redWork.getTagToInput().get(tag)); + } + } + + private void initializeSourceForTag(ReduceWork redWork, int tag, ObjectInspector[] ois, + ReduceRecordSource[] sources, TableDesc valueTableDesc, String inputName) + throws Exception { + reducer = redWork.getReducer(); + reducer.getParentOperators().clear(); + reducer.setParentOperators(null); // clear out any parents as reducer is the root + + TableDesc keyTableDesc = redWork.getKeyDesc(); + KeyValuesReader reader = (KeyValuesReader) inputs.get(inputName).getReader(); + + sources[tag] = new ReduceRecordSource(); + sources[tag].init(jconf, redWork.getReducer(), redWork.getVectorMode(), keyTableDesc, + valueTableDesc, reader, tag == bigTablePosition, (byte) tag, + redWork.getAllScratchColumnVectorTypeMaps()); + ois[tag] = sources[tag].getObjectInspector(); + } + @Override void run() throws Exception { - List shuffleInputs = getShuffleInputs(inputs); - if (shuffleInputs != null) { - l4j.info("Waiting for ShuffleInputs to become ready"); - processorContext.waitForAllInputsReady(new ArrayList(shuffleInputs)); - } for (Entry outputEntry : outputs.entrySet()) { l4j.info("Starting Output: " + outputEntry.getKey()); @@ -180,22 +245,26 @@ void run() throws Exception { } // run the operator pipeline - while (sources[position].pushRecord()) {} + while (sources[bigTablePosition].pushRecord()) { + } } /** * Get the inputs that should be streamed through reduce plan. + * * @param inputs * @return + * @throws Exception */ - private List getShuffleInputs(Map inputs) { - //the reduce plan inputs have tags, add all inputs that have tags - Map tagToinput = redWork.getTagToInput(); + private List getShuffleInputs(Map inputs) throws Exception { + // the reduce plan inputs have tags, add all inputs that have tags + Map tagToinput = reduceWork.getTagToInput(); ArrayList shuffleInputs = new ArrayList(); - for(String inpStr : tagToinput.values()){ + for (String inpStr : tagToinput.values()) { if (inputs.get(inpStr) == null) { throw new AssertionError("Cound not find input: " + inpStr); } + inputs.get(inpStr).start(); shuffleInputs.add(inputs.get(inpStr)); } return shuffleInputs; @@ -203,8 +272,10 @@ void run() throws Exception { @Override void close(){ - if (cache != null) { - cache.release(cacheKey); + if (cache != null && cacheKeys != null) { + for (String key : cacheKeys) { + cache.release(key); + } } try { @@ -213,13 +284,18 @@ void close(){ } reducer.close(abort); + if (mergeWorkList != null) { + for (BaseWork redWork : mergeWorkList) { + ((ReduceWork) redWork).getReducer().close(abort); + } + } // Need to close the dummyOps as well. The operator pipeline // is not considered "closed/done" unless all operators are // done. For broadcast joins that includes the dummy parents. - List dummyOps = redWork.getDummyOps(); + List dummyOps = reduceWork.getDummyOps(); if (dummyOps != null) { - for (Operator dummyOp : dummyOps){ + for (Operator dummyOp : dummyOps) { dummyOp.close(abort); } } @@ -230,8 +306,8 @@ void close(){ if (!abort) { // signal new failure to map-reduce l4j.error("Hit error while closing operators - failing tree"); - throw new RuntimeException("Hive Runtime Error while closing operators: " - + e.getMessage(), e); + throw new RuntimeException( + "Hive Runtime Error while closing operators: " + e.getMessage(), e); } } finally { Utilities.clearWorkMap(); @@ -239,4 +315,19 @@ void close(){ } } + private DummyStoreOperator getJoinParentOp(Operator mergeReduceOp) { + for (Operator childOp : mergeReduceOp.getChildOperators()) { + if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) { + if (childOp instanceof DummyStoreOperator) { + return (DummyStoreOperator) childOp; + } else { + throw new IllegalStateException("Was expecting dummy store operator but found: " + + childOp); + } + } else { + return getJoinParentOp(childOp); + } + } + throw new IllegalStateException("Expecting a DummyStoreOperator found op: " + mergeReduceOp); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index 1a43b72..fcb959c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -68,7 +68,7 @@ private boolean abort = false; - private static Deserializer inputKeyDeserializer; + private Deserializer inputKeyDeserializer; // Input value serde needs to be an array to support different SerDe // for different tags diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java index 62f1aa4..8fbfeea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java @@ -19,6 +19,7 @@ import java.util.Map; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.mapred.JobConf; import org.apache.tez.runtime.api.LogicalInput; @@ -39,6 +40,8 @@ private RecordSource[] sources; + private Map dummyOpsMap; + public TezContext(boolean isMap, JobConf jobConf) { super(isMap, jobConf); } @@ -80,4 +83,12 @@ public ProcessorContext getTezProcessorContext() { public void setRecordSources(RecordSource[] sources) { this.sources = sources; } + + public void setDummyOpsMap(Map dummyOpsMap) { + this.dummyOpsMap = dummyOpsMap; + } + + public Map getDummyOpsMap() { + return dummyOpsMap; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 95bf8c9..f073deb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -131,7 +131,7 @@ public void run(Map inputs, Map out if (isMap) { rproc = new MapRecordProcessor(jobConf); } else { - rproc = new ReduceRecordProcessor(); + rproc = new ReduceRecordProcessor(jobConf); } initializeAndRunProcessor(inputs, outputs); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index fb553f1..8423698 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -163,7 +163,7 @@ // map join operator by default has no bucket cols and num of reduce sinks // reduced by 1 mapJoinOp - .setOpTraits(new OpTraits(null, -1, null, joinOp.getOpTraits().getNumReduceSinks())); +.setOpTraits(new OpTraits(null, -1, null)); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator childOp : mapJoinOp.getChildOperators()) { @@ -178,8 +178,7 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { // we cannot convert to bucket map join, we cannot convert to // map join either based on the size. Check if we can convert to SMB join. - if ((context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) - || (joinOp.getOpTraits().getNumReduceSinks() >= 2)) { + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) { convertJoinSMBJoin(joinOp, context, 0, 0, false); return null; } @@ -254,9 +253,9 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont CommonMergeJoinOperator mergeJoinOp = (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets, mapJoinConversionPos, mapJoinDesc), joinOp.getSchema()); - int numReduceSinks = joinOp.getOpTraits().getNumReduceSinks(); - OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp - .getOpTraits().getSortCols(), numReduceSinks); + OpTraits opTraits = + new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits() + .getSortCols()); mergeJoinOp.setOpTraits(opTraits); mergeJoinOp.setStatistics(joinOp.getStatistics()); @@ -321,8 +320,7 @@ private void setAllChildrenTraitsToNull(Operator current if (currentOp instanceof ReduceSinkOperator) { return; } - currentOp.setOpTraits(new OpTraits(null, -1, null, - currentOp.getOpTraits().getNumReduceSinks())); + currentOp.setOpTraits(new OpTraits(null, -1, null)); for (Operator childOp : currentOp.getChildOperators()) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) { break; @@ -345,7 +343,7 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon // we can set the traits for this join operator OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), - tezBucketJoinProcCtx.getNumBuckets(), null, joinOp.getOpTraits().getNumReduceSinks()); + tezBucketJoinProcCtx.getNumBuckets(), null); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java index 5b16e5b..5b73866 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java @@ -70,12 +70,11 @@ context.opMergeJoinWorkMap.put(mergeJoinOp, mergeWork); } + mergeWork.addMergedWork(null, parentWork, context.leafOperatorToFollowingWork); mergeWork.setMergeJoinOperator(mergeJoinOp); - mergeWork.addMergedWork(null, parentWork); tezWork.setVertexType(mergeWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); for (BaseWork grandParentWork : tezWork.getParents(parentWork)) { - parentWork.setName(grandParentWork.getName()); TezEdgeProperty edgeProp = tezWork.getEdgeProperty(grandParentWork, parentWork); tezWork.disconnect(grandParentWork, parentWork); tezWork.connect(grandParentWork, mergeWork, edgeProp); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index 367217d..4b15698 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -179,13 +179,42 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, parentRS.getConf().setReducerTraits(EnumSet.of(FIXED)); numBuckets = (Integer) joinConf.getBigTableBucketNumMapping().values().toArray()[0]; - Operator rootOp = OperatorUtils.findSingleOperatorUpstream(mapJoinOp.getParentOperators() - .get(joinConf.getPosBigTable()), TableScanOperator.class); - - if (rootOp instanceof TableScanOperator) { // we will run in mapper - edgeType = EdgeType.CUSTOM_EDGE; - } else { // we will run in reducer - edgeType = EdgeType.CUSTOM_SIMPLE_EDGE; + /* + * Here, we can be in one of 4 states. + * + * 1. If map join work is null implies that we have not yet traversed the big table side. We + * just need to see if we can find a reduce sink operator in the big table side. This would + * imply a reduce side operation. + * + * 2. If we don't find a reducesink in 1 it has to be the case that it is a map side operation. + * + * 3 & 4. If we know of the map join work, depending on whether it is a mapwork or reducework + * we can determine the edge type. + */ + if (mapJoinWork == null) { + Operator rootOp = + OperatorUtils.findSingleOperatorUpstream( + mapJoinOp.getParentOperators().get(joinConf.getPosBigTable()), + ReduceSinkOperator.class); + if (rootOp == null) { + // likely we found a table scan operator + edgeType = EdgeType.CUSTOM_EDGE; + } else { + // we have found a reduce sink + edgeType = EdgeType.CUSTOM_SIMPLE_EDGE; + } + } else { + Operator rootOp = + OperatorUtils.findSingleOperatorUpstream( + mapJoinOp.getParentOperators().get(joinConf.getPosBigTable()), + TableScanOperator.class); + if (rootOp != null) { + // likely we found a table scan operator + edgeType = EdgeType.CUSTOM_EDGE; + } else { + // we have found a reduce sink + edgeType = EdgeType.CUSTOM_SIMPLE_EDGE; + } } } TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java index 86f360d..7149f5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java @@ -50,19 +50,19 @@ * 1. Bucketing columns. * 2. Table * 3. Pruned partitions - * + * * Bucketing columns refer to not to the bucketing columns from the table object but instead * to the dynamic 'bucketing' done by operators such as reduce sinks and group-bys. * All the operators have a translation from their input names to the output names corresponding * to the bucketing column. The colExprMap that is a part of every operator is used in this * transformation. - * + * * The table object is used for the base-case in map-reduce when deciding to perform a bucket * map join. This object is used in the BucketMapJoinProc to find if number of files for the * table correspond to the number of buckets specified in the meta data. - * + * * The pruned partition information has the same purpose as the table object at the moment. - * + * * The traits of sorted-ness etc. can be populated as well for future optimizations to make use of. */ @@ -106,13 +106,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, List> listBucketCols = new ArrayList>(); listBucketCols.add(bucketCols); int numBuckets = -1; - int numReduceSinks = 1; OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getTraits(); if (parentOpTraits != null) { numBuckets = parentOpTraits.getNumBuckets(); - numReduceSinks += parentOpTraits.getNumReduceSinks(); } - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols); rs.setOpTraits(opTraits); return null; } @@ -132,8 +130,8 @@ public boolean checkBucketedTable(Table tbl, ParseContext pGraphContext, // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number) if (!partitions.isEmpty()) { for (Partition p : partitions) { - List fileNames = - AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(), + List fileNames = + AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(), pGraphContext); // The number of files for the table should be same as number of // buckets. @@ -146,8 +144,8 @@ public boolean checkBucketedTable(Table tbl, ParseContext pGraphContext, } } else { - List fileNames = - AbstractBucketJoinProc.getBucketFilePathsOfPartition(tbl.getDataLocation(), + List fileNames = + AbstractBucketJoinProc.getBucketFilePathsOfPartition(tbl.getDataLocation(), pGraphContext); Integer num = new Integer(tbl.getNumBuckets()); @@ -188,7 +186,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, sortedColsList.add(sortCols); } // num reduce sinks hardcoded to 0 because TS has no parents - OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList, 0); + OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList); ts.setOpTraits(opTraits); return null; } @@ -213,13 +211,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } List> listBucketCols = new ArrayList>(); - int numReduceSinks = 0; - OpTraits parentOpTraits = gbyOp.getParentOperators().get(0).getOpTraits(); - if (parentOpTraits != null) { - numReduceSinks = parentOpTraits.getNumReduceSinks(); - } listBucketCols.add(gbyKeys); - OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols); gbyOp.setOpTraits(opTraits); return null; } @@ -255,7 +248,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { SelectOperator selOp = (SelectOperator) nd; - List> parentBucketColNames = + List> parentBucketColNames = selOp.getParentOperators().get(0).getOpTraits().getBucketColNames(); List> listBucketCols = null; @@ -264,7 +257,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (parentBucketColNames != null) { listBucketCols = getConvertedColNames(parentBucketColNames, selOp); } - List> parentSortColNames = + List> parentSortColNames = selOp.getParentOperators().get(0).getOpTraits().getSortCols(); if (parentSortColNames != null) { listSortCols = getConvertedColNames(parentSortColNames, selOp); @@ -272,13 +265,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } int numBuckets = -1; - int numReduceSinks = 0; OpTraits parentOpTraits = selOp.getParentOperators().get(0).getOpTraits(); if (parentOpTraits != null) { numBuckets = parentOpTraits.getNumBuckets(); - numReduceSinks = parentOpTraits.getNumReduceSinks(); } - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols); selOp.setOpTraits(opTraits); return null; } @@ -307,13 +298,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, OpTraits parentOpTraits = rsOp.getOpTraits(); bucketColsList.add(getOutputColNames(joinOp, parentOpTraits.getBucketColNames(), pos)); sortColsList.add(getOutputColNames(joinOp, parentOpTraits.getSortCols(), pos)); - if (parentOpTraits.getNumReduceSinks() > numReduceSinks) { - numReduceSinks = parentOpTraits.getNumReduceSinks(); - } pos++; } - joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, numReduceSinks)); + joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList)); return null; } @@ -366,17 +354,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @SuppressWarnings("unchecked") Operator operator = (Operator) nd; - - int numReduceSinks = 0; - for (Operator parentOp : operator.getParentOperators()) { - if (parentOp.getOpTraits() == null) { - continue; - } - if (parentOp.getOpTraits().getNumReduceSinks() > numReduceSinks) { - numReduceSinks = parentOp.getOpTraits().getNumReduceSinks(); - } - } - OpTraits opTraits = new OpTraits(null, -1, null, numReduceSinks); + OpTraits opTraits = new OpTraits(null, -1, null); operator.setOpTraits(opTraits); return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index a455175..39d1f18 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -103,7 +103,7 @@ } // we can set the traits for this join operator - OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null, joinOp.getOpTraits().getNumReduceSinks()); + OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 8a50249..4dcdf91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -105,7 +105,7 @@ public ReduceWork createReduceWork(GenTezProcContext context, Operator root, float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR); long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); - ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); + ReduceWork reduceWork = new ReduceWork(Utilities.REDUCENAME + (++sequenceNumber)); LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root); reduceWork.setReducer(root); reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork)); @@ -180,7 +180,7 @@ protected void setupReduceSink(GenTezProcContext context, ReduceWork reduceWork, public MapWork createMapWork(GenTezProcContext context, Operator root, TezWork tezWork, PrunedPartitionList partitions) throws SemanticException { assert root.getParentOperators().isEmpty(); - MapWork mapWork = new MapWork("Map "+ (++sequenceNumber)); + MapWork mapWork = new MapWork(Utilities.MAPNAME + (++sequenceNumber)); LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root); // map work starts with table scan operators diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index e67d98b..0990894 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -166,7 +166,7 @@ public Object process(Node nd, Stack stack, } // connect the work correctly. work.addSortCols(root.getOpTraits().getSortCols().get(0)); - mergeJoinWork.addMergedWork(work, null); + mergeJoinWork.addMergedWork(work, null, context.leafOperatorToFollowingWork); Operator parentOp = getParentFromStack(context.currentMergeJoinOperator, stack); int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp); @@ -268,6 +268,7 @@ public Object process(Node nd, Stack stack, if (LOG.isDebugEnabled()) { LOG.debug("Removing " + parent + " as parent from " + root); } + context.leafOperatorToFollowingWork.remove(parent); context.leafOperatorToFollowingWork.put(parent, work); root.removeParent(parent); } @@ -326,7 +327,7 @@ public Object process(Node nd, Stack stack, MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork; CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator(); work.setTag(mergeJoinOp.getTagForOperator(operator)); - mergeJoinWork.addMergedWork(null, work); + mergeJoinWork.addMergedWork(null, work, context.leafOperatorToFollowingWork); tezWork.setVertexType(mergeJoinWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); for (BaseWork parentWork : tezWork.getParents(work)) { TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work); @@ -399,7 +400,7 @@ public Object process(Node nd, Stack stack, return null; } - private int getFollowingWorkIndex(TezWork tezWork, UnionWork unionWork, ReduceSinkOperator rs) + private int getFollowingWorkIndex(TezWork tezWork, UnionWork unionWork, ReduceSinkOperator rs) throws SemanticException { int index = 0; for (BaseWork baseWork : tezWork.getChildren(unionWork)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java index b2369fa..457e59e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java @@ -21,11 +21,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.mapred.JobConf; public class MergeJoinWork extends BaseWork { @@ -65,7 +67,8 @@ public void setMergeJoinOperator(CommonMergeJoinOperator mergeJoinOp) { this.mergeJoinOp = mergeJoinOp; } - public void addMergedWork(BaseWork work, BaseWork connectWork) { + public void addMergedWork(BaseWork work, BaseWork connectWork, + Map, BaseWork> leafOperatorToFollowingWork) { if (work != null) { if ((bigTableWork != null) && (bigTableWork != work)) { assert false; @@ -76,6 +79,39 @@ public void addMergedWork(BaseWork work, BaseWork connectWork) { if (connectWork != null) { this.mergeWorkList.add(connectWork); + if ((connectWork instanceof ReduceWork) && (bigTableWork != null)) { + /* + * For tez to route data from an up-stream vertex correctly to the following vertex, the + * output name in the reduce sink needs to be setup appropriately. In the case of reduce + * side merge work, we need to ensure that the parent work that provides data to this merge + * work is setup to point to the right vertex name - the main work name. + * + * In this case, if the big table work has already been created, we can hook up the merge + * work items for the small table correctly. + */ + setReduceSinkOutputName(connectWork, leafOperatorToFollowingWork, bigTableWork.getName()); + } + } + + if (work != null) { + /* + * Same reason as above. This is the case when we have the main work item after the merge work + * has been created for the small table side. + */ + for (BaseWork mergeWork : mergeWorkList) { + if (mergeWork instanceof ReduceWork) { + setReduceSinkOutputName(mergeWork, leafOperatorToFollowingWork, work.getName()); + } + } + } + } + + private void setReduceSinkOutputName(BaseWork mergeWork, + Map, BaseWork> leafOperatorToFollowingWork, String name) { + for (Entry, BaseWork> entry : leafOperatorToFollowingWork.entrySet()) { + if (entry.getValue() == mergeWork) { + ((ReduceSinkOperator) entry.getKey()).getConf().setOutputName(name); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java index a687a3d..c2b3664 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java @@ -25,14 +25,11 @@ List> bucketColNames; List> sortColNames; int numBuckets; - int numReduceSinks; - public OpTraits(List> bucketColNames, int numBuckets, - List> sortColNames, int numReduceSinks) { + public OpTraits(List> bucketColNames, int numBuckets, List> sortColNames) { this.bucketColNames = bucketColNames; this.numBuckets = numBuckets; this.sortColNames = sortColNames; - this.numReduceSinks = numReduceSinks; } public List> getBucketColNames() { @@ -58,12 +55,4 @@ public void setSortColNames(List> sortColNames) { public List> getSortCols() { return sortColNames; } - - public void setNumReduceSinks(int numReduceSinks) { - this.numReduceSinks = numReduceSinks; - } - - public int getNumReduceSinks() { - return this.numReduceSinks; - } } diff --git a/ql/src/test/queries/clientpositive/tez_join.q b/ql/src/test/queries/clientpositive/tez_join.q index d35ec83..0ec1881 100644 --- a/ql/src/test/queries/clientpositive/tez_join.q +++ b/ql/src/test/queries/clientpositive/tez_join.q @@ -24,20 +24,3 @@ join (select rt2.id from (select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 where vt1.id=vt2.id; - -explain -select vt1.id from -(select rt1.id from -(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 -join -(select rt2.id from -(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 -where vt1.id=vt2.id; - -select vt1.id from -(select rt1.id from -(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 -join -(select rt2.id from -(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 -where vt1.id=vt2.id; diff --git a/ql/src/test/queries/clientpositive/tez_smb_1.q b/ql/src/test/queries/clientpositive/tez_smb_1.q index 763f7c3..266a81b 100644 --- a/ql/src/test/queries/clientpositive/tez_smb_1.q +++ b/ql/src/test/queries/clientpositive/tez_smb_1.q @@ -33,3 +33,21 @@ set hive.auto.convert.join.noconditionaltask.size=500; explain select count(*) from tab s1 join tab s3 on s1.key=s3.key; +set hive.auto.convert.join=false; + +explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id; + +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id; diff --git a/ql/src/test/queries/clientpositive/tez_smb_main.q b/ql/src/test/queries/clientpositive/tez_smb_main.q index 4f178f7..404690f 100644 --- a/ql/src/test/queries/clientpositive/tez_smb_main.q +++ b/ql/src/test/queries/clientpositive/tez_smb_main.q @@ -82,3 +82,20 @@ UNION ALL select s2.key as key, s2.value as value from tab s2 ) a join tab_part b on (a.key = b.key); +explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id; + +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id; + diff --git a/ql/src/test/results/clientpositive/tez/tez_join.q.out b/ql/src/test/results/clientpositive/tez/tez_join.q.out index a051dc7..b87b71c 100644 --- a/ql/src/test/results/clientpositive/tez/tez_join.q.out +++ b/ql/src/test/results/clientpositive/tez/tez_join.q.out @@ -48,9 +48,7 @@ STAGE PLANS: Stage: Stage-1 Tez Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE) - Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) - Reducer 5 <- Map 4 (SIMPLE_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -69,7 +67,7 @@ STAGE PLANS: key expressions: _col0 (type: string), _col1 (type: string) sort order: ++ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Map 4 + Map 3 Map Operator Tree: TableScan alias: t2 @@ -91,204 +89,33 @@ STAGE PLANS: expressions: KEY.reducesinkkey0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string) - sort order: + - Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reducer 3 - Reduce Operator Tree: - Merge Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col0 (type: string) - 1 _col0 (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Filter Operator - predicate: (_col0 = _col1) (type: boolean) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Reducer 5 Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string) - sort order: + - Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - ListSink - -PREHOOK: query: select vt1.id from -(select rt1.id from -(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 -join -(select rt2.id from -(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 -where vt1.id=vt2.id -PREHOOK: type: QUERY -PREHOOK: Input: default@t1 -PREHOOK: Input: default@t2 -#### A masked pattern was here #### -POSTHOOK: query: select vt1.id from -(select rt1.id from -(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 -join -(select rt2.id from -(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 -where vt1.id=vt2.id -POSTHOOK: type: QUERY -POSTHOOK: Input: default@t1 -POSTHOOK: Input: default@t2 -#### A masked pattern was here #### -PREHOOK: query: explain -select vt1.id from -(select rt1.id from -(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 -join -(select rt2.id from -(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 -where vt1.id=vt2.id -PREHOOK: type: QUERY -POSTHOOK: query: explain -select vt1.id from -(select rt1.id from -(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 -join -(select rt2.id from -(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 -where vt1.id=vt2.id -POSTHOOK: type: QUERY -STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 - -STAGE PLANS: - Stage: Stage-1 - Tez - Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE) - Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) - Reducer 5 <- Map 4 (SIMPLE_EDGE) -#### A masked pattern was here #### - Vertices: - Map 1 - Map Operator Tree: - TableScan - alias: t1 + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE Filter Operator - predicate: id is not null (type: boolean) + predicate: (_col0 = _col1) (type: boolean) Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Group By Operator - aggregations: count() - keys: id (type: string), od (type: string) - mode: hash - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) - sort order: ++ - Map-reduce partition columns: _col0 (type: string), _col1 (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - value expressions: _col2 (type: bigint) - Map 4 - Map Operator Tree: - TableScan - alias: t2 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Filter Operator - predicate: id is not null (type: boolean) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Group By Operator - aggregations: count() - keys: id (type: string), od (type: string) - mode: hash - outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) - sort order: ++ - Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + File Output Operator + compressed: false Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - value expressions: _col2 (type: bigint) - Reducer 2 - Reduce Operator Tree: - Group By Operator - aggregations: count(VALUE._col0) - keys: KEY._col0 (type: string), KEY._col1 (type: string) - mode: mergepartial - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string) - sort order: + - Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reducer 3 - Reduce Operator Tree: - Merge Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col0 (type: string) - 1 _col0 (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Filter Operator - predicate: (_col0 = _col1) (type: boolean) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Reducer 5 - Reduce Operator Tree: - Group By Operator - aggregations: count(VALUE._col0) - keys: KEY._col0 (type: string), KEY._col1 (type: string) - mode: mergepartial - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string) - sort order: + - Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator @@ -298,10 +125,10 @@ STAGE PLANS: PREHOOK: query: select vt1.id from (select rt1.id from -(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 +(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 join (select rt2.id from -(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 +(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 where vt1.id=vt2.id PREHOOK: type: QUERY PREHOOK: Input: default@t1 @@ -309,10 +136,10 @@ PREHOOK: Input: default@t2 #### A masked pattern was here #### POSTHOOK: query: select vt1.id from (select rt1.id from -(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 +(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 join (select rt2.id from -(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 +(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 where vt1.id=vt2.id POSTHOOK: type: QUERY POSTHOOK: Input: default@t1 diff --git a/ql/src/test/results/clientpositive/tez/tez_smb_1.q.out b/ql/src/test/results/clientpositive/tez/tez_smb_1.q.out index 09ff9bb..d970bd9 100644 --- a/ql/src/test/results/clientpositive/tez/tez_smb_1.q.out +++ b/ql/src/test/results/clientpositive/tez/tez_smb_1.q.out @@ -177,3 +177,146 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 4 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) + Reducer 5 <- Reducer 4 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Map 3 + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reducer 4 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = _col1) (type: boolean) + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 5 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +480 diff --git a/ql/src/test/results/clientpositive/tez/tez_smb_main.q.out b/ql/src/test/results/clientpositive/tez/tez_smb_main.q.out index f0e4b34..f0bcea6 100644 --- a/ql/src/test/results/clientpositive/tez/tez_smb_main.q.out +++ b/ql/src/test/results/clientpositive/tez/tez_smb_main.q.out @@ -1194,3 +1194,155 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Reducer 4 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Map 3 + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reducer 4 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col0, _col1 + input vertices: + 0 Reducer 2 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = _col1) (type: boolean) + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 5 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +480