Index: ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (revision 950364) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (working copy) @@ -34,8 +34,8 @@ import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -139,7 +139,7 @@ .get(entry.getKey()); // All the operators need to be initialized before process forwardOp.setExecContext(this.getExecContext()); - forwardOp.initialize(this.getExecContext().jc, new ObjectInspector[] {entry.getValue() + forwardOp.initialize(this.getExecContext().getJc(), new ObjectInspector[] {entry.getValue() .getOutputObjectInspector()}); l4j.info("fetchoperator for " + entry.getKey() + " initialized"); } @@ -148,20 +148,21 @@ @Override public void processOp(Object row, int tag) throws HiveException { - if (this.getExecContext().inputFileChanged) { - if(firstFetchHappened) { - //we need to first join and flush out data left by the previous file. - joinFinalLeftData(); + if (tag == posBigTable) { + if (this.getExecContext().inputFileChanged()) { + if (firstFetchHappened) { + // we need to first join and flush out data left by the previous file. + joinFinalLeftData(); + } + // set up the fetch operator for the new input file. + for (Map.Entry entry : fetchOperators.entrySet()) { + String alias = entry.getKey(); + FetchOperator fetchOp = entry.getValue(); + fetchOp.clearFetchContext(); + setUpFetchOpContext(fetchOp, alias); + } + firstFetchHappened = false; } - //set up the fetch operator for the new input file. - for (Map.Entry entry : fetchOperators.entrySet()) { - String alias = entry.getKey(); - FetchOperator fetchOp = entry.getValue(); - fetchOp.clearFetchContext(); - setUpFetchOpContext(fetchOp, alias); - } - this.getExecContext().inputFileChanged = false; - firstFetchHappened = false; } if (!firstFetchHappened) { @@ -423,7 +424,7 @@ } private void setUpFetchOpContext(FetchOperator fetchOp, String alias) { - String currentInputFile = this.getExecContext().currentInputFile; + String currentInputFile = this.getExecContext().getCurrentInputFile(); BucketMapJoinContext bucketMatcherCxt = this.localWork .getBucketMapjoinContext(); Class bucketMatcherCls = bucketMatcherCxt @@ -479,7 +480,7 @@ } closeCalled = true; - if ((this.getExecContext() != null && this.getExecContext().inputFileChanged) + if ((this.getExecContext() != null && this.getExecContext().inputFileChanged()) || !firstFetchHappened) { //set up the fetch operator for the new input file. for (Map.Entry entry : fetchOperators.entrySet()) { @@ -488,7 +489,6 @@ fetchOp.clearFetchContext(); setUpFetchOpContext(fetchOp, alias); } - this.getExecContext().inputFileChanged = false; firstFetchHappened = true; for (Byte t : order) { if(t != (byte)posBigTable) { @@ -519,6 +519,7 @@ } } + @Override protected boolean allInitializedParentsAreClosed() { return true; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (revision 950364) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (working copy) @@ -25,19 +25,13 @@ import java.net.URLClassLoader; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; -import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -45,7 +39,6 @@ import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; /** @@ -64,48 +57,13 @@ private static boolean done; // used to log memory usage periodically - private MemoryMXBean memoryMXBean; + public static MemoryMXBean memoryMXBean; private long numRows = 0; private long nextCntr = 1; - private String lastInputFile = null; private MapredLocalWork localWork = null; - private ExecMapperContext execContext = new ExecMapperContext(); + private final ExecMapperContext execContext = new ExecMapperContext(); - public static class ExecMapperContext { - boolean inputFileChanged = false; - String currentInputFile; - Integer fileId = new Integer(-1); - - JobConf jc; - public boolean isInputFileChanged() { - return inputFileChanged; - } - public void setInputFileChanged(boolean inputFileChanged) { - this.inputFileChanged = inputFileChanged; - } - public String getCurrentInputFile() { - return currentInputFile; - } - public void setCurrentInputFile(String currentInputFile) { - this.currentInputFile = currentInputFile; - } - public JobConf getJc() { - return jc; - } - public void setJc(JobConf jc) { - this.jc = jc; - } - - public Integer getFileId() { - return fileId; - } - public void setFileId(Integer fileId) { - this.fileId = fileId; - } - - } - @Override public void configure(JobConf job) { // Allocate the bean at the beginning - @@ -123,7 +81,7 @@ } try { jc = job; - execContext.jc = jc; + execContext.setJc(jc); // create map and fetch operators MapredWork mrwork = Utilities.getMapRedWork(job); mo = new MapOperator(); @@ -158,6 +116,8 @@ .getOutputObjectInspector()}); l4j.info("fetchoperator for " + entry.getKey() + " initialized"); } + this.execContext.setLocalWork(localWork); + this.execContext.setFetchOperators(fetchOperators); // defer processing of map local operators to first row if in case there // is no input (??) } catch (Throwable e) { @@ -181,15 +141,9 @@ mo.setOutputCollector(oc); mo.setReporter(rp); } + // reset the execContext for each new row + execContext.resetRow(); - if(inputFileChanged()) { - if (this.localWork != null - && (localWork.getInputFileChangeSensitive() || this.lastInputFile == null)) { - processMapLocalWork(localWork.getInputFileChangeSensitive()); - } - this.lastInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); - } - try { if (mo.getDone()) { done = true; @@ -219,97 +173,7 @@ } } - /** - * For CompbineFileInputFormat, the mapper's input file will be changed on the - * fly. If the map local work has any mapping depending on the current - * mapper's input file, the work need to clear context and re-initialization - * after the input file changed. This is first introduced to process bucket - * map join. - * - * @return - */ - private boolean inputFileChanged() { - String currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); - execContext.currentInputFile = currentInputFile; - if (this.lastInputFile == null - || !this.lastInputFile.equals(currentInputFile)) { - execContext.inputFileChanged = true; - return true; - } - execContext.inputFileChanged = false; - return false; - } - private void processMapLocalWork(boolean inputFileChangeSenstive) { - // process map local operators - if (fetchOperators != null) { - try { - int fetchOpNum = 0; - for (Map.Entry entry : fetchOperators - .entrySet()) { - int fetchOpRows = 0; - String alias = entry.getKey(); - FetchOperator fetchOp = entry.getValue(); - - if(inputFileChangeSenstive) { - fetchOp.clearFetchContext(); - setUpFetchOpContext(fetchOp, alias); - } - - Operator forwardOp = localWork - .getAliasToWork().get(alias); - - while (true) { - InspectableObject row = fetchOp.getNextRow(); - if (row == null) { - forwardOp.close(false); - break; - } - fetchOpRows++; - forwardOp.process(row.o, 0); - // check if any operator had a fatal error or early exit during - // execution - if (forwardOp.getDone()) { - done = true; - break; - } - } - - if (l4j.isInfoEnabled()) { - l4j - .info("fetch " + fetchOpNum++ + " processed " + fetchOpRows - + " used mem: " - + memoryMXBean.getHeapMemoryUsage().getUsed()); - } - } - } catch (Throwable e) { - abort = true; - if (e instanceof OutOfMemoryError) { - // Don't create a new object if we are already out of memory - throw (OutOfMemoryError) e; - } else { - throw new RuntimeException("Hive Runtime Error: Map local work failed", e); - } - } - } - } - - private void setUpFetchOpContext(FetchOperator fetchOp, String alias) - throws Exception { - String currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); - BucketMapJoinContext bucketMatcherCxt = this.localWork.getBucketMapjoinContext(); - Class bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass(); - BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance(bucketMatcherCls, null); - bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt.getAliasBucketFileNameMapping()); - - List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, - bucketMatcherCxt.getMapJoinBigTableAlias(), - alias); - Iterator iter = aliasFiles.iterator(); - fetchOp.setupContext(iter, null); - } - - private long getNextCntr(long cntr) { // A very simple counter to keep track of number of rows processed by the // reducer. It dumps @@ -331,7 +195,6 @@ // detecting failed executions by exceptions thrown by the operator tree // ideally hadoop should let us know whether map execution failed or not try { - inputFileChanged(); mo.close(abort); if (fetchOperators != null) { MapredLocalWork localWork = mo.getConf().getMapLocalWork(); @@ -364,6 +227,18 @@ return done; } + public boolean isAbort() { + return abort; + } + + public void setAbort(boolean abort) { + this.abort = abort; + } + + public static void setDone(boolean done) { + ExecMapper.done = done; + } + /** * reportStats. * Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (revision 950364) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (working copy) @@ -160,6 +160,11 @@ @Override public void processOp(Object row, int tag) throws HiveException { + + if (tag == posBigTable && this.getExecContext().getLastInputFile() == null) { + this.getExecContext().processInputFileChangeForLocalWork(); + } + try { // get alias alias = (byte) tag; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 950364) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -29,7 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.ExecMapper.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.ExecMapperContext; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.Explain; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java (revision 0) @@ -0,0 +1,192 @@ +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; + +public class ExecMapperContext { + + public static final Log l4j = ExecMapper.l4j; + + // lastInputFile should be changed by the root of the operator tree ExecMapper.map() + // but kept unchanged throughout the operator tree for one row + private String lastInputFile = null; + + // currentInputFile will be updated only by inputFileChanged(). If inputFileChanged() + // is not called throughout the opertor tree, currentInputFile won't be used anyways + // so it won't be updated. + private String currentInputFile = null; + private boolean inputFileChecked = false; + private Integer fileId = new Integer(-1); + private MapredLocalWork localWork = null; + private Map fetchOperators; + private JobConf jc; + + public ExecMapperContext() { + } + + public void processInputFileChangeForLocalWork() throws HiveException { + // put inputFileChanged() after localWork check + if (this.localWork != null && inputFileChanged()) { + processMapLocalWork(localWork.getInputFileChangeSensitive()); + } + } + + + /** + * For CompbineFileInputFormat, the mapper's input file will be changed on the + * fly, and the input file name is passed to jobConf by shims/initNextRecordReader. + * If the map local work has any mapping depending on the current + * mapper's input file, the work need to clear context and re-initialization + * after the input file changed. This is first introduced to process bucket + * map join. + * + * @return + */ + public boolean inputFileChanged() { + if (!inputFileChecked) { + currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); + } + return lastInputFile == null || !lastInputFile.equals(currentInputFile); + } + + /** + * Reset the execution context for each new row. This function should be called only + * once at the root of the operator tree -- ExecMapper.map(). + * Note: this function should be kept minimum since it is called for each input row. + */ + public void resetRow() { + // Update the lastInputFile with the currentInputFile. + lastInputFile = currentInputFile; + inputFileChecked = false; + } + + public String getLastInputFile() { + return lastInputFile; + } + + public void setLastInputFile(String lastInputFile) { + this.lastInputFile = lastInputFile; + } + + private void processMapLocalWork(boolean inputFileChangeSenstive) throws HiveException { + // process map local operators + if (fetchOperators != null) { + try { + int fetchOpNum = 0; + for (Map.Entry entry : fetchOperators.entrySet()) { + int fetchOpRows = 0; + String alias = entry.getKey(); + FetchOperator fetchOp = entry.getValue(); + + if (inputFileChangeSenstive) { + fetchOp.clearFetchContext(); + setUpFetchOpContext(fetchOp, alias); + } + + Operator forwardOp = localWork + .getAliasToWork().get(alias); + + while (true) { + InspectableObject row = fetchOp.getNextRow(); + if (row == null) { + forwardOp.close(false); + break; + } + fetchOpRows++; + forwardOp.process(row.o, 0); + // check if any operator had a fatal error or early exit during + // execution + if (forwardOp.getDone()) { + ExecMapper.setDone(true); + break; + } + } + + if (l4j.isInfoEnabled()) { + l4j.info("fetch " + fetchOpNum++ + " processed " + fetchOpRows + + " used mem: " + + ExecMapper.memoryMXBean.getHeapMemoryUsage().getUsed()); + } + } + } catch (Throwable e) { + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new HiveException( + "Hive Runtime Error: Map local work failed", e); + } + } + } + } + + private void setUpFetchOpContext(FetchOperator fetchOp, String alias) + throws Exception { + String currentInputFile = HiveConf.getVar(jc, + HiveConf.ConfVars.HADOOPMAPFILENAME); + BucketMapJoinContext bucketMatcherCxt = this.localWork + .getBucketMapjoinContext(); + Class bucketMatcherCls = bucketMatcherCxt + .getBucketMatcherClass(); + BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance( + bucketMatcherCls, null); + bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt + .getAliasBucketFileNameMapping()); + + List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, + bucketMatcherCxt.getMapJoinBigTableAlias(), alias); + Iterator iter = aliasFiles.iterator(); + fetchOp.setupContext(iter, null); + } + + public String getCurrentInputFile() { + return currentInputFile; + } + + public void setCurrentInputFile(String currentInputFile) { + this.currentInputFile = currentInputFile; + } + + public JobConf getJc() { + return jc; + } + public void setJc(JobConf jc) { + this.jc = jc; + } + + public MapredLocalWork getLocalWork() { + return localWork; + } + + public void setLocalWork(MapredLocalWork localWork) { + this.localWork = localWork; + } + + public Integer getFileId() { + return fileId; + } + + public void setFileId(Integer fileId) { + this.fileId = fileId; + } + + public Map getFetchOperators() { + return fetchOperators; + } + + public void setFetchOperators(Map fetchOperators) { + this.fetchOperators = fetchOperators; + } +}