Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 905742) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -187,6 +187,7 @@ HIVEOPTCP("hive.optimize.cp", true), // column pruner HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by + HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join ; public final String varname; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/BucketMatcher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/BucketMatcher.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/BucketMatcher.java (revision 0) @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.LinkedHashMap; +import java.util.List; + +import org.apache.hadoop.fs.Path; + +public interface BucketMatcher { + + public List getAliasBucketFiles(String currentInputFile, String refTableAlias, String alias); + + public void setAliasToBucketNumber(LinkedHashMap aliasToBucketNumber); + + public void setAliasToBucketFileNames(LinkedHashMap> aliasToBucketFileNames); + +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultBucketMatcher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultBucketMatcher.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultBucketMatcher.java (revision 0) @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; + +import org.apache.hadoop.fs.Path; + +public class DefaultBucketMatcher implements BucketMatcher { + + private LinkedHashMap> aliasToBucketFileNames; + + private LinkedHashMap aliasToBucketNumber; + + public DefaultBucketMatcher( + LinkedHashMap> aliasToBucketFileNames, + LinkedHashMap aliasToBucketNumber) { + super(); + this.setAliasToBucketFileNames(aliasToBucketFileNames); + this.setAliasToBucketNumber(aliasToBucketNumber); + } + + public DefaultBucketMatcher(){ + } + + public List getAliasBucketFiles(String refTableInputFile, String refTableAlias, String alias) { + int bigTblBucketNum = aliasToBucketNumber.get(refTableAlias); + int smallTblBucketNum = aliasToBucketNumber.get(alias); + Collections.sort(aliasToBucketFileNames.get(refTableAlias)); + Collections.sort(aliasToBucketFileNames.get(alias)); + + List resultFileNames = new ArrayList(); + if (bigTblBucketNum >= smallTblBucketNum) { + int temp = bigTblBucketNum / smallTblBucketNum; + int index = aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile); + int toAddSmallIndex = index/temp; + if(toAddSmallIndex < aliasToBucketFileNames.get(alias).size()) { + resultFileNames.add(new Path(aliasToBucketFileNames.get(alias).get(toAddSmallIndex))); + } + } else { + int jump = smallTblBucketNum / bigTblBucketNum; + int index = aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile); + for (int i = index; i < aliasToBucketFileNames.get(alias).size(); i = i + jump) { + if(i <= aliasToBucketFileNames.get(alias).size()) { + resultFileNames.add(new Path(aliasToBucketFileNames.get(alias).get(i))); + } + } + } + return resultFileNames; + } + + @Override + public void setAliasToBucketFileNames( + LinkedHashMap> aliasToBucketFileNames) { + this.aliasToBucketFileNames = aliasToBucketFileNames; + } + + @Override + public void setAliasToBucketNumber( + LinkedHashMap aliasToBucketNumber) { + this.aliasToBucketNumber = aliasToBucketNumber; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (revision 905742) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (working copy) @@ -23,12 +23,19 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.net.URLClassLoader; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.MapredWork; @@ -56,6 +63,8 @@ private MemoryMXBean memoryMXBean; private long numRows = 0; private long nextCntr = 1; + private String lastInputFile = null; + private MapredLocalWork localWork = null; @Override public void configure(JobConf job) { @@ -84,7 +93,7 @@ mo.initialize(jc, null); // initialize map local work - MapredLocalWork localWork = mrwork.getMapLocalWork(); + localWork = mrwork.getMapLocalWork(); if (localWork == null) { return; } @@ -127,54 +136,15 @@ rp = reporter; mo.setOutputCollector(oc); mo.setReporter(rp); - // process map local operators - if (fetchOperators != null) { - try { - MapredLocalWork localWork = mo.getConf().getMapLocalWork(); - int fetchOpNum = 0; - for (Map.Entry entry : fetchOperators - .entrySet()) { - int fetchOpRows = 0; - String alias = entry.getKey(); - FetchOperator fetchOp = entry.getValue(); - Operator forwardOp = localWork - .getAliasToWork().get(alias); - - while (true) { - InspectableObject row = fetchOp.getNextRow(); - if (row == null) { - forwardOp.close(false); - break; - } - fetchOpRows++; - forwardOp.process(row.o, 0); - // check if any operator had a fatal error or early exit during - // execution - if (forwardOp.getDone()) { - done = true; - break; - } - } - - if (l4j.isInfoEnabled()) { - l4j - .info("fetch " + fetchOpNum++ + " processed " + fetchOpRows - + " used mem: " - + memoryMXBean.getHeapMemoryUsage().getUsed()); - } - } - } catch (Throwable e) { - abort = true; - if (e instanceof OutOfMemoryError) { - // Don't create a new object if we are already out of memory - throw (OutOfMemoryError) e; - } else { - throw new RuntimeException("Map local work failed", e); - } - } - } } - + + if (localWork != null + && (this.lastInputFile == null || + (localWork.getInputFileChangeSensitive() && inputFileChanged()))) { + this.lastInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); + processMapLocalWork(localWork.getInputFileChangeSensitive()); + } + try { if (mo.getDone()) { done = true; @@ -204,6 +174,94 @@ } } + /** + * For CompbineFileInputFormat, the mapper's input file will be changed on the + * fly. If the map local work has any mapping depending on the current + * mapper's input file, the work need to clear context and re-initialization + * after the input file changed. This is first introduced to process bucket + * map join. + * + * @return + */ + private boolean inputFileChanged() { + String currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); + if (this.lastInputFile == null + || !this.lastInputFile.equals(currentInputFile)) { + return true; + } + return false; + } + + private void processMapLocalWork(boolean inputFileChangeSenstive) { + // process map local operators + if (fetchOperators != null) { + try { + int fetchOpNum = 0; + for (Map.Entry entry : fetchOperators + .entrySet()) { + int fetchOpRows = 0; + String alias = entry.getKey(); + FetchOperator fetchOp = entry.getValue(); + + if(inputFileChangeSenstive) { + fetchOp.clearFetchContext(); + setUpFetchOpContext(fetchOp, alias); + } + + Operator forwardOp = localWork + .getAliasToWork().get(alias); + + while (true) { + InspectableObject row = fetchOp.getNextRow(); + if (row == null) { + forwardOp.close(false); + break; + } + fetchOpRows++; + forwardOp.process(row.o, 0); + // check if any operator had a fatal error or early exit during + // execution + if (forwardOp.getDone()) { + done = true; + break; + } + } + + if (l4j.isInfoEnabled()) { + l4j + .info("fetch " + fetchOpNum++ + " processed " + fetchOpRows + + " used mem: " + + memoryMXBean.getHeapMemoryUsage().getUsed()); + } + } + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new RuntimeException("Map local work failed", e); + } + } + } + } + + private void setUpFetchOpContext(FetchOperator fetchOp, String alias) + throws Exception { + String currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); + Class bucketMatcherCls = this.localWork.getBucketMatcker(); + if(bucketMatcherCls == null) { + bucketMatcherCls = org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class; + } + BucketMatcher bucketMatcher = bucketMatcherCls.newInstance(); + bucketMatcher.setAliasToBucketFileNames(this.localWork.getAliasToBucketFileNames()); + bucketMatcher.setAliasToBucketNumber(this.localWork.getAliasToBucketNumber()); + List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, this.localWork.getMapJoinBigTableAlias(), alias); + Iterator iter = aliasFiles.iterator(); + fetchOp.setupContext(iter, null); + } + + private long getNextCntr(long cntr) { // A very simple counter to keep track of number of rows processed by the // reducer. It dumps Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 905742) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy) @@ -182,14 +182,18 @@ while (iterPath.hasNext()) { Path nxt = iterPath.next(); - PartitionDesc prt = iterPartDesc.next(); + PartitionDesc prt = null; + if(iterPartDesc != null) + prt = iterPartDesc.next(); FileSystem fs = nxt.getFileSystem(job); if (fs.exists(nxt)) { FileStatus[] fStats = fs.listStatus(nxt); for (FileStatus fStat : fStats) { if (fStat.getLen() > 0) { currPath = nxt; - currPart = prt; + if(iterPartDesc != null) { + currPart = prt; + } return; } } @@ -225,7 +229,7 @@ LOG.debug("Creating fetchTask with deserializer typeinfo: " + serde.getObjectInspector().getTypeName()); LOG.debug("deserializer properties: " + tmp.getProperties()); - if (!tblDataDone) { + if (currPart != null) { setPrtnDesc(); } } @@ -259,9 +263,15 @@ } } - boolean ret = currRecReader.next(key, value); + boolean ret = false; + try { + value = currRecReader.createValue(); + ret = currRecReader.next(key, value); + } catch (Exception e) { + e.printStackTrace(); + } if (ret) { - if (tblDataDone) { + if (this.currPart == null) { Object obj = serde.deserialize(value); return new InspectableObject(obj, serde.getObjectInspector()); } else { @@ -282,7 +292,7 @@ throw new IOException(e); } } - + /** * Clear the context, if anything needs to be done. * @@ -293,11 +303,32 @@ currRecReader.close(); currRecReader = null; } + this.currPath = null; + this.iterPath = null; + this.iterPartDesc = null; } catch (Exception e) { throw new HiveException("Failed with exception " + e.getMessage() + org.apache.hadoop.util.StringUtils.stringifyException(e)); } } + + /** + * used for bucket map join. there is a hack for getting partitionDesc. + * bucket map join right now only allow one partition present in bucket map join. + */ + public void setupContext (Iterator iterPath, Iterator iterPartDesc) { + this.iterPath = iterPath; + this.iterPartDesc = iterPartDesc; + if(iterPartDesc == null) { + if (work.getTblDir() != null) { + this.currTbl = work.getTblDesc(); + } else { + //hack, get the first. + List listParts = work.getPartDesc(); + currPart = listParts.get(0); + } + } + } public ObjectInspector getOutputObjectInspector() throws HiveException { try { Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (revision 0) @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; +import org.apache.hadoop.hive.ql.parse.QBJoinTree; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; + +/** + *this transformation does bucket map join optimization. + */ +public class BucketMapJoinOptimizer implements Transform { + + private static final Log LOG = LogFactory.getLog(GroupByOptimizer.class + .getName()); + + public BucketMapJoinOptimizer() { + } + + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + Map opRules = new LinkedHashMap(); + BucketMapjoinOptProcCtx bucketMapJoinOptimizeCtx = new BucketMapjoinOptProcCtx(); + + // process group-by pattern + opRules.put(new RuleRegExp("R1", "MAPJOIN%"), getBucketMapjoinProc(pctx)); + opRules.put(new RuleRegExp("R2", "RS%.*MAPJOIN"), getBucketMapjoinRejectProc(pctx)); + opRules.put(new RuleRegExp(new String("R3"), "UNION%.*MAPJOIN%"), + getBucketMapjoinRejectProc(pctx)); + opRules.put(new RuleRegExp(new String("R4"), "MAPJOIN%.*MAPJOIN%"), + getBucketMapjoinRejectProc(pctx)); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, + bucketMapJoinOptimizeCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + + return pctx; + } + + private NodeProcessor getBucketMapjoinRejectProc(ParseContext pctx) { + return new NodeProcessor () { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + MapJoinOperator mapJoinOp = (MapJoinOperator) nd; + BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx; + context.listOfRejectedMapjoins.add(mapJoinOp); + return null; + } + }; + } + + private NodeProcessor getBucketMapjoinProc(ParseContext pctx) { + return new BucketMapjoinOptProc(pctx); + } + + private NodeProcessor getDefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + return null; + } + }; + } + + class BucketMapjoinOptProc implements NodeProcessor { + + protected ParseContext pGraphContext; + + public BucketMapjoinOptProc(ParseContext pGraphContext) { + super(); + this.pGraphContext = pGraphContext; + } + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + MapJoinOperator mapJoinOp = (MapJoinOperator) nd; + BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx; + + if(context.getListOfRejectedMapjoins().contains(mapJoinOp)) + return null; + + QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext().get(mapJoinOp); + if(joinCxt == null) + return null; + + List joinAliases = new ArrayList(); + String[] srcs = joinCxt.getBaseSrc(); + String[] left = joinCxt.getLeftAliases(); + List mapAlias = joinCxt.getMapAliases(); + String baseBigAlias = null; + for(String s : left) { + if(s != null) { + joinAliases.add(s); + if(!mapAlias.contains(s)) { + baseBigAlias = s; + } + } + } + for(String s : srcs) { + if(s != null) { + joinAliases.add(s); + if(!mapAlias.contains(s)) { + baseBigAlias = s; + } + } + } + + MapJoinDesc mjDecs = mapJoinOp.getConf(); + LinkedHashMap aliasToBucketNumber = new LinkedHashMap(); + LinkedHashMap> aliasToBucketFileNames = new LinkedHashMap>(); + // right now this code does not work with "a join b on a.key = b.key and + // a.ds = b.ds", where ds is a partition column. It only works with joins + // with only one partition presents in each join source tables. + Map> topOps = this.pGraphContext.getTopOps(); + Map topToTable = this.pGraphContext.getTopToTable(); + + List bucketNumbers = new ArrayList(); + for (int index = 0; index < joinAliases.size(); index++) { + String alias = joinAliases.get(index); + TableScanOperator tso = (TableScanOperator) topOps.get(alias); + Table tbl = topToTable.get(tso); + if(tbl.isPartitioned()) { + PrunedPartitionList prunedParts = null; + try { + prunedParts = PartitionPruner.prune(tbl, pGraphContext.getOpToPartPruner().get(tso), pGraphContext.getConf(), alias, + pGraphContext.getPrunedPartitions()); + } catch (HiveException e) { + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new SemanticException(e.getMessage(), e); + } + int partNumber = prunedParts.getConfirmedPartns().size() + + prunedParts.getUnknownPartns().size(); + if(partNumber > 1) + return null; + if(partNumber == 0) { + Integer num = new Integer(0); + bucketNumbers.add(num); + aliasToBucketNumber.put(alias, num); + aliasToBucketFileNames.put(alias, new ArrayList()); + } else { + Partition part = null; + Iterator iter = prunedParts.getConfirmedPartns().iterator(); + while(iter.hasNext()) + part = iter.next(); + if(part == null) { + iter = prunedParts.getUnknownPartns().iterator(); + while(iter.hasNext()) + part = iter.next(); + } + + assert part != null; + + if (!checkBucketColumns(part.getBucketCols(), mjDecs, index)) + return null; + + Integer num = new Integer(part.getBucketCount()); + aliasToBucketNumber.put(alias, num); + List fileNames = new ArrayList(); + try { + FileSystem fs = FileSystem.get(this.pGraphContext.getConf()); + FileStatus[] files = fs.listStatus(new Path(part.getDataLocation().toString())); + if(files != null) { + for(FileStatus file : files) { + fileNames.add(file.getPath().toString()); + } + } + } catch (IOException e) { + throw new SemanticException(e); + } + aliasToBucketFileNames.put(alias, fileNames); + } + } else { + if (!checkBucketColumns(tbl.getBucketCols(), mjDecs, index)) + return null; + Integer num = new Integer(tbl.getNumBuckets()); + aliasToBucketNumber.put(alias, num); + List fileNames = new ArrayList(); + try { + FileSystem fs = FileSystem.get(this.pGraphContext.getConf()); + FileStatus[] files = fs.listStatus(new Path(tbl.getDataLocation().toString())); + if(files != null) { + for(FileStatus file : files) { + fileNames.add(file.getPath().toString()); + } + } + } catch (IOException e) { + throw new SemanticException(e); + } + aliasToBucketFileNames.put(alias, fileNames); + } + } + + // All tables or partitions are bucketed, and their bucket number is + // stored in 'bucketNumbers', we need to check if the number of buckets in + // the big table can be divided by no of buckets in small tables. + int bucketNoInBigTbl = aliasToBucketNumber.get(baseBigAlias).intValue(); + Iterator iter = aliasToBucketNumber.values().iterator(); + while(iter.hasNext()) { + int nxt = iter.next().intValue(); + boolean ok = (nxt >= bucketNoInBigTbl) ? nxt % bucketNoInBigTbl == 0 + : bucketNoInBigTbl % nxt == 0; + if(!ok) + return null; + } + MapJoinDesc desc = mapJoinOp.getConf(); + desc.setAliasToBucketFileNames(aliasToBucketFileNames); + desc.setAliasToBucketNumber(aliasToBucketNumber); + desc.setBigTableAlias(baseBigAlias); + return null; + } + + + private boolean checkBucketColumns(List bucketColumns, MapJoinDesc mjDesc, int index) { + List keys = mjDesc.getKeys().get((byte)index); + if (keys == null || bucketColumns == null || bucketColumns.size() == 0) + return false; + + List joinCols = new ArrayList(); + List joinKeys = new ArrayList(); + joinKeys.addAll(keys); + while (joinKeys.size() > 0) { + ExprNodeDesc node = joinKeys.remove(0); + if (node instanceof ExprNodeColumnDesc) { + joinCols.addAll(node.getCols()); + } else if (node instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc udfNode = ((ExprNodeGenericFuncDesc) node); + GenericUDF udf = udfNode.getGenericUDF(); + if (!FunctionRegistry.isDeterministic(udf)) { + return false; + } + joinKeys.addAll(0, udfNode.getChildExprs()); + } else { + return false; + } + } + + if (joinCols.size() == 0 || joinCols.size() != bucketColumns.size()) { + return false; + } + + for (String col : joinCols) { + if (!bucketColumns.contains(col)) + return false; + } + + return true; + } + + } + + class BucketMapjoinOptProcCtx implements NodeProcessorCtx { + // we only convert map joins that follows a root table scan in the same + // mapper. That means there is not reducer between the root table scan and + // mapjoin. + Set listOfRejectedMapjoins = new HashSet(); + + public Set getListOfRejectedMapjoins() { + return listOfRejectedMapjoins; + } + + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 905742) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -194,6 +194,7 @@ } setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc); + setupBucketMapJoinInfo(plan, currMapJoinOp); } else { initUnionPlan(opProcCtx, currTask, false); } @@ -215,6 +216,7 @@ seenOps.add(currTopOp); boolean local = (pos == desc.getPosBigTable()) ? false : true; setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx); + setupBucketMapJoinInfo(plan, (MapJoinOperator)op); } opProcCtx.setCurrTask(currTask); @@ -222,6 +224,19 @@ opProcCtx.setCurrAliasId(null); } + private static void setupBucketMapJoinInfo(MapredWork plan, + MapJoinOperator currMapJoinOp) { + MapredLocalWork localPlan = plan.getMapLocalWork(); + if (localPlan != null) { + if(currMapJoinOp.getConf().getAliasToBucketFileNames() != null) { + localPlan.setAliasToBucketFileNames(currMapJoinOp.getConf().getAliasToBucketFileNames()); + localPlan.setAliasToBucketNumber(currMapJoinOp.getConf().getAliasToBucketNumber()); + localPlan.setInputFileChangeSensitive(true); + localPlan.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias()); + } + } + } + /** * Initialize the current union plan. * @@ -370,6 +385,7 @@ boolean local = ((pos == -1) || (pos == (mjOp.getConf()) .getPosBigTable())) ? false : true; setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc); + setupBucketMapJoinInfo(plan, oldMapJoin); } opProcCtx.setCurrMapJoinOp(null); @@ -805,6 +821,7 @@ opProcCtx.setMapJoinCtx(mjOp, mjCtx); opProcCtx.getMapCurrCtx().put(parent, new GenMapRedCtx(childTask, null, null)); + setupBucketMapJoinInfo(cplan, mjOp); } currTopOp = null; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (revision 905742) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (working copy) @@ -367,6 +367,11 @@ // traverse all the joins and convert them if necessary if (pGraphContext.getJoinContext() != null) { Map joinMap = new HashMap(); + Map mapJoinMap = pGraphContext.getMapJoinContext(); + if(mapJoinMap == null) { + mapJoinMap = new HashMap (); + pGraphContext.setMapJoinContext(mapJoinMap); + } Set> joinCtx = pGraphContext .getJoinContext().entrySet(); @@ -378,7 +383,9 @@ QBJoinTree qbJoin = joinEntry.getValue(); int mapJoinPos = mapSideJoin(joinOp, qbJoin); if (mapJoinPos >= 0) { - listMapJoinOps.add(convertMapJoin(pactx, joinOp, qbJoin, mapJoinPos)); + MapJoinOperator mapJoinOp = convertMapJoin(pactx, joinOp, qbJoin, mapJoinPos); + listMapJoinOps.add(mapJoinOp); + mapJoinMap.put(mapJoinOp, qbJoin); } else { joinMap.put(joinOp, qbJoin); } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 905742) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy) @@ -56,6 +56,9 @@ transformations.add(new SamplePruner()); transformations.add(new MapJoinProcessor()); + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) { + transformations.add(new BucketMapJoinOptimizer()); + } transformations.add(new UnionProcessor()); transformations.add(new JoinReorder()); } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 905742) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy) @@ -58,6 +58,7 @@ private HashMap> topSelOps; private LinkedHashMap, OpParseContext> opParseCtx; private Map joinContext; + private Map mapJoinContext; private HashMap topToTable; private List loadTableWork; private List loadFileWork; @@ -439,4 +440,12 @@ Map prunedPartitions) { this.prunedPartitions = prunedPartitions; } + + public Map getMapJoinContext() { + return mapJoinContext; + } + + public void setMapJoinContext(Map mapJoinContext) { + this.mapJoinContext = mapJoinContext; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (revision 905742) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (working copy) @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,6 +43,10 @@ private int posBigTable; private Map> retainList; + + private transient LinkedHashMap aliasToBucketNumber; + private transient LinkedHashMap> aliasToBucketFileNames; + private transient String bigTableAlias; public MapJoinDesc() { } @@ -141,4 +146,47 @@ public void setValueTblDescs(List valueTblDescs) { this.valueTblDescs = valueTblDescs; } + + /** + * @return aliasToBucketNumber + */ + public LinkedHashMap getAliasToBucketNumber() { + return aliasToBucketNumber; + } + + /** + * @param aliasToBucketNumber + */ + public void setAliasToBucketNumber(LinkedHashMap aliasToBucketNumber) { + this.aliasToBucketNumber = aliasToBucketNumber; + } + + /** + * @return aliasToBucketFileNames + */ + public LinkedHashMap> getAliasToBucketFileNames() { + return aliasToBucketFileNames; + } + + /** + * @param aliasToBucketFileNames + */ + public void setAliasToBucketFileNames( + LinkedHashMap> aliasToBucketFileNames) { + this.aliasToBucketFileNames = aliasToBucketFileNames; + } + + /** + * @return bigTableAlias + */ + public String getBigTableAlias() { + return bigTableAlias; + } + + /** + * @param bigTableAlias + */ + public void setBigTableAlias(String bigTableAlias) { + this.bigTableAlias = bigTableAlias; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (revision 905742) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (working copy) @@ -19,9 +19,12 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.BucketMatcher; @Explain(displayName = "Map Reduce Local Work") public class MapredLocalWork implements Serializable { @@ -29,7 +32,15 @@ private LinkedHashMap> aliasToWork; private LinkedHashMap aliasToFetchWork; + private boolean inputFileChangeSensitive; + + // used for bucket map join + private LinkedHashMap aliasToBucketNumber; + private LinkedHashMap> aliasToBucketFileNames; + private String mapJoinBigTableAlias; + private Class bucketMatcker; + public MapredLocalWork() { } @@ -66,4 +77,46 @@ final LinkedHashMap aliasToFetchWork) { this.aliasToFetchWork = aliasToFetchWork; } + + public boolean getInputFileChangeSensitive() { + return inputFileChangeSensitive; + } + + public void setInputFileChangeSensitive(boolean inputFileChangeSensitive) { + this.inputFileChangeSensitive = inputFileChangeSensitive; + } + + public LinkedHashMap getAliasToBucketNumber() { + return aliasToBucketNumber; + } + + public void setAliasToBucketNumber(LinkedHashMap aliasToBucketNumber) { + this.aliasToBucketNumber = aliasToBucketNumber; + } + + public LinkedHashMap> getAliasToBucketFileNames() { + return aliasToBucketFileNames; + } + + public void setAliasToBucketFileNames( + LinkedHashMap> aliasToBucketFileNames) { + this.aliasToBucketFileNames = aliasToBucketFileNames; + } + + public void setMapJoinBigTableAlias(String bigTableAlias) { + this.mapJoinBigTableAlias = bigTableAlias; + } + + public String getMapJoinBigTableAlias() { + return mapJoinBigTableAlias; + } + + public Class getBucketMatcker() { + return bucketMatcker; + } + + public void setBucketMatcker(Class bucketMatcker) { + this.bucketMatcker = bucketMatcker; + } + } Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 905742) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -237,7 +237,7 @@ String warehousePath = ((new URI(testWarehouse)).getPath()); // Drop any tables that remain due to unsuccessful runs for (String s : new String[] { "src", "src1", "src_json", "src_thrift", - "src_sequencefile", "srcpart", "srcbucket", "srcbucket2", "dest1", + "src_sequencefile", "srcpart", "srcbucket", "srcbucket2","srcbucket_part", "srcbucket2_part", "dest1", "dest2", "dest3", "dest4", "dest4_sequencefile", "dest_j1", "dest_j2", "dest_g1", "dest_g2", "fetchtask_ioexception" }) { db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, s); @@ -318,19 +318,39 @@ + "' INTO TABLE srcbucket"); } - runCreateTableCmd("CREATE TABLE srcbucket2(key int, value string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE"); + runCreateTableCmd("CREATE TABLE srcbucket2(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE"); // db.createTable("srcbucket", cols, null, TextInputFormat.class, // IgnoreKeyTextOutputFormat.class, 2, bucketCols); srcTables.add("srcbucket2"); + for (String fname : new String[] { "srcbucket20.txt", "srcbucket21.txt"}) { + fpath = new Path(testFiles, fname); + newfpath = new Path(tmppath, fname); + fs.copyFromLocalFile(false, true, fpath, newfpath); + runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() + + "' INTO TABLE srcbucket2"); + } + + runCreateTableCmd("CREATE TABLE srcbucket_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE"); + srcTables.add("srcbucket_part"); for (String fname : new String[] { "srcbucket20.txt", "srcbucket21.txt", "srcbucket22.txt", "srcbucket23.txt" }) { fpath = new Path(testFiles, fname); newfpath = new Path(tmppath, fname); fs.copyFromLocalFile(false, true, fpath, newfpath); runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() - + "' INTO TABLE srcbucket2"); + + "' INTO TABLE srcbucket_part partition(ds='2008-04-08')"); } - + + runCreateTableCmd("CREATE TABLE srcbucket2_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE"); + srcTables.add("srcbucket2_part"); + for (String fname : new String[] { "srcbucket22.txt", "srcbucket23.txt" }) { + fpath = new Path(testFiles, fname); + newfpath = new Path(tmppath, fname); + fs.copyFromLocalFile(false, true, fpath, newfpath); + runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() + + "' INTO TABLE srcbucket2_part partition(ds='2008-04-08')"); + } + for (String tname : new String[] { "src", "src1" }) { db.createTable(tname, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); Index: ql/src/test/queries/clientpositive/bucketmapjoin.q =================================================================== --- ql/src/test/queries/clientpositive/bucketmapjoin.q (revision 0) +++ ql/src/test/queries/clientpositive/bucketmapjoin.q (revision 0) @@ -0,0 +1,24 @@ +set hive.optimize.bucketmapjoin = true; + +create table bucketmapjoin_tmp_result (key string , value1 string, value2 string); +insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket_part b on a.key=b.key where b.ds="2008-04-08"; +select count(1) from bucketmapjoin_tmp_result; +insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket_part b on a.key=b.key where b.ds="2008-04-08"; +select count(1) from bucketmapjoin_tmp_result; + +insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2_part b on a.key=b.key and b.ds="2008-04-08"; +select count(1) from bucketmapjoin_tmp_result; +insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2_part b on a.key=b.key and b.ds="2008-04-08"; +select count(1) from bucketmapjoin_tmp_result; + +insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2_part a join srcbucket_part b on a.key=b.key and b.ds="2008-04-08" and a.ds="2008-04-08"; +select count(1) from bucketmapjoin_tmp_result; +insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2_part a join srcbucket_part b on a.key=b.key and b.ds="2008-04-08" and a.ds="2008-04-08"; +select count(1) from bucketmapjoin_tmp_result; + +insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2 b on a.key=b.key; +select count(1) from bucketmapjoin_tmp_result; +insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2 b on a.key=b.key; +select count(1) from bucketmapjoin_tmp_result; + +drop table bucketmapjoin_tmp_result; \ No newline at end of file Index: ql/src/test/results/clientpositive/bucketmapjoin.q.out =================================================================== --- ql/src/test/results/clientpositive/bucketmapjoin.q.out (revision 0) +++ ql/src/test/results/clientpositive/bucketmapjoin.q.out (revision 0) @@ -0,0 +1,158 @@ +PREHOOK: query: create table bucketmapjoin_tmp_result (key string , value1 string, value2 string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table bucketmapjoin_tmp_result (key string , value1 string, value2 string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@bucketmapjoin_tmp_result +PREHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket_part b on a.key=b.key where b.ds="2008-04-08" +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_part@ds=2008-04-08 +PREHOOK: Input: default@srcbucket2 +PREHOOK: Output: default@bucketmapjoin_tmp_result +POSTHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket_part b on a.key=b.key where b.ds="2008-04-08" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_part@ds=2008-04-08 +POSTHOOK: Input: default@srcbucket2 +POSTHOOK: Output: default@bucketmapjoin_tmp_result +PREHOOK: query: select count(1) from bucketmapjoin_tmp_result +PREHOOK: type: QUERY +PREHOOK: Input: default@bucketmapjoin_tmp_result +PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-35-28_018_9106225217914859893/10000 +POSTHOOK: query: select count(1) from bucketmapjoin_tmp_result +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucketmapjoin_tmp_result +POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-35-28_018_9106225217914859893/10000 +464 +PREHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket_part b on a.key=b.key where b.ds="2008-04-08" +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_part@ds=2008-04-08 +PREHOOK: Input: default@srcbucket2 +PREHOOK: Output: default@bucketmapjoin_tmp_result +POSTHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket_part b on a.key=b.key where b.ds="2008-04-08" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_part@ds=2008-04-08 +POSTHOOK: Input: default@srcbucket2 +POSTHOOK: Output: default@bucketmapjoin_tmp_result +PREHOOK: query: select count(1) from bucketmapjoin_tmp_result +PREHOOK: type: QUERY +PREHOOK: Input: default@bucketmapjoin_tmp_result +PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-35-46_589_5242898573727592443/10000 +POSTHOOK: query: select count(1) from bucketmapjoin_tmp_result +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucketmapjoin_tmp_result +POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-35-46_589_5242898573727592443/10000 +464 +PREHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2_part b on a.key=b.key and b.ds="2008-04-08" +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket2_part@ds=2008-04-08 +PREHOOK: Input: default@srcbucket2 +PREHOOK: Output: default@bucketmapjoin_tmp_result +POSTHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2_part b on a.key=b.key and b.ds="2008-04-08" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket2_part@ds=2008-04-08 +POSTHOOK: Input: default@srcbucket2 +POSTHOOK: Output: default@bucketmapjoin_tmp_result +PREHOOK: query: select count(1) from bucketmapjoin_tmp_result +PREHOOK: type: QUERY +PREHOOK: Input: default@bucketmapjoin_tmp_result +PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-36-01_063_8545422721727445090/10000 +POSTHOOK: query: select count(1) from bucketmapjoin_tmp_result +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucketmapjoin_tmp_result +POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-36-01_063_8545422721727445090/10000 +0 +PREHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2_part b on a.key=b.key and b.ds="2008-04-08" +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket2_part@ds=2008-04-08 +PREHOOK: Input: default@srcbucket2 +PREHOOK: Output: default@bucketmapjoin_tmp_result +POSTHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2_part b on a.key=b.key and b.ds="2008-04-08" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket2_part@ds=2008-04-08 +POSTHOOK: Input: default@srcbucket2 +POSTHOOK: Output: default@bucketmapjoin_tmp_result +PREHOOK: query: select count(1) from bucketmapjoin_tmp_result +PREHOOK: type: QUERY +PREHOOK: Input: default@bucketmapjoin_tmp_result +PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-36-19_946_1253212524508388244/10000 +POSTHOOK: query: select count(1) from bucketmapjoin_tmp_result +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucketmapjoin_tmp_result +POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-36-19_946_1253212524508388244/10000 +0 +PREHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2_part a join srcbucket_part b on a.key=b.key and b.ds="2008-04-08" and a.ds="2008-04-08" +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_part@ds=2008-04-08 +PREHOOK: Input: default@srcbucket2_part@ds=2008-04-08 +PREHOOK: Output: default@bucketmapjoin_tmp_result +POSTHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2_part a join srcbucket_part b on a.key=b.key and b.ds="2008-04-08" and a.ds="2008-04-08" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_part@ds=2008-04-08 +POSTHOOK: Input: default@srcbucket2_part@ds=2008-04-08 +POSTHOOK: Output: default@bucketmapjoin_tmp_result +PREHOOK: query: select count(1) from bucketmapjoin_tmp_result +PREHOOK: type: QUERY +PREHOOK: Input: default@bucketmapjoin_tmp_result +PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-36-35_996_8426260759503622800/10000 +POSTHOOK: query: select count(1) from bucketmapjoin_tmp_result +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucketmapjoin_tmp_result +POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-36-35_996_8426260759503622800/10000 +564 +PREHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2_part a join srcbucket_part b on a.key=b.key and b.ds="2008-04-08" and a.ds="2008-04-08" +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_part@ds=2008-04-08 +PREHOOK: Input: default@srcbucket2_part@ds=2008-04-08 +PREHOOK: Output: default@bucketmapjoin_tmp_result +POSTHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2_part a join srcbucket_part b on a.key=b.key and b.ds="2008-04-08" and a.ds="2008-04-08" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_part@ds=2008-04-08 +POSTHOOK: Input: default@srcbucket2_part@ds=2008-04-08 +POSTHOOK: Output: default@bucketmapjoin_tmp_result +PREHOOK: query: select count(1) from bucketmapjoin_tmp_result +PREHOOK: type: QUERY +PREHOOK: Input: default@bucketmapjoin_tmp_result +PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-36-54_566_7731753793291620492/10000 +POSTHOOK: query: select count(1) from bucketmapjoin_tmp_result +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucketmapjoin_tmp_result +POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-36-54_566_7731753793291620492/10000 +564 +PREHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2 b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket2 +PREHOOK: Output: default@bucketmapjoin_tmp_result +POSTHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(b)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2 b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket2 +POSTHOOK: Output: default@bucketmapjoin_tmp_result +PREHOOK: query: select count(1) from bucketmapjoin_tmp_result +PREHOOK: type: QUERY +PREHOOK: Input: default@bucketmapjoin_tmp_result +PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-37-11_525_7736517641468045028/10000 +POSTHOOK: query: select count(1) from bucketmapjoin_tmp_result +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucketmapjoin_tmp_result +POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-37-11_525_7736517641468045028/10000 +464 +PREHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2 b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket2 +PREHOOK: Output: default@bucketmapjoin_tmp_result +POSTHOOK: query: insert overwrite table bucketmapjoin_tmp_result select /*+mapjoin(a)*/ a.key, a.value, b.value from srcbucket2 a join srcbucket2 b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket2 +POSTHOOK: Output: default@bucketmapjoin_tmp_result +PREHOOK: query: select count(1) from bucketmapjoin_tmp_result +PREHOOK: type: QUERY +PREHOOK: Input: default@bucketmapjoin_tmp_result +PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-37-26_545_95616668364138013/10000 +POSTHOOK: query: select count(1) from bucketmapjoin_tmp_result +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucketmapjoin_tmp_result +POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-trunk/build/ql/scratchdir/hive_2010-02-03_00-37-26_545_95616668364138013/10000 +464 +PREHOOK: query: drop table bucketmapjoin_tmp_result +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table bucketmapjoin_tmp_result +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@bucketmapjoin_tmp_result