diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java index 4deafbb..47bf874 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.templeton.tool; import java.io.BufferedReader; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -168,8 +169,10 @@ public void run(Context context) proc.waitFor(); keepAlive.sendReport = false; pool.shutdown(); - if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) + if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) { + System.err.println("Shutting down watcher/keep alive thread pool forcefully"); pool.shutdownNow(); + } writeExitValue(conf, proc.exitValue(), statusdir); JobState state = new JobState(context.getJobID().toString(), conf); @@ -214,6 +217,18 @@ private void writeExitValue(Configuration conf, int exitValue, String statusdir) } } + private static class NonClosableStream extends FilterOutputStream { + public NonClosableStream(OutputStream out) { + super(out); + } + @Override + // override the close function not closing the stream + // to prevent closing System.out/System.err by mistake + public void close() throws IOException{ + //do nothing here + } + } + private static class Watcher implements Runnable { private InputStream in; private OutputStream out; @@ -227,10 +242,13 @@ public Watcher(Configuration conf, JobID jobid, InputStream in, this.jobid = jobid; this.in = in; - if (name.equals(STDERR_FNAME)) - out = System.err; - else - out = System.out; + if (name.equals(STDERR_FNAME)) { + // prevent System.err get closed by mistake + out = new NonClosableStream(System.err); + } else { + // prevent System.out get closed by mistake + out = new NonClosableStream(System.out); + } if (TempletonUtils.isset(statusdir)) { Path p = new Path(statusdir, name); @@ -272,6 +290,7 @@ public void run() { } } writer.flush(); + writer.close(); } catch (IOException e) { System.err.println("templeton: execute error: " + e); }