diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java index 7799d64..2838bef 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java @@ -68,9 +68,6 @@ * in hdfs files. */ public class TempletonControllerJob extends Configured implements Tool { - static enum ControllerCounters {SIMPLE_COUNTER} - - ; public static final String COPY_NAME = "templeton.copy"; public static final String STATUSDIR_NAME = "templeton.statusdir"; public static final String JAR_ARGS_NAME = "templeton.args"; @@ -157,14 +154,13 @@ public void run(Context context) conf.get(OVERRIDE_CLASSPATH)); String statusdir = conf.get(STATUSDIR_NAME); - Counter cnt = context.getCounter(ControllerCounters.SIMPLE_COUNTER); ExecutorService pool = Executors.newCachedThreadPool(); executeWatcher(pool, conf, context.getJobID(), proc.getInputStream(), statusdir, STDOUT_FNAME); executeWatcher(pool, conf, context.getJobID(), proc.getErrorStream(), statusdir, STDERR_FNAME); - KeepAlive keepAlive = startCounterKeepAlive(pool, cnt); + KeepAlive keepAlive = startCounterKeepAlive(pool, context); proc.waitFor(); keepAlive.sendReport = false; @@ -193,7 +189,7 @@ private void executeWatcher(ExecutorService pool, Configuration conf, pool.execute(w); } - private KeepAlive startCounterKeepAlive(ExecutorService pool, Counter cnt) + private KeepAlive startCounterKeepAlive(ExecutorService pool, Context cnt) throws IOException { KeepAlive k = new KeepAlive(cnt); pool.execute(k); @@ -215,7 +211,7 @@ private void writeExitValue(Configuration conf, int exitValue, String statusdir) } } - public static class Watcher implements Runnable { + private static class Watcher implements Runnable { private InputStream in; private OutputStream out; private JobID jobid; @@ -279,11 +275,11 @@ public void run() { } } - public static class KeepAlive implements Runnable { - private Counter cnt; - public boolean sendReport; + private static class KeepAlive implements Runnable { + private final Mapper.Context cnt; + private volatile boolean sendReport; - public KeepAlive(Counter cnt) { + public KeepAlive(Mapper.Context cnt) { this.cnt = cnt; this.sendReport = true; } @@ -292,7 +288,7 @@ public KeepAlive(Counter cnt) { public void run() { try { while (sendReport) { - cnt.increment(1); + cnt.progress(); Thread.sleep(KEEP_ALIVE_MSEC); } } catch (InterruptedException e) {