Index: core/src/main/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/GroomServer.java (revision 1545149) +++ core/src/main/java/org/apache/hama/bsp/GroomServer.java (working copy) @@ -115,7 +115,6 @@ private Instructor instructor; // Filesystem - // private LocalDirAllocator localDirAllocator; Path systemDirectory = null; FileSystem systemFS = null; @@ -140,9 +139,6 @@ InetSocketAddress taskReportAddress; Server taskReportServer = null; - // private BlockingQueue tasksToCleanup = new - // LinkedBlockingQueue(); - // Schedule Heartbeats to GroomServer private ScheduledExecutorService taskMonitorService; @@ -167,10 +163,6 @@ LOG.info("Launch " + actions.length + " tasks."); startNewTask((LaunchTaskAction) action); } else if (action instanceof KillTaskAction) { - - // TODO Use the cleanup thread - // tasksToCleanup.put(action); - LOG.info("Kill " + actions.length + " tasks."); KillTaskAction killAction = (KillTaskAction) action; if (tasks.containsKey(killAction.getTaskID())) { @@ -289,9 +281,6 @@ System.exit(0); } - // FileSystem local = FileSystem.getLocal(conf); - // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir"); - try { zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf), conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this); @@ -363,7 +352,7 @@ int tmpPort = socAddr.getPort(); // RPC initialization - // TODO numHandlers should be a .. + // TODO Make number of RPC Server threads configurable this.taskReportServer = RPC.getServer(this, bindAddress, tmpPort, 10, false, this.conf); @@ -424,8 +413,6 @@ this.running = true; this.initialized = true; - - // FIXME } /** Return the port at which the tasktracker bound to */ @@ -791,19 +778,14 @@ // monitorPeriod. A task is given a leeway of 10 times monitorPeriod // to get started. - // TODO Please refactor this conditions - // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod - + // TODO The task timeout check interval should be configurable if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))) { LOG.info("adding purge task: " + tip.getTask().getTaskID()); - outOfContactTasks.add(tip); } - } - } /** @@ -1353,6 +1335,7 @@ @Override public void process(WatchedEvent event) { + // do nothing } } Index: core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (revision 1545149) +++ core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (working copy) @@ -79,6 +79,7 @@ Constants.DEFAULT_PEER_HOST); InetSocketAddress selfAddress = new InetSocketAddress(bindAddress, 0); + // TODO Make number of RPC Server threads configurable this.server = RPC.getServer(this, selfAddress.getHostName(), selfAddress.getPort(), conf); server.start();