Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java (revision 1025823) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java (working copy) @@ -14,6 +14,7 @@ import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; public class ExecMapperContext { @@ -40,10 +41,12 @@ ioCxt = IOContext.get(); } - public void processInputFileChangeForLocalWork() throws HiveException { + public void processInputFileChangeForLocalWork(int heartbeatInterval, + Reporter reporter) throws HiveException { // put inputFileChanged() after localWork check if (this.localWork != null && inputFileChanged()) { - processMapLocalWork(localWork.getInputFileChangeSensitive()); + processMapLocalWork(localWork.getInputFileChangeSensitive(), + heartbeatInterval, reporter); } } @@ -85,13 +88,15 @@ this.lastInputFile = lastInputFile; } - private void processMapLocalWork(boolean inputFileChangeSenstive) throws HiveException { + private void processMapLocalWork(boolean inputFileChangeSenstive, + int heartbeatInterval, Reporter reporter) throws HiveException { // process map local operators if (fetchOperators != null) { try { int fetchOpNum = 0; for (Map.Entry entry : fetchOperators.entrySet()) { int fetchOpRows = 0; + int countAfterReport = 0; String alias = entry.getKey(); FetchOperator fetchOp = entry.getValue(); @@ -111,6 +116,12 @@ } fetchOpRows++; forwardOp.process(row.o, 0); + if ((countAfterReport % heartbeatInterval) == 0 + && (reporter != null)) { + reporter.progress(); + countAfterReport = 0; + } + // check if any operator had a fatal error or early exit during // execution if (forwardOp.getDone()) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (revision 1025823) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (working copy) @@ -162,7 +162,8 @@ public void processOp(Object row, int tag) throws HiveException { if (tag == posBigTable) { - this.getExecContext().processInputFileChangeForLocalWork(); + this.getExecContext().processInputFileChangeForLocalWork( + heartbeatInterval, reporter); } try {