Index: core/src/main/java/org/apache/hama/bsp/TaskRunner.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/TaskRunner.java (revision 1240661) +++ core/src/main/java/org/apache/hama/bsp/TaskRunner.java (working copy) @@ -55,6 +55,8 @@ boolean bspKilled = false; private Process bspProcess; + private Thread errorLog; + private Thread infoLog; private final Task task; private final BSPJob conf; @@ -95,17 +97,20 @@ builder.directory(workDir); try { bspProcess = builder.start(); - new Thread() { + + errorLog = new Thread() { public void run() { logStream(bspProcess.getErrorStream(), LogType.ERROR); } - }.start(); + }; + errorLog.start(); - new Thread() { + infoLog = new Thread() { public void run() { logStream(bspProcess.getInputStream(), LogType.STDOUT); } - }.start(); + }; + infoLog.start(); int exit_code = bspProcess.waitFor(); if (!bspKilled && exit_code != 0) { @@ -113,8 +118,7 @@ + exit_code + "."); } } catch (InterruptedException e) { - LOG.warn("Thread is interrupted when execeuting BSP process.", - e); + LOG.warn("Thread is interrupted when execeuting BSP process.", e); } catch (IOException ioe) { LOG.error("Error when executing BSPPeer process.", ioe); } finally { @@ -203,7 +207,7 @@ vargs.add(classPath.toString()); // Add main class and its arguments LOG.debug("Executing child Process " + child.getName()); - vargs.add(child.getName()); // bsp class name + vargs.add(child.getName()); // bsp class name if (GroomServer.BSPPeerChild.class.equals(child)) { InetSocketAddress addr = groomServer.getTaskTrackerReportAddress(); @@ -234,6 +238,8 @@ LOG.error("BSPPeer child process is interrupted.", ie); } catch (ExecutionException ee) { LOG.error("Failure occurs when retrieving tasks result.", ee); + } finally { + killBsp(); } LOG.debug("Finishes executing BSPPeer child process."); } @@ -257,13 +263,19 @@ } /** - * Kill bsppeer child process. + * Kill bspPeer child process. */ public void killBsp() { + bspKilled = true; + + if (errorLog != null || infoLog != null) { + errorLog = null; + infoLog = null; + } + if (bspProcess != null) { bspProcess.destroy(); } - bspKilled = true; } /** @@ -287,7 +299,9 @@ writer.newLine(); } } catch (IOException e) { - LOG.warn(task.getTaskID() + " Error reading child output", e); + if (!bspKilled) { + LOG.warn(task.getTaskID() + " Error reading child output", e); + } } finally { try { input.close();