Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (revision 946374) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (working copy) @@ -64,48 +64,13 @@ private static boolean done; // used to log memory usage periodically - private MemoryMXBean memoryMXBean; + public static MemoryMXBean memoryMXBean; private long numRows = 0; private long nextCntr = 1; - private String lastInputFile = null; private MapredLocalWork localWork = null; private ExecMapperContext execContext = new ExecMapperContext(); - public static class ExecMapperContext { - boolean inputFileChanged = false; - String currentInputFile; - Integer fileId = new Integer(-1); - - JobConf jc; - public boolean isInputFileChanged() { - return inputFileChanged; - } - public void setInputFileChanged(boolean inputFileChanged) { - this.inputFileChanged = inputFileChanged; - } - public String getCurrentInputFile() { - return currentInputFile; - } - public void setCurrentInputFile(String currentInputFile) { - this.currentInputFile = currentInputFile; - } - public JobConf getJc() { - return jc; - } - public void setJc(JobConf jc) { - this.jc = jc; - } - - public Integer getFileId() { - return fileId; - } - public void setFileId(Integer fileId) { - this.fileId = fileId; - } - - } - @Override public void configure(JobConf job) { // Allocate the bean at the beginning - @@ -158,6 +123,8 @@ .getOutputObjectInspector()}); l4j.info("fetchoperator for " + entry.getKey() + " initialized"); } + this.execContext.setLocalWork(localWork); + this.execContext.setFetchOperators(fetchOperators); // defer processing of map local operators to first row if in case there // is no input (??) } catch (Throwable e) { @@ -182,14 +149,6 @@ mo.setReporter(rp); } - if(inputFileChanged()) { - if (this.localWork != null - && (localWork.getInputFileChangeSensitive() || this.lastInputFile == null)) { - processMapLocalWork(localWork.getInputFileChangeSensitive()); - } - this.lastInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); - } - try { if (mo.getDone()) { done = true; @@ -219,96 +178,6 @@ } } - /** - * For CompbineFileInputFormat, the mapper's input file will be changed on the - * fly. If the map local work has any mapping depending on the current - * mapper's input file, the work need to clear context and re-initialization - * after the input file changed. This is first introduced to process bucket - * map join. - * - * @return - */ - private boolean inputFileChanged() { - String currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); - execContext.currentInputFile = currentInputFile; - if (this.lastInputFile == null - || !this.lastInputFile.equals(currentInputFile)) { - execContext.inputFileChanged = true; - return true; - } - execContext.inputFileChanged = false; - return false; - } - - private void processMapLocalWork(boolean inputFileChangeSenstive) { - // process map local operators - if (fetchOperators != null) { - try { - int fetchOpNum = 0; - for (Map.Entry entry : fetchOperators - .entrySet()) { - int fetchOpRows = 0; - String alias = entry.getKey(); - FetchOperator fetchOp = entry.getValue(); - - if(inputFileChangeSenstive) { - fetchOp.clearFetchContext(); - setUpFetchOpContext(fetchOp, alias); - } - - Operator forwardOp = localWork - .getAliasToWork().get(alias); - - while (true) { - InspectableObject row = fetchOp.getNextRow(); - if (row == null) { - forwardOp.close(false); - break; - } - fetchOpRows++; - forwardOp.process(row.o, 0); - // check if any operator had a fatal error or early exit during - // execution - if (forwardOp.getDone()) { - done = true; - break; - } - } - - if (l4j.isInfoEnabled()) { - l4j - .info("fetch " + fetchOpNum++ + " processed " + fetchOpRows - + " used mem: " - + memoryMXBean.getHeapMemoryUsage().getUsed()); - } - } - } catch (Throwable e) { - abort = true; - if (e instanceof OutOfMemoryError) { - // Don't create a new object if we are already out of memory - throw (OutOfMemoryError) e; - } else { - throw new RuntimeException("Hive Runtime Error: Map local work failed", e); - } - } - } - } - - private void setUpFetchOpContext(FetchOperator fetchOp, String alias) - throws Exception { - String currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); - BucketMapJoinContext bucketMatcherCxt = this.localWork.getBucketMapjoinContext(); - Class bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass(); - BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance(bucketMatcherCls, null); - bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt.getAliasBucketFileNameMapping()); - - List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, - bucketMatcherCxt.getMapJoinBigTableAlias(), - alias); - Iterator iter = aliasFiles.iterator(); - fetchOp.setupContext(iter, null); - } - private long getNextCntr(long cntr) { // A very simple counter to keep track of number of rows processed by the @@ -331,7 +200,7 @@ // detecting failed executions by exceptions thrown by the operator tree // ideally hadoop should let us know whether map execution failed or not try { - inputFileChanged(); + this.execContext.checkForInputFileChanged(); mo.close(abort); if (fetchOperators != null) { MapredLocalWork localWork = mo.getConf().getMapLocalWork(); @@ -363,6 +232,18 @@ public static boolean getDone() { return done; } + + public boolean isAbort() { + return abort; + } + + public void setAbort(boolean abort) { + this.abort = abort; + } + + public static void setDone(boolean done) { + ExecMapper.done = done; + } /** * reportStats. Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java (revision 0) @@ -0,0 +1,192 @@ +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; + +public class ExecMapperContext { + + public static final Log l4j = ExecMapper.l4j; + + boolean inputFileChanged = false; + String currentInputFile; + Integer fileId = new Integer(-1); + + private String lastInputFile = null; + + private MapredLocalWork localWork = null; + private Map fetchOperators; + JobConf jc; + + public ExecMapperContext() { + } + + public void processInputFileChangeForLocalWork() throws HiveException { + // put inputFileChanged() after localWork check + // and check for 'lastInputFile == null' + if (this.localWork != null && checkForInputFileChanged()) { + processMapLocalWork(localWork.getInputFileChangeSensitive()); + } + } + + + /** + * For CompbineFileInputFormat, the mapper's input file will be changed on the + * fly. If the map local work has any mapping depending on the current + * mapper's input file, the work need to clear context and re-initialization + * after the input file changed. This is first introduced to process bucket + * map join. + * + * @return + */ + public boolean checkForInputFileChanged() { + String currentInputFile = HiveConf.getVar(jc, + HiveConf.ConfVars.HADOOPMAPFILENAME); + this.currentInputFile = currentInputFile; + if (this.lastInputFile == null + || !this.lastInputFile.equals(currentInputFile)) { + inputFileChanged = true; + this.lastInputFile = currentInputFile; + return true; + } + inputFileChanged = false; + return false; + } + + public String getLastInputFile() { + return lastInputFile; + } + + public void setLastInputFile(String lastInputFile) { + this.lastInputFile = lastInputFile; + } + + private void processMapLocalWork(boolean inputFileChangeSenstive) throws HiveException { + // process map local operators + if (fetchOperators != null) { + try { + int fetchOpNum = 0; + for (Map.Entry entry : fetchOperators.entrySet()) { + int fetchOpRows = 0; + String alias = entry.getKey(); + FetchOperator fetchOp = entry.getValue(); + + if (inputFileChangeSenstive) { + fetchOp.clearFetchContext(); + setUpFetchOpContext(fetchOp, alias); + } + + Operator forwardOp = localWork + .getAliasToWork().get(alias); + + while (true) { + InspectableObject row = fetchOp.getNextRow(); + if (row == null) { + forwardOp.close(false); + break; + } + fetchOpRows++; + forwardOp.process(row.o, 0); + // check if any operator had a fatal error or early exit during + // execution + if (forwardOp.getDone()) { + ExecMapper.setDone(true); + break; + } + } + + if (l4j.isInfoEnabled()) { + l4j.info("fetch " + fetchOpNum++ + " processed " + fetchOpRows + + " used mem: " + + ExecMapper.memoryMXBean.getHeapMemoryUsage().getUsed()); + } + } + } catch (Throwable e) { + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new HiveException( + "Hive Runtime Error: Map local work failed", e); + } + } + } + } + + private void setUpFetchOpContext(FetchOperator fetchOp, String alias) + throws Exception { + String currentInputFile = HiveConf.getVar(jc, + HiveConf.ConfVars.HADOOPMAPFILENAME); + BucketMapJoinContext bucketMatcherCxt = this.localWork + .getBucketMapjoinContext(); + Class bucketMatcherCls = bucketMatcherCxt + .getBucketMatcherClass(); + BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance( + bucketMatcherCls, null); + bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt + .getAliasBucketFileNameMapping()); + + List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, + bucketMatcherCxt.getMapJoinBigTableAlias(), alias); + Iterator iter = aliasFiles.iterator(); + fetchOp.setupContext(iter, null); + } + + public boolean isInputFileChanged() { + return inputFileChanged; + } + + public void setInputFileChanged(boolean inputFileChanged) { + this.inputFileChanged = inputFileChanged; + } + + public String getCurrentInputFile() { + return currentInputFile; + } + + public void setCurrentInputFile(String currentInputFile) { + this.currentInputFile = currentInputFile; + } + + public JobConf getJc() { + return jc; + } + public void setJc(JobConf jc) { + this.jc = jc; + } + + public MapredLocalWork getLocalWork() { + return localWork; + } + + public void setLocalWork(MapredLocalWork localWork) { + this.localWork = localWork; + } + + public Integer getFileId() { + return fileId; + } + + public void setFileId(Integer fileId) { + this.fileId = fileId; + } + + public Map getFetchOperators() { + return fetchOperators; + } + + public void setFetchOperators(Map fetchOperators) { + this.fetchOperators = fetchOperators; + } + } \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (revision 946374) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (working copy) @@ -160,6 +160,11 @@ @Override public void processOp(Object row, int tag) throws HiveException { + + if (tag == posBigTable && this.getExecContext().getLastInputFile() == null) { + this.getExecContext().processInputFileChangeForLocalWork(); + } + try { // get alias alias = (byte) tag; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 946374) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -29,7 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.ExecMapper.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.ExecMapperContext; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.Explain; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (revision 946374) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (working copy) @@ -147,23 +147,26 @@ @Override public void processOp(Object row, int tag) throws HiveException { - - if (this.getExecContext().inputFileChanged) { - if(firstFetchHappened) { - //we need to first join and flush out data left by the previous file. - joinFinalLeftData(); - } - //set up the fetch operator for the new input file. - for (Map.Entry entry : fetchOperators.entrySet()) { - String alias = entry.getKey(); - FetchOperator fetchOp = entry.getValue(); - fetchOp.clearFetchContext(); - setUpFetchOpContext(fetchOp, alias); + + if (tag == posBigTable) { + this.getExecContext().checkForInputFileChanged(); + if (this.getExecContext().inputFileChanged) { + if (firstFetchHappened) { + // we need to first join and flush out data left by the previous file. + joinFinalLeftData(); + } + // set up the fetch operator for the new input file. + for (Map.Entry entry : fetchOperators.entrySet()) { + String alias = entry.getKey(); + FetchOperator fetchOp = entry.getValue(); + fetchOp.clearFetchContext(); + setUpFetchOpContext(fetchOp, alias); + } + this.getExecContext().inputFileChanged = false; + firstFetchHappened = false; } - this.getExecContext().inputFileChanged = false; - firstFetchHappened = false; } - + if (!firstFetchHappened) { firstFetchHappened = true; // fetch the first group for all small table aliases