diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 16d54c6..7c2b476 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -529,6 +529,7 @@
HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true),
HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD("hive.auto.convert.join.noconditionaltask.size",
10000000L),
+ HIVECONVERTJOINUSENONSTAGED("hive.auto.convert.join.use.nonstaged", false),
HIVESKEWJOINKEY("hive.skewjoin.key", 100000),
HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000),
HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432L), //32M
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index d188f2a..a0178a4 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -866,6 +866,14 @@
+ hive.auto.convert.join.use.nonstaged
+ false
+ For conditional joins, if input stream from a small alias can be directly applied to join operator without
+ filtering or projection, the alias need not to be pre-staged in distributed cache via mapred local task.
+
+
+
+
hive.script.auto.progress
false
Whether Hive Transform/Map/Reduce Clause should automatically send progress information to TaskTracker to avoid the task getting killed because of inactivity. Hive sends progress information when the script is outputting to stderr. This option removes the need of periodically producing stderr messages, but users should be cautious because this may prevent infinite loops in the scripts to be killed by TaskTracker.
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
index d8f4eb4..3cfaacf 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
@@ -51,7 +51,7 @@
*/
protected transient List[] joinKeysStandardObjectInspectors;
- protected transient byte posBigTable = -1; // one of the tables that is not in memory
+ protected transient byte posBigTable = -1; // pos of driver alias
protected transient RowContainer> emptyList = null;
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java
index a080fcc..64abf85 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java
@@ -30,7 +30,8 @@
*/
public interface HashTableLoader {
- void load(ExecMapperContext context, Configuration hconf, MapJoinDesc desc, byte posBigTable,
- MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes)
+ void init(ExecMapperContext context, Configuration hconf, MapJoinDesc desc);
+
+ void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes)
throws HiveException;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
index aa8f19c..e3ad94b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.BufferedOutputStream;
+import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
@@ -26,6 +27,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -198,7 +200,9 @@ protected void initializeOp(Configuration hconf) throws HiveException {
}
}
-
+ public MapJoinTableContainer[] getMapJoinTables() {
+ return mapJoinTables;
+ }
private static List[] getStandardObjectInspectors(
List[] aliasToObjectInspectors, int maxTag) {
@@ -265,34 +269,7 @@ private boolean hasFilter(int alias) {
public void closeOp(boolean abort) throws HiveException {
try {
if (mapJoinTables != null) {
- // get tmp file URI
- String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
- LOG.info("Temp URI for side table: " + tmpURI);
- for (byte tag = 0; tag < mapJoinTables.length; tag++) {
- // get the key and value
- MapJoinTableContainer tableContainer = mapJoinTables[tag];
- if (tableContainer == null) {
- continue;
- }
- // get current input file name
- String bigBucketFileName = getExecContext().getCurrentBigBucketFile();
- String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName);
- // get the tmp URI path; it will be a hdfs path if not local mode
- String dumpFilePrefix = conf.getDumpFilePrefix();
- String tmpURIPath = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
- console.printInfo(Utilities.now() + "\tDump the side-table into file: " + tmpURIPath);
- // get the hashtable file and path
- Path path = new Path(tmpURIPath);
- FileSystem fs = path.getFileSystem(hconf);
- ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096));
- try {
- mapJoinTableSerdes[tag].persist(out, tableContainer);
- } finally {
- out.close();
- }
- tableContainer.clear();
- console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath);
- }
+ flushToFile();
}
super.closeOp(abort);
} catch (Exception e) {
@@ -300,6 +277,39 @@ public void closeOp(boolean abort) throws HiveException {
}
}
+ protected void flushToFile() throws IOException, HiveException {
+ // get tmp file URI
+ String tmpURI = getExecContext().getLocalWork().getTmpFileURI();
+ LOG.info("Temp URI for side table: " + tmpURI);
+ for (byte tag = 0; tag < mapJoinTables.length; tag++) {
+ // get the key and value
+ MapJoinTableContainer tableContainer = mapJoinTables[tag];
+ if (tableContainer == null) {
+ continue;
+ }
+ // get current input file name
+ String bigBucketFileName = getExecContext().getCurrentBigBucketFile();
+ String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName);
+ // get the tmp URI path; it will be a hdfs path if not local mode
+ String dumpFilePrefix = conf.getDumpFilePrefix();
+ String tmpURIPath = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
+ console.printInfo(Utilities.now() + "\tDump the side-table into file: " + tmpURIPath);
+ // get the hashtable file and path
+ Path path = new Path(tmpURIPath);
+ FileSystem fs = path.getFileSystem(hconf);
+ ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096));
+ try {
+ mapJoinTableSerdes[tag].persist(out, tableContainer);
+ } finally {
+ out.close();
+ }
+ tableContainer.clear();
+ FileStatus status = fs.getFileStatus(path);
+ console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + tmpURIPath +
+ " (" + status.getLen() + " bytes)");
+ }
+ }
+
/**
* Implements the getName function for the Node Interface.
*
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
index 1e0314d..3fc6bc4 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
@@ -55,7 +55,7 @@
List[] result = new List[tagLen];
for (byte alias = 0; alias < exprEntries.length; alias++) {
//get big table
- if (alias == (byte) posBigTableAlias){
+ if (alias == (byte) posBigTableAlias || exprEntries[alias] == null){
//skip the big tables
continue;
}
@@ -77,7 +77,7 @@
List[] result = new List[tagLen];
for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) {
//get big table
- if(alias == (byte) posBigTableAlias ){
+ if(alias == (byte) posBigTableAlias || aliasToObjectInspectors[alias] == null){
//skip the big tables
continue;
}
@@ -106,6 +106,9 @@ public static int populateJoinKeyValue(List[] outMap,
int posBigTableAlias) throws HiveException {
int total = 0;
for (Entry> e : inputMap.entrySet()) {
+ if (e.getValue() == null) {
+ continue;
+ }
Byte key = order == null ? e.getKey() : order[e.getKey()];
List valueFields = new ArrayList();
for (ExprNodeDesc expr : e.getValue()) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index bdc85b9..6e2bc33 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -145,8 +145,8 @@ private void loadHashTable() throws HiveException {
}
}
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
- loader.load(this.getExecContext(), hconf, this.getConf(),
- posBigTable, mapJoinTables, mapJoinTableSerdes);
+ loader.init(getExecContext(), hconf, getConf());
+ loader.load(mapJoinTables, mapJoinTableSerdes);
cache.cache(tableKey, mapJoinTables);
cache.cache(serdeKey, mapJoinTableSerdes);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TemporaryHashSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TemporaryHashSinkOperator.java
new file mode 100644
index 0000000..16baf4b
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/TemporaryHashSinkOperator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+
+import java.io.IOException;
+
+public class TemporaryHashSinkOperator extends HashTableSinkOperator {
+ public TemporaryHashSinkOperator(MapJoinDesc desc) {
+ conf = new HashTableSinkDesc(desc);
+ }
+
+ @Override
+ protected void flushToFile() throws IOException, HiveException {
+ // do nothing
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 5511bca..37849a3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -329,7 +329,7 @@ public int execute(DriverContext driverContext) {
try{
MapredLocalWork localwork = mWork.getMapLocalWork();
- if (localwork != null) {
+ if (localwork != null && localwork.hasStagedAlias()) {
if (!ShimLoader.getHadoopShims().isLocalMode(job)) {
Path localPath = new Path(localwork.getTmpFileURI());
Path hdfsPath = new Path(mWork.getTmpHDFSFileURI());
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
index efe5710..94e6972 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
@@ -20,6 +20,9 @@
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -27,13 +30,20 @@
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapred.JobConf;
/**
* HashTableLoader for MR loads the hashtable for MapJoins from local disk (hashtables
@@ -44,22 +54,28 @@
private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
- public HashTableLoader() {
+ private ExecMapperContext context;
+ private Configuration hconf;
+ private MapJoinDesc desc;
+
+ @Override
+ public void init(ExecMapperContext context, Configuration hconf, MapJoinDesc desc) {
+ this.context = context;
+ this.hconf = hconf;
+ this.desc = desc;
}
@Override
- public void load(ExecMapperContext context,
- Configuration hconf,
- MapJoinDesc desc,
- byte posBigTable,
+ public void load(
MapJoinTableContainer[] mapJoinTables,
MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
String baseDir = null;
Path currentInputPath = context.getCurrentInputPath();
- LOG.info("******* Load from HashTable File: input : " + currentInputPath);
+ LOG.info("******* Load from HashTable for input file: " + currentInputPath);
String fileName = context.getLocalWork().getBucketFileName(currentInputPath.toString());
try {
+ loadDirectly(mapJoinTables, fileName);
if (ShimLoader.getHadoopShims().isLocalMode(hconf)) {
baseDir = context.getLocalWork().getTmpFileURI();
} else {
@@ -79,7 +95,7 @@ public void load(ExecMapperContext context,
}
}
for (int pos = 0; pos < mapJoinTables.length; pos++) {
- if (pos == posBigTable) {
+ if (pos == desc.getPosBigTable() || mapJoinTables[pos] != null) {
continue;
}
if(baseDir == null) {
@@ -101,4 +117,30 @@ public void load(ExecMapperContext context,
}
}
+ private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFileName)
+ throws Exception {
+ MapredLocalWork localWork = context.getLocalWork();
+ List> directWorks = localWork.getDirectFetchOp();
+ if (directWorks == null || directWorks.isEmpty()) {
+ return;
+ }
+ JobConf job = new JobConf(hconf);
+ MapredLocalTask localTask = new MapredLocalTask(localWork, job, false);
+
+ HashTableSinkOperator sink = new TemporaryHashSinkOperator(desc);
+ sink.setParentOperators(new ArrayList>(directWorks));
+
+ for (Operator> operator : directWorks) {
+ if (operator instanceof TableScanOperator) {
+ operator.setChildOperators(Arrays.>asList(sink));
+ }
+ }
+ localTask.setExecContext(context);
+ localTask.startForward(inputFileName);
+
+ MapJoinTableContainer[] tables = sink.getMapJoinTables();
+ System.arraycopy(tables, 0, mapJoinTables, 0, tables.length);
+
+ Arrays.fill(tables, null);
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 0cc90d0..cd1b809 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -81,7 +81,7 @@
*/
public class MapredLocalTask extends Task implements Serializable {
- private Map fetchOperators;
+ private Map fetchOperators = new HashMap();
protected HadoopJobExecHelper jobExecHelper;
private JobConf job;
public static transient final Log l4j = LogFactory.getLog(MapredLocalTask.class);
@@ -93,7 +93,7 @@
// not sure we need this exec context; but all the operators in the work
// will pass this context throught
- private final ExecMapperContext execContext = new ExecMapperContext();
+ private ExecMapperContext execContext = new ExecMapperContext();
private Process executor;
@@ -107,6 +107,10 @@ public MapredLocalTask(MapredLocalWork plan, JobConf job, boolean isSilent) thro
console = new LogHelper(LOG, isSilent);
}
+ public void setExecContext(ExecMapperContext execContext) {
+ this.execContext = execContext;
+ }
+
@Override
public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
super.initialize(conf, queryPlan, driverContext);
@@ -291,26 +295,11 @@ public int executeFromChildJVM(DriverContext driverContext) {
console.printInfo(Utilities.now()
+ "\tStarting to launch local task to process map join;\tmaximum memory = "
+ memoryMXBean.getHeapMemoryUsage().getMax());
- fetchOperators = new HashMap();
- Map fetchOpJobConfMap = new HashMap();
execContext.setJc(job);
// set the local work, so all the operator can get this context
execContext.setLocalWork(work);
- boolean inputFileChangeSenstive = work.getInputFileChangeSensitive();
try {
-
- initializeOperators(fetchOpJobConfMap);
- // for each big table's bucket, call the start forward
- if (inputFileChangeSenstive) {
- for (Map> bigTableBucketFiles : work
- .getBucketMapjoinContext().getAliasBucketFileNameMapping().values()) {
- for (String bigTableBucket : bigTableBucketFiles.keySet()) {
- startForward(inputFileChangeSenstive, bigTableBucket);
- }
- }
- } else {
- startForward(inputFileChangeSenstive, null);
- }
+ startForward(null);
long currentTime = System.currentTimeMillis();
long elapsed = currentTime - startTime;
console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: "
@@ -328,6 +317,26 @@ public int executeFromChildJVM(DriverContext driverContext) {
return 0;
}
+ public void startForward(String bigTableBucket) throws Exception {
+ boolean inputFileChangeSenstive = work.getInputFileChangeSensitive();
+ initializeOperators(new HashMap());
+ // for each big table's bucket, call the start forward
+ if (inputFileChangeSenstive) {
+ for (Map> bigTableBucketFiles : work
+ .getBucketMapjoinContext().getAliasBucketFileNameMapping().values()) {
+ if (bigTableBucket == null) {
+ for (String bigTableBucketFile : bigTableBucketFiles.keySet()) {
+ startForward(inputFileChangeSenstive, bigTableBucketFile);
+ }
+ } else if (bigTableBucketFiles.keySet().contains(bigTableBucket)) {
+ startForward(inputFileChangeSenstive, bigTableBucket);
+ }
+ }
+ } else {
+ startForward(inputFileChangeSenstive, null);
+ }
+ }
+
private void startForward(boolean inputFileChangeSenstive, String bigTableBucket)
throws Exception {
for (Map.Entry entry : fetchOperators.entrySet()) {
@@ -348,24 +357,18 @@ private void startForward(boolean inputFileChangeSenstive, String bigTableBucket
// get the root operator
Operator extends OperatorDesc> forwardOp = work.getAliasToWork().get(alias);
// walk through the operator tree
- while (true) {
+ while (!forwardOp.getDone()) {
InspectableObject row = fetchOp.getNextRow();
if (row == null) {
- if (inputFileChangeSenstive) {
- execContext.setCurrentBigBucketFile(bigTableBucket);
- forwardOp.reset();
- }
- forwardOp.close(false);
break;
}
forwardOp.processOp(row.o, 0);
- // check if any operator had a fatal error or early exit during
- // execution
- if (forwardOp.getDone()) {
- // ExecMapper.setDone(true);
- break;
- }
}
+ if (inputFileChangeSenstive) {
+ execContext.setCurrentBigBucketFile(bigTableBucket);
+ forwardOp.reset();
+ }
+ forwardOp.close(false);
}
}
@@ -373,6 +376,9 @@ private void initializeOperators(Map fetchOpJobConfMap)
throws HiveException {
// this mapper operator is used to initialize all the operators
for (Map.Entry entry : work.getAliasToFetchWork().entrySet()) {
+ if (entry.getValue() == null) {
+ continue;
+ }
JobConf jobClone = new JobConf(job);
TableScanOperator ts = (TableScanOperator)work.getAliasToWork().get(entry.getKey());
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
index 2df8ab9..8a1cde9 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
@@ -47,14 +47,19 @@
private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
- public HashTableLoader() {
+ private ExecMapperContext context;
+ private Configuration hconf;
+ private MapJoinDesc desc;
+
+ @Override
+ public void init(ExecMapperContext context, Configuration hconf, MapJoinDesc desc) {
+ this.context = context;
+ this.hconf = hconf;
+ this.desc = desc;
}
@Override
- public void load(ExecMapperContext context,
- Configuration hconf,
- MapJoinDesc desc,
- byte posBigTable,
+ public void load(
MapJoinTableContainer[] mapJoinTables,
MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
@@ -65,7 +70,7 @@ public void load(ExecMapperContext context,
HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
for (int pos = 0; pos < mapJoinTables.length; pos++) {
- if (pos == posBigTable) {
+ if (pos == desc.getPosBigTable()) {
continue;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
index 5a53e15..a462826 100644
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
@@ -118,6 +118,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object..
// mapjoin should not affected by join reordering
mapJoinOp.getConf().resetOrder();
+ HiveConf conf = context.getParseCtx().getConf();
+
HashTableSinkDesc hashTableSinkDesc = new HashTableSinkDesc(mapJoinOp.getConf());
HashTableSinkOperator hashTableSinkOp = (HashTableSinkOperator) OperatorFactory
.get(hashTableSinkDesc);
@@ -125,34 +127,54 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object..
// set hashtable memory usage
float hashtableMemoryUsage;
if (context.isFollowedByGroupBy()) {
- hashtableMemoryUsage = context.getParseCtx().getConf().getFloatVar(
+ hashtableMemoryUsage = conf.getFloatVar(
HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE);
} else {
- hashtableMemoryUsage = context.getParseCtx().getConf().getFloatVar(
+ hashtableMemoryUsage = conf.getFloatVar(
HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE);
}
hashTableSinkOp.getConf().setHashtableMemoryUsage(hashtableMemoryUsage);
// get the last operator for processing big tables
int bigTable = mapJoinOp.getConf().getPosBigTable();
+ Byte[] orders = mapJoinOp.getConf().getTagOrder();
+
+ final boolean useNontaged = conf.getBoolVar(
+ HiveConf.ConfVars.HIVECONVERTJOINUSENONSTAGED);
// the parent ops for hashTableSinkOp
List> smallTablesParentOp =
new ArrayList>();
List> dummyOperators =
new ArrayList>();
+ List> directOperators =
+ new ArrayList>();
// get all parents
List> parentsOp = mapJoinOp.getParentOperators();
for (int i = 0; i < parentsOp.size(); i++) {
if (i == bigTable) {
smallTablesParentOp.add(null);
+ directOperators.add(null);
continue;
}
Operator extends OperatorDesc> parent = parentsOp.get(i);
+ if (useNontaged && parent instanceof TableScanOperator) {
+ // no filter, no projection. no need to stage
+ smallTablesParentOp.add(null);
+ directOperators.add(parent);
+ hashTableSinkDesc.getKeys().put(orders[i], null);
+ hashTableSinkDesc.getExprs().put(orders[i], null);
+ hashTableSinkDesc.getFilters().put(orders[i], null);
+ } else {
+ // keep the parent id correct
+ smallTablesParentOp.add(parent);
+ directOperators.add(null);
+ }
// let hashtable Op be the child of this parent
parent.replaceChild(mapJoinOp, hashTableSinkOp);
- // keep the parent id correct
- smallTablesParentOp.add(parent);
+ if (useNontaged && parent instanceof TableScanOperator) {
+ parent.setChildOperators(null);
+ }
// create an new operator: HashTable DummyOpeator, which share the table desc
HashTableDummyDesc desc = new HashTableDummyDesc();
@@ -186,6 +208,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object..
for (Operator extends OperatorDesc> op : dummyOperators) {
context.addDummyParentOp(op);
}
+ context.setDirectWorks(directOperators);
return null;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
index 010ac54..9561f25 100644
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
@@ -123,17 +123,25 @@ private void processCurrentTask(Task extends Serializable> currTask,
}
// replace the map join operator to local_map_join operator in the operator tree
// and return all the dummy parent
- LocalMapJoinProcCtx localMapJoinProcCtx= adjustLocalTask(localTask);
+ LocalMapJoinProcCtx localMapJoinProcCtx = adjustLocalTask(localTask);
List> dummyOps =
localMapJoinProcCtx.getDummyParentOp();
+ List> directWorks = localMapJoinProcCtx.getDirectWorks();
+
// create new local work and setup the dummy ops
- MapredLocalWork newLocalWork = new MapredLocalWork();
+ MapredLocalWork newLocalWork = localwork.extractDirectWorks(directWorks);
newLocalWork.setDummyParentOp(dummyOps);
- newLocalWork.setTmpFileURI(tmpFileURI);
- newLocalWork.setInputFileChangeSensitive(localwork.getInputFileChangeSensitive());
- newLocalWork.setBucketMapjoinContext(localwork.copyPartSpecMappingOnly());
mapredWork.getMapWork().setMapLocalWork(newLocalWork);
+
+ if (localwork.getAliasToFetchWork().isEmpty()) {
+ // no alias to stage.. no local task
+ newLocalWork.setHasStagedAlias(false);
+ currTask.setBackupTask(localTask.getBackupTask());
+ currTask.setBackupChildrenTasks(localTask.getBackupChildrenTasks());
+ return;
+ }
+ newLocalWork.setHasStagedAlias(true);
// get all parent tasks
List> parentTasks = currTask.getParentTasks();
currTask.setParentTasks(null);
@@ -271,6 +279,8 @@ public void setPhysicalContext(PhysicalContext physicalContext) {
private List> dummyParentOp = null;
private boolean isFollowedByGroupBy;
+ private List> directWorks;
+
public LocalMapJoinProcCtx(Task extends Serializable> task, ParseContext parseCtx) {
currentTask = task;
this.parseCtx = parseCtx;
@@ -312,5 +322,13 @@ public void setDummyParentOp(List> op) {
public void addDummyParentOp(Operator extends OperatorDesc> op) {
this.dummyParentOp.add(op);
}
+
+ public void setDirectWorks(List> directWorks) {
+ this.directWorks = directWorks;
+ }
+
+ public List> getDirectWorks() {
+ return directWorks;
+ }
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
index 14fced7..f445016 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
@@ -20,10 +20,8 @@
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -92,7 +90,7 @@ public HashTableSinkDesc() {
public HashTableSinkDesc(MapJoinDesc clone) {
this.bigKeysDirMap = clone.getBigKeysDirMap();
this.conds = clone.getConds();
- this.exprs= clone.getExprs();
+ this.exprs = new HashMap>(clone.getExprs());
this.handleSkewJoin = clone.getHandleSkewJoin();
this.keyTableDesc = clone.getKeyTableDesc();
this.noOuterJoin = clone.getNoOuterJoin();
@@ -102,10 +100,10 @@ public HashTableSinkDesc(MapJoinDesc clone) {
this.skewKeysValuesTables = clone.getSkewKeysValuesTables();
this.smallKeysDirMap = clone.getSmallKeysDirMap();
this.tagOrder = clone.getTagOrder();
- this.filters = clone.getFilters();
+ this.filters = new HashMap>(clone.getFilters());
this.filterMap = clone.getFilterMap();
- this.keys = clone.getKeys();
+ this.keys = new HashMap>(clone.getKeys());
this.keyTblDesc = clone.getKeyTblDesc();
this.valueTblDescs = clone.getValueTblDescs();
this.valueTblFilteredDescs = clone.getValueFilteredTblDescs();
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
index 83a778d..f969a36 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
@@ -19,8 +19,11 @@
package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -40,7 +43,10 @@
private String tmpFileURI;
private String stageID;
- private List> dummyParentOp ;
+ private List> dummyParentOp;
+ private List> directFetchOp;
+
+ private boolean hasStagedAlias;
public MapredLocalWork() {
@@ -177,4 +183,52 @@ private String getFileName(String path) {
}
return path.substring(last_separator + 1);
}
+
+ public MapredLocalWork extractDirectWorks(List> directWorks) {
+ MapredLocalWork newLocalWork = new MapredLocalWork();
+ newLocalWork.setTmpFileURI(tmpFileURI);
+ newLocalWork.setInputFileChangeSensitive(inputFileChangeSensitive);
+ newLocalWork.setBucketMapjoinContext(copyPartSpecMappingOnly());
+ if (!hasAnyNonNull(directWorks)) {
+ // all small aliases are staged
+ return newLocalWork;
+ }
+ newLocalWork.directFetchOp = new ArrayList>(directWorks);
+ newLocalWork.aliasToWork = new LinkedHashMap>();
+ newLocalWork.aliasToFetchWork = new LinkedHashMap();
+
+ Map> works = new HashMap>(aliasToWork);
+ for (Map.Entry> entry : works.entrySet()) {
+ String alias = entry.getKey();
+ boolean notStaged = directWorks.contains(entry.getValue());
+ newLocalWork.aliasToWork.put(alias, notStaged ? aliasToWork.remove(alias) : null);
+ newLocalWork.aliasToFetchWork.put(alias, notStaged ? aliasToFetchWork.remove(alias) : null);
+ }
+ return newLocalWork;
+ }
+
+ private boolean hasAnyNonNull(List> list) {
+ for (Object element : list) {
+ if (element != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void setDirectFetchOp(List> op){
+ this.directFetchOp = op;
+ }
+
+ public List> getDirectFetchOp() {
+ return directFetchOp;
+ }
+
+ public boolean hasStagedAlias() {
+ return hasStagedAlias;
+ }
+
+ public void setHasStagedAlias(boolean hasStagedAlias) {
+ this.hasStagedAlias = hasStagedAlias;
+ }
}
diff --git ql/src/test/queries/clientpositive/auto_join_without_localtask.q ql/src/test/queries/clientpositive/auto_join_without_localtask.q
new file mode 100644
index 0000000..bb7edc9
--- /dev/null
+++ ql/src/test/queries/clientpositive/auto_join_without_localtask.q
@@ -0,0 +1,28 @@
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.use.nonstaged=true;
+
+set hive.auto.convert.join.noconditionaltask.size=100;
+
+explain
+select a.* from src a join src b on a.key=b.key limit 40;
+
+select a.* from src a join src b on a.key=b.key limit 40;
+
+explain
+select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40;
+
+select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40;
+
+set hive.auto.convert.join.noconditionaltask.size=100;
+
+explain
+select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40;
+
+select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40;
+
+set hive.mapjoin.localtask.max.memory.usage = 0.0001;
+set hive.mapjoin.check.memory.rows = 2;
+
+-- fallback to common join
+select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40;
+
diff --git ql/src/test/results/clientpositive/auto_join_without_localtask.q.out ql/src/test/results/clientpositive/auto_join_without_localtask.q.out
new file mode 100644
index 0000000..22a243f
--- /dev/null
+++ ql/src/test/results/clientpositive/auto_join_without_localtask.q.out
@@ -0,0 +1,1025 @@
+PREHOOK: query: explain
+select a.* from src a join src b on a.key=b.key limit 40
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select a.* from src a join src b on a.key=b.key limit 40
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF (TOK_TABNAME a)))) (TOK_LIMIT 40)))
+
+STAGE DEPENDENCIES:
+ Stage-5 is a root stage , consists of Stage-3, Stage-4, Stage-1
+ Stage-3 has a backup stage: Stage-1
+ Stage-4 has a backup stage: Stage-1
+ Stage-1
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-5
+ Conditional Operator
+
+ Stage: Stage-3
+ Map Reduce
+ Alias -> Map Operator Tree:
+ a
+ TableScan
+ alias: a
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {key} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ outputColumnNames: _col0, _col1
+ Position of Big Table: 0
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Local Work:
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ b
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ b
+ TableScan
+ alias: b
+
+ Stage: Stage-4
+ Map Reduce
+ Alias -> Map Operator Tree:
+ b
+ TableScan
+ alias: b
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {key} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ outputColumnNames: _col0, _col1
+ Position of Big Table: 1
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Local Work:
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ a
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ a
+ TableScan
+ alias: a
+
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ a
+ TableScan
+ alias: a
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: string
+ tag: 0
+ value expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ b
+ TableScan
+ alias: b
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: string
+ tag: 1
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {VALUE._col0} {VALUE._col1}
+ 1
+ handleSkewJoin: false
+ outputColumnNames: _col0, _col1
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 40
+
+PREHOOK: query: select a.* from src a join src b on a.key=b.key limit 40
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select a.* from src a join src b on a.key=b.key limit 40
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+238 val_238
+238 val_238
+86 val_86
+311 val_311
+311 val_311
+311 val_311
+27 val_27
+165 val_165
+165 val_165
+409 val_409
+409 val_409
+409 val_409
+255 val_255
+255 val_255
+278 val_278
+278 val_278
+98 val_98
+98 val_98
+484 val_484
+265 val_265
+265 val_265
+193 val_193
+193 val_193
+193 val_193
+401 val_401
+401 val_401
+401 val_401
+401 val_401
+401 val_401
+150 val_150
+273 val_273
+273 val_273
+273 val_273
+224 val_224
+224 val_224
+369 val_369
+369 val_369
+369 val_369
+66 val_66
+128 val_128
+PREHOOK: query: explain
+select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key))) (TOK_TABREF (TOK_TABNAME src) c) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL c) value)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF (TOK_TABNAME a)))) (TOK_LIMIT 40)))
+
+STAGE DEPENDENCIES:
+ Stage-10 is a root stage , consists of Stage-8, Stage-9, Stage-1
+ Stage-8 has a backup stage: Stage-1
+ Stage-7 depends on stages: Stage-1, Stage-8, Stage-9 , consists of Stage-5, Stage-6, Stage-2
+ Stage-5 has a backup stage: Stage-2
+ Stage-6 has a backup stage: Stage-2
+ Stage-2
+ Stage-9 has a backup stage: Stage-1
+ Stage-1
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-10
+ Conditional Operator
+
+ Stage: Stage-8
+ Map Reduce
+ Alias -> Map Operator Tree:
+ a
+ TableScan
+ alias: a
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {key} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ outputColumnNames: _col0, _col1
+ Position of Big Table: 0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ b
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ b
+ TableScan
+ alias: b
+
+ Stage: Stage-7
+ Conditional Operator
+
+ Stage: Stage-5
+ Map Reduce
+ Alias -> Map Operator Tree:
+ $INTNAME
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {_col0} {_col1}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[value]]
+ outputColumnNames: _col4, _col5
+ Position of Big Table: 0
+ Select Operator
+ expressions:
+ expr: _col4
+ type: string
+ expr: _col5
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Local Work:
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ c
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ c
+ TableScan
+ alias: c
+
+ Stage: Stage-6
+ Map Reduce
+ Alias -> Map Operator Tree:
+ c
+ TableScan
+ alias: c
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {_col0} {_col1}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[value]]
+ outputColumnNames: _col4, _col5
+ Position of Big Table: 1
+ Select Operator
+ expressions:
+ expr: _col4
+ type: string
+ expr: _col5
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Local Work:
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ $INTNAME
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ $INTNAME
+ TableScan
+
+ Stage: Stage-2
+ Map Reduce
+ Alias -> Map Operator Tree:
+ $INTNAME
+ TableScan
+ Reduce Output Operator
+ key expressions:
+ expr: _col1
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col1
+ type: string
+ tag: 0
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ c
+ TableScan
+ alias: c
+ Reduce Output Operator
+ key expressions:
+ expr: value
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: value
+ type: string
+ tag: 1
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {VALUE._col4} {VALUE._col5}
+ 1
+ handleSkewJoin: false
+ outputColumnNames: _col4, _col5
+ Select Operator
+ expressions:
+ expr: _col4
+ type: string
+ expr: _col5
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-9
+ Map Reduce
+ Alias -> Map Operator Tree:
+ b
+ TableScan
+ alias: b
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {key} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ outputColumnNames: _col0, _col1
+ Position of Big Table: 1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ a
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ a
+ TableScan
+ alias: a
+
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ a
+ TableScan
+ alias: a
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: string
+ tag: 0
+ value expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ b
+ TableScan
+ alias: b
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: string
+ tag: 1
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {VALUE._col0} {VALUE._col1}
+ 1
+ handleSkewJoin: false
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 40
+
+PREHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a.value=c.value limit 40
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+238 val_238
+238 val_238
+238 val_238
+238 val_238
+86 val_86
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+27 val_27
+165 val_165
+165 val_165
+165 val_165
+165 val_165
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+255 val_255
+255 val_255
+255 val_255
+255 val_255
+278 val_278
+278 val_278
+278 val_278
+278 val_278
+98 val_98
+98 val_98
+98 val_98
+98 val_98
+PREHOOK: query: explain
+select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key))) (TOK_TABREF (TOK_TABNAME src) c) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL c) value)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF (TOK_TABNAME a)))) (TOK_WHERE (> (. (TOK_TABLE_OR_COL a) key) 100)) (TOK_LIMIT 40)))
+
+STAGE DEPENDENCIES:
+ Stage-10 is a root stage , consists of Stage-13, Stage-14, Stage-1
+ Stage-13 has a backup stage: Stage-1
+ Stage-8 depends on stages: Stage-13
+ Stage-7 depends on stages: Stage-1, Stage-8, Stage-9 , consists of Stage-5, Stage-6, Stage-2
+ Stage-5 has a backup stage: Stage-2
+ Stage-6 has a backup stage: Stage-2
+ Stage-2
+ Stage-14 has a backup stage: Stage-1
+ Stage-9 depends on stages: Stage-14
+ Stage-1
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-10
+ Conditional Operator
+
+ Stage: Stage-13
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ b
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ b
+ TableScan
+ alias: b
+ Filter Operator
+ predicate:
+ expr: (key > 100)
+ type: boolean
+ HashTable Sink Operator
+ condition expressions:
+ 0 {key} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ Position of Big Table: 0
+
+ Stage: Stage-8
+ Map Reduce
+ Alias -> Map Operator Tree:
+ a
+ TableScan
+ alias: a
+ Filter Operator
+ predicate:
+ expr: (key > 100)
+ type: boolean
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {key} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ outputColumnNames: _col0, _col1
+ Position of Big Table: 0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-7
+ Conditional Operator
+
+ Stage: Stage-5
+ Map Reduce
+ Alias -> Map Operator Tree:
+ $INTNAME
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {_col0} {_col1}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[value]]
+ outputColumnNames: _col4, _col5
+ Position of Big Table: 0
+ Select Operator
+ expressions:
+ expr: _col4
+ type: string
+ expr: _col5
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Local Work:
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ c
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ c
+ TableScan
+ alias: c
+
+ Stage: Stage-6
+ Map Reduce
+ Alias -> Map Operator Tree:
+ c
+ TableScan
+ alias: c
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {_col0} {_col1}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[value]]
+ outputColumnNames: _col4, _col5
+ Position of Big Table: 1
+ Select Operator
+ expressions:
+ expr: _col4
+ type: string
+ expr: _col5
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Local Work:
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ $INTNAME
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ $INTNAME
+ TableScan
+
+ Stage: Stage-2
+ Map Reduce
+ Alias -> Map Operator Tree:
+ $INTNAME
+ TableScan
+ Reduce Output Operator
+ key expressions:
+ expr: _col1
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col1
+ type: string
+ tag: 0
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ c
+ TableScan
+ alias: c
+ Reduce Output Operator
+ key expressions:
+ expr: value
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: value
+ type: string
+ tag: 1
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {VALUE._col4} {VALUE._col5}
+ 1
+ handleSkewJoin: false
+ outputColumnNames: _col4, _col5
+ Select Operator
+ expressions:
+ expr: _col4
+ type: string
+ expr: _col5
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-14
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ a
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ a
+ TableScan
+ alias: a
+ Filter Operator
+ predicate:
+ expr: (key > 100)
+ type: boolean
+ HashTable Sink Operator
+ condition expressions:
+ 0 {key} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ Position of Big Table: 1
+
+ Stage: Stage-9
+ Map Reduce
+ Alias -> Map Operator Tree:
+ b
+ TableScan
+ alias: b
+ Filter Operator
+ predicate:
+ expr: (key > 100)
+ type: boolean
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {key} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ outputColumnNames: _col0, _col1
+ Position of Big Table: 1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ a
+ TableScan
+ alias: a
+ Filter Operator
+ predicate:
+ expr: (key > 100)
+ type: boolean
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: string
+ tag: 0
+ value expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ b
+ TableScan
+ alias: b
+ Filter Operator
+ predicate:
+ expr: (key > 100)
+ type: boolean
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: string
+ tag: 1
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {VALUE._col0} {VALUE._col1}
+ 1
+ handleSkewJoin: false
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 40
+
+PREHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+238 val_238
+238 val_238
+238 val_238
+238 val_238
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+311 val_311
+165 val_165
+165 val_165
+165 val_165
+165 val_165
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+409 val_409
+255 val_255
+255 val_255
+255 val_255
+255 val_255
+278 val_278
+278 val_278
+278 val_278
+278 val_278
+484 val_484
+265 val_265
+265 val_265
+265 val_265
+265 val_265
+193 val_193
+PREHOOK: query: -- fallback to common join
+select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+Execution failed with exit status: 3
+Obtaining error information
+
+Task failed!
+Task ID:
+ Stage-13
+
+Logs:
+
+#### A masked pattern was here ####
+FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
+ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
+Execution failed with exit status: 2
+Obtaining error information
+
+Task failed!
+Task ID:
+ Stage-5
+
+Logs:
+
+#### A masked pattern was here ####
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
+ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
+POSTHOOK: query: -- fallback to common join
+select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 limit 40
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+103 val_103
+103 val_103
+103 val_103
+103 val_103
+103 val_103
+103 val_103
+103 val_103
+103 val_103
+104 val_104
+104 val_104
+104 val_104
+104 val_104
+104 val_104
+104 val_104
+104 val_104
+104 val_104
+105 val_105
+111 val_111
+113 val_113
+113 val_113
+113 val_113
+113 val_113
+113 val_113
+113 val_113
+113 val_113
+113 val_113
+114 val_114
+116 val_116
+118 val_118
+118 val_118
+118 val_118
+118 val_118
+118 val_118
+118 val_118
+118 val_118
+118 val_118
+119 val_119
+119 val_119
+119 val_119
+119 val_119