diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java index 78f77c2..e203031 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java @@ -29,6 +29,11 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; @@ -925,7 +930,7 @@ private void runCommand() throws IOException { final BufferedReader errReader = new BufferedReader(new InputStreamReader( process.getErrorStream(), Charset.defaultCharset())); - BufferedReader inReader = + final BufferedReader inReader = new BufferedReader(new InputStreamReader( process.getInputStream(), Charset.defaultCharset())); final StringBuffer errMsg = new StringBuffer(); @@ -947,6 +952,20 @@ public void run() { } } }; + + ExecutorService threadPool = Executors.newSingleThreadScheduledExecutor(); + Future future = threadPool.submit(new Callable() { + public Void call() throws Exception { + parseExecResult(inReader); // parse the output + // clear the input stream buffer + String line = inReader.readLine(); + while(line != null) { + line = inReader.readLine(); + } + return null; + } + }); + try { errThread.start(); } catch (IllegalStateException ise) { @@ -957,16 +976,14 @@ public void run() { throw oe; } try { - parseExecResult(inReader); // parse the output - // clear the input stream buffer - String line = inReader.readLine(); - while(line != null) { - line = inReader.readLine(); - } // wait for the process to finish and check the exit code exitCode = process.waitFor(); // make sure that the error thread exits joinThread(errThread); + + threadPool.shutdown(); + future.get(); + completed.set(true); //the timeout thread handling //taken care in finally block @@ -977,10 +994,23 @@ public void run() { InterruptedIOException iie = new InterruptedIOException(ie.toString()); iie.initCause(ie); throw iie; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException)cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException)cause; + } } finally { if (timeOutTimer != null) { timeOutTimer.cancel(); } + if (!completed.get()) { + process.destroy(); + errThread.interrupt(); + joinThread(errThread); + } + // close the input stream try { // JDK 7 tries to automatically drain the input streams for us @@ -997,10 +1027,6 @@ public void run() { } catch (IOException ioe) { LOG.warn("Error while closing the input stream", ioe); } - if (!completed.get()) { - errThread.interrupt(); - joinThread(errThread); - } try { InputStream stderr = process.getErrorStream(); synchronized (stderr) { @@ -1009,7 +1035,6 @@ public void run() { } catch (IOException ioe) { LOG.warn("Error while closing the error stream", ioe); } - process.destroy(); lastTime = Time.monotonicNow(); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java index 67903f7..d8d8c7a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java @@ -149,6 +149,33 @@ public void testShellCommandTimeout() throws Throwable { assertTrue("Script did not timeout" , shexc.isTimedOut()); } + @Test(timeout=120000) + public void testShellInterrupt() throws Throwable { + Assume.assumeFalse(WINDOWS); + StringBuffer sleepCommand = new StringBuffer(); + sleepCommand.append("sleep 200"); + String[] shellCmd = { "bash", "-c", sleepCommand.toString() }; + final ShellCommandExecutor shexc = new ShellCommandExecutor(shellCmd); + + Thread shellThread = new Thread() { + @Override + public void run() { + try { + shexc.execute(); + } catch(IOException ioe) { + //ignore IOException from thread interrupt + } + } + }; + + shellThread.start(); + Thread.sleep(250); + shellThread.interrupt(); + Process process = shexc.getProcess(); + process.waitFor(); + } + + @Test public void testEnvVarsWithInheritance() throws Exception { Assume.assumeFalse(WINDOWS); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 04be631..57f8c39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -184,7 +184,8 @@ public LocalizationProtocol run() { } finally { try { if (exec != null) { - exec.shutdownNow(); + exec.shutdown(); + exec.awaitTermination(10, TimeUnit.SECONDS); } LocalDirAllocator.removeContext(appCacheDirContextName); } finally {