Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 916550) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -136,7 +136,7 @@ HIVEJOINCACHESIZE("hive.join.cache.size", 25000), HIVEMAPJOINBUCKETCACHESIZE("hive.mapjoin.bucket.cache.size", 100), HIVEMAPJOINROWSIZE("hive.mapjoin.size.key", 10000), - HIVEMAPJOINCACHEROWS("hive.mapjoin.cache.numrows", 25000), + HIVEMAPJOINCACHEROWS("hive.mapjoin.cache.numrows", 10), HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000), HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.5), HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5), @@ -200,6 +200,7 @@ HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join + HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join ; public final String varname; Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -343,18 +343,21 @@ return (0); } catch (SemanticException e) { + e.printStackTrace(); errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (10); } catch (ParseException e) { + e.printStackTrace(); errorMessage = "FAILED: Parse Error: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (11); } catch (Exception e) { + e.printStackTrace(); errorMessage = "FAILED: Unknown exception: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); console.printError(errorMessage, "\n" @@ -550,6 +553,7 @@ SessionState.get().getHiveHistory().printRowCount(queryId); } } catch (Exception e) { + e.printStackTrace(); if (SessionState.get() != null) { SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, String.valueOf(12)); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (revision 0) @@ -0,0 +1,125 @@ +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx; +import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue; +import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.util.ReflectionUtils; + + +public abstract class AbstractMapJoinOperator extends CommonJoinOperator implements + Serializable { + private static final long serialVersionUID = 1L; + + /** + * The expressions for join inputs's join keys. + */ + protected transient Map> joinKeys; + /** + * The ObjectInspectors for the join inputs's join keys. + */ + protected transient Map> joinKeysObjectInspectors; + /** + * The standard ObjectInspectors for the join inputs's join keys. + */ + protected transient Map> joinKeysStandardObjectInspectors; + + protected transient int posBigTable; // one of the tables that is not in memory + transient int mapJoinRowsKey; // rows for a given key + + protected transient RowContainer> emptyList = null; + + private static final transient String[] FATAL_ERR_MSG = { + null, // counter value 0 means no error + "Mapside join size exceeds hive.mapjoin.maxsize. " + + "Please increase that or remove the mapjoin hint." + }; + + transient boolean firstRow; + transient int heartbeatInterval; + + public AbstractMapJoinOperator() { + } + + public AbstractMapJoinOperator(AbstractMapJoinOperator mjop) { + super((CommonJoinOperator)mjop); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + + firstRow = true; + heartbeatInterval = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVESENDHEARTBEAT); + + joinKeys = new HashMap>(); + + populateJoinKeyValue(joinKeys, conf.getKeys()); + joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys, + inputObjInspectors); + joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors); + + // all other tables are small, and are cached in the hash table + posBigTable = conf.getPosBigTable(); + + emptyList = new RowContainer>(1, hconf); + RowContainer bigPosRC = getRowContainer(hconf, (byte) posBigTable, + order[posBigTable], joinCacheSize); + storage.put((byte) posBigTable, bigPosRC); + + mapJoinRowsKey = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVEMAPJOINROWSIZE); + + List structFields = ((StructObjectInspector) outputObjInspector) + .getAllStructFieldRefs(); + if (conf.getOutputColumnNames().size() < structFields.size()) { + List structFieldObjectInspectors = new ArrayList(); + for (Byte alias : order) { + int sz = conf.getExprs().get(alias).size(); + List retained = conf.getRetainList().get(alias); + for (int i = 0; i < sz; i++) { + int pos = retained.get(i); + structFieldObjectInspectors.add(structFields.get(pos) + .getFieldObjectInspector()); + } + } + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(conf.getOutputColumnNames(), + structFieldObjectInspectors); + } + initializeChildren(hconf); + } + + @Override + protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) { + errMsg.append("Operator " + getOperatorId() + " (id=" + id + "): " + + FATAL_ERR_MSG[(int) counterCode]); + } + + @Override + public int getType() { + return OperatorType.MAPJOIN; + } +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (working copy) @@ -110,7 +110,7 @@ // be output protected transient JoinCondDesc[] condn; protected transient boolean noOuterJoin; - private transient Object[] dummyObj; // for outer joins, contains the + protected transient Object[] dummyObj; // for outer joins, contains the // potential nulls for the concerned // aliases protected transient RowContainer>[] dummyObjVectors; // empty @@ -140,6 +140,48 @@ transient boolean handleSkewJoin = false; + public CommonJoinOperator() { + } + + public CommonJoinOperator(CommonJoinOperator clone) { + this.joinEmitInterval = clone.joinEmitInterval; + this.joinCacheSize = clone.joinCacheSize; + this.nextSz = clone.nextSz; + this.childOperators = clone.childOperators; + this.parentOperators = clone.parentOperators; + this.counterNames = clone.counterNames; + this.counterNameToEnum = clone.counterNameToEnum; + this.done = clone.done; + this.operatorId = clone.operatorId; + this.storage = clone.storage; + this.condn = clone.condn; + + this.setSchema(clone.getSchema()); + + this.alias = clone.alias; + this.beginTime = clone.beginTime; + this.inputRows = clone.inputRows; + this.childOperatorsArray = clone.childOperatorsArray; + this.childOperatorsTag = clone.childOperatorsTag; + this.colExprMap = clone.colExprMap; + this.counters = clone.counters; + this.dummyObj = clone.dummyObj; + this.dummyObjVectors = clone.dummyObjVectors; + this.forwardCache = clone.forwardCache; + this.groupKeyObject = clone.groupKeyObject; + this.handleSkewJoin = clone.handleSkewJoin; + this.hconf = clone.hconf; + this.id = clone.id; + this.inputObjInspectors = clone.inputObjInspectors; + this.inputRows = clone.inputRows; + this.noOuterJoin = clone.noOuterJoin; + this.numAliases = clone.numAliases; + this.operatorId = clone.operatorId; + this.posToAliasMap = clone.posToAliasMap; + this.spillTableDesc = clone.spillTableDesc; + this.statsMap = clone.statsMap; + } + protected int populateJoinKeyValue(Map> outMap, Map> inputMap) { @@ -699,7 +741,7 @@ * maintained (inputNulls) where each entry denotes whether the element is to * be used or not (whether it is null or not). The size of the bitvector is * same as the number of inputs under consideration currently. When all inputs - * are accounted for, the output is forwared appropriately. + * are accounted for, the output is forwarded appropriately. */ private void genObject(ArrayList inputNulls, int aliasNum, IntermediateObject intObj, boolean firstRow) throws HiveException { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (working copy) @@ -68,6 +68,32 @@ private long nextCntr = 1; private String lastInputFile = null; private MapredLocalWork localWork = null; + + private ExecMapperContext execContext = new ExecMapperContext(); + + public static class ExecMapperContext { + boolean inputFileChanged = false; + String currentInputFile; + JobConf jc; + public boolean isInputFileChanged() { + return inputFileChanged; + } + public void setInputFileChanged(boolean inputFileChanged) { + this.inputFileChanged = inputFileChanged; + } + public String getCurrentInputFile() { + return currentInputFile; + } + public void setCurrentInputFile(String currentInputFile) { + this.currentInputFile = currentInputFile; + } + public JobConf getJc() { + return jc; + } + public void setJc(JobConf jc) { + this.jc = jc; + } + } @Override public void configure(JobConf job) { @@ -86,6 +112,7 @@ } try { jc = job; + execContext.jc = jc; // create map and fetch operators MapredWork mrwork = Utilities.getMapRedWork(job); mo = new MapOperator(); @@ -93,8 +120,9 @@ // initialize map operator mo.setChildren(job); l4j.info(mo.dump(0)); + mo.setExecContext(execContext); mo.initialize(jc, null); - + // initialize map local work localWork = mrwork.getMapLocalWork(); if (localWork == null) { @@ -141,11 +169,11 @@ mo.setReporter(rp); } - if (localWork != null - && (this.lastInputFile == null || - (localWork.getInputFileChangeSensitive() && inputFileChanged()))) { + if (this.localWork != null + && (this.lastInputFile == null || (inputFileChanged() && localWork + .getInputFileChangeSensitive()))) { this.lastInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); - processMapLocalWork(localWork.getInputFileChangeSensitive()); + processMapLocalWork(localWork.getInputFileChangeSensitive()); } try { @@ -188,10 +216,14 @@ */ private boolean inputFileChanged() { String currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); + execContext.currentInputFile = currentInputFile; if (this.lastInputFile == null || !this.lastInputFile.equals(currentInputFile)) { + this.lastInputFile = currentInputFile; + execContext.inputFileChanged = true; return true; } + execContext.inputFileChanged = false; return false; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (working copy) @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.File; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -50,32 +49,14 @@ /** * Map side Join operator implementation. */ -public class MapJoinOperator extends CommonJoinOperator implements +public class MapJoinOperator extends AbstractMapJoinOperator implements Serializable { private static final long serialVersionUID = 1L; private static final Log LOG = LogFactory.getLog(MapJoinOperator.class .getName()); - /** - * The expressions for join inputs's join keys. - */ - protected transient Map> joinKeys; - /** - * The ObjectInspectors for the join inputs's join keys. - */ - protected transient Map> joinKeysObjectInspectors; - /** - * The standard ObjectInspectors for the join inputs's join keys. - */ - protected transient Map> joinKeysStandardObjectInspectors; - - private transient int posBigTable; // one of the tables that is not in memory - transient int mapJoinRowsKey; // rows for a given key - protected transient Map> mapJoinTables; - protected transient RowContainer> emptyList = null; - private static final transient String[] FATAL_ERR_MSG = { null, // counter value 0 means no error "Mapside join size exceeds hive.mapjoin.maxsize. " @@ -135,43 +116,32 @@ return mapMetadata; } - transient boolean firstRow; - transient int metadataKeyTag; transient int[] metadataValueTag; - transient List hTables; transient int numMapRowsRead; - transient int heartbeatInterval; transient int maxMapJoinSize; + + public MapJoinOperator() { + } + + public MapJoinOperator(AbstractMapJoinOperator mjop) { + super(mjop); + } @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); + numMapRowsRead = 0; - - firstRow = true; - heartbeatInterval = HiveConf.getIntVar(hconf, - HiveConf.ConfVars.HIVESENDHEARTBEAT); maxMapJoinSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAXMAPJOINSIZE); - joinKeys = new HashMap>(); - - populateJoinKeyValue(joinKeys, conf.getKeys()); - joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys, - inputObjInspectors); - joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors); - - // all other tables are small, and are cached in the hash table - posBigTable = conf.getPosBigTable(); - metadataValueTag = new int[numAliases]; for (int pos = 0; pos < numAliases; pos++) { metadataValueTag[pos] = -1; } mapJoinTables = new HashMap>(); - hTables = new ArrayList(); // initialize the hash tables for other tables for (int pos = 0; pos < numAliases; pos++) { @@ -186,33 +156,6 @@ mapJoinTables.put(Byte.valueOf((byte) pos), hashTable); } - - emptyList = new RowContainer>(1, hconf); - RowContainer bigPosRC = getRowContainer(hconf, (byte) posBigTable, - order[posBigTable], joinCacheSize); - storage.put((byte) posBigTable, bigPosRC); - - mapJoinRowsKey = HiveConf.getIntVar(hconf, - HiveConf.ConfVars.HIVEMAPJOINROWSIZE); - - List structFields = ((StructObjectInspector) outputObjInspector) - .getAllStructFieldRefs(); - if (conf.getOutputColumnNames().size() < structFields.size()) { - List structFieldObjectInspectors = new ArrayList(); - for (Byte alias : order) { - int sz = conf.getExprs().get(alias).size(); - List retained = conf.getRetainList().get(alias); - for (int i = 0; i < sz; i++) { - int pos = retained.get(i); - structFieldObjectInspectors.add(structFields.get(pos) - .getFieldObjectInspector()); - } - } - outputObjInspector = ObjectInspectorFactory - .getStandardStructObjectInspector(conf.getOutputColumnNames(), - structFieldObjectInspectors); - } - initializeChildren(hconf); } @Override @@ -380,7 +323,7 @@ } super.closeOp(abort); } - + /** * Implements the getName function for the Node Interface. * Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.ExecMapper.ExecMapperContext; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.Explain; @@ -67,6 +68,8 @@ * run-time while extracting the operator specific counts. */ protected HashMap counterNameToEnum; + + private transient ExecMapperContext execContext; private static int seqId; @@ -340,7 +343,10 @@ } // derived classes can set this to different object if needed outputObjInspector = inputObjInspectors[0]; - + + //pass the exec context to child operators + passExecContext(this.execContext); + initializeOp(hconf); LOG.info("Initialization Done " + id + " " + getName()); } @@ -371,6 +377,18 @@ } } } + + /** + * Pass the execContext reference to every child operator + */ + public void passExecContext(ExecMapperContext execContext) { + this.setExecContext(execContext); + if(childOperators != null) { + for (int i = 0; i < childOperators.size(); i++) { + childOperators.get(i).passExecContext(execContext); + } + } + } /** * Collects all the parent's output object inspectors and calls actual @@ -1117,4 +1135,12 @@ public Object getGroupKeyObject() { return groupKeyObject; } + + public ExecMapperContext getExecContext() { + return execContext; + } + + public void setExecContext(ExecMapperContext execContext) { + this.execContext = execContext; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.plan.LimitDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -74,6 +75,7 @@ opvec.add(new OpTuple(GroupByDesc.class, GroupByOperator.class)); opvec.add(new OpTuple(JoinDesc.class, JoinOperator.class)); opvec.add(new OpTuple(MapJoinDesc.class, MapJoinOperator.class)); + opvec.add(new OpTuple(SMBJoinDesc.class, SMBMapJoinOperator.class)); opvec.add(new OpTuple(LimitDesc.class, LimitOperator.class)); opvec.add(new OpTuple(TableScanDesc.class, TableScanOperator.class)); opvec.add(new OpTuple(UnionDesc.class, UnionOperator.class)); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (revision 0) @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Sorted Merge Map Join Operator. + */ +public class SMBMapJoinOperator extends AbstractMapJoinOperator implements + Serializable { + + private static final long serialVersionUID = 1L; + + private static final Log LOG = LogFactory.getLog(SMBMapJoinOperator.class + .getName()); + + private MapredLocalWork localWork = null; + private Map fetchOperators; + transient Map> keyWritables; + transient Map> nextKeyWritables; + transient Map keyObjInspectors; + HashMap>> nextGroupStorage; + HashMap>> candidateStorage; + + transient HashMap tagToAlias; + private transient HashMap fetchOpDone = new HashMap(); + private transient HashMap foundNextKeyGroup = new HashMap(); + transient boolean firstFetchHappened = false; + + public SMBMapJoinOperator() { + } + + public SMBMapJoinOperator(AbstractMapJoinOperator mapJoinOp) { + super(mapJoinOp); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + initializeMapredLocalWork(conf, hconf, conf.getLocalWork(), LOG); + super.initializeOp(hconf); + + firstRow = true; + heartbeatInterval = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVESENDHEARTBEAT); + + byte storePos = (byte) 0; + nextGroupStorage = new HashMap>>(); + candidateStorage = new HashMap>>(); + for (Byte alias : order) { + int bucketSize = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE); + RowContainer rc = getRowContainer(hconf, storePos, alias, bucketSize); + nextGroupStorage.put((byte) storePos, rc); + RowContainer candidateRC = getRowContainer(hconf, storePos, alias, + bucketSize); + candidateStorage.put(alias, candidateRC); + storePos++; + } + tagToAlias = conf.getTagToAlias(); + keyObjInspectors = new HashMap(); + keyWritables = new HashMap>(); + nextKeyWritables = new HashMap>(); + + for (Byte alias : order) { + List structFieldNames = new ArrayList( + joinKeysObjectInspectors.get(alias).size()); + for (int i = 0; i < joinKeysObjectInspectors.get(alias).size(); i++) { + structFieldNames.add(new String("join_key_" + alias + "_" + i)); + } + StructObjectInspector inspector = ObjectInspectorFactory + .getStandardStructObjectInspector(structFieldNames, + joinKeysObjectInspectors.get(alias)); + keyObjInspectors.put(alias, inspector); + fetchOpDone.put(alias, Boolean.FALSE); + foundNextKeyGroup.put(alias, Boolean.FALSE); + } + } + + public void initializeMapredLocalWork(MapJoinDesc conf, Configuration hconf, + MapredLocalWork localWork, Log l4j) throws HiveException { + if (localWork == null) { + return; + } + this.localWork = localWork; + fetchOperators = new HashMap(); + // create map local operators + for (Map.Entry entry : localWork.getAliasToFetchWork() + .entrySet()) { + fetchOperators.put(entry.getKey(), new FetchOperator(entry.getValue(), + new JobConf(hconf))); + if (l4j != null) { + l4j.info("fetchoperator for " + entry.getKey() + " created"); + } + } + + for (Map.Entry entry : fetchOperators.entrySet()) { + Operator forwardOp = localWork.getAliasToWork() + .get(entry.getKey()); + // All the operators need to be initialized before process + forwardOp.setExecContext(this.getExecContext()); + forwardOp.initialize(this.getExecContext().jc, new ObjectInspector[] {entry.getValue() + .getOutputObjectInspector()}); + l4j.info("fetchoperator for " + entry.getKey() + " initialized"); + } + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + + if (this.getExecContext().inputFileChanged) { + if(firstFetchHappened) { + joinFinalLeftData(); + } + for (Map.Entry entry : fetchOperators.entrySet()) { + String alias = entry.getKey(); + FetchOperator fetchOp = entry.getValue(); + fetchOp.clearFetchContext(); + setUpFetchOpContext(fetchOp, alias); + } + this.getExecContext().inputFileChanged = false; + firstFetchHappened = false; + } + + if (!firstFetchHappened) { + firstFetchHappened = true; + // fetch the first group for all small table aliases + for (Byte t : order) { + if(t != (byte)posBigTable) { + fetchNextGroup(t); + } + } + } + + byte alias = (byte) tag; + // compute keys and values as StandardObjects + ArrayList key = computeValues(row, joinKeys.get(alias), + joinKeysObjectInspectors.get(alias)); + ArrayList value = computeValues(row, joinValues.get(alias), + joinValuesObjectInspectors.get(alias)); + + boolean nextKeyGroup = processKey(alias, key); + if (nextKeyGroup) { + this.nextGroupStorage.get(alias).add(value); + foundNextKeyGroup.put((byte) tag, Boolean.TRUE); + if (tag != posBigTable) { + // this.nextGroupStorage.get((byte) tag).add(value); + return; + } + } + + // the big table has reached a new key group. try to let the small tables + // catch up with the big table by joining them. + if (nextKeyGroup) { + assert tag == (byte)posBigTable; + int smallestPos = -1; + do { + smallestPos = joinOneGroup(); + } while (smallestPos != this.posBigTable); + + return; + } + + assert !nextKeyGroup; + candidateStorage.get((byte) tag).add(value); + } + + /* + * this happens either when the input file of the big table is changed or in + * close op. It needs to fetch all the left data from the small tables and try + * to join them. + */ + private void joinFinalLeftData() throws HiveException { + RowContainer bigTblRowContainer = this.candidateStorage.get((byte)this.posBigTable); + + boolean allFetchOpDone = allFetchOpDone(); + // if all left data in small tables are less than and equal to the left data + // in big table, let's them catch up + while (bigTblRowContainer != null && bigTblRowContainer.size() > 0 + && !allFetchOpDone) { + joinOneGroup(); + bigTblRowContainer = this.candidateStorage.get((byte)this.posBigTable); + allFetchOpDone = allFetchOpDone(); + } + + if(allFetchOpDone) { + joinOneGroup(); + } else { + while (!allFetchOpDone) { + joinOneGroup(); + allFetchOpDone = allFetchOpDone(); + } + } + } + + private boolean allFetchOpDone() { + boolean allFetchOpDone = true; + for (Byte tag : order) { + if(tag == (byte) posBigTable) { + continue; + } + allFetchOpDone = allFetchOpDone && fetchOpDone.get(tag); + } + return allFetchOpDone; + } + + private int joinOneGroup() throws HiveException { + int smallestPos = -1; + smallestPos = findMostSmallKey(); + List listOfNeedFetchNext = joinObject(smallestPos); + if (listOfNeedFetchNext.size() > 0) { + for (Byte b : listOfNeedFetchNext) { + fetchNextGroup(b); + } + } + return smallestPos; + } + + private List joinObject(int smallestPos) throws HiveException { + List needFetchList = new ArrayList(); + ArrayList smallKey = keyWritables.get((byte) smallestPos); + needFetchList.add((byte)smallestPos); + this.storage.put((byte) smallestPos, this.candidateStorage.get((byte) smallestPos)); + for (Byte i : order) { + if ((byte) smallestPos == i) { + continue; + } + ArrayList key = keyWritables.get(i); + if (key == null) { + putDummyOrEmpty(i); + } else { + int cmp = compareKeys(key, smallKey); + if (cmp == 0) { + this.storage.put((byte) i, this.candidateStorage + .get((byte) i)); + needFetchList.add(i); + continue; + } else { + putDummyOrEmpty(i); + } + } + } + checkAndGenObject(); + for (Byte pos : needFetchList) { + this.candidateStorage.get(pos).clear(); + } + return needFetchList; + } + + private void fetchNextGroup(Byte t) throws HiveException { + if (foundNextKeyGroup.get(t)) { + // first promote the next group to be the current group + ArrayList oldKey = this.keyWritables.get(t); + oldKey.clear(); + if (this.nextKeyWritables.get(t) != null) { + this.keyWritables.put(t, this.nextKeyWritables.get(t)); + this.nextKeyWritables.remove(t); + RowContainer> oldRowContainer = this.candidateStorage.get(t); + oldRowContainer.clear(); + this.candidateStorage.put(t, this.nextGroupStorage.get(t)); + this.nextGroupStorage.put(t, oldRowContainer); + } else { + this.keyWritables.remove(t); + this.candidateStorage.remove(t); + this.nextGroupStorage.remove(t); + } + foundNextKeyGroup.put(t, Boolean.FALSE); + } + //for the big table, we only need to promote the next group to the current group. + if(t == (byte)posBigTable) { + return; + } + +// System.out.println("table id: " + t); + + //for tables other than the big table, we need to fetch more data until reach a new group or done. + while (!foundNextKeyGroup.get(t)) { + if (fetchOpDone.get(t)) { + break; + } + fetchOneRow(t); + } + if (!foundNextKeyGroup.get(t) && fetchOpDone.get(t)) { + this.nextKeyWritables.remove(t); + } + } + + private int compareKeys (ArrayList k1, ArrayList k2) { + int ret = 0; + for (int i = 0; i < k1.size() && i < k1.size(); i++) { + WritableComparable key_1 = (WritableComparable) k1.get(i); + WritableComparable key_2 = (WritableComparable) k2.get(i); + ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2); + if(ret != 0) { + return ret; + } + } + return k1.size() - k2.size(); + } + + private void putDummyOrEmpty(Byte i) { + // put a empty list or null + if (noOuterJoin) { + storage.put(i, emptyList); + } else { + storage.put(i, dummyObjVectors[i.intValue()]); + } + } + + private int findMostSmallKey() { + byte index = -1; + ArrayList mostSmallOne = null; + + for (byte i : order) { + ArrayList key = keyWritables.get(i); + if (key == null) { + continue; + } + if (mostSmallOne == null) { + mostSmallOne = key; + index = i; + continue; + } + int cmp = compareKeys(key, mostSmallOne); + if (cmp < 0) { + mostSmallOne = key; + index = i; + continue; + } + } + return index; + } + + private boolean processKey(byte alias, ArrayList key) + throws HiveException { + ArrayList keyWritable = keyWritables.get(alias); + if (keyWritable == null) { + //the first group. + keyWritables.put(alias, key); + return false; + } else { + int cmp = compareKeys(key, keyWritable);; + if (cmp != 0) { + nextKeyWritables.put(alias, key); + return true; + } + return false; + } + } + + private void setUpFetchOpContext(FetchOperator fetchOp, String alias) { + String currentInputFile = this.getExecContext().currentInputFile; + BucketMapJoinContext bucketMatcherCxt = this.localWork + .getBucketMapjoinContext(); + Class bucketMatcherCls = bucketMatcherCxt + .getBucketMatcherClass(); + BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance( + bucketMatcherCls, null); + bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt + .getAliasBucketFileNameMapping()); + List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, + bucketMatcherCxt.getMapJoinBigTableAlias(), alias); + Iterator iter = aliasFiles.iterator(); + fetchOp.setupContext(iter, null); + } + + private void fetchOneRow(byte tag) { + if (fetchOperators != null) { + String tble = this.tagToAlias.get(tag); + FetchOperator fetchOp = fetchOperators.get(tble); + +// System.out.println("table name in fetchOneRow: " + tble); + + Operator forwardOp = localWork.getAliasToWork() + .get(tble); + try { + InspectableObject row = fetchOp.getNextRow(); + if (row == null) { + this.fetchOpDone.put(tag, Boolean.TRUE); + forwardOp.close(false); + return; + } + forwardOp.process(row.o, 0); + // check if any operator had a fatal error or early exit during + // execution + if (forwardOp.getDone()) { + this.fetchOpDone.put(tag, Boolean.TRUE); + } + } catch (Throwable e) { + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new RuntimeException("Map local work failed", e); + } + } + } + } + + protected boolean areAllParentsInitialized() { + return true; + } + + @Override + public void closeOp(boolean abort) throws HiveException { + this.firstFetchHappened = false; + joinFinalLeftData(); + super.closeOp(abort); + } + + /** + * Implements the getName function for the Node Interface. + * + * @return the name of the operator + */ + @Override + public String getName() { + return "MAPJOIN"; + } + + @Override + public int getType() { + return OperatorType.MAPJOIN; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (working copy) @@ -192,6 +192,8 @@ for (int index = 0; index < joinAliases.size(); index++) { String alias = joinAliases.get(index); TableScanOperator tso = (TableScanOperator) topOps.get(alias); + if (tso == null) + return null; Table tbl = topToTable.get(tso); if(tbl.isPartitioned()) { PrunedPartitionList prunedParts = null; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -54,6 +55,7 @@ import org.apache.hadoop.hive.ql.plan.ExtractDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -342,7 +344,7 @@ return dest; } - MapJoinOperator currMapJoinOp = ctx.getCurrMapJoinOp(); + AbstractMapJoinOperator currMapJoinOp = ctx.getCurrMapJoinOp(); if (currMapJoinOp != null) { opTaskMap.put(null, currTask); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java (working copy) @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; /** @@ -140,7 +142,7 @@ String taskTmpDir; TableDesc tt_desc; Operator rootMapJoinOp; - MapJoinOperator oldMapJoin; + AbstractMapJoinOperator oldMapJoin; public GenMRMapJoinCtx() { taskTmpDir = null; @@ -157,7 +159,7 @@ */ public GenMRMapJoinCtx(String taskTmpDir, TableDesc tt_desc, Operator rootMapJoinOp, - MapJoinOperator oldMapJoin) { + AbstractMapJoinOperator oldMapJoin) { this.taskTmpDir = taskTmpDir; this.tt_desc = tt_desc; this.rootMapJoinOp = rootMapJoinOp; @@ -198,7 +200,7 @@ /** * @return the oldMapJoin */ - public MapJoinOperator getOldMapJoin() { + public AbstractMapJoinOperator getOldMapJoin() { return oldMapJoin; } @@ -206,7 +208,7 @@ * @param oldMapJoin * the oldMapJoin to set */ - public void setOldMapJoin(MapJoinOperator oldMapJoin) { + public void setOldMapJoin(AbstractMapJoinOperator oldMapJoin) { this.oldMapJoin = oldMapJoin; } } @@ -214,7 +216,7 @@ private HiveConf conf; private HashMap, Task> opTaskMap; private HashMap unionTaskMap; - private HashMap mapJoinTaskMap; + private HashMap, GenMRMapJoinCtx> mapJoinTaskMap; private List> seenOps; private List seenFileSinkOps; @@ -226,7 +228,7 @@ private Task currTask; private Operator currTopOp; private UnionOperator currUnionOp; - private MapJoinOperator currMapJoinOp; + private AbstractMapJoinOperator currMapJoinOp; private String currAliasId; private List> rootOps; @@ -289,7 +291,7 @@ rootOps = new ArrayList>(); rootOps.addAll(parseCtx.getTopOps().values()); unionTaskMap = new HashMap(); - mapJoinTaskMap = new HashMap(); + mapJoinTaskMap = new HashMap, GenMRMapJoinCtx>(); } /** @@ -456,7 +458,7 @@ this.currUnionOp = currUnionOp; } - public MapJoinOperator getCurrMapJoinOp() { + public AbstractMapJoinOperator getCurrMapJoinOp() { return currMapJoinOp; } @@ -464,7 +466,7 @@ * @param currMapJoinOp * current map join operator */ - public void setCurrMapJoinOp(MapJoinOperator currMapJoinOp) { + public void setCurrMapJoinOp(AbstractMapJoinOperator currMapJoinOp) { this.currMapJoinOp = currMapJoinOp; } @@ -491,11 +493,11 @@ unionTaskMap.put(op, uTask); } - public GenMRMapJoinCtx getMapJoinCtx(MapJoinOperator op) { + public GenMRMapJoinCtx getMapJoinCtx(AbstractMapJoinOperator op) { return mapJoinTaskMap.get(op); } - public void setMapJoinCtx(MapJoinOperator op, GenMRMapJoinCtx mjCtx) { + public void setMapJoinCtx(AbstractMapJoinOperator op, GenMRMapJoinCtx mjCtx) { mapJoinTaskMap.put(op, mjCtx); } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -141,7 +143,7 @@ // If there is a mapjoin at position 'pos' if (uPrsCtx.getMapJoinSubq(pos)) { - MapJoinOperator mjOp = ctx.getCurrMapJoinOp(); + AbstractMapJoinOperator mjOp = ctx.getCurrMapJoinOp(); assert mjOp != null; GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mjOp); assert mjCtx != null; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -33,11 +33,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -157,7 +159,7 @@ // The mapjoin has already been encountered. Some context must be stored // about that if (readInputMapJoin) { - MapJoinOperator currMapJoinOp = opProcCtx.getCurrMapJoinOp(); + AbstractMapJoinOperator currMapJoinOp = (AbstractMapJoinOperator) opProcCtx.getCurrMapJoinOp(); assert currMapJoinOp != null; boolean local = ((pos == -1) || (pos == (currMapJoinOp.getConf()) .getPosBigTable())) ? false : true; @@ -217,7 +219,7 @@ seenOps.add(currTopOp); boolean local = (pos == desc.getPosBigTable()) ? false : true; setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx); - setupBucketMapJoinInfo(plan, (MapJoinOperator)op); + setupBucketMapJoinInfo(plan, (AbstractMapJoinOperator)op); } opProcCtx.setCurrTask(currTask); @@ -226,7 +228,7 @@ } private static void setupBucketMapJoinInfo(MapredWork plan, - MapJoinOperator currMapJoinOp) { + AbstractMapJoinOperator currMapJoinOp) { if (currMapJoinOp != null) { LinkedHashMap>> aliasBucketFileNameMapping = currMapJoinOp.getConf().getAliasBucketFileNameMapping(); @@ -236,8 +238,12 @@ localPlan = new MapredLocalWork( new LinkedHashMap>(), new LinkedHashMap()); - plan.setMapLocalWork(localPlan); + plan.setMapLocalWork(localPlan); } + if(currMapJoinOp instanceof SMBMapJoinOperator) { + plan.setMapLocalWork(null); + ((SMBMapJoinOperator)currMapJoinOp).getConf().setLocalWork(localPlan); + } BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext(); localPlan.setBucketMapjoinContext(bucketMJCxt); bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping); @@ -368,7 +374,7 @@ currTopOp = null; opProcCtx.setCurrTopOp(currTopOp); } else if (opProcCtx.getCurrMapJoinOp() != null) { - MapJoinOperator mjOp = opProcCtx.getCurrMapJoinOp(); + AbstractMapJoinOperator mjOp = (AbstractMapJoinOperator) opProcCtx.getCurrMapJoinOp(); if (readUnionData) { initUnionPlan(opProcCtx, currTask, false); } else { @@ -376,7 +382,7 @@ // In case of map-join followed by map-join, the file needs to be // obtained from the old map join - MapJoinOperator oldMapJoin = mjCtx.getOldMapJoin(); + AbstractMapJoinOperator oldMapJoin = (AbstractMapJoinOperator) mjCtx.getOldMapJoin(); String taskTmpDir = null; TableDesc tt_desc = null; Operator rootOp = null; @@ -819,8 +825,8 @@ setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan, local, tt_desc); // This can be cleaned up as a function table in future - if (op instanceof MapJoinOperator) { - MapJoinOperator mjOp = (MapJoinOperator) op; + if (op instanceof AbstractMapJoinOperator) { + AbstractMapJoinOperator mjOp = (AbstractMapJoinOperator) op; opProcCtx.setCurrMapJoinOp(mjOp); GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp); if (mjCtx == null) { Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -43,6 +44,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -52,7 +54,7 @@ */ public final class MapJoinFactory { - public static int getPositionParent(MapJoinOperator op, Stack stack) { + public static int getPositionParent(AbstractMapJoinOperator op, Stack stack) { int pos = 0; int size = stack.size(); assert size >= 2 && stack.get(size - 1) == op; @@ -72,7 +74,7 @@ @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - MapJoinOperator mapJoin = (MapJoinOperator) nd; + AbstractMapJoinOperator mapJoin = (AbstractMapJoinOperator) nd; GenMRProcContext ctx = (GenMRProcContext) procCtx; // find the branch on which this processor was invoked @@ -122,7 +124,7 @@ @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - MapJoinOperator mapJoin = (MapJoinOperator) nd; + AbstractMapJoinOperator mapJoin = (AbstractMapJoinOperator) nd; GenMRProcContext opProcCtx = (GenMRProcContext) procCtx; MapredWork cplan = GenMapRedUtils.getMapRedWork(); @@ -133,7 +135,7 @@ // find the branch on which this processor was invoked int pos = getPositionParent(mapJoin, stack); - boolean local = (pos == (mapJoin.getConf()).getPosBigTable()) ? false + boolean local = (pos == ((MapJoinDesc)(mapJoin.getConf())).getPosBigTable()) ? false : true; GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false, @@ -180,7 +182,7 @@ Object... nodeOutputs) throws SemanticException { SelectOperator sel = (SelectOperator) nd; - MapJoinOperator mapJoin = (MapJoinOperator) sel.getParentOperators().get( + AbstractMapJoinOperator mapJoin = (AbstractMapJoinOperator) sel.getParentOperators().get( 0); assert sel.getParentOperators().size() == 1; @@ -188,7 +190,7 @@ ParseContext parseCtx = ctx.getParseCtx(); // is the mapjoin followed by a reducer - List listMapJoinOps = parseCtx + List> listMapJoinOps = parseCtx .getListMapJoinOpsNoReducer(); if (listMapJoinOps.contains(mapJoin)) { @@ -263,11 +265,11 @@ @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - MapJoinOperator mapJoin = (MapJoinOperator) nd; + AbstractMapJoinOperator mapJoin = (AbstractMapJoinOperator) nd; GenMRProcContext ctx = (GenMRProcContext) procCtx; ctx.getParseCtx(); - MapJoinOperator oldMapJoin = ctx.getCurrMapJoinOp(); + AbstractMapJoinOperator oldMapJoin = ctx.getCurrMapJoinOp(); assert oldMapJoin != null; GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin); if (mjCtx != null) { @@ -335,7 +337,7 @@ UnionOperator currUnion = ctx.getCurrUnionOp(); assert currUnion != null; ctx.getUnionTask(currUnion); - MapJoinOperator mapJoin = (MapJoinOperator) nd; + AbstractMapJoinOperator mapJoin = (AbstractMapJoinOperator) nd; // find the branch on which this processor was invoked int pos = getPositionParent(mapJoin, stack); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (working copy) @@ -28,7 +28,13 @@ import java.util.Set; import java.util.Stack; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -36,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; @@ -44,21 +51,29 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ErrorMsg; import org.apache.hadoop.hive.ql.parse.GenMapRedWalker; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; /** * Implementation of one of the rule-based map join optimization. User passes @@ -68,6 +83,9 @@ * implemented, this transformation can also be done based on costs. */ public class MapJoinProcessor implements Transform { + + private static final Log LOG = LogFactory.getLog(MapJoinProcessor.class.getName()); + private ParseContext pGraphContext; /** @@ -84,7 +102,7 @@ pGraphContext.getOpParseCtx().put(op, ctx); return op; } - + /** * convert a regular join to a a map-side join. * @@ -101,17 +119,23 @@ // outer join cannot be performed on a table which is being cached JoinDesc desc = op.getConf(); org.apache.hadoop.hive.ql.plan.JoinCondDesc[] condns = desc.getConds(); - for (org.apache.hadoop.hive.ql.plan.JoinCondDesc condn : condns) { - if (condn.getType() == JoinDesc.FULL_OUTER_JOIN) { - throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); - } - if ((condn.getType() == JoinDesc.LEFT_OUTER_JOIN) - && (condn.getLeft() != mapJoinPos)) { - throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); - } - if ((condn.getType() == JoinDesc.RIGHT_OUTER_JOIN) - && (condn.getRight() != mapJoinPos)) { - throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + HiveConf hiveConf = pGraphContext.getConf(); + boolean noCheckOuterJoin = HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN) + && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN); + if (!noCheckOuterJoin) { + for (org.apache.hadoop.hive.ql.plan.JoinCondDesc condn : condns) { + if (condn.getType() == JoinDesc.FULL_OUTER_JOIN) { + throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + } + if ((condn.getType() == JoinDesc.LEFT_OUTER_JOIN) + && (condn.getLeft() != mapJoinPos)) { + throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + } + if ((condn.getType() == JoinDesc.RIGHT_OUTER_JOIN) + && (condn.getRight() != mapJoinPos)) { + throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + } } } @@ -243,7 +267,7 @@ keyTableDesc, valueExprMap, valueTableDescs, outputColumnNames, mapJoinPos, joinCondns), new RowSchema(outputRS.getColumnInfos()), newPar), outputRS); - + mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs()); mapJoinOp.setColumnExprMap(colExprMap); @@ -396,7 +420,7 @@ } // Go over the list and find if a reducer is not needed - List listMapJoinOpsNoRed = new ArrayList(); + List> listMapJoinOpsNoRed = new ArrayList>(); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. @@ -461,8 +485,8 @@ Object... nodeOutputs) throws SemanticException { MapJoinWalkerCtx ctx = (MapJoinWalkerCtx) procCtx; - MapJoinOperator mapJoin = ctx.getCurrMapJoinOp(); - List listRejectedMapJoins = ctx + AbstractMapJoinOperator mapJoin = ctx.getCurrMapJoinOp(); + List> listRejectedMapJoins = ctx .getListRejectedMapJoins(); // the mapjoin has already been handled @@ -471,9 +495,9 @@ return null; } - List listMapJoinsNoRed = ctx.getListMapJoinsNoRed(); + List> listMapJoinsNoRed = ctx.getListMapJoinsNoRed(); if (listMapJoinsNoRed == null) { - listMapJoinsNoRed = new ArrayList(); + listMapJoinsNoRed = new ArrayList>(); } listMapJoinsNoRed.add(mapJoin); ctx.setListMapJoins(listMapJoinsNoRed); @@ -494,11 +518,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { MapJoinWalkerCtx ctx = (MapJoinWalkerCtx) procCtx; - MapJoinOperator mapJoin = ctx.getCurrMapJoinOp(); - List listRejectedMapJoins = ctx + AbstractMapJoinOperator mapJoin = ctx.getCurrMapJoinOp(); + List> listRejectedMapJoins = ctx .getListRejectedMapJoins(); if (listRejectedMapJoins == null) { - listRejectedMapJoins = new ArrayList(); + listRejectedMapJoins = new ArrayList>(); } listRejectedMapJoins.add(mapJoin); ctx.setListRejectedMapJoins(listRejectedMapJoins); @@ -543,23 +567,23 @@ * */ public static class MapJoinWalkerCtx implements NodeProcessorCtx { - private List listMapJoinsNoRed; - private List listRejectedMapJoins; - private MapJoinOperator currMapJoinOp; + private List> listMapJoinsNoRed; + private List> listRejectedMapJoins; + private AbstractMapJoinOperator currMapJoinOp; /** * @param listMapJoinsNoRed */ - public MapJoinWalkerCtx(List listMapJoinsNoRed) { + public MapJoinWalkerCtx(List> listMapJoinsNoRed) { this.listMapJoinsNoRed = listMapJoinsNoRed; currMapJoinOp = null; - listRejectedMapJoins = new ArrayList(); + listRejectedMapJoins = new ArrayList>(); } /** * @return the listMapJoins */ - public List getListMapJoinsNoRed() { + public List> getListMapJoinsNoRed() { return listMapJoinsNoRed; } @@ -567,14 +591,14 @@ * @param listMapJoinsNoRed * the listMapJoins to set */ - public void setListMapJoins(List listMapJoinsNoRed) { + public void setListMapJoins(List> listMapJoinsNoRed) { this.listMapJoinsNoRed = listMapJoinsNoRed; } /** * @return the currMapJoinOp */ - public MapJoinOperator getCurrMapJoinOp() { + public AbstractMapJoinOperator getCurrMapJoinOp() { return currMapJoinOp; } @@ -582,14 +606,14 @@ * @param currMapJoinOp * the currMapJoinOp to set */ - public void setCurrMapJoinOp(MapJoinOperator currMapJoinOp) { + public void setCurrMapJoinOp(AbstractMapJoinOperator currMapJoinOp) { this.currMapJoinOp = currMapJoinOp; } /** * @return the listRejectedMapJoins */ - public List getListRejectedMapJoins() { + public List> getListRejectedMapJoins() { return listRejectedMapJoins; } @@ -598,7 +622,7 @@ * the listRejectedMapJoins to set */ public void setListRejectedMapJoins( - List listRejectedMapJoins) { + List> listRejectedMapJoins) { this.listRejectedMapJoins = listRejectedMapJoins; } } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy) @@ -58,6 +58,9 @@ transformations.add(new MapJoinProcessor()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) { transformations.add(new BucketMapJoinOptimizer()); + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) { + transformations.add(new SortedMergeBucketMapJoinOptimizer()); + } } transformations.add(new UnionProcessor()); transformations.add(new JoinReorder()); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (revision 0) @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; +import org.apache.hadoop.hive.ql.parse.QBJoinTree; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; + +//try to replace a bucket map join with a sorted merge map join +public class SortedMergeBucketMapJoinOptimizer implements Transform { + + private static final Log LOG = LogFactory + .getLog(SortedMergeBucketMapJoinOptimizer.class.getName()); + + public SortedMergeBucketMapJoinOptimizer() { + } + + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + Map opRules = new LinkedHashMap(); + // go through all map joins and find out all which have enabled bucket map + // join. + opRules.put(new RuleRegExp("R1", "MAPJOIN%"), + getSortedMergeBucketMapjoinProc(pctx)); + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, null); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + + return pctx; + } + + private NodeProcessor getSortedMergeBucketMapjoinProc(ParseContext pctx) { + return new SortedMergeBucketMapjoinProc(pctx); + } + + private NodeProcessor getDefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + return null; + } + }; + } + + class SortedMergeBucketMapjoinProc implements NodeProcessor { + ParseContext pGraphContext; + + public SortedMergeBucketMapjoinProc(ParseContext pctx) { + this.pGraphContext = pctx; + } + + public SortedMergeBucketMapjoinProc() { + } + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + if (nd instanceof SMBMapJoinOperator) { + return null; + } + MapJoinOperator mapJoinOp = (MapJoinOperator) nd; + if (mapJoinOp.getConf().getAliasBucketFileNameMapping() == null + || mapJoinOp.getConf().getAliasBucketFileNameMapping().size() == 0) { + return null; + } + + boolean tableSorted = true; + QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext() + .get(mapJoinOp); + if (joinCxt == null) + return null; + String[] srcs = joinCxt.getBaseSrc(); + int pos = 0; + for (String src : srcs) { + tableSorted = tableSorted + && isTableSorted(this.pGraphContext, mapJoinOp, joinCxt, src, pos); + pos++; + } + if (!tableSorted) { + return null; + } + // convert a bucket map join operator to a sorted merge bucket map join + // operator + convertToSMBJoin(mapJoinOp, srcs); + return null; + } + + private SMBMapJoinOperator convertToSMBJoin(MapJoinOperator mapJoinOp, + String[] srcs) { + SMBMapJoinOperator smbJop = new SMBMapJoinOperator(mapJoinOp); + SMBJoinDesc smbJoinDesc = new SMBJoinDesc(mapJoinOp.getConf()); + smbJop.setConf(smbJoinDesc); + HashMap tagToAlias = new HashMap(); + for (int i = 0; i < srcs.length; i++) { + tagToAlias.put((byte) i, srcs[i]); + } + smbJoinDesc.setTagToAlias(tagToAlias); + + int indexInListMapJoinNoReducer = this.pGraphContext.getListMapJoinOpsNoReducer().indexOf(mapJoinOp); + if(indexInListMapJoinNoReducer >= 0 ) { + this.pGraphContext.getListMapJoinOpsNoReducer().remove(indexInListMapJoinNoReducer); + this.pGraphContext.getListMapJoinOpsNoReducer().add(indexInListMapJoinNoReducer, smbJop); + } + + List parentOperators = mapJoinOp.getParentOperators(); + for (int i = 0; i < parentOperators.size(); i++) { + Operator par = parentOperators.get(i); + int index = par.getChildOperators().indexOf(mapJoinOp); + par.getChildOperators().remove(index); + par.getChildOperators().add(index, smbJop); + } + List childOps = mapJoinOp.getChildOperators(); + for (int i = 0; i < childOps.size(); i++) { + Operator child = childOps.get(i); + int index = child.getParentOperators().indexOf(mapJoinOp); + child.getParentOperators().remove(index); + child.getParentOperators().add(index, smbJop); + } + return smbJop; + } + + private boolean isTableSorted(ParseContext pctx, MapJoinOperator op, + QBJoinTree joinTree, String alias, int pos) throws SemanticException { + Map> topOps = this.pGraphContext + .getTopOps(); + Map topToTable = this.pGraphContext + .getTopToTable(); + TableScanOperator tso = (TableScanOperator) topOps.get(alias); + if (tso == null) + return false; + + List keys = op.getConf().getKeys().get((byte) pos); + // get all join columns from join keys stored in MapJoinDesc + List joinCols = new ArrayList(); + List joinKeys = new ArrayList(); + joinKeys.addAll(keys); + while (joinKeys.size() > 0) { + ExprNodeDesc node = joinKeys.remove(0); + if (node instanceof ExprNodeColumnDesc) { + joinCols.addAll(node.getCols()); + } else if (node instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc udfNode = ((ExprNodeGenericFuncDesc) node); + GenericUDF udf = udfNode.getGenericUDF(); + if (!FunctionRegistry.isDeterministic(udf)) { + return false; + } + joinKeys.addAll(0, udfNode.getChildExprs()); + } + } + + Table tbl = topToTable.get(tso); + if (tbl.isPartitioned()) { + PrunedPartitionList prunedParts = null; + try { + prunedParts = PartitionPruner.prune(tbl, pGraphContext + .getOpToPartPruner().get(tso), pGraphContext.getConf(), alias, + pGraphContext.getPrunedPartitions()); + } catch (HiveException e) { + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new SemanticException(e.getMessage(), e); + } + boolean ret = true; + for (Partition p : prunedParts.getConfirmedPartns()) { + ret = ret && checkSortColsAndJoinCols(p.getSortCols(), joinCols); + if (!ret) { + return false; + } + } + for (Partition p : prunedParts.getUnknownPartns()) { + ret = ret && checkSortColsAndJoinCols(p.getSortCols(), joinCols); + if (!ret) { + return false; + } + } + } else { + return checkSortColsAndJoinCols(tbl.getSortCols(), joinCols); + } + return true; + } + + private boolean checkSortColsAndJoinCols(List sortCols, + List joinCols) { + // require all sort columns are asc, right now only support asc + List sortColNames = new ArrayList(); + for (Order o : sortCols) { + if (o.getOrder() != BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC) { + return false; + } + sortColNames.add(o.getCol()); + } + + return sortColNames.containsAll(joinCols) + && sortColNames.size() == joinCols.size(); + } + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (working copy) @@ -75,7 +75,10 @@ protected Context ctx; protected HashMap idToTableNameMap; - + + public static int HIVE_COLUMN_ORDER_ASC = 1; + public static int HIVE_COLUMN_ORDER_DESC = 0; + /** * ReadEntitites that are passed to the hooks. */ @@ -355,10 +358,10 @@ ASTNode child = (ASTNode) ast.getChild(i); if (child.getToken().getType() == HiveParser.TOK_TABSORTCOLNAMEASC) { colList.add(new Order(unescapeIdentifier(child.getChild(0).getText()), - 1)); + HIVE_COLUMN_ORDER_ASC)); } else { colList.add(new Order(unescapeIdentifier(child.getChild(0).getText()), - 0)); + HIVE_COLUMN_ORDER_DESC)); } } return colList; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -37,6 +38,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; /** @@ -67,7 +69,7 @@ private HashMap idToTableNameMap; private int destTableId; private UnionProcContext uCtx; - private List listMapJoinOpsNoReducer; // list of map join + private List> listMapJoinOpsNoReducer; // list of map join // operators with no // reducer private Map> groupOpToInputTables; @@ -129,7 +131,7 @@ HashMap topToTable, List loadTableWork, List loadFileWork, Context ctx, HashMap idToTableNameMap, int destTableId, - UnionProcContext uCtx, List listMapJoinOpsNoReducer, + UnionProcContext uCtx, List> listMapJoinOpsNoReducer, Map> groupOpToInputTables, Map prunedPartitions, HashMap opToSamplePruner) { @@ -366,7 +368,7 @@ /** * @return the listMapJoinOpsNoReducer */ - public List getListMapJoinOpsNoReducer() { + public List> getListMapJoinOpsNoReducer() { return listMapJoinOpsNoReducer; } @@ -375,7 +377,7 @@ * the listMapJoinOpsNoReducer to set */ public void setListMapJoinOpsNoReducer( - List listMapJoinOpsNoReducer) { + List> listMapJoinOpsNoReducer) { this.listMapJoinOpsNoReducer = listMapJoinOpsNoReducer; } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 917282) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; @@ -123,6 +124,7 @@ import org.apache.hadoop.hive.ql.plan.LimitDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -173,7 +175,7 @@ private ASTNode ast; private int destTableId; private UnionProcContext uCtx; - List listMapJoinOpsNoReducer; + List> listMapJoinOpsNoReducer; private HashMap opToSamplePruner; Map> groupOpToInputTables; Map prunedPartitions; @@ -202,7 +204,7 @@ topToTable = new HashMap(); destTableId = 1; uCtx = null; - listMapJoinOpsNoReducer = new ArrayList(); + listMapJoinOpsNoReducer = new ArrayList>(); groupOpToInputTables = new HashMap>(); prunedPartitions = new HashMap(); unparseTranslator = new UnparseTranslator(); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java (working copy) @@ -89,6 +89,21 @@ this(exprs, outputColumnNames, false, conds); } + public JoinDesc(JoinDesc clone) { + this.bigKeysDirMap = clone.bigKeysDirMap; + this.conds = clone.conds; + this.exprs = clone.exprs; + this.handleSkewJoin = clone.handleSkewJoin; + this.keyTableDesc = clone.keyTableDesc; + this.noOuterJoin = clone.noOuterJoin; + this.outputColumnNames = clone.outputColumnNames; + this.reversedExprs = clone.reversedExprs; + this.skewKeyDefinition = clone.skewKeyDefinition; + this.skewKeysValuesTables = clone.skewKeysValuesTables; + this.smallKeysDirMap = clone.smallKeysDirMap; + this.tagOrder = clone.tagOrder; + } + public Map> getExprs() { return exprs; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (revision 916496) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (working copy) @@ -50,6 +50,17 @@ public MapJoinDesc() { } + + public MapJoinDesc(MapJoinDesc clone) { + super(clone); + this.keys = clone.keys; + this.keyTblDesc = clone.keyTblDesc; + this.valueTblDescs = clone.valueTblDescs; + this.posBigTable = clone.posBigTable; + this.retainList = clone.retainList; + this.bigTableAlias = clone.bigTableAlias; + this.aliasBucketFileNameMapping = clone.aliasBucketFileNameMapping; + } public MapJoinDesc(final Map> keys, final TableDesc keyTblDesc, final Map> values, Index: ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java (revision 0) @@ -0,0 +1,39 @@ +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.HashMap; + +@Explain(displayName = "Sorted Merge Bucket Map Join Operator") +public class SMBJoinDesc extends MapJoinDesc implements Serializable { + + private static final long serialVersionUID = 1L; + + private MapredLocalWork localWork; + + //keep a mapping from tag to the fetch operator alias + private HashMap tagToAlias; + + public SMBJoinDesc(MapJoinDesc conf) { + super(conf); + } + + public SMBJoinDesc() { + } + + public MapredLocalWork getLocalWork() { + return localWork; + } + + public void setLocalWork(MapredLocalWork localWork) { + this.localWork = localWork; + } + + public HashMap getTagToAlias() { + return tagToAlias; + } + + public void setTagToAlias(HashMap tagToAlias) { + this.tagToAlias = tagToAlias; + } + +}