diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java index 7799d64..de0237c 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; @@ -68,9 +69,7 @@ * in hdfs files. */ public class TempletonControllerJob extends Configured implements Tool { - static enum ControllerCounters {SIMPLE_COUNTER} - ; public static final String COPY_NAME = "templeton.copy"; public static final String STATUSDIR_NAME = "templeton.statusdir"; public static final String JAR_ARGS_NAME = "templeton.args"; @@ -157,14 +156,13 @@ public void run(Context context) conf.get(OVERRIDE_CLASSPATH)); String statusdir = conf.get(STATUSDIR_NAME); - Counter cnt = context.getCounter(ControllerCounters.SIMPLE_COUNTER); ExecutorService pool = Executors.newCachedThreadPool(); executeWatcher(pool, conf, context.getJobID(), proc.getInputStream(), statusdir, STDOUT_FNAME); executeWatcher(pool, conf, context.getJobID(), proc.getErrorStream(), statusdir, STDERR_FNAME); - KeepAlive keepAlive = startCounterKeepAlive(pool, cnt); + KeepAlive keepAlive = startCounterKeepAlive(pool, context); proc.waitFor(); keepAlive.sendReport = false; @@ -193,9 +191,10 @@ private void executeWatcher(ExecutorService pool, Configuration conf, pool.execute(w); } - private KeepAlive startCounterKeepAlive(ExecutorService pool, Counter cnt) + private KeepAlive startCounterKeepAlive(ExecutorService pool, Context context) throws IOException { - KeepAlive k = new KeepAlive(cnt); + KeepAlive k = new KeepAlive(context); + System.err.println("Starting the KeepAlive thread"); pool.execute(k); return k; } @@ -280,11 +279,11 @@ public void run() { } public static class KeepAlive implements Runnable { - private Counter cnt; + private Context context; public boolean sendReport; - public KeepAlive(Counter cnt) { - this.cnt = cnt; + public KeepAlive(Context context) { + this.context = context; this.sendReport = true; } @@ -292,7 +291,11 @@ public KeepAlive(Counter cnt) { public void run() { try { while (sendReport) { - cnt.increment(1); + // Periodically report progress on the Context object + // to prevent TaskTracker from killing the Templeton + // Controller task + context.progress(); + System.err.println("KeepAlive Heart beat"); Thread.sleep(KEEP_ALIVE_MSEC); } } catch (InterruptedException e) {