Index: core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (revision 1684345) +++ core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (working copy) @@ -237,16 +237,26 @@ @Override public void register(BSPJobID jobId, TaskAttemptID taskId, String hostAddress, long port) { - try { - String jobRegisterKey = constructKey(jobId, "peers"); - if (zk.exists(jobRegisterKey, false) == null) { + int count = 0; + String jobRegisterKey = constructKey(jobId, "peers"); + Stat stat = null; + + LOG.info("TaskAttemptID : " + taskId); + while (stat != null) { + try { + stat = zk.exists(jobRegisterKey, false); zk.create(jobRegisterKey, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Thread.sleep(1000); + } catch (Exception e) { + LOG.debug(e); // ignore it. } - } catch (KeeperException e) { - LOG.error(e); - } catch (InterruptedException e) { - LOG.error(e); + count++; + + // retry 10 times. + if (count > 9) { + throw new RuntimeException("can't create root node."); + } } registerTask(jobId, hostAddress, port, taskId); } Index: yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java (revision 1684345) +++ yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java (working copy) @@ -80,7 +80,6 @@ private String hostname; private int clientPort; private FileSystem fs; - private static int id = 0; private volatile long superstep; private Counters globalCounter = new Counters(); @@ -186,6 +185,7 @@ LogManager.shutdown(); ExitUtil.terminate(1, t); } finally { + LOG.info("Stop SyncServer and RPCServer."); appMaster.close(); } @@ -491,6 +491,7 @@ public void onContainersAllocated(List allocatedContainers) { LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); + numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { LOG.info("Launching shell command on a new container." @@ -502,10 +503,8 @@ + allocatedContainer.getResource().getMemory() + ", containerResourceVirtualCores" + allocatedContainer.getResource().getVirtualCores()); - // + ", containerToken" - // +allocatedContainer.getContainerToken().getIdentifier().toString()); - Thread launchThread = createLaunchContainerThread(allocatedContainer); + Thread launchThread = createLaunchContainerThread(allocatedContainer, allocatedContainer.getId().getContainerId()); // launch and start the container on a separate thread to keep // the main thread unblocked @@ -513,7 +512,6 @@ launchThreads.add(launchThread); launchedContainers.add(allocatedContainer.getId()); launchThread.start(); - id++; } } @@ -621,15 +619,19 @@ Configuration conf; + long taskAttemptId; + /** * @param lcontainer Allocated container * @param containerListener Callback handler of the container */ public LaunchContainerRunnable( - Container lcontainer, NMCallbackHandler containerListener, Configuration conf) { + Container lcontainer, NMCallbackHandler containerListener, + Configuration conf, long taskAttemptId) { this.container = lcontainer; this.containerListener = containerListener; this.conf = conf; + this.taskAttemptId = taskAttemptId; } /** @@ -725,7 +727,7 @@ vargs.add(BSPRunner.class.getCanonicalName()); vargs.add(jobId.getJtIdentifier()); - vargs.add(Integer.toString(id)); + vargs.add(Long.toString(taskAttemptId)); vargs.add( new Path(jobFile).makeQualified(fs.getUri(), fs.getWorkingDirectory()) .toString()); @@ -805,7 +807,6 @@ "ApplicationAttemptId not set in the environment"); } - LOG.info("app attempt id!!!"); ContainerId containerId = ConverterUtils.toContainerId(envs .get(ApplicationConstants.Environment.CONTAINER_ID.name())); return containerId.getApplicationAttemptId(); @@ -930,9 +931,9 @@ } @VisibleForTesting - Thread createLaunchContainerThread(Container allocatedContainer) { + Thread createLaunchContainerThread(Container allocatedContainer, long taskAttemptId) { LaunchContainerRunnable runnableLaunchContainer = - new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf); + new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf, taskAttemptId); return new Thread(runnableLaunchContainer); } @@ -1001,6 +1002,7 @@ public void close() throws IOException { this.clientServer.stop(); this.taskServer.stop(); + this.syncServer.stopServer(); } @Override