diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3bfd539..4d4be60 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -529,6 +529,7 @@ HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true), HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD("hive.auto.convert.join.noconditionaltask.size", 10000000L), + HIVECONVERTJOINUSENONSTAGED("hive.auto.convert.join.use.nonstaged", false), HIVESKEWJOINKEY("hive.skewjoin.key", 100000), HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000), HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432L), //32M diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java index d8f4eb4..3cfaacf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java @@ -51,7 +51,7 @@ */ protected transient List[] joinKeysStandardObjectInspectors; - protected transient byte posBigTable = -1; // one of the tables that is not in memory + protected transient byte posBigTable = -1; // pos of driver alias protected transient RowContainer> emptyList = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java index a080fcc..64abf85 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java @@ -30,7 +30,8 @@ */ public interface HashTableLoader { - void load(ExecMapperContext context, Configuration hconf, MapJoinDesc desc, byte posBigTable, - MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes) + void init(ExecMapperContext context, Configuration hconf, MapJoinDesc desc); + + void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index aa8f19c..e3ad94b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; import java.io.BufferedOutputStream; +import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.ArrayList; @@ -26,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -198,7 +200,9 @@ protected void initializeOp(Configuration hconf) throws HiveException { } } - + public MapJoinTableContainer[] getMapJoinTables() { + return mapJoinTables; + } private static List[] getStandardObjectInspectors( List[] aliasToObjectInspectors, int maxTag) { @@ -265,34 +269,7 @@ private boolean hasFilter(int alias) { public void closeOp(boolean abort) throws HiveException { try { if (mapJoinTables != null) { - // get tmp file URI - String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI(); - LOG.info("Temp URI for side table: " + tmpURI); - for (byte tag = 0; tag < mapJoinTables.length; tag++) { - // get the key and value - MapJoinTableContainer tableContainer = mapJoinTables[tag]; - if (tableContainer == null) { - continue; - } - // get current input file name - String bigBucketFileName = getExecContext().getCurrentBigBucketFile(); - String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName); - // get the tmp URI path; it will be a hdfs path if not local mode - String dumpFilePrefix = conf.getDumpFilePrefix(); - String tmpURIPath = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName); - console.printInfo(Utilities.now() + "\tDump the side-table into file: " + tmpURIPath); - // get the hashtable file and path - Path path = new Path(tmpURIPath); - FileSystem fs = path.getFileSystem(hconf); - ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096)); - try { - mapJoinTableSerdes[tag].persist(out, tableContainer); - } finally { - out.close(); - } - tableContainer.clear(); - console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath); - } + flushToFile(); } super.closeOp(abort); } catch (Exception e) { @@ -300,6 +277,39 @@ public void closeOp(boolean abort) throws HiveException { } } + protected void flushToFile() throws IOException, HiveException { + // get tmp file URI + String tmpURI = getExecContext().getLocalWork().getTmpFileURI(); + LOG.info("Temp URI for side table: " + tmpURI); + for (byte tag = 0; tag < mapJoinTables.length; tag++) { + // get the key and value + MapJoinTableContainer tableContainer = mapJoinTables[tag]; + if (tableContainer == null) { + continue; + } + // get current input file name + String bigBucketFileName = getExecContext().getCurrentBigBucketFile(); + String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName); + // get the tmp URI path; it will be a hdfs path if not local mode + String dumpFilePrefix = conf.getDumpFilePrefix(); + String tmpURIPath = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName); + console.printInfo(Utilities.now() + "\tDump the side-table into file: " + tmpURIPath); + // get the hashtable file and path + Path path = new Path(tmpURIPath); + FileSystem fs = path.getFileSystem(hconf); + ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096)); + try { + mapJoinTableSerdes[tag].persist(out, tableContainer); + } finally { + out.close(); + } + tableContainer.clear(); + FileStatus status = fs.getFileStatus(path); + console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + tmpURIPath + + " (" + status.getLen() + " bytes)"); + } + } + /** * Implements the getName function for the Node Interface. * diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java index 1e0314d..3fc6bc4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java @@ -55,7 +55,7 @@ List[] result = new List[tagLen]; for (byte alias = 0; alias < exprEntries.length; alias++) { //get big table - if (alias == (byte) posBigTableAlias){ + if (alias == (byte) posBigTableAlias || exprEntries[alias] == null){ //skip the big tables continue; } @@ -77,7 +77,7 @@ List[] result = new List[tagLen]; for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) { //get big table - if(alias == (byte) posBigTableAlias ){ + if(alias == (byte) posBigTableAlias || aliasToObjectInspectors[alias] == null){ //skip the big tables continue; } @@ -106,6 +106,9 @@ public static int populateJoinKeyValue(List[] outMap, int posBigTableAlias) throws HiveException { int total = 0; for (Entry> e : inputMap.entrySet()) { + if (e.getValue() == null) { + continue; + } Byte key = order == null ? e.getKey() : order[e.getKey()]; List valueFields = new ArrayList(); for (ExprNodeDesc expr : e.getValue()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index bdc85b9..6e2bc33 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -145,8 +145,8 @@ private void loadHashTable() throws HiveException { } } perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); - loader.load(this.getExecContext(), hconf, this.getConf(), - posBigTable, mapJoinTables, mapJoinTableSerdes); + loader.init(getExecContext(), hconf, getConf()); + loader.load(mapJoinTables, mapJoinTableSerdes); cache.cache(tableKey, mapJoinTables); cache.cache(serdeKey, mapJoinTableSerdes); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TemporaryHashSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TemporaryHashSinkOperator.java new file mode 100644 index 0000000..16baf4b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TemporaryHashSinkOperator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; + +import java.io.IOException; + +public class TemporaryHashSinkOperator extends HashTableSinkOperator { + public TemporaryHashSinkOperator(MapJoinDesc desc) { + conf = new HashTableSinkDesc(desc); + } + + @Override + protected void flushToFile() throws IOException, HiveException { + // do nothing + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 42d764d..3a04a5a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -329,7 +329,7 @@ public int execute(DriverContext driverContext) { try{ MapredLocalWork localwork = mWork.getMapLocalWork(); - if (localwork != null) { + if (localwork != null && localwork.hasStagedAlias()) { if (!ShimLoader.getHadoopShims().isLocalMode(job)) { Path localPath = new Path(localwork.getTmpFileURI()); Path hdfsPath = new Path(mWork.getTmpHDFSFileURI()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java index efe5710..94e6972 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java @@ -20,6 +20,9 @@ import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,13 +30,20 @@ import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapred.JobConf; /** * HashTableLoader for MR loads the hashtable for MapJoins from local disk (hashtables @@ -44,22 +54,28 @@ private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName()); - public HashTableLoader() { + private ExecMapperContext context; + private Configuration hconf; + private MapJoinDesc desc; + + @Override + public void init(ExecMapperContext context, Configuration hconf, MapJoinDesc desc) { + this.context = context; + this.hconf = hconf; + this.desc = desc; } @Override - public void load(ExecMapperContext context, - Configuration hconf, - MapJoinDesc desc, - byte posBigTable, + public void load( MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { String baseDir = null; Path currentInputPath = context.getCurrentInputPath(); - LOG.info("******* Load from HashTable File: input : " + currentInputPath); + LOG.info("******* Load from HashTable for input file: " + currentInputPath); String fileName = context.getLocalWork().getBucketFileName(currentInputPath.toString()); try { + loadDirectly(mapJoinTables, fileName); if (ShimLoader.getHadoopShims().isLocalMode(hconf)) { baseDir = context.getLocalWork().getTmpFileURI(); } else { @@ -79,7 +95,7 @@ public void load(ExecMapperContext context, } } for (int pos = 0; pos < mapJoinTables.length; pos++) { - if (pos == posBigTable) { + if (pos == desc.getPosBigTable() || mapJoinTables[pos] != null) { continue; } if(baseDir == null) { @@ -101,4 +117,30 @@ public void load(ExecMapperContext context, } } + private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFileName) + throws Exception { + MapredLocalWork localWork = context.getLocalWork(); + List> directWorks = localWork.getDirectFetchOp(); + if (directWorks == null || directWorks.isEmpty()) { + return; + } + JobConf job = new JobConf(hconf); + MapredLocalTask localTask = new MapredLocalTask(localWork, job, false); + + HashTableSinkOperator sink = new TemporaryHashSinkOperator(desc); + sink.setParentOperators(new ArrayList>(directWorks)); + + for (Operator operator : directWorks) { + if (operator instanceof TableScanOperator) { + operator.setChildOperators(Arrays.>asList(sink)); + } + } + localTask.setExecContext(context); + localTask.startForward(inputFileName); + + MapJoinTableContainer[] tables = sink.getMapJoinTables(); + System.arraycopy(tables, 0, mapJoinTables, 0, tables.length); + + Arrays.fill(tables, null); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 0cc90d0..cd1b809 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -81,7 +81,7 @@ */ public class MapredLocalTask extends Task implements Serializable { - private Map fetchOperators; + private Map fetchOperators = new HashMap(); protected HadoopJobExecHelper jobExecHelper; private JobConf job; public static transient final Log l4j = LogFactory.getLog(MapredLocalTask.class); @@ -93,7 +93,7 @@ // not sure we need this exec context; but all the operators in the work // will pass this context throught - private final ExecMapperContext execContext = new ExecMapperContext(); + private ExecMapperContext execContext = new ExecMapperContext(); private Process executor; @@ -107,6 +107,10 @@ public MapredLocalTask(MapredLocalWork plan, JobConf job, boolean isSilent) thro console = new LogHelper(LOG, isSilent); } + public void setExecContext(ExecMapperContext execContext) { + this.execContext = execContext; + } + @Override public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { super.initialize(conf, queryPlan, driverContext); @@ -291,26 +295,11 @@ public int executeFromChildJVM(DriverContext driverContext) { console.printInfo(Utilities.now() + "\tStarting to launch local task to process map join;\tmaximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - fetchOperators = new HashMap(); - Map fetchOpJobConfMap = new HashMap(); execContext.setJc(job); // set the local work, so all the operator can get this context execContext.setLocalWork(work); - boolean inputFileChangeSenstive = work.getInputFileChangeSensitive(); try { - - initializeOperators(fetchOpJobConfMap); - // for each big table's bucket, call the start forward - if (inputFileChangeSenstive) { - for (Map> bigTableBucketFiles : work - .getBucketMapjoinContext().getAliasBucketFileNameMapping().values()) { - for (String bigTableBucket : bigTableBucketFiles.keySet()) { - startForward(inputFileChangeSenstive, bigTableBucket); - } - } - } else { - startForward(inputFileChangeSenstive, null); - } + startForward(null); long currentTime = System.currentTimeMillis(); long elapsed = currentTime - startTime; console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: " @@ -328,6 +317,26 @@ public int executeFromChildJVM(DriverContext driverContext) { return 0; } + public void startForward(String bigTableBucket) throws Exception { + boolean inputFileChangeSenstive = work.getInputFileChangeSensitive(); + initializeOperators(new HashMap()); + // for each big table's bucket, call the start forward + if (inputFileChangeSenstive) { + for (Map> bigTableBucketFiles : work + .getBucketMapjoinContext().getAliasBucketFileNameMapping().values()) { + if (bigTableBucket == null) { + for (String bigTableBucketFile : bigTableBucketFiles.keySet()) { + startForward(inputFileChangeSenstive, bigTableBucketFile); + } + } else if (bigTableBucketFiles.keySet().contains(bigTableBucket)) { + startForward(inputFileChangeSenstive, bigTableBucket); + } + } + } else { + startForward(inputFileChangeSenstive, null); + } + } + private void startForward(boolean inputFileChangeSenstive, String bigTableBucket) throws Exception { for (Map.Entry entry : fetchOperators.entrySet()) { @@ -348,24 +357,18 @@ private void startForward(boolean inputFileChangeSenstive, String bigTableBucket // get the root operator Operator forwardOp = work.getAliasToWork().get(alias); // walk through the operator tree - while (true) { + while (!forwardOp.getDone()) { InspectableObject row = fetchOp.getNextRow(); if (row == null) { - if (inputFileChangeSenstive) { - execContext.setCurrentBigBucketFile(bigTableBucket); - forwardOp.reset(); - } - forwardOp.close(false); break; } forwardOp.processOp(row.o, 0); - // check if any operator had a fatal error or early exit during - // execution - if (forwardOp.getDone()) { - // ExecMapper.setDone(true); - break; - } } + if (inputFileChangeSenstive) { + execContext.setCurrentBigBucketFile(bigTableBucket); + forwardOp.reset(); + } + forwardOp.close(false); } } @@ -373,6 +376,9 @@ private void initializeOperators(Map fetchOpJobConfMap) throws HiveException { // this mapper operator is used to initialize all the operators for (Map.Entry entry : work.getAliasToFetchWork().entrySet()) { + if (entry.getValue() == null) { + continue; + } JobConf jobClone = new JobConf(job); TableScanOperator ts = (TableScanOperator)work.getAliasToWork().get(entry.getKey()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java index 5a53e15..a462826 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java @@ -118,6 +118,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. // mapjoin should not affected by join reordering mapJoinOp.getConf().resetOrder(); + HiveConf conf = context.getParseCtx().getConf(); + HashTableSinkDesc hashTableSinkDesc = new HashTableSinkDesc(mapJoinOp.getConf()); HashTableSinkOperator hashTableSinkOp = (HashTableSinkOperator) OperatorFactory .get(hashTableSinkDesc); @@ -125,34 +127,54 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. // set hashtable memory usage float hashtableMemoryUsage; if (context.isFollowedByGroupBy()) { - hashtableMemoryUsage = context.getParseCtx().getConf().getFloatVar( + hashtableMemoryUsage = conf.getFloatVar( HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE); } else { - hashtableMemoryUsage = context.getParseCtx().getConf().getFloatVar( + hashtableMemoryUsage = conf.getFloatVar( HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE); } hashTableSinkOp.getConf().setHashtableMemoryUsage(hashtableMemoryUsage); // get the last operator for processing big tables int bigTable = mapJoinOp.getConf().getPosBigTable(); + Byte[] orders = mapJoinOp.getConf().getTagOrder(); + + final boolean useNontaged = conf.getBoolVar( + HiveConf.ConfVars.HIVECONVERTJOINUSENONSTAGED); // the parent ops for hashTableSinkOp List> smallTablesParentOp = new ArrayList>(); List> dummyOperators = new ArrayList>(); + List> directOperators = + new ArrayList>(); // get all parents List> parentsOp = mapJoinOp.getParentOperators(); for (int i = 0; i < parentsOp.size(); i++) { if (i == bigTable) { smallTablesParentOp.add(null); + directOperators.add(null); continue; } Operator parent = parentsOp.get(i); + if (useNontaged && parent instanceof TableScanOperator) { + // no filter, no projection. no need to stage + smallTablesParentOp.add(null); + directOperators.add(parent); + hashTableSinkDesc.getKeys().put(orders[i], null); + hashTableSinkDesc.getExprs().put(orders[i], null); + hashTableSinkDesc.getFilters().put(orders[i], null); + } else { + // keep the parent id correct + smallTablesParentOp.add(parent); + directOperators.add(null); + } // let hashtable Op be the child of this parent parent.replaceChild(mapJoinOp, hashTableSinkOp); - // keep the parent id correct - smallTablesParentOp.add(parent); + if (useNontaged && parent instanceof TableScanOperator) { + parent.setChildOperators(null); + } // create an new operator: HashTable DummyOpeator, which share the table desc HashTableDummyDesc desc = new HashTableDummyDesc(); @@ -186,6 +208,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. for (Operator op : dummyOperators) { context.addDummyParentOp(op); } + context.setDirectWorks(directOperators); return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java index 010ac54..9561f25 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java @@ -123,17 +123,25 @@ private void processCurrentTask(Task currTask, } // replace the map join operator to local_map_join operator in the operator tree // and return all the dummy parent - LocalMapJoinProcCtx localMapJoinProcCtx= adjustLocalTask(localTask); + LocalMapJoinProcCtx localMapJoinProcCtx = adjustLocalTask(localTask); List> dummyOps = localMapJoinProcCtx.getDummyParentOp(); + List> directWorks = localMapJoinProcCtx.getDirectWorks(); + // create new local work and setup the dummy ops - MapredLocalWork newLocalWork = new MapredLocalWork(); + MapredLocalWork newLocalWork = localwork.extractDirectWorks(directWorks); newLocalWork.setDummyParentOp(dummyOps); - newLocalWork.setTmpFileURI(tmpFileURI); - newLocalWork.setInputFileChangeSensitive(localwork.getInputFileChangeSensitive()); - newLocalWork.setBucketMapjoinContext(localwork.copyPartSpecMappingOnly()); mapredWork.getMapWork().setMapLocalWork(newLocalWork); + + if (localwork.getAliasToFetchWork().isEmpty()) { + // no alias to stage.. no local task + newLocalWork.setHasStagedAlias(false); + currTask.setBackupTask(localTask.getBackupTask()); + currTask.setBackupChildrenTasks(localTask.getBackupChildrenTasks()); + return; + } + newLocalWork.setHasStagedAlias(true); // get all parent tasks List> parentTasks = currTask.getParentTasks(); currTask.setParentTasks(null); @@ -271,6 +279,8 @@ public void setPhysicalContext(PhysicalContext physicalContext) { private List> dummyParentOp = null; private boolean isFollowedByGroupBy; + private List> directWorks; + public LocalMapJoinProcCtx(Task task, ParseContext parseCtx) { currentTask = task; this.parseCtx = parseCtx; @@ -312,5 +322,13 @@ public void setDummyParentOp(List> op) { public void addDummyParentOp(Operator op) { this.dummyParentOp.add(op); } + + public void setDirectWorks(List> directWorks) { + this.directWorks = directWorks; + } + + public List> getDirectWorks() { + return directWorks; + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java index 14fced7..f445016 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java @@ -20,10 +20,8 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -92,7 +90,7 @@ public HashTableSinkDesc() { public HashTableSinkDesc(MapJoinDesc clone) { this.bigKeysDirMap = clone.getBigKeysDirMap(); this.conds = clone.getConds(); - this.exprs= clone.getExprs(); + this.exprs = new HashMap>(clone.getExprs()); this.handleSkewJoin = clone.getHandleSkewJoin(); this.keyTableDesc = clone.getKeyTableDesc(); this.noOuterJoin = clone.getNoOuterJoin(); @@ -102,10 +100,10 @@ public HashTableSinkDesc(MapJoinDesc clone) { this.skewKeysValuesTables = clone.getSkewKeysValuesTables(); this.smallKeysDirMap = clone.getSmallKeysDirMap(); this.tagOrder = clone.getTagOrder(); - this.filters = clone.getFilters(); + this.filters = new HashMap>(clone.getFilters()); this.filterMap = clone.getFilterMap(); - this.keys = clone.getKeys(); + this.keys = new HashMap>(clone.getKeys()); this.keyTblDesc = clone.getKeyTblDesc(); this.valueTblDescs = clone.getValueTblDescs(); this.valueTblFilteredDescs = clone.getValueFilteredTblDescs(); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java index 83a778d..f969a36 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java @@ -19,8 +19,11 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Operator; @@ -40,7 +43,10 @@ private String tmpFileURI; private String stageID; - private List> dummyParentOp ; + private List> dummyParentOp; + private List> directFetchOp; + + private boolean hasStagedAlias; public MapredLocalWork() { @@ -177,4 +183,52 @@ private String getFileName(String path) { } return path.substring(last_separator + 1); } + + public MapredLocalWork extractDirectWorks(List> directWorks) { + MapredLocalWork newLocalWork = new MapredLocalWork(); + newLocalWork.setTmpFileURI(tmpFileURI); + newLocalWork.setInputFileChangeSensitive(inputFileChangeSensitive); + newLocalWork.setBucketMapjoinContext(copyPartSpecMappingOnly()); + if (!hasAnyNonNull(directWorks)) { + // all small aliases are staged + return newLocalWork; + } + newLocalWork.directFetchOp = new ArrayList>(directWorks); + newLocalWork.aliasToWork = new LinkedHashMap>(); + newLocalWork.aliasToFetchWork = new LinkedHashMap(); + + Map> works = new HashMap>(aliasToWork); + for (Map.Entry> entry : works.entrySet()) { + String alias = entry.getKey(); + boolean notStaged = directWorks.contains(entry.getValue()); + newLocalWork.aliasToWork.put(alias, notStaged ? aliasToWork.remove(alias) : null); + newLocalWork.aliasToFetchWork.put(alias, notStaged ? aliasToFetchWork.remove(alias) : null); + } + return newLocalWork; + } + + private boolean hasAnyNonNull(List list) { + for (Object element : list) { + if (element != null) { + return true; + } + } + return false; + } + + public void setDirectFetchOp(List> op){ + this.directFetchOp = op; + } + + public List> getDirectFetchOp() { + return directFetchOp; + } + + public boolean hasStagedAlias() { + return hasStagedAlias; + } + + public void setHasStagedAlias(boolean hasStagedAlias) { + this.hasStagedAlias = hasStagedAlias; + } } diff --git ql/src/test/queries/clientpositive/auto_join_without_localtask.q ql/src/test/queries/clientpositive/auto_join_without_localtask.q new file mode 100644 index 0000000..bb7edc9 --- /dev/null +++ ql/src/test/queries/clientpositive/auto_join_without_localtask.q @@ -0,0 +1,28 @@ +set hive.auto.convert.join=true; +set hive.auto.convert.join.use.nonstaged=true; + +set hive.auto.convert.join.noconditionaltask.size=100; + +explain +select a.* from src a join src b on a.key=b.key limit 40; + +select a.* from src a join src b on a.key=b.key limit 40; + +explain +select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40; + +select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40; + +set hive.auto.convert.join.noconditionaltask.size=100; + +explain +select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40; + +select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40; + +set hive.mapjoin.localtask.max.memory.usage = 0.0001; +set hive.mapjoin.check.memory.rows = 2; + +-- fallback to common join +select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40; + diff --git ql/src/test/results/clientpositive/auto_join_without_localtask.q.out ql/src/test/results/clientpositive/auto_join_without_localtask.q.out new file mode 100644 index 0000000..22a243f --- /dev/null +++ ql/src/test/results/clientpositive/auto_join_without_localtask.q.out @@ -0,0 +1,1025 @@ +PREHOOK: query: explain +select a.* from src a join src b on a.key=b.key limit 40 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select a.* from src a join src b on a.key=b.key limit 40 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF (TOK_TABNAME a)))) (TOK_LIMIT 40))) + +STAGE DEPENDENCIES: + Stage-5 is a root stage , consists of Stage-3, Stage-4, Stage-1 + Stage-3 has a backup stage: Stage-1 + Stage-4 has a backup stage: Stage-1 + Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-5 + Conditional Operator + + Stage: Stage-3 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0, _col1 + Position of Big Table: 0 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Local Work: + Map Reduce Local Work + Alias -> Map Local Tables: + b + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + b + TableScan + alias: b + + Stage: Stage-4 + Map Reduce + Alias -> Map Operator Tree: + b + TableScan + alias: b + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0, _col1 + Position of Big Table: 1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Local Work: + Map Reduce Local Work + Alias -> Map Local Tables: + a + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + a + TableScan + alias: a + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 0 + value expressions: + expr: key + type: string + expr: value + type: string + b + TableScan + alias: b + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 + handleSkewJoin: false + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 40 + +PREHOOK: query: select a.* from src a join src b on a.key=b.key limit 40 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select a.* from src a join src b on a.key=b.key limit 40 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +238 val_238 +238 val_238 +86 val_86 +311 val_311 +311 val_311 +311 val_311 +27 val_27 +165 val_165 +165 val_165 +409 val_409 +409 val_409 +409 val_409 +255 val_255 +255 val_255 +278 val_278 +278 val_278 +98 val_98 +98 val_98 +484 val_484 +265 val_265 +265 val_265 +193 val_193 +193 val_193 +193 val_193 +401 val_401 +401 val_401 +401 val_401 +401 val_401 +401 val_401 +150 val_150 +273 val_273 +273 val_273 +273 val_273 +224 val_224 +224 val_224 +369 val_369 +369 val_369 +369 val_369 +66 val_66 +128 val_128 +PREHOOK: query: explain +select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key))) (TOK_TABREF (TOK_TABNAME src) c) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL c) value)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF (TOK_TABNAME a)))) (TOK_LIMIT 40))) + +STAGE DEPENDENCIES: + Stage-10 is a root stage , consists of Stage-8, Stage-9, Stage-1 + Stage-8 has a backup stage: Stage-1 + Stage-7 depends on stages: Stage-1, Stage-8, Stage-9 , consists of Stage-5, Stage-6, Stage-2 + Stage-5 has a backup stage: Stage-2 + Stage-6 has a backup stage: Stage-2 + Stage-2 + Stage-9 has a backup stage: Stage-1 + Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-10 + Conditional Operator + + Stage: Stage-8 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0, _col1 + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + Alias -> Map Local Tables: + b + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + b + TableScan + alias: b + + Stage: Stage-7 + Conditional Operator + + Stage: Stage-5 + Map Reduce + Alias -> Map Operator Tree: + $INTNAME + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {_col0} {_col1} + 1 + handleSkewJoin: false + keys: + 0 [Column[_col1]] + 1 [Column[value]] + outputColumnNames: _col4, _col5 + Position of Big Table: 0 + Select Operator + expressions: + expr: _col4 + type: string + expr: _col5 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Local Work: + Map Reduce Local Work + Alias -> Map Local Tables: + c + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + c + TableScan + alias: c + + Stage: Stage-6 + Map Reduce + Alias -> Map Operator Tree: + c + TableScan + alias: c + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {_col0} {_col1} + 1 + handleSkewJoin: false + keys: + 0 [Column[_col1]] + 1 [Column[value]] + outputColumnNames: _col4, _col5 + Position of Big Table: 1 + Select Operator + expressions: + expr: _col4 + type: string + expr: _col5 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Local Work: + Map Reduce Local Work + Alias -> Map Local Tables: + $INTNAME + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + $INTNAME + TableScan + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + $INTNAME + TableScan + Reduce Output Operator + key expressions: + expr: _col1 + type: string + sort order: + + Map-reduce partition columns: + expr: _col1 + type: string + tag: 0 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + c + TableScan + alias: c + Reduce Output Operator + key expressions: + expr: value + type: string + sort order: + + Map-reduce partition columns: + expr: value + type: string + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col4} {VALUE._col5} + 1 + handleSkewJoin: false + outputColumnNames: _col4, _col5 + Select Operator + expressions: + expr: _col4 + type: string + expr: _col5 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-9 + Map Reduce + Alias -> Map Operator Tree: + b + TableScan + alias: b + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0, _col1 + Position of Big Table: 1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + Alias -> Map Local Tables: + a + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + a + TableScan + alias: a + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 0 + value expressions: + expr: key + type: string + expr: value + type: string + b + TableScan + alias: b + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 + handleSkewJoin: false + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-0 + Fetch Operator + limit: 40 + +PREHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +238 val_238 +238 val_238 +238 val_238 +238 val_238 +86 val_86 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +27 val_27 +165 val_165 +165 val_165 +165 val_165 +165 val_165 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +255 val_255 +255 val_255 +255 val_255 +255 val_255 +278 val_278 +278 val_278 +278 val_278 +278 val_278 +98 val_98 +98 val_98 +98 val_98 +98 val_98 +PREHOOK: query: explain +select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key))) (TOK_TABREF (TOK_TABNAME src) c) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL c) value)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF (TOK_TABNAME a)))) (TOK_WHERE (> (. (TOK_TABLE_OR_COL a) key) 100)) (TOK_LIMIT 40))) + +STAGE DEPENDENCIES: + Stage-10 is a root stage , consists of Stage-13, Stage-14, Stage-1 + Stage-13 has a backup stage: Stage-1 + Stage-8 depends on stages: Stage-13 + Stage-7 depends on stages: Stage-1, Stage-8, Stage-9 , consists of Stage-5, Stage-6, Stage-2 + Stage-5 has a backup stage: Stage-2 + Stage-6 has a backup stage: Stage-2 + Stage-2 + Stage-14 has a backup stage: Stage-1 + Stage-9 depends on stages: Stage-14 + Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-10 + Conditional Operator + + Stage: Stage-13 + Map Reduce Local Work + Alias -> Map Local Tables: + b + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + b + TableScan + alias: b + Filter Operator + predicate: + expr: (key > 100) + type: boolean + HashTable Sink Operator + condition expressions: + 0 {key} {value} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 0 + + Stage: Stage-8 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Filter Operator + predicate: + expr: (key > 100) + type: boolean + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0, _col1 + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-7 + Conditional Operator + + Stage: Stage-5 + Map Reduce + Alias -> Map Operator Tree: + $INTNAME + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {_col0} {_col1} + 1 + handleSkewJoin: false + keys: + 0 [Column[_col1]] + 1 [Column[value]] + outputColumnNames: _col4, _col5 + Position of Big Table: 0 + Select Operator + expressions: + expr: _col4 + type: string + expr: _col5 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Local Work: + Map Reduce Local Work + Alias -> Map Local Tables: + c + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + c + TableScan + alias: c + + Stage: Stage-6 + Map Reduce + Alias -> Map Operator Tree: + c + TableScan + alias: c + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {_col0} {_col1} + 1 + handleSkewJoin: false + keys: + 0 [Column[_col1]] + 1 [Column[value]] + outputColumnNames: _col4, _col5 + Position of Big Table: 1 + Select Operator + expressions: + expr: _col4 + type: string + expr: _col5 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Local Work: + Map Reduce Local Work + Alias -> Map Local Tables: + $INTNAME + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + $INTNAME + TableScan + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + $INTNAME + TableScan + Reduce Output Operator + key expressions: + expr: _col1 + type: string + sort order: + + Map-reduce partition columns: + expr: _col1 + type: string + tag: 0 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + c + TableScan + alias: c + Reduce Output Operator + key expressions: + expr: value + type: string + sort order: + + Map-reduce partition columns: + expr: value + type: string + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col4} {VALUE._col5} + 1 + handleSkewJoin: false + outputColumnNames: _col4, _col5 + Select Operator + expressions: + expr: _col4 + type: string + expr: _col5 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-14 + Map Reduce Local Work + Alias -> Map Local Tables: + a + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + a + TableScan + alias: a + Filter Operator + predicate: + expr: (key > 100) + type: boolean + HashTable Sink Operator + condition expressions: + 0 {key} {value} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 1 + + Stage: Stage-9 + Map Reduce + Alias -> Map Operator Tree: + b + TableScan + alias: b + Filter Operator + predicate: + expr: (key > 100) + type: boolean + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0, _col1 + Position of Big Table: 1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Filter Operator + predicate: + expr: (key > 100) + type: boolean + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 0 + value expressions: + expr: key + type: string + expr: value + type: string + b + TableScan + alias: b + Filter Operator + predicate: + expr: (key > 100) + type: boolean + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 + handleSkewJoin: false + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-0 + Fetch Operator + limit: 40 + +PREHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +238 val_238 +238 val_238 +238 val_238 +238 val_238 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +311 val_311 +165 val_165 +165 val_165 +165 val_165 +165 val_165 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +409 val_409 +255 val_255 +255 val_255 +255 val_255 +255 val_255 +278 val_278 +278 val_278 +278 val_278 +278 val_278 +484 val_484 +265 val_265 +265 val_265 +265 val_265 +265 val_265 +193 val_193 +PREHOOK: query: -- fallback to common join +select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +Execution failed with exit status: 3 +Obtaining error information + +Task failed! +Task ID: + Stage-13 + +Logs: + +#### A masked pattern was here #### +FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask +Execution failed with exit status: 2 +Obtaining error information + +Task failed! +Task ID: + Stage-5 + +Logs: + +#### A masked pattern was here #### +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask +POSTHOOK: query: -- fallback to common join +select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +103 val_103 +103 val_103 +103 val_103 +103 val_103 +103 val_103 +103 val_103 +103 val_103 +103 val_103 +104 val_104 +104 val_104 +104 val_104 +104 val_104 +104 val_104 +104 val_104 +104 val_104 +104 val_104 +105 val_105 +111 val_111 +113 val_113 +113 val_113 +113 val_113 +113 val_113 +113 val_113 +113 val_113 +113 val_113 +113 val_113 +114 val_114 +116 val_116 +118 val_118 +118 val_118 +118 val_118 +118 val_118 +118 val_118 +118 val_118 +118 val_118 +118 val_118 +119 val_119 +119 val_119 +119 val_119 +119 val_119