diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index 9caf79e..970b9c3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.NodeUtils.Function; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.mapred.OutputCollector; @@ -68,8 +69,8 @@ public static void setChildrenCollector(List> c return; } for (Operator op : childOperators) { - if(op.getName().equals(ReduceSinkOperator.getOperatorName())) { - ((ReduceSinkOperator)op).setOutputCollector(out); + if (op.getName().equals(ReduceSinkOperator.getOperatorName())) { + op.setOutputCollector(out); } else { setChildrenCollector(op.getChildOperators(), out); } @@ -93,4 +94,20 @@ public static void setChildrenCollector(List> c } } } + + public static void iterateParents(Operator operator, Function> function) { + iterateParents(operator, function, new HashSet>()); + } + + private static void iterateParents(Operator operator, Function> function, Set> visited) { + if (!visited.add(operator)) { + return; + } + function.apply(operator); + if (operator.getNumParent() > 0) { + for (Operator parent : operator.getParentOperators()) { + iterateParents(parent, function, visited); + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index c5bbe68..0cbfdc7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -518,4 +518,12 @@ public void setConsole(LogHelper console) { public String toString() { return getId() + ":" + getType(); } + + public int hashCode() { + return toString().hashCode(); + } + + public boolean equals(Object obj) { + return toString().equals(String.valueOf(obj)); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index cc840be..810053c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -72,7 +72,6 @@ import java.util.UUID; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; -import java.util.zip.Inflater; import java.util.zip.InflaterInputStream; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -2272,12 +2271,16 @@ public void run() { } } - // return sum of lengths except one alias. returns -1 if any of other alias is unknown + public static long sumOf(Map aliasToSize, Set aliases) { + return sumOfExcept(aliasToSize, aliases, null); + } + + // return sum of lengths except some aliases. returns -1 if any of other alias is unknown public static long sumOfExcept(Map aliasToSize, - Set aliases, String except) { + Set aliases, Set excepts) { long total = 0; for (String alias : aliases) { - if (alias.equals(except)) { + if (excepts != null && excepts.contains(alias)) { continue; } Long size = aliasToSize.get(alias); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index d2aa220..0f55fb3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -46,8 +47,10 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MoveTask; +import org.apache.hadoop.hive.ql.exec.NodeUtils; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; @@ -1756,6 +1759,38 @@ public static Path createMoveTask(Task currTask, boolean return inputPaths; } + public static Set findAliases(final MapWork work, Operator startOp) { + Set aliases = new LinkedHashSet(); + for (Operator topOp : findTopOps(startOp, null)) { + String alias = findAlias(work, topOp); + if (alias != null) { + aliases.add(alias); + } + } + return aliases; + } + + public static Set> findTopOps(Operator startOp, final Class clazz) { + final Set> operators = new LinkedHashSet>(); + OperatorUtils.iterateParents(startOp, new NodeUtils.Function>() { + public void apply(Operator argument) { + if (argument.getNumParent() == 0 && (clazz == null || clazz.isInstance(argument))) { + operators.add(argument); + } + } + }); + return operators; + } + + public static String findAlias(MapWork work, Operator operator) { + for (Entry> entry : work.getAliasToWork().entrySet()) { + if (entry.getValue() == operator) { + return entry.getKey(); + } + } + return null; + } + private GenMapRedUtils() { // prevent instantiation } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index b4aeb14..e3e0acc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -120,14 +120,12 @@ public MapJoinProcessor() { * @param mapJoinOp * map-join operator for which local work needs to be generated. * @param bigTablePos - * @return * @throws SemanticException */ - private static String genMapJoinLocalWork(MapredWork newWork, MapJoinOperator mapJoinOp, + private static void genMapJoinLocalWork(MapredWork newWork, MapJoinOperator mapJoinOp, int bigTablePos) throws SemanticException { // keep the small table alias to avoid concurrent modification exception ArrayList smallTableAliasList = new ArrayList(); - String bigTableAlias = null; // create a new MapredLocalWork MapredLocalWork newLocalWork = new MapredLocalWork( @@ -155,7 +153,6 @@ private static String genMapJoinLocalWork(MapredWork newWork, MapJoinOperator ma // skip the big table pos int i = childOp.getParentOperators().indexOf(parentOp); if (i == bigTablePos) { - bigTableAlias = alias; continue; } // set alias to work and put into smallTableAliasList @@ -219,11 +216,6 @@ private static String genMapJoinLocalWork(MapredWork newWork, MapJoinOperator ma newWork.getMapWork().setMapLocalWork(newLocalWork); // remove reducer newWork.setReduceWork(null); - // return the big table alias - if (bigTableAlias == null) { - throw new SemanticException("Big Table Alias is null"); - } - return bigTableAlias; } /** @@ -233,10 +225,9 @@ private static String genMapJoinLocalWork(MapredWork newWork, MapJoinOperator ma * @param op * The join operator that needs to be converted to map-join * @param bigTablePos - * @return the alias to the big table * @throws SemanticException */ - public static String genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork, + public static void genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork, JoinOperator op, int mapJoinPos) throws SemanticException { LinkedHashMap, OpParseContext> opParseCtxMap = @@ -245,22 +236,19 @@ public static String genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork, // generate the map join operator; already checked the map join MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(conf, opParseCtxMap, op, newJoinTree, mapJoinPos, true, false); - return genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos); + genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos); } - public static String genLocalWorkForMapJoin(MapredWork newWork, MapJoinOperator newMapJoinOp, + public static void genLocalWorkForMapJoin(MapredWork newWork, MapJoinOperator newMapJoinOp, int mapJoinPos) throws SemanticException { try { - // generate the local work and return the big table alias - String bigTableAlias = MapJoinProcessor - .genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos); + // generate the local work for the big table alias + MapJoinProcessor.genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos); // clean up the mapred work newWork.getMapWork().setOpParseCtxMap(null); newWork.getMapWork().setJoinTree(null); - return bigTableAlias; - } catch (Exception e) { e.printStackTrace(); throw new SemanticException("Failed to generate new mapJoin operator " + @@ -380,11 +368,9 @@ public static MapJoinOperator convertMapJoin(HiveConf conf, // remove old parents for (pos = 0; pos < newParentOps.size(); pos++) { - newParentOps.get(pos).removeChild(oldReduceSinkParentOps.get(pos)); - newParentOps.get(pos).getChildOperators().add(mapJoinOp); + newParentOps.get(pos).replaceChild(oldReduceSinkParentOps.get(pos), mapJoinOp); } - mapJoinOp.getParentOperators().removeAll(oldReduceSinkParentOps); mapJoinOp.setParentOperators(newParentOps); @@ -835,6 +821,7 @@ private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticExc * @param pactx * current parse context */ + @Override public ParseContext transform(ParseContext pactx) throws SemanticException { pGraphContext = pactx; List listMapJoinOps = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java index 3595640..86e4834 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -112,49 +113,53 @@ private void findPossibleAutoConvertedJoinOperators() throws SemanticException { // Get total size and individual alias's size long aliasTotalKnownInputSize = 0; Map aliasToSize = new HashMap(); - Map posToAlias = new HashMap(); - for (Operator op: joinOp.getParentOperators()) { - TableScanOperator tsop = CorrelationUtilities.findTableScanOperator(op); - if (tsop == null) { + Map> posToAliases = new HashMap>(); + for (int pos = 0; pos < joinOp.getNumParent(); pos++) { + Operator op = joinOp.getParentOperators().get(pos); + Set topOps = CorrelationUtilities.findTableScanOperators(op); + if (topOps.isEmpty()) { isAbleToGuess = false; break; } - Table table = pCtx.getTopToTable().get(tsop); - String alias = tsop.getConf().getAlias(); - posToAlias.put(joinOp.getParentOperators().indexOf(op), alias); - if (table == null) { - // table should not be null. - throw new SemanticException("The table of " + - tsop.getName() + " " + tsop.getIdentifier() + - " is null, which is not expected."); - } - - Path p = table.getPath(); - FileSystem fs = null; - ContentSummary resultCs = null; - try { - fs = table.getPath().getFileSystem(pCtx.getConf()); - resultCs = fs.getContentSummary(p); - } catch (IOException e) { - LOG.warn("Encounter a error while querying content summary of table " + - table.getCompleteName() + " from FileSystem. " + - "Cannot guess if CommonJoinOperator will optimize " + - joinOp.getName() + " " + joinOp.getIdentifier()); - } - if (resultCs == null) { - isAbleToGuess = false; - break; - } + Set aliases = new LinkedHashSet(); + for (TableScanOperator tsop : topOps) { + Table table = pCtx.getTopToTable().get(tsop); + if (table == null) { + // table should not be null. + throw new SemanticException("The table of " + + tsop.getName() + " " + tsop.getIdentifier() + + " is null, which is not expected."); + } + String alias = tsop.getConf().getAlias(); + aliases.add(alias); + + Path p = table.getPath(); + ContentSummary resultCs = null; + try { + FileSystem fs = table.getPath().getFileSystem(pCtx.getConf()); + resultCs = fs.getContentSummary(p); + } catch (IOException e) { + LOG.warn("Encounter a error while querying content summary of table " + + table.getCompleteName() + " from FileSystem. " + + "Cannot guess if CommonJoinOperator will optimize " + + joinOp.getName() + " " + joinOp.getIdentifier()); + } + if (resultCs == null) { + isAbleToGuess = false; + break; + } - long size = resultCs.getLength(); - aliasTotalKnownInputSize += size; - Long es = aliasToSize.get(alias); - if(es == null) { - es = new Long(0); + long size = resultCs.getLength(); + aliasTotalKnownInputSize += size; + Long es = aliasToSize.get(alias); + if(es == null) { + es = new Long(0); + } + es += size; + aliasToSize.put(alias, es); } - es += size; - aliasToSize.put(alias, es); + posToAliases.put(pos, aliases); } if (!isAbleToGuess) { @@ -172,7 +177,6 @@ private void findPossibleAutoConvertedJoinOperators() throws SemanticException { continue; } - String bigTableAlias = null; long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(pCtx.getConf(), HiveConf.ConfVars.HIVESMALLTABLESFILESIZE); for (int i = 0; i < numAliases; i++) { @@ -180,8 +184,9 @@ private void findPossibleAutoConvertedJoinOperators() throws SemanticException { if (!bigTableCandidates.contains(i)) { continue; } - bigTableAlias = posToAlias.get(i); - if (!CommonJoinTaskDispatcher.cannotConvert(bigTableAlias, aliasToSize, + Set aliases = posToAliases.get(i); + long aliasKnownSize = Utilities.sumOf(aliasToSize, aliases); + if (!CommonJoinTaskDispatcher.cannotConvert(aliasKnownSize, aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) { mayConvert = true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java index 98fcff5..94224b3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java @@ -23,9 +23,11 @@ import java.lang.reflect.Array; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ExtractOperator; @@ -276,25 +278,30 @@ protected static int indexOf(ExprNodeDesc cexpr, ExprNodeDesc[] pexprs, Operator * @return the TableScanOperator traced from startPoint. Null, if the search encounters any * ReduceSinkOperator. */ - protected static TableScanOperator findTableScanOperator( - Operator startPoint) { - Operator thisOp = startPoint.getParentOperators().get(0); - while (true) { - if (thisOp.getName().equals(ReduceSinkOperator.getOperatorName())) { - return null; - } else if (thisOp.getName().equals(TableScanOperator.getOperatorName())) { - return (TableScanOperator) thisOp; - } else { - if (thisOp.getParentOperators() != null) { - thisOp = thisOp.getParentOperators().get(0); - } else { - break; - } - } + protected static Set findTableScanOperators(Operator startPoint) { + if (startPoint instanceof ReduceSinkOperator) { + assert startPoint.getNumParent() == 1; // for now + startPoint = startPoint.getParentOperators().get(0); } - return null; + return findTableScanOperators(startPoint, new LinkedHashSet()); } + private static Set findTableScanOperators(Operator current, + Set found) { + if (current instanceof TableScanOperator) { + found.add((TableScanOperator) current); + return found; + } + if (current instanceof ReduceSinkOperator || current.getNumParent() == 0) { + return found; + } + for (Operator parent : current.getParentOperators()) { + findTableScanOperators(parent, found); + } + return found; + } + + /** * Find all sibling ReduceSinkOperators (which have the same child operator of op) of op (op * included). diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 74ca355..5ac6338 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -20,19 +20,22 @@ import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -166,32 +169,18 @@ private boolean isLocalTableTotalSizeUnderLimitAfterMerge( return true; } - // Get the position of the big table for this join operator and the given alias - private int getPosition(MapWork work, Operator joinOp, - String alias) { - Operator parentOp = work.getAliasToWork().get(alias); - - // reduceSinkOperator's child is null, but joinOperator's parents is reduceSink - while ((parentOp.getChildOperators() != null) && - (!parentOp.getChildOperators().isEmpty())) { - parentOp = parentOp.getChildOperators().get(0); - } - return joinOp.getParentOperators().indexOf(parentOp); - } - // create map join task and set big table as bigTablePosition - private ObjectPair convertTaskToMapJoinTask(MapredWork newWork, - int bigTablePosition) throws UnsupportedEncodingException, SemanticException { + private MapRedTask convertTaskToMapJoinTask(MapredWork newWork, int bigTablePosition) + throws UnsupportedEncodingException, SemanticException { // create a mapred task for this work MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext .getParseContext().getConf()); JoinOperator newJoinOp = getJoinOp(newTask); // optimize this newWork given the big table position - String bigTableAlias = - MapJoinProcessor.genMapJoinOpAndLocalWork(physicalContext.getParseContext().getConf(), - newWork, newJoinOp, bigTablePosition); - return new ObjectPair(newTask, bigTableAlias); + MapJoinProcessor.genMapJoinOpAndLocalWork(physicalContext.getParseContext().getConf(), + newWork, newJoinOp, bigTablePosition); + return newTask; } /* @@ -374,20 +363,13 @@ private void mergeMapJoinTaskIntoItsChildMapRedTask(MapRedTask mapJoinTask, Conf } } - public static boolean cannotConvert(String bigTableAlias, - Map aliasToSize, long aliasTotalKnownInputSize, - long ThresholdOfSmallTblSizeSum) { - boolean ret = false; - Long aliasKnownSize = aliasToSize.get(bigTableAlias); - if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) { - long smallTblTotalKnownSize = aliasTotalKnownInputSize - - aliasKnownSize.longValue(); - if (smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) { - //this table is not good to be a big table. - ret = true; - } + public static boolean cannotConvert(long aliasKnownSize, + long aliasTotalKnownInputSize, long ThresholdOfSmallTblSizeSum) { + if (aliasKnownSize > 0 && + aliasTotalKnownInputSize - aliasKnownSize > ThresholdOfSmallTblSizeSum) { + return true; } - return ret; + return false; } @Override @@ -408,9 +390,9 @@ public static boolean cannotConvert(String bigTableAlias, List listWorks = new ArrayList(); List> listTasks = new ArrayList>(); - // create alias to task mapping and alias to input file mapping for resolver - HashMap> aliasToTask = - new HashMap>(); + // create task to aliases mapping and alias to input file mapping for resolver + HashMap, Set> taskToAliases = + new HashMap, Set>(); HashMap> pathToAliases = currWork.getPathToAliases(); Map> aliasToWork = currWork.getAliasToWork(); @@ -420,8 +402,6 @@ public static boolean cannotConvert(String bigTableAlias, // start to generate multiple map join tasks JoinDesc joinDesc = joinOp.getConf(); - Byte[] order = joinDesc.getTagOrder(); - int numAliases = order.length; if (aliasToSize == null) { aliasToSize = new HashMap(); @@ -439,6 +419,10 @@ public static boolean cannotConvert(String bigTableAlias, return null; } + // if any of bigTableCandidates is from multi-sourced, bigTableCandidates should + // only contain multi-sourced because multi-sourced cannot be hashed or direct readable + bigTableCandidates = multiInsertBigTableCheck(joinOp, bigTableCandidates); + Configuration conf = context.getConf(); // If sizes of at least n-1 tables in a n-way join is known, and their sum is smaller than @@ -453,20 +437,18 @@ public static boolean cannotConvert(String bigTableAlias, Long bigTableSize = null; Set aliases = aliasToWork.keySet(); - for (String alias : aliases) { - int tablePosition = getPosition(currWork, joinOp, alias); - if (!bigTableCandidates.contains(tablePosition)) { - continue; - } - long sumOfOthers = Utilities.sumOfExcept(aliasToSize, aliases, alias); + for (int tablePosition : bigTableCandidates) { + Operator parent = joinOp.getParentOperators().get(tablePosition); + Set participants = GenMapRedUtils.findAliases(currWork, parent); + long sumOfOthers = Utilities.sumOfExcept(aliasToSize, aliases, participants); if (sumOfOthers < 0 || sumOfOthers > mapJoinSize) { continue; // some small alias is not known or too big } if (bigTableSize == null && bigTablePosition >= 0 && tablePosition < bigTablePosition) { continue; // prefer right most alias } - Long aliasSize = aliasToSize.get(alias); - if (bigTableSize == null || (aliasSize != null && aliasSize > bigTableSize)) { + long aliasSize = Utilities.sumOf(aliasToSize, participants); + if (bigTableSize == null || bigTableSize < 0 || (aliasSize >= 0 && aliasSize >= bigTableSize)) { bigTablePosition = tablePosition; bigTableSize = aliasSize; } @@ -478,7 +460,7 @@ public static boolean cannotConvert(String bigTableAlias, if (bigTablePosition >= 0) { // create map join task and set big table as bigTablePosition - MapRedTask newTask = convertTaskToMapJoinTask(currTask.getWork(), bigTablePosition).getFirst(); + MapRedTask newTask = convertTaskToMapJoinTask(currTask.getWork(), bigTablePosition); newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP); replaceTask(currTask, newTask, physicalContext); @@ -494,9 +476,9 @@ public static boolean cannotConvert(String bigTableAlias, long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVESMALLTABLESFILESIZE); - for (int i = 0; i < numAliases; i++) { + for (int pos = 0; pos < joinOp.getNumParent(); pos++) { // this table cannot be big table - if (!bigTableCandidates.contains(i)) { + if (!bigTableCandidates.contains(pos)) { continue; } // deep copy a new mapred work from xml @@ -504,12 +486,14 @@ public static boolean cannotConvert(String bigTableAlias, MapredWork newWork = Utilities.clonePlan(currTask.getWork()); // create map join task and set big table as i - ObjectPair newTaskAlias = convertTaskToMapJoinTask(newWork, i); - MapRedTask newTask = newTaskAlias.getFirst(); - String bigTableAlias = newTaskAlias.getSecond(); + MapRedTask newTask = convertTaskToMapJoinTask(newWork, pos); + + MapWork mapWork = newTask.getWork().getMapWork(); + Operator startOp = joinOp.getParentOperators().get(pos); + Set aliases = GenMapRedUtils.findAliases(mapWork, startOp); - if (cannotConvert(bigTableAlias, aliasToSize, - aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) { + long aliasKnownSize = Utilities.sumOf(aliasToSize, aliases); + if (cannotConvert(aliasKnownSize, aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) { continue; } @@ -522,8 +506,8 @@ public static boolean cannotConvert(String bigTableAlias, newTask.setBackupTask(currTask); newTask.setBackupChildrenTasks(currTask.getChildTasks()); - // put the mapping alias to task - aliasToTask.put(bigTableAlias, newTask); + // put the mapping task to aliases + taskToAliases.put(newTask, aliases); } } catch (Exception e) { e.printStackTrace(); @@ -547,7 +531,7 @@ public static boolean cannotConvert(String bigTableAlias, ConditionalResolverCommonJoinCtx resolverCtx = new ConditionalResolverCommonJoinCtx(); resolverCtx.setPathToAliases(pathToAliases); resolverCtx.setAliasToKnownSize(aliasToSize); - resolverCtx.setAliasToTask(aliasToTask); + resolverCtx.setTaskToAliases(taskToAliases); resolverCtx.setCommonJoinTask(currTask); resolverCtx.setLocalTmpDir(context.getLocalScratchDir(false)); resolverCtx.setHdfsTmpDir(context.getMRScratchDir()); @@ -600,4 +584,42 @@ private JoinOperator getJoinOp(MapRedTask task) throws SemanticException { return null; } } + + + /** + * In the case of a multi-insert statement the Source Operator will have multiple children. + * For e.g. + * from src b + * INSERT OVERWRITE TABLE src_4 + * select * + * where b.key in + * (select a.key from src a where b.value = a.value and a.key > '9') + * INSERT OVERWRITE TABLE src_5 + * select * + * where b.key not in + * ( select key from src s1 where s1.key > '2') + * + * The TableScan on 'src'(for alias b) will have 2 children one for each destination. + * + * In such cases only the Source side of the Join is the candidate Big Table. + * The reason being, it cannot be replaced by a HashTable as its rows must flow into the other children + * of the TableScan Operator. + */ + private Set multiInsertBigTableCheck(JoinOperator joinOp, Set bigTableCandidates) { + int multiChildrenSource = -1; + for (int tablePosition : bigTableCandidates.toArray(new Integer[0])) { + Operator parent = joinOp.getParentOperators().get(tablePosition); + for (; parent != null; + parent = parent.getNumParent() > 0 ? parent.getParentOperators().get(0) : null) { + if (parent.getNumChild() > 1 && !(parent instanceof LateralViewForwardOperator)) { + if (multiChildrenSource >= 0) { + return Collections.emptySet(); + } + multiChildrenSource = tablePosition; + } + } + } + return multiChildrenSource < 0 ? bigTableCandidates : + new HashSet(Arrays.asList(multiChildrenSource)); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java index 82a833e..984963b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java @@ -21,9 +21,11 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Stack; import org.apache.hadoop.fs.Path; @@ -196,21 +198,22 @@ private void processCurrentTask(Task currTask, // get bigKeysDirToTaskMap ConditionalResolverCommonJoinCtx context = (ConditionalResolverCommonJoinCtx) conditionalTask .getResolverCtx(); - HashMap> aliasToWork = context.getAliasToTask(); + HashMap, Set> taskToAliases = context.getTaskToAliases(); // to avoid concurrent modify the hashmap - HashMap> newAliasToWork = new HashMap>(); + HashMap, Set> newTaskToAliases = + new HashMap, Set>(); // reset the resolver - for (Map.Entry> entry : aliasToWork.entrySet()) { - Task task = entry.getValue(); - String key = entry.getKey(); + for (Map.Entry, Set> entry : taskToAliases.entrySet()) { + Task task = entry.getKey(); + Set key = new HashSet(entry.getValue()); if (task.equals(currTask)) { - newAliasToWork.put(key, localTask); + newTaskToAliases.put(localTask, key); } else { - newAliasToWork.put(key, task); + newTaskToAliases.put(task, key); } } - context.setAliasToTask(newAliasToWork); + context.setTaskToAliases(newTaskToAliases); conditionalTask.setResolverCtx(context); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index f4cd3ab..fca30c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; @@ -42,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; @@ -165,7 +165,7 @@ private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOpera } // create map join task and set big table as bigTablePosition - private ObjectPair convertSMBTaskToMapJoinTask(MapredWork origWork, + private MapRedTask convertSMBTaskToMapJoinTask(MapredWork origWork, int bigTablePosition, SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree) @@ -185,12 +185,11 @@ private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOpera ReduceWork rWork = newWork.getReduceWork(); // create the local work for this plan - String bigTableAlias = - MapJoinProcessor.genLocalWorkForMapJoin(newWork, newMapJoinOp, bigTablePosition); + MapJoinProcessor.genLocalWorkForMapJoin(newWork, newMapJoinOp, bigTablePosition); // restore the reducer newWork.setReduceWork(rWork); - return new ObjectPair(newTask, bigTableAlias); + return newTask; } private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) { @@ -265,9 +264,9 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) List listWorks = new ArrayList(); List> listTasks = new ArrayList>(); - // create alias to task mapping and alias to input file mapping for resolver - HashMap> aliasToTask = - new HashMap>(); + // create task to aliases mapping and alias to input file mapping for resolver + HashMap, Set> taskToAliases = + new HashMap, Set>(); // Note that pathToAlias will behave as if the original plan was a join plan HashMap> pathToAliases = currJoinWork.getMapWork().getPathToAliases(); @@ -294,15 +293,16 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) } // create map join task for the given big table position - ObjectPair newTaskAlias = convertSMBTaskToMapJoinTask( + MapRedTask newTask = convertSMBTaskToMapJoinTask( currJoinWork, bigTablePosition, newSMBJoinOp, joinTree); - MapRedTask newTask = newTaskAlias.getFirst(); - String bigTableAlias = newTaskAlias.getSecond(); - Long aliasKnownSize = aliasToSize.get(bigTableAlias); - if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) { - long smallTblTotalKnownSize = aliasTotalKnownInputSize - - aliasKnownSize.longValue(); + MapWork mapWork = newTask.getWork().getMapWork(); + Operator parentOp = originalSMBJoinOp.getParentOperators().get(bigTablePosition); + Set aliases = GenMapRedUtils.findAliases(mapWork, parentOp); + + long aliasKnownSize = Utilities.sumOf(aliasToSize, aliases); + if (aliasKnownSize > 0) { + long smallTblTotalKnownSize = aliasTotalKnownInputSize - aliasKnownSize; if (smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) { // this table is not good to be a big table. continue; @@ -318,8 +318,8 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) newTask.setBackupTask(currTask); newTask.setBackupChildrenTasks(currTask.getChildTasks()); - // put the mapping alias to task - aliasToTask.put(bigTableAlias, newTask); + // put the mapping task to aliases + taskToAliases.put(newTask, aliases); } } catch (Exception e) { e.printStackTrace(); @@ -343,7 +343,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) ConditionalResolverCommonJoinCtx resolverCtx = new ConditionalResolverCommonJoinCtx(); resolverCtx.setPathToAliases(pathToAliases); resolverCtx.setAliasToKnownSize(aliasToSize); - resolverCtx.setAliasToTask(aliasToTask); + resolverCtx.setTaskToAliases(taskToAliases); resolverCtx.setCommonJoinTask(currTask); resolverCtx.setLocalTmpDir(context.getLocalScratchDir(false)); resolverCtx.setHdfsTmpDir(context.getMRScratchDir()); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java index 5c3a582..1da7f85 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java @@ -49,7 +49,7 @@ public static class ConditionalResolverCommonJoinCtx implements Serializable { private static final long serialVersionUID = 1L; - private HashMap> aliasToTask; + private HashMap, Set> taskToAliases; HashMap> pathToAliases; HashMap aliasToKnownSize; private Task commonJoinTask; @@ -60,12 +60,12 @@ public ConditionalResolverCommonJoinCtx() { } - public HashMap> getAliasToTask() { - return aliasToTask; + public HashMap, Set> getTaskToAliases() { + return taskToAliases; } - public void setAliasToTask(HashMap> aliasToTask) { - this.aliasToTask = aliasToTask; + public void setTaskToAliases(HashMap, Set> taskToAliases) { + this.taskToAliases = taskToAliases; } public Task getCommonJoinTask() { @@ -112,7 +112,7 @@ public void setHdfsTmpDir(Path hdfsTmpDir) { @Override public ConditionalResolverCommonJoinCtx clone() { ConditionalResolverCommonJoinCtx ctx = new ConditionalResolverCommonJoinCtx(); - ctx.setAliasToTask(aliasToTask); + ctx.setTaskToAliases(taskToAliases); ctx.setCommonJoinTask(commonJoinTask); ctx.setPathToAliases(pathToAliases); ctx.setHdfsTmpDir(hdfsTmpDir); @@ -133,15 +133,13 @@ public ConditionalResolverCommonJoin() { List> resTsks = new ArrayList>(); // get aliasToPath and pass it to the heuristic - String bigTableAlias = resolveDriverAlias(ctx, conf); + Task task = resolveDriverAlias(ctx, conf); - if (bigTableAlias == null) { + if (task == null) { // run common join task resTsks.add(ctx.getCommonJoinTask()); } else { - // run the map join task - Task task = ctx.getAliasToTask().get(bigTableAlias); - //set task tag + // run the map join task, set task tag if (task.getBackupTask() != null) { task.getBackupTask().setTaskTag(Task.BACKUP_COMMON_JOIN); } @@ -152,7 +150,7 @@ public ConditionalResolverCommonJoin() { return resTsks; } - private String resolveDriverAlias(ConditionalResolverCommonJoinCtx ctx, HiveConf conf) { + private Task resolveDriverAlias(ConditionalResolverCommonJoinCtx ctx, HiveConf conf) { try { resolveUnknownSizes(ctx, conf); return resolveMapJoinTask(ctx, conf); @@ -162,40 +160,38 @@ private String resolveDriverAlias(ConditionalResolverCommonJoinCtx ctx, HiveConf return null; } - protected String resolveMapJoinTask( + protected Task resolveMapJoinTask( ConditionalResolverCommonJoinCtx ctx, HiveConf conf) throws Exception { - Set aliases = getParticipants(ctx); + Set participants = getParticipants(ctx); Map aliasToKnownSize = ctx.getAliasToKnownSize(); Map> pathToAliases = ctx.getPathToAliases(); - Map> aliasToTask = ctx.getAliasToTask(); + Map, Set> taskToAliases = ctx.getTaskToAliases(); long threshold = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVESMALLTABLESFILESIZE); Long bigTableSize = null; Long smallTablesSize = null; - String bigTableFileAlias = null; - for (String alias : aliases) { - if (!aliasToTask.containsKey(alias)) { - continue; - } - long sumOfOthers = Utilities.sumOfExcept(aliasToKnownSize, aliases, alias); + Map.Entry, Set> nextTask = null; + for (Map.Entry, Set> entry : taskToAliases.entrySet()) { + Set aliases = entry.getValue(); + long sumOfOthers = Utilities.sumOfExcept(aliasToKnownSize, participants, aliases); if (sumOfOthers < 0 || sumOfOthers > threshold) { continue; } // at most one alias is unknown. we can safely regard it as a big alias - Long aliasSize = aliasToKnownSize.get(alias); - if (bigTableSize == null || (aliasSize != null && aliasSize > bigTableSize)) { - bigTableFileAlias = alias; + long aliasSize = Utilities.sumOf(aliasToKnownSize, aliases); + if (bigTableSize == null || aliasSize > bigTableSize) { + nextTask = entry; bigTableSize = aliasSize; smallTablesSize = sumOfOthers; } } - if (bigTableFileAlias != null) { - LOG.info("Driver alias is " + bigTableFileAlias + " with size " + bigTableSize + if (nextTask != null) { + LOG.info("Driver alias is " + nextTask.getValue() + " with size " + bigTableSize + " (total size of others : " + smallTablesSize + ", threshold : " + threshold + ")"); - return bigTableFileAlias; + return nextTask.getKey(); } LOG.info("Failed to resolve driver alias (threshold : " + threshold + ", length mapping : " + aliasToKnownSize + ")"); diff --git ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java index c9b0cf0..3af0257 100644 --- ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java +++ ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java @@ -20,6 +20,7 @@ import junit.framework.Assert; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.Task; import org.junit.Test; @@ -27,6 +28,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; public class TestConditionalResolverCommonJoin { @@ -43,30 +46,35 @@ public void testResolvingDriverAlias() throws Exception { aliasToKnownSize.put("alias2", 2048l); aliasToKnownSize.put("alias3", 4096l); + DDLTask task1 = new DDLTask(); + task1.setId("alias2"); + DDLTask task2 = new DDLTask(); + task2.setId("alias3"); + // joins alias1, alias2, alias3 (alias1 was not eligible for big pos) - HashMap> aliasToTask = - new HashMap>(); - aliasToTask.put("alias2", null); - aliasToTask.put("alias3", null); + HashMap, Set> taskToAliases = + new HashMap, Set>(); + taskToAliases.put(task1, new HashSet(Arrays.asList("alias2"))); + taskToAliases.put(task2, new HashSet(Arrays.asList("alias3"))); ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx ctx = new ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx(); ctx.setPathToAliases(pathToAliases); - ctx.setAliasToTask(aliasToTask); + ctx.setTaskToAliases(taskToAliases); ctx.setAliasToKnownSize(aliasToKnownSize); HiveConf conf = new HiveConf(); conf.setLongVar(HiveConf.ConfVars.HIVESMALLTABLESFILESIZE, 4096); // alias3 only can be selected - String resolved = resolver.resolveMapJoinTask(ctx, conf); - Assert.assertEquals("alias3", resolved); + Task resolved = resolver.resolveMapJoinTask(ctx, conf); + Assert.assertEquals("alias3", resolved.getId()); conf.setLongVar(HiveConf.ConfVars.HIVESMALLTABLESFILESIZE, 65536); // alias1, alias2, alias3 all can be selected but overriden by biggest one (alias3) resolved = resolver.resolveMapJoinTask(ctx, conf); - Assert.assertEquals("alias3", resolved); + Assert.assertEquals("alias3", resolved.getId()); conf.setLongVar(HiveConf.ConfVars.HIVESMALLTABLESFILESIZE, 2048); diff --git ql/src/test/queries/clientpositive/subquery_multiinsert.q ql/src/test/queries/clientpositive/subquery_multiinsert.q index 1f65b16..f65696c 100644 --- ql/src/test/queries/clientpositive/subquery_multiinsert.q +++ ql/src/test/queries/clientpositive/subquery_multiinsert.q @@ -42,4 +42,39 @@ INSERT OVERWRITE TABLE src_5 select * from src_4 ; select * from src_5 +; +set hive.auto.convert.join=true; + +explain +from src b +INSERT OVERWRITE TABLE src_4 + select * + where b.key in + (select a.key + from src a + where b.value = a.value and a.key > '9' + ) +INSERT OVERWRITE TABLE src_5 + select * + where b.key not in ( select key from src s1 where s1.key > '2') + order by key +; + +from src b +INSERT OVERWRITE TABLE src_4 + select * + where b.key in + (select a.key + from src a + where b.value = a.value and a.key > '9' + ) +INSERT OVERWRITE TABLE src_5 + select * + where b.key not in ( select key from src s1 where s1.key > '2') + order by key +; + +select * from src_4 +; +select * from src_5 ; \ No newline at end of file diff --git ql/src/test/results/clientpositive/auto_join12.q.out ql/src/test/results/clientpositive/auto_join12.q.out index bf7990c..ffd6fc4 100644 --- ql/src/test/results/clientpositive/auto_join12.q.out +++ ql/src/test/results/clientpositive/auto_join12.q.out @@ -32,7 +32,7 @@ STAGE PLANS: src1:src Fetch Operator limit: -1 - src3:src + src2:src Fetch Operator limit: -1 Alias -> Map Local Operator Tree: @@ -56,17 +56,17 @@ STAGE PLANS: 0 _col0 (type: string) 1 _col0 (type: string) 2 _col0 (type: string) - src3:src + src2:src TableScan alias: src - Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((key < 100) and (key < 80)) (type: boolean) - Statistics: Num rows: 6 Data size: 601 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE Select Operator - expressions: key (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 6 Data size: 601 Basic stats: COMPLETE Column stats: NONE + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator condition expressions: 0 {_col0} @@ -82,14 +82,14 @@ STAGE PLANS: Map Operator Tree: TableScan alias: src - Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((key < 100) and (key < 80)) (type: boolean) - Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 601 Basic stats: COMPLETE Column stats: NONE Select Operator - expressions: key (type: string), value (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 6 Data size: 601 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 diff --git ql/src/test/results/clientpositive/auto_join25.q.out ql/src/test/results/clientpositive/auto_join25.q.out index 9fe6049..268b06e 100644 --- ql/src/test/results/clientpositive/auto_join25.q.out +++ ql/src/test/results/clientpositive/auto_join25.q.out @@ -81,7 +81,7 @@ Obtaining error information Task failed! Task ID: - Stage-6 + Stage-7 Logs: @@ -129,7 +129,7 @@ Obtaining error information Task failed! Task ID: - Stage-5 + Stage-4 Logs: diff --git ql/src/test/results/clientpositive/auto_join29.q.out ql/src/test/results/clientpositive/auto_join29.q.out index 0dd84ff..6bfccd8 100644 --- ql/src/test/results/clientpositive/auto_join29.q.out +++ ql/src/test/results/clientpositive/auto_join29.q.out @@ -3346,7 +3346,7 @@ STAGE PLANS: src1 Fetch Operator limit: -1 - src3 + src2 Fetch Operator limit: -1 Alias -> Map Local Operator Tree: @@ -3370,9 +3370,9 @@ STAGE PLANS: 0 key (type: string) 1 key (type: string) 2 key (type: string) - src3 + src2 TableScan - alias: src3 + alias: src2 Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (key < 10) (type: boolean) @@ -3395,7 +3395,7 @@ STAGE PLANS: Map Reduce Map Operator Tree: TableScan - alias: src2 + alias: src3 Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (key < 10) (type: boolean) @@ -3492,7 +3492,7 @@ STAGE PLANS: src1 Fetch Operator limit: -1 - src3 + src2 Fetch Operator limit: -1 Alias -> Map Local Operator Tree: @@ -3512,12 +3512,12 @@ STAGE PLANS: 0 key (type: string) 1 key (type: string) 2 key (type: string) - src3 + src2 TableScan - alias: src3 + alias: src2 Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: ((key > 10) and (key < 10)) (type: boolean) + predicate: ((key < 10) and (key > 10)) (type: boolean) Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator condition expressions: @@ -3533,10 +3533,10 @@ STAGE PLANS: Map Reduce Map Operator Tree: TableScan - alias: src2 + alias: src3 Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: ((key < 10) and (key > 10)) (type: boolean) + predicate: ((key > 10) and (key < 10)) (type: boolean) Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: diff --git ql/src/test/results/clientpositive/auto_join3.q.out ql/src/test/results/clientpositive/auto_join3.q.out index 8c5fc02..fac0796 100644 --- ql/src/test/results/clientpositive/auto_join3.q.out +++ ql/src/test/results/clientpositive/auto_join3.q.out @@ -21,8 +21,8 @@ STAGE PLANS: Map Reduce Map Operator Tree: TableScan - alias: src2 - Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + alias: src3 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 @@ -55,7 +55,7 @@ STAGE PLANS: src1 Fetch Operator limit: -1 - src3 + src2 Fetch Operator limit: -1 Alias -> Map Local Operator Tree: @@ -63,10 +63,10 @@ STAGE PLANS: TableScan alias: src1 Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE - src3 + src2 TableScan - alias: src3 - Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + alias: src2 + Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Stage: Stage-0 Move Operator diff --git ql/src/test/results/clientpositive/multiMapJoin2.q.out ql/src/test/results/clientpositive/multiMapJoin2.q.out index c59a407..4e19954 100644 --- ql/src/test/results/clientpositive/multiMapJoin2.q.out +++ ql/src/test/results/clientpositive/multiMapJoin2.q.out @@ -2221,7 +2221,7 @@ STAGE PLANS: Map Reduce Map Operator Tree: TableScan - alias: a + alias: b Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: @@ -2251,13 +2251,13 @@ STAGE PLANS: Local Work: Map Reduce Local Work Alias -> Map Local Tables: - null-subquery1:x-subquery1:tmp:b + null-subquery1:x-subquery1:tmp:a Fetch Operator limit: -1 Alias -> Map Local Operator Tree: - null-subquery1:x-subquery1:tmp:b + null-subquery1:x-subquery1:tmp:a TableScan - alias: b + alias: a Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Group By Operator diff --git ql/src/test/results/clientpositive/subquery_multiinsert.q.out ql/src/test/results/clientpositive/subquery_multiinsert.q.out index fdafea3..83e9d77 100644 --- ql/src/test/results/clientpositive/subquery_multiinsert.q.out +++ ql/src/test/results/clientpositive/subquery_multiinsert.q.out @@ -482,3 +482,523 @@ POSTHOOK: Lineage: src_5.value EXPRESSION [(src)b.FieldSchema(name:value, type:s 199 val_199 199 val_199 2 val_2 +PREHOOK: query: explain +from src b +INSERT OVERWRITE TABLE src_4 + select * + where b.key in + (select a.key + from src a + where b.value = a.value and a.key > '9' + ) +INSERT OVERWRITE TABLE src_5 + select * + where b.key not in ( select key from src s1 where s1.key > '2') + order by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +from src b +INSERT OVERWRITE TABLE src_4 + select * + where b.key in + (select a.key + from src a + where b.value = a.value and a.key > '9' + ) +INSERT OVERWRITE TABLE src_5 + select * + where b.key not in ( select key from src s1 where s1.key > '2') + order by key +POSTHOOK: type: QUERY +POSTHOOK: Lineage: src_4.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_4.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +STAGE DEPENDENCIES: + Stage-10 is a root stage + Stage-13 depends on stages: Stage-10, Stage-14 , consists of Stage-12, Stage-4 + Stage-12 has a backup stage: Stage-4 + Stage-15 depends on stages: Stage-4, Stage-12 + Stage-6 depends on stages: Stage-15 + Stage-1 depends on stages: Stage-6 + Stage-7 depends on stages: Stage-1 + Stage-4 + Stage-17 is a root stage + Stage-14 depends on stages: Stage-17 + Stage-0 depends on stages: Stage-14 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-10 + Map Reduce + Map Operator Tree: + TableScan + alias: s1 + Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((key > '2') and key is null) (type: boolean) + Statistics: Num rows: 9 Data size: 901 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 9 Data size: 901 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = 0) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Group By Operator + keys: _col0 (type: bigint) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-13 + Conditional Operator + + Stage: Stage-12 + Map Reduce + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Left Semi Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + Alias -> Map Local Tables: + $INTNAME1 + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + $INTNAME1 + TableScan + + Stage: Stage-15 + Map Reduce Local Work + Alias -> Map Local Tables: + sq_2:s1 + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + sq_2:s1 + TableScan + alias: s1 + Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key > '2') (type: boolean) + Statistics: Num rows: 19 Data size: 1903 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 19 Data size: 1903 Basic stats: COMPLETE Column stats: NONE + HashTable Sink Operator + condition expressions: + 0 {_col0} {_col1} + 1 {_col0} + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + + Stage: Stage-6 + Map Reduce + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Left Outer Join0 to 1 + condition expressions: + 0 {_col0} {_col1} + 1 {_col0} + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col4 + Statistics: Num rows: 34 Data size: 7032 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((1 = 1) and _col4 is null) (type: boolean) + Statistics: Num rows: 8 Data size: 1654 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 8 Data size: 1654 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 8 Data size: 1654 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Local Work: + Map Reduce Local Work + Reduce Operator Tree: + Extract + Statistics: Num rows: 8 Data size: 1654 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 8 Data size: 1654 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src_5 + + Stage: Stage-1 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src_5 + + Stage: Stage-7 + Stats-Aggr Operator + + Stage: Stage-4 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + value expressions: key (type: string), value (type: string) + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Left Semi Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 31 Data size: 6393 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-17 + Map Reduce Local Work + Alias -> Map Local Tables: + sq_1:a + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + sq_1:a + TableScan + alias: a + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key > '9') (type: boolean) + Statistics: Num rows: 9 Data size: 1803 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 9 Data size: 1803 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string), _col1 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 9 Data size: 1803 Basic stats: COMPLETE Column stats: NONE + HashTable Sink Operator + condition expressions: + 0 {key} {value} + 1 + keys: + 0 key (type: string), value (type: string) + 1 _col0 (type: string), _col1 (type: string) + + Stage: Stage-14 + Map Reduce + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Left Semi Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 + keys: + 0 key (type: string), value (type: string) + 1 _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 31 Data size: 6393 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (1 = 1) (type: boolean) + Statistics: Num rows: 15 Data size: 3093 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 15 Data size: 3093 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 15 Data size: 3093 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src_4 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src_4 + + Stage: Stage-3 + Stats-Aggr Operator + +PREHOOK: query: from src b +INSERT OVERWRITE TABLE src_4 + select * + where b.key in + (select a.key + from src a + where b.value = a.value and a.key > '9' + ) +INSERT OVERWRITE TABLE src_5 + select * + where b.key not in ( select key from src s1 where s1.key > '2') + order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src_4 +PREHOOK: Output: default@src_5 +POSTHOOK: query: from src b +INSERT OVERWRITE TABLE src_4 + select * + where b.key in + (select a.key + from src a + where b.value = a.value and a.key > '9' + ) +INSERT OVERWRITE TABLE src_5 + select * + where b.key not in ( select key from src s1 where s1.key > '2') + order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@src_4 +POSTHOOK: Output: default@src_5 +POSTHOOK: Lineage: src_4.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_4.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_4.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: src_4.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: select * from src_4 +PREHOOK: type: QUERY +PREHOOK: Input: default@src_4 +#### A masked pattern was here #### +POSTHOOK: query: select * from src_4 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_4 +#### A masked pattern was here #### +POSTHOOK: Lineage: src_4.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_4.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_4.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: src_4.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +98 val_98 +92 val_92 +96 val_96 +95 val_95 +98 val_98 +90 val_90 +95 val_95 +90 val_90 +97 val_97 +90 val_90 +97 val_97 +PREHOOK: query: select * from src_5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src_5 +#### A masked pattern was here #### +POSTHOOK: query: select * from src_5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_5 +#### A masked pattern was here #### +POSTHOOK: Lineage: src_4.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_4.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_4.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: src_4.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.key EXPRESSION [(src)b.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: src_5.value EXPRESSION [(src)b.FieldSchema(name:value, type:string, comment:default), ] +0 val_0 +0 val_0 +0 val_0 +10 val_10 +100 val_100 +100 val_100 +103 val_103 +103 val_103 +104 val_104 +104 val_104 +105 val_105 +11 val_11 +111 val_111 +113 val_113 +113 val_113 +114 val_114 +116 val_116 +118 val_118 +118 val_118 +119 val_119 +119 val_119 +119 val_119 +12 val_12 +12 val_12 +120 val_120 +120 val_120 +125 val_125 +125 val_125 +126 val_126 +128 val_128 +128 val_128 +128 val_128 +129 val_129 +129 val_129 +131 val_131 +133 val_133 +134 val_134 +134 val_134 +136 val_136 +137 val_137 +137 val_137 +138 val_138 +138 val_138 +138 val_138 +138 val_138 +143 val_143 +145 val_145 +146 val_146 +146 val_146 +149 val_149 +149 val_149 +15 val_15 +15 val_15 +150 val_150 +152 val_152 +152 val_152 +153 val_153 +155 val_155 +156 val_156 +157 val_157 +158 val_158 +160 val_160 +162 val_162 +163 val_163 +164 val_164 +164 val_164 +165 val_165 +165 val_165 +166 val_166 +167 val_167 +167 val_167 +167 val_167 +168 val_168 +169 val_169 +169 val_169 +169 val_169 +169 val_169 +17 val_17 +170 val_170 +172 val_172 +172 val_172 +174 val_174 +174 val_174 +175 val_175 +175 val_175 +176 val_176 +176 val_176 +177 val_177 +178 val_178 +179 val_179 +179 val_179 +18 val_18 +18 val_18 +180 val_180 +181 val_181 +183 val_183 +186 val_186 +187 val_187 +187 val_187 +187 val_187 +189 val_189 +19 val_19 +190 val_190 +191 val_191 +191 val_191 +192 val_192 +193 val_193 +193 val_193 +193 val_193 +194 val_194 +195 val_195 +195 val_195 +196 val_196 +197 val_197 +197 val_197 +199 val_199 +199 val_199 +199 val_199 +2 val_2