Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 8461) +++ conf/hive-default.xml (working copy) @@ -683,6 +683,14 @@ + + hive.auto.progress.timeout + 0 + + How long to run autoprogressor for the script/UDTF operators (in seconds). + Set to 0 for forever. + + Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 8461) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -182,6 +182,7 @@ HIVEADDEDARCHIVES("hive.added.archives.path", ""), // for hive script operator + HIVES_AUTO_PROGRESS_TIMEOUT("hive.auto.progress.timeout", 0), HIVETABLENAME("hive.table.name", ""), HIVEPARTITIONNAME("hive.partition.name", ""), HIVESCRIPTAUTOPROGRESS("hive.script.auto.progress", false), Index: ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (revision 8461) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (working copy) @@ -76,7 +76,8 @@ // for a while if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUDTFAUTOPROGRESS)) { autoProgressor = new AutoProgressor(this.getClass().getName(), reporter, - Utilities.getDefaultNotificationInterval(hconf)); + Utilities.getDefaultNotificationInterval(hconf), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT) * 1000); autoProgressor.go(); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (revision 8461) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (working copy) @@ -309,7 +309,8 @@ if (HiveConf .getBoolVar(hconf, HiveConf.ConfVars.HIVESCRIPTAUTOPROGRESS)) { autoProgressor = new AutoProgressor(this.getClass().getName(), - reporter, Utilities.getDefaultNotificationInterval(hconf)); + reporter, Utilities.getDefaultNotificationInterval(hconf), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT) * 1000); autoProgressor.go(); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java (revision 8461) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java (working copy) @@ -28,7 +28,8 @@ /** * AutoProgressor periodically sends updates to the job tracker so that it * doesn't consider this task attempt dead if there is a long period of - * inactivity. + * inactivity. This can be configured with a timeout so that it doesn't run + * indefinitely. */ public class AutoProgressor { protected Log LOG = LogFactory.getLog(this.getClass().getName()); @@ -38,9 +39,12 @@ // duration, a progress report is sent to the tracker so that the tracker // does not think that the job is dead. Timer rpTimer = null; + // Timer that tops rpTimer after a long timeout, e.g. 1 hr + Timer srpTimer = null; // Name of the class to report for String logClassName = null; int notificationInterval; + int timeout; Reporter reporter; class ReporterTask extends TimerTask { @@ -70,18 +74,68 @@ } } + class StopReporterTimerTask extends TimerTask { + + /** + * Task to stop the reporter timer once we hit the timeout + */ + private final ReporterTask rt; + + public StopReporterTimerTask(ReporterTask rp) { + this.rt = rp; + } + + @Override + public void run() { + if (rt != null) { + LOG.info("Stopping reporter timer for " + logClassName); + rt.cancel(); + } + } + } + + /** + * + * @param logClassName + * @param reporter + * @param notificationInterval - interval for reporter updates (in ms) + */ AutoProgressor(String logClassName, Reporter reporter, int notificationInterval) { this.logClassName = logClassName; this.reporter = reporter; this.notificationInterval = notificationInterval; + this.timeout = 0; } + /** + * + * @param logClassName + * @param reporter + * @param notificationInterval - interval for reporter updates (in ms) + * @param timeout - when the autoprogressor should stop reporting (in ms) + */ + AutoProgressor(String logClassName, Reporter reporter, + int notificationInterval, int timeout) { + this.logClassName = logClassName; + this.reporter = reporter; + this.notificationInterval = notificationInterval; + this.timeout = timeout; + } + public void go() { LOG.info("Running ReporterTask every " + notificationInterval + " miliseconds."); rpTimer = new Timer(true); - rpTimer.scheduleAtFixedRate(new ReporterTask(reporter), 0, - notificationInterval); + + + ReporterTask rt = new ReporterTask(reporter); + rpTimer.scheduleAtFixedRate(rt, 0, notificationInterval); + + if (timeout > 0) { + srpTimer = new Timer(true); + StopReporterTimerTask srt = new StopReporterTimerTask(rt); + srpTimer.schedule(srt, timeout); + } } }