Index: yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java (revision 0) +++ yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java (working copy) @@ -0,0 +1,1006 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; +import org.apache.hama.Constants; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.sync.SyncServer; +import org.apache.hama.bsp.sync.SyncServiceFactory; +import org.apache.hama.ipc.BSPPeerProtocol; +import org.apache.hama.ipc.RPC; +import org.apache.hama.ipc.Server; +import org.apache.hama.util.BSPNetUtils; +import org.apache.log4j.LogManager; + +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +public class ApplicationMaster implements BSPClient, BSPPeerProtocol { + private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + + // Configuration + private Configuration localConf; + private Configuration jobConf; + + private String jobFile; + private String applicationName; + // RPC info where the AM receive client side requests + private String hostname; + private int clientPort; + private FileSystem fs; + private static int id = 0; + + private volatile long superstep; + private Counters globalCounter = new Counters(); + private BSPJobClient.RawSplit[] splits; + + private BSPJobID jobId; + + // SyncServer for Zookeeper + private SyncServer syncServer; + + // Zookeeper thread pool + private static final ExecutorService threadPool = Executors + .newFixedThreadPool(1); + + // RPC info where the AM receive client side requests + private int taskServerPort; + + private Server clientServer; + private Server taskServer; + + // Handle to communicate with the Resource Manager + @SuppressWarnings("rawtypes") + private AMRMClientAsync amRMClient; + + // In both secure and non-secure modes, this points to the job-submitter. + @VisibleForTesting + UserGroupInformation appSubmitterUgi; + + // Handle to communicate with the Node Manager + private NMClientAsync nmClientAsync; + // Listen to process the response from the Node Manager + private NMCallbackHandler containerListener; + + // Application Attempt Id ( combination of attemptId and fail count ) + @VisibleForTesting + protected ApplicationAttemptId appAttemptID; + + + // TODO + // For status update for clients - yet to be implemented + // Hostname of the container + private String appMasterHostname = ""; + // Port on which the app master listens for status updates from clients + private int appMasterRpcPort = -1; + // Tracking url to which app master publishes info for clients to monitor + private String appMasterTrackingUrl = ""; + + // App Master configuration + // No. of containers to run shell command on + @VisibleForTesting + protected int numTotalContainers; + // Memory to request for the container on which the shell command will run + private int containerMemory; + // VirtualCores to request for the container on which the shell command will run + private int containerVirtualCores = 1; + + // Priority of the request + private int requestPriority = 0; + + // Counter for completed containers ( complete denotes successful or failed ) + private AtomicInteger numCompletedContainers = new AtomicInteger(); + // Allocated container count so that we know how many containers has the RM + // allocated to us + @VisibleForTesting + protected AtomicInteger numAllocatedContainers = new AtomicInteger(); + // Count of failed containers + private AtomicInteger numFailedContainers = new AtomicInteger(); + // Count of containers already requested from the RM + // Needed as once requested, we should not request for containers again. + // Only request for more if the original requirement changes. + @VisibleForTesting + protected AtomicInteger numRequestedContainers = new AtomicInteger(); + + private volatile boolean done; + private ByteBuffer allTokens; + + // Launch threads + private List launchThreads = new ArrayList(); + + @VisibleForTesting + protected final Set launchedContainers = + Collections.newSetFromMap(new ConcurrentHashMap()); + + public ApplicationMaster() { + // Set up the configuration + this.localConf = new YarnConfiguration(); + } + + public static void main(String[] args) { + boolean result = false; + try { + ApplicationMaster appMaster = new ApplicationMaster(); + LOG.info("Initializing ApplicationMaster"); + boolean doRun = appMaster.init(args); + if (!doRun) { + System.exit(0); + } + appMaster.run(); + result = appMaster.finish(); + } catch (Throwable t) { + LOG.fatal("Error running ApplicationMaster", t); + LogManager.shutdown(); + ExitUtil.terminate(1, t); + } + if (result) { + LOG.info("Application Master completed successfully. exiting"); + System.exit(0); + } else { + LOG.info("Application Master failed. exiting"); + System.exit(2); + } + } + + public boolean init(String[] args) throws Exception { + if (args.length != 1) { + throw new IllegalArgumentException(); + } + this.jobFile = args[0]; + this.jobConf = getSubmitConfiguration(jobFile); + localConf.addResource(localConf); + fs = FileSystem.get(jobConf); + + this.applicationName = jobConf.get("bsp.job.name", + ""); + if (applicationName.isEmpty()) { + this.applicationName = ""; + } + + appAttemptID = getApplicationAttemptId(); + this.jobId = new BSPJobID(appAttemptID.toString(), 0); + this.appMasterHostname = BSPNetUtils.getCanonicalHostname(); + this.appMasterTrackingUrl = "http://localhost:8088"; + this.numTotalContainers = this.jobConf.getInt("bsp.peers.num", 1); + this.containerMemory = getMemoryRequirements(jobConf); + + this.hostname = BSPNetUtils.getCanonicalHostname(); + this.clientPort = BSPNetUtils.getFreePort(12000); + + // Set configuration for starting SyncServer which run Zookeeper + this.jobConf.set(Constants.ZOOKEEPER_QUORUM, appMasterHostname); + + // start our synchronization service + startSyncServer(); + + // start RPC server + startRPCServers(); + + /* + * Make sure that this executes after the start the RPC servers, because we + * are readjusting the configuration. + */ + rewriteSubmitConfiguration(jobFile, jobConf); + + String jobSplit = jobConf.get("bsp.job.split.file"); + splits = null; + if (jobSplit != null) { + DataInputStream splitFile = fs.open(new Path(jobSplit)); + try { + splits = BSPJobClient.readSplitFile(splitFile); + } finally { + splitFile.close(); + } + } + + return true; + } + + /** + * Main run function for the application master + * + * @throws org.apache.hadoop.yarn.exceptions.YarnException + * @throws IOException + */ + @SuppressWarnings({ "unchecked" }) + public void run() throws YarnException, IOException, InterruptedException { + LOG.info("Starting ApplicationMaster"); + + // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class + // are marked as LimitedPrivate + Credentials credentials = + UserGroupInformation.getCurrentUser().getCredentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + // Now remove the AM->RM token so that containers cannot access it. + Iterator> iter = credentials.getAllTokens().iterator(); + LOG.info("Executing with tokens:"); + while (iter.hasNext()) { + Token token = iter.next(); + LOG.info(token); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + iter.remove(); + } + } + allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + + // Create appSubmitterUgi and add original tokens to it + String appSubmitterUserName = + System.getenv(ApplicationConstants.Environment.USER.name()); + appSubmitterUgi = + UserGroupInformation.createRemoteUser(appSubmitterUserName); + appSubmitterUgi.addCredentials(credentials); + + + AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); + amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); + amRMClient.init(localConf); + amRMClient.start(); + + containerListener = createNMCallbackHandler(); + nmClientAsync = new NMClientAsyncImpl(containerListener); + nmClientAsync.init(localConf); + nmClientAsync.start(); + + // Setup local RPC Server to accept status requests directly from clients + // TODO need to setup a protocol for client to be able to communicate to + // the RPC server + // TODO use the rpc port info to register with the RM for the client to + // send requests to this app master + + // Register self with ResourceManager + // This will start heartbeating to the RM + appMasterHostname = NetUtils.getHostname(); + RegisterApplicationMasterResponse response = amRMClient + .registerApplicationMaster(appMasterHostname, appMasterRpcPort, + appMasterTrackingUrl); + // Dump out information about cluster capability as seen by the + // resource manager + int maxMem = response.getMaximumResourceCapability().getMemory(); + LOG.info("Max mem capability of resources in this cluster " + maxMem); + + int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); + LOG.info("Max vcores capability of resources in this cluster " + maxVCores); + + // A resource ask cannot exceed the max. + if (containerMemory > maxMem) { + LOG.info("Container memory specified above max threshold of cluster." + + " Using max value." + ", specified=" + containerMemory + ", max=" + + maxMem); + containerMemory = maxMem; + } + + if (containerVirtualCores > maxVCores) { + LOG.info("Container virtual cores specified above max threshold of cluster." + + " Using max value." + ", specified=" + containerVirtualCores + ", max=" + + maxVCores); + containerVirtualCores = maxVCores; + } + + List previousAMRunningContainers = + response.getContainersFromPreviousAttempts(); + LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() + + " previous attempts' running containers on AM registration."); + for(Container container: previousAMRunningContainers) { + launchedContainers.add(container.getId()); + } + numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); + + + int numTotalContainersToRequest = + numTotalContainers - previousAMRunningContainers.size(); + // Setup ask for containers from RM + // Send request for containers to RM + // Until we get our fully allocated quota, we keep on polling RM for + // containers + // Keep looping until all the containers are launched and shell script + // executed on them ( regardless of success/failure). + for (int i = 0; i < numTotalContainersToRequest; ++i) { + AMRMClient.ContainerRequest containerAsk = setupContainerAskForRM(); + amRMClient.addContainerRequest(containerAsk); + } + numRequestedContainers.set(numTotalContainers); + } + + @VisibleForTesting + NMCallbackHandler createNMCallbackHandler() { + return new NMCallbackHandler(this); + } + + @VisibleForTesting + protected boolean finish() { + // wait for completion. + while (!done + && (numCompletedContainers.get() != numTotalContainers)) { + try { + Thread.sleep(200); + } catch (InterruptedException ex) {} + } + + // Join all launched threads + // needed for when we time out + // and we need to release containers + for (Thread launchThread : launchThreads) { + try { + launchThread.join(10000); + } catch (InterruptedException e) { + LOG.info("Exception thrown in thread join: " + e.getMessage()); + e.printStackTrace(); + } + } + + // When the application completes, it should stop all running containers + LOG.info("Application completed. Stopping running containers"); + nmClientAsync.stop(); + + // When the application completes, it should send a finish application + // signal to the RM + LOG.info("Application completed. Signalling finish to RM"); + + FinalApplicationStatus appStatus; + String appMessage = null; + boolean success = true; + if (numFailedContainers.get() == 0 && + numCompletedContainers.get() == numTotalContainers) { + appStatus = FinalApplicationStatus.SUCCEEDED; + } else { + appStatus = FinalApplicationStatus.FAILED; + appMessage = "Diagnostics." + ", total=" + numTotalContainers + + ", completed=" + numCompletedContainers.get() + ", allocated=" + + numAllocatedContainers.get() + ", failed=" + + numFailedContainers.get(); + LOG.info(appMessage); + success = false; + } + try { + amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); + } catch (YarnException ex) { + LOG.error("Failed to unregister application", ex); + } catch (IOException e) { + LOG.error("Failed to unregister application", e); + } + + amRMClient.stop(); + + return success; + } + + private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { + @SuppressWarnings("unchecked") + @Override + public void onContainersCompleted(List completedContainers) { + LOG.info("Got response from RM for container ask, completedCnt=" + + completedContainers.size()); + for (ContainerStatus containerStatus : completedContainers) { + LOG.info(appAttemptID + " got container status for containerID=" + + containerStatus.getContainerId() + ", state=" + + containerStatus.getState() + ", exitStatus=" + + containerStatus.getExitStatus() + ", diagnostics=" + + containerStatus.getDiagnostics()); + + // non complete containers should not be here + assert (containerStatus.getState() == ContainerState.COMPLETE); + // ignore containers we know nothing about - probably from a previous + // attempt + if (!launchedContainers.contains(containerStatus.getContainerId())) { + LOG.info("Ignoring completed status of " + + containerStatus.getContainerId() + + "; unknown container(probably launched by previous attempt)"); + continue; + } + + // increment counters for completed/failed containers + int exitStatus = containerStatus.getExitStatus(); + if (0 != exitStatus) { + // container failed + if (ContainerExitStatus.ABORTED != exitStatus) { + // shell script failed + // counts as completed + numCompletedContainers.incrementAndGet(); + numFailedContainers.incrementAndGet(); + } else { + // container was killed by framework, possibly preempted + // we should re-try as the container was lost for some reason + numAllocatedContainers.decrementAndGet(); + numRequestedContainers.decrementAndGet(); + // we do not need to release the container as it would be done + // by the RM + } + } else { + // nothing to do + // container completed successfully + numCompletedContainers.incrementAndGet(); + LOG.info("Container completed successfully." + ", containerId=" + + containerStatus.getContainerId()); + } + } + + // ask for more containers if any failed + int askCount = numTotalContainers - numRequestedContainers.get(); + numRequestedContainers.addAndGet(askCount); + + if (askCount > 0) { + for (int i = 0; i < askCount; ++i) { + AMRMClient.ContainerRequest containerAsk = setupContainerAskForRM(); + amRMClient.addContainerRequest(containerAsk); + } + } + + if (numCompletedContainers.get() == numTotalContainers) { + done = true; + } + } + + @Override + public void onContainersAllocated(List allocatedContainers) { + LOG.info("Got response from RM for container ask, allocatedCnt=" + + allocatedContainers.size()); + numAllocatedContainers.addAndGet(allocatedContainers.size()); + for (Container allocatedContainer : allocatedContainers) { + LOG.info("Launching shell command on a new container." + + ", containerId=" + allocatedContainer.getId() + + ", containerNode=" + allocatedContainer.getNodeId().getHost() + + ":" + allocatedContainer.getNodeId().getPort() + + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + + ", containerResourceMemory" + + allocatedContainer.getResource().getMemory() + + ", containerResourceVirtualCores" + + allocatedContainer.getResource().getVirtualCores()); + // + ", containerToken" + // +allocatedContainer.getContainerToken().getIdentifier().toString()); + + Thread launchThread = createLaunchContainerThread(allocatedContainer); + + // launch and start the container on a separate thread to keep + // the main thread unblocked + // as all containers may not be allocated at one go. + launchThreads.add(launchThread); + launchedContainers.add(allocatedContainer.getId()); + launchThread.start(); + id++; + } + } + + @Override + public void onShutdownRequest() { + done = true; + } + + @Override + public void onNodesUpdated(List list) { + + } + + @Override + public float getProgress() { + // set progress to deliver to RM on next heartbeat + float progress = (float) numCompletedContainers.get() + / numTotalContainers; + return progress; + } + + @Override + public void onError(Throwable throwable) { + done = true; + amRMClient.stop(); + } + } + + @VisibleForTesting + static class NMCallbackHandler + implements NMClientAsync.CallbackHandler { + + private ConcurrentMap containers = + new ConcurrentHashMap(); + private final ApplicationMaster applicationMaster; + + public NMCallbackHandler(ApplicationMaster applicationMaster) { + this.applicationMaster = applicationMaster; + } + + public void addContainer(ContainerId containerId, Container container) { + containers.putIfAbsent(containerId, container); + } + + @Override + public void onContainerStopped(ContainerId containerId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Succeeded to stop Container " + containerId); + } + containers.remove(containerId); + } + + @Override + public void onContainerStatusReceived(ContainerId containerId, + ContainerStatus containerStatus) { + if (LOG.isDebugEnabled()) { + LOG.debug("Container Status: id=" + containerId + ", status=" + + containerStatus); + } + } + + @Override + public void onContainerStarted(ContainerId containerId, + Map allServiceResponse) { + if (LOG.isDebugEnabled()) { + LOG.debug("Succeeded to start Container " + containerId); + } + Container container = containers.get(containerId); + if (container != null) { + applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); + } + } + + @Override + public void onStartContainerError(ContainerId containerId, Throwable t) { + LOG.error("Failed to start Container " + containerId); + containers.remove(containerId); + applicationMaster.numCompletedContainers.incrementAndGet(); + applicationMaster.numFailedContainers.incrementAndGet(); + } + + @Override + public void onGetContainerStatusError( + ContainerId containerId, Throwable t) { + LOG.error("Failed to query the status of Container " + containerId); + } + + @Override + public void onStopContainerError(ContainerId containerId, Throwable t) { + LOG.error("Failed to stop Container " + containerId); + containers.remove(containerId); + } + } + + /** + * Thread to connect to the {@link ContainerManagementProtocol} and launch the container + * that will execute the shell command. + */ + private class LaunchContainerRunnable implements Runnable { + + // Allocated container + Container container; + + NMCallbackHandler containerListener; + + Configuration conf; + + /** + * @param lcontainer Allocated container + * @param containerListener Callback handler of the container + */ + public LaunchContainerRunnable( + Container lcontainer, NMCallbackHandler containerListener, Configuration conf) { + this.container = lcontainer; + this.containerListener = containerListener; + this.conf = conf; + } + + /** + * Connects to CM, sets up container launch context + * for shell command and eventually dispatches the container + * start request to the CM. + */ + @Override + public void run() { + LOG.info("Setting up container launch container for containerid=" + + container.getId()); + // Now we setup a ContainerLaunchContext + ContainerLaunchContext ctx = Records + .newRecord(ContainerLaunchContext.class); + + // Set the local resources + Map localResources = new HashMap(); + LocalResource packageResource = Records.newRecord(LocalResource.class); + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + } catch (IOException e) { + e.printStackTrace(); + } + Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION)); + URL packageUrl = null; + try { + packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile + .makeQualified(fs.getUri(), fs.getWorkingDirectory())); + LOG.info("PackageURL has been composed to " + packageUrl.toString()); + LOG.info("Reverting packageURL to path: " + + ConverterUtils.getPathFromYarnURL(packageUrl)); + } catch (URISyntaxException e) { + LOG.fatal("If you see this error the workarround does not work", e); + numCompletedContainers.incrementAndGet(); + numFailedContainers.incrementAndGet(); + return; + } + + packageResource.setResource(packageUrl); + packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE))); + packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP))); + packageResource.setType(LocalResourceType.FILE); + packageResource.setVisibility(LocalResourceVisibility.APPLICATION); + + localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource); + + Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION)); + URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile + .makeQualified(fs.getUri(), fs.getWorkingDirectory())); + LOG.info("Hama release URL has been composed to " + hamaReleaseUrl + .toString()); + + RemoteIterator fileStatusListIterator = null; + try { + fileStatusListIterator = fs.listFiles( + hamaReleaseFile, true); + + while(fileStatusListIterator.hasNext()) { + LocatedFileStatus lfs = fileStatusListIterator.next(); + LocalResource localRsrc = LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(lfs.getPath()), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + lfs.getLen(), lfs.getModificationTime()); + localResources.put(lfs.getPath().getName(), localRsrc); + } + } catch (IOException e) { + LOG.fatal("The error has occured to RemoteIterator " + e); + } + + ctx.setLocalResources(localResources); + + /* + * TODO Package classpath seems not to work if you're in pseudo distributed + * mode, because the resource must not be moved, it will never be unpacked. + * So we will check if our jar file has the file:// prefix and put it into + * the CP directly + */ + + StringBuilder classPathEnv = new StringBuilder( + ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar) + .append("./*"); + for (String c : conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + classPathEnv.append(File.pathSeparatorChar); + classPathEnv.append(c.trim()); + } + + Vector vargs = new Vector(); + vargs.add("${JAVA_HOME}/bin/java"); + vargs.add("-cp " + classPathEnv + ""); + vargs.add(BSPRunner.class.getCanonicalName()); + + vargs.add(jobId.getJtIdentifier()); + vargs.add(Integer.toString(id)); + vargs.add( + new Path(jobFile).makeQualified(fs.getUri(), fs.getWorkingDirectory()) + .toString()); + + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stderr"); + + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + List commands = new ArrayList(); + commands.add(command.toString()); + + ctx.setCommands(commands); + ctx.setTokens(allTokens.duplicate()); + LOG.info("Starting commands: " + commands); + + containerListener.addContainer(container.getId(), container); + nmClientAsync.startContainerAsync(container, ctx); + } + } + + /** + * Setup the request that will be sent to the RM for the container ask. + * + * @return the setup ResourceRequest to be sent to RM + */ + private AMRMClient.ContainerRequest setupContainerAskForRM() { + // setup requirements for hosts + // using * as any host will do for the distributed shell app + // set the priority for the request + // TODO - what is the range for priority? how to decide? + Priority pri = Priority.newInstance(requestPriority); + + // Set up resource type requirements + // For now, memory and CPU are supported so we set memory and cpu requirements + Resource capability = Resource.newInstance(containerMemory, + containerVirtualCores); + + AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, null, null, + pri); + LOG.info("Requested container ask: " + request.toString()); + return request; + } + + /** + * Reads the configuration from the given path. + */ + private static Configuration getSubmitConfiguration(String path) + throws IOException { + Path jobSubmitPath = new Path(path); + Configuration jobConf = new HamaConfiguration(); + + FileSystem fs = FileSystem.get(URI.create(path), jobConf); + + InputStream in =fs.open(jobSubmitPath); + jobConf.addResource(in); + + return jobConf; + } + + /** + * Gets the application attempt ID from the environment. This should be set by + * YARN when the container has been launched. + * + * @return a new ApplicationAttemptId which is unique and identifies this + * task. + */ + private static ApplicationAttemptId getApplicationAttemptId() + throws IOException { + Map envs = System.getenv(); + if (!envs.containsKey(ApplicationConstants.Environment.CONTAINER_ID.name())) { + throw new IllegalArgumentException( + "ApplicationAttemptId not set in the environment"); + } + + LOG.info("app attempt id!!!"); + ContainerId containerId = ConverterUtils.toContainerId(envs + .get(ApplicationConstants.Environment.CONTAINER_ID.name())); + return containerId.getApplicationAttemptId(); + } + + /** + * This method starts the sync server on a specific port and waits for it to + * come up. Be aware that this method adds the "bsp.sync.server.address" that + * is needed for a task to connect to the service. + * + * @throws IOException + */ + private void startSyncServer() throws Exception { + syncServer = SyncServiceFactory.getSyncServer(jobConf); + syncServer.init(jobConf); + + ZKServerThread serverThread = new ZKServerThread(syncServer); + threadPool.submit(serverThread); + } + + /** + * This method is to run Zookeeper in order to coordinates between BSPMaster and Groomservers + * using Runnable interface in java. + */ + private static class ZKServerThread implements Runnable { + SyncServer server; + + ZKServerThread(SyncServer s) { + server = s; + } + + @Override + public void run() { + try { + server.start(); + } catch (Exception e) { + LOG.error("Error running SyncServer.", e); + } + } + } + + /** + * This method starts the needed RPC servers: client server and the task + * server. This method manipulates the configuration and therefore needs to be + * executed BEFORE the submitconfiguration gets rewritten. + * + * @throws IOException + */ + private void startRPCServers() throws IOException { + // start the RPC server which talks to the client + this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf); + this.clientServer.start(); + + // start the RPC server which talks to the tasks + this.taskServerPort = BSPNetUtils.getFreePort(10000); + this.taskServer = RPC.getServer(this, hostname, taskServerPort, jobConf); + this.taskServer.start(); + + // readjusting the configuration to let the tasks know where we are. + this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort); + } + + /** + * Writes the current configuration to a given path to reflect changes. For + * example the sync server address is put after the file has been written. + */ + private static void rewriteSubmitConfiguration(String path, Configuration conf) + throws IOException { + Path jobSubmitPath = new Path(path); + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream out = fs.create(jobSubmitPath); + conf.writeXml(out); + out.close(); + + LOG.info("Written new configuration back to " + path); + } + + /** + * Get container memory from "bsp.child.mem.in.mb" set on Hama configuration + * @return The memory of container. + */ + private int getMemoryRequirements(Configuration conf) { + String newMemoryProperty = conf.get("bsp.child.mem.in.mb"); + if (newMemoryProperty == null) { + LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts..."); + return getMemoryFromOptString(conf.get("bsp.child.java.opts")); + } else { + return Integer.valueOf(newMemoryProperty); + } + } + + // This really needs a testcase + private static int getMemoryFromOptString(String opts) { + final int DEFAULT_MEMORY_MB = 256; + + if (opts == null) { + return DEFAULT_MEMORY_MB; + } + + if (!opts.contains("-Xmx")) { + LOG.info( + "No \"-Xmx\" option found in child opts, using default amount of memory!"); + return DEFAULT_MEMORY_MB; + } else { + // e.G: -Xmx512m + + int startIndex = opts.indexOf("-Xmx") + 4; + String xmxString = opts.substring(startIndex); + char qualifier = xmxString.charAt(xmxString.length() - 1); + int memory = Integer + .valueOf(xmxString.substring(0, xmxString.length() - 1)); + if (qualifier == 'm') { + return memory; + } else if (qualifier == 'g') { + return memory * 1024; + } else { + throw new IllegalArgumentException( + "Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was: " + + opts); + } + } + } + + @VisibleForTesting + Thread createLaunchContainerThread(Container allocatedContainer) { + LaunchContainerRunnable runnableLaunchContainer = + new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf); + return new Thread(runnableLaunchContainer); + } + + @Override + public LongWritable getCurrentSuperStep() { + return new LongWritable(superstep); + } + + @Override + public Task getTask(TaskAttemptID taskid) throws IOException { + BSPJobClient.RawSplit assignedSplit = null; + String splitName = NullInputFormat.NullInputSplit.class.getName(); + //String splitName = NullInputSplit.class.getCanonicalName(); + if (splits != null) { + assignedSplit = splits[taskid.id]; + splitName = assignedSplit.getClassName(); + return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName, + assignedSplit.getBytes()); + } else { + return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName, + new BytesWritable()); + } + } + + @Override + public boolean ping(TaskAttemptID taskid) throws IOException { + return false; + } + + @Override + public void done(TaskAttemptID taskid) throws IOException { + + } + + @Override + public void fsError(TaskAttemptID taskId, String message) throws IOException { + + } + + @Override + public void fatalError(TaskAttemptID taskId, String message) + throws IOException { + + } + + @Override + public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) + throws IOException, InterruptedException { + if (taskStatus.getSuperstepCount() > superstep) { + superstep = taskStatus.getSuperstepCount(); + LOG.info("Now in superstep " + superstep); + } + + Counters counters = taskStatus.getCounters(); + globalCounter.incrAllCounters(counters); + + return true; + } + + @Override + public int getAssignedPortNum(TaskAttemptID taskid) { + return 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return BSPClient.versionID; + } +} Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1683052) +++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy) @@ -1,483 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp; - -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.URI; -import java.security.PrivilegedAction; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.Records; -import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.hama.Constants; -import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.Job.JobState; -import org.apache.hama.bsp.sync.SyncServer; -import org.apache.hama.bsp.sync.SyncServiceFactory; -import org.apache.hama.ipc.BSPPeerProtocol; -import org.apache.hama.ipc.RPC; -import org.apache.hama.ipc.Server; -import org.apache.hama.util.BSPNetUtils; - - -/** - * BSPApplicationMaster is an application master for Apache Hamas BSP Engine. - */ -public class BSPApplicationMaster implements BSPClient, BSPPeerProtocol { - - private static final Log LOG = LogFactory.getLog(BSPApplicationMaster.class); - private static final ExecutorService threadPool = Executors - .newFixedThreadPool(1); - - private Configuration localConf; - private Configuration jobConf; - private String jobFile; - - private Clock clock; - private YarnRPC yarnRPC; - - private ApplicationMasterProtocol amrmRPC; - - private ApplicationAttemptId appAttemptId; - private String applicationName; - private long startTime; - - private JobImpl job; - private BSPJobID jobId; - - // RPC info where the AM receive client side requests - private String hostname; - private int clientPort; - private int taskServerPort; - - private Server clientServer; - private Server taskServer; - - private volatile long superstep; - //private SyncServerRunner syncServer; - private SyncServer syncServer; - - private Counters globalCounter = new Counters(); - - private FileSystem fs; - private BSPJobClient.RawSplit[] splits; - - private BSPApplicationMaster(String[] args) throws Exception { - if (args.length != 1) { - throw new IllegalArgumentException(); - } - - this.jobFile = args[0]; - - this.jobConf = getSubmitConfiguration(jobFile); - - this.localConf = new YarnConfiguration(); - localConf.addResource(localConf); - fs = FileSystem.get(jobConf); - - this.applicationName = jobConf.get("bsp.job.name", - ""); - if (applicationName.isEmpty()) { - this.applicationName = ""; - } - - this.appAttemptId = getApplicationAttemptId(); - - this.yarnRPC = YarnRPC.create(localConf); - this.clock = new SystemClock(); - this.startTime = clock.getTime(); - - this.jobId = new BSPJobID(appAttemptId.toString(), 0); - - this.hostname = BSPNetUtils.getCanonicalHostname(); - this.clientPort = BSPNetUtils.getFreePort(12000); - - // Set configuration for starting SyncServer which run Zookeeper - this.jobConf.set(Constants.ZOOKEEPER_QUORUM, hostname); - - // start our synchronization service - startSyncServer(); - - startRPCServers(); - - /* - * Make sure that this executes after the start the RPC servers, because we - * are readjusting the configuration. - */ - - rewriteSubmitConfiguration(jobFile, jobConf); - - String jobSplit = jobConf.get("bsp.job.split.file"); - splits = null; - if (jobSplit != null) { - DataInputStream splitFile = fs.open(new Path(jobSplit)); - try { - splits = BSPJobClient.readSplitFile(splitFile); - } finally { - splitFile.close(); - } - } - - this.amrmRPC = getYarnRPCConnection(localConf); - registerApplicationMaster(amrmRPC, hostname, clientPort, - "http://localhost:8080"); - } - - /** - * This method starts the needed RPC servers: client server and the task - * server. This method manipulates the configuration and therefore needs to be - * executed BEFORE the submitconfiguration gets rewritten. - * - * @throws IOException - */ - private void startRPCServers() throws IOException { - // start the RPC server which talks to the client - this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf); - this.clientServer.start(); - - // start the RPC server which talks to the tasks - this.taskServerPort = BSPNetUtils.getFreePort(10000); - this.taskServer = RPC.getServer(this, hostname, taskServerPort, jobConf); - this.taskServer.start(); - - // readjusting the configuration to let the tasks know where we are. - this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort); - } - - /** - * This method starts the sync server on a specific port and waits for it to - * come up. Be aware that this method adds the "bsp.sync.server.address" that - * is needed for a task to connect to the service. - * - * @throws IOException - */ - private void startSyncServer() throws Exception { - syncServer = SyncServiceFactory.getSyncServer(jobConf); - syncServer.init(jobConf); - - ZKServerThread serverThread = new ZKServerThread(syncServer); - threadPool.submit(serverThread); - } - - private static class ZKServerThread implements Runnable { - SyncServer server; - - ZKServerThread(SyncServer s) { - server = s; - } - - @Override - public void run() { - try { - server.start(); - } catch (Exception e) { - LOG.error("Error running SyncServer.", e); - } - } - } - - /** - * Connects to the Resource Manager. - * - * @param yarnConf - * @return a new RPC connection to the Resource Manager. - */ - private ApplicationMasterProtocol getYarnRPCConnection(Configuration yarnConf) throws IOException { - // Connect to the Scheduler of the ResourceManager. - UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(appAttemptId.toString()); - Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); - - final InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); - - Token amRMToken = setupAndReturnAMRMToken(rmAddress, credentials.getAllTokens()); - currentUser.addToken(amRMToken); - - final Configuration conf = yarnConf; - - ApplicationMasterProtocol client = currentUser - .doAs(new PrivilegedAction() { - @Override - public ApplicationMasterProtocol run() { - return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class, rmAddress, conf); - } - }); - LOG.info("Connecting to ResourceManager at " + rmAddress); - return client; - } - - private Token setupAndReturnAMRMToken( - InetSocketAddress rmBindAddress, - Collection> allTokens) { - for (Token token : allTokens) { - if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { - SecurityUtil.setTokenService(token, rmBindAddress); - return token; - } - } - return null; - } - - - /** - * Registers this application master with the Resource Manager and retrieves a - * response which is used to launch additional containers. - */ - private static RegisterApplicationMasterResponse registerApplicationMaster( - ApplicationMasterProtocol resourceManager, String appMasterHostName, int appMasterRpcPort, - String appMasterTrackingUrl) throws YarnException, IOException { - - RegisterApplicationMasterRequest appMasterRequest = Records - .newRecord(RegisterApplicationMasterRequest.class); - appMasterRequest.setHost(appMasterHostName); - appMasterRequest.setRpcPort(appMasterRpcPort); - // TODO tracking URL - appMasterRequest.setTrackingUrl(appMasterTrackingUrl); - RegisterApplicationMasterResponse response = resourceManager - .registerApplicationMaster(appMasterRequest); - LOG.info("ApplicationMaster has maximum resource capability of: " - + response.getMaximumResourceCapability().getMemory()); - return response; - } - - /** - * Gets the application attempt ID from the environment. This should be set by - * YARN when the container has been launched. - * - * @return a new ApplicationAttemptId which is unique and identifies this - * task. - */ - private static ApplicationAttemptId getApplicationAttemptId() - throws IOException { - Map envs = System.getenv(); - if (!envs.containsKey(Environment.CONTAINER_ID.name())) { - throw new IllegalArgumentException( - "ApplicationAttemptId not set in the environment"); - } - - return ConverterUtils.toContainerId( - envs.get(Environment.CONTAINER_ID.name())) - .getApplicationAttemptId(); - } - - private void start() throws IOException, YarnException /*throws Exception*/ { - JobState finalState = null; - try { - job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId); - finalState = job.startJob(); - } catch (Exception e) { - LOG.error("error was occured. cleaning up"); - e.printStackTrace(); - } finally { - if (finalState != null) { - LOG.info("Job \"" + applicationName + "\"'s state after completion: " - + finalState.toString()); - LOG.info("Job took " + ((clock.getTime() - startTime) / 1000L) - + "s to finish!"); - } - LOG.info("job is cleaning up"); - job.cleanup(); - } - } - - private void cleanup() throws YarnException, IOException { - //syncServer.stop(); - syncServer.stopServer(); - - if (threadPool != null && !threadPool.isShutdown()) { - threadPool.shutdownNow(); - } - - clientServer.stop(); - taskServer.stop(); - FinishApplicationMasterRequest finishReq = Records - .newRecord(FinishApplicationMasterRequest.class); - switch (job.getState()) { - case SUCCESS: - finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); - break; - case KILLED: - finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED); - break; - case FAILED: - finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED); - break; - default: - finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED); - } - this.amrmRPC.finishApplicationMaster(finishReq); - } - - public static void main(String[] args) throws YarnException, IOException { - // we expect getting the qualified path of the job.xml as the first - // element in the arguments - BSPApplicationMaster master = null; - try { - master = new BSPApplicationMaster(args); - master.start(); - } catch (Exception e) { - LOG.fatal("Error starting BSPApplicationMaster", e); - } finally { - if (master != null) { - master.cleanup(); - } - } - } - - /** - * Reads the configuration from the given path. - */ - private static Configuration getSubmitConfiguration(String path) - throws IOException { - Path jobSubmitPath = new Path(path); - Configuration jobConf = new HamaConfiguration(); - - FileSystem fs = FileSystem.get(URI.create(path), jobConf); - - InputStream in =fs.open(jobSubmitPath); - jobConf.addResource(in); - - return jobConf; - } - - /** - * Writes the current configuration to a given path to reflect changes. For - * example the sync server address is put after the file has been written. - */ - private static void rewriteSubmitConfiguration(String path, Configuration conf) - throws IOException { - Path jobSubmitPath = new Path(path); - FileSystem fs = FileSystem.get(conf); - FSDataOutputStream out = fs.create(jobSubmitPath); - conf.writeXml(out); - out.close(); - - LOG.info("Written new configuration back to " + path); - } - - @Override - public long getProtocolVersion(String arg0, long arg1) throws IOException { - return BSPClient.versionID; - } - - @Override - public LongWritable getCurrentSuperStep() { - return new LongWritable(superstep); - } - - @Override - public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) - throws IOException, InterruptedException { - if (taskStatus.getSuperstepCount() > superstep) { - superstep = taskStatus.getSuperstepCount(); - LOG.info("Now in superstep " + superstep); - } - - Counters counters = taskStatus.getCounters(); - globalCounter.incrAllCounters(counters); - - return true; - } - - /** - * most of the following methods are already handled over YARN and with the - * JobImpl. - */ - - @Override - public void close() throws IOException { - - } - - @Override - public Task getTask(TaskAttemptID taskid) throws IOException { - BSPJobClient.RawSplit assignedSplit = null; - String splitName = NullInputFormat.NullInputSplit.class.getName(); - //String splitName = NullInputSplit.class.getCanonicalName(); - if (splits != null) { - assignedSplit = splits[taskid.id]; - splitName = assignedSplit.getClassName(); - return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName, - assignedSplit.getBytes()); - } else { - return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName, - new BytesWritable()); - } - } - - @Override - public boolean ping(TaskAttemptID taskid) throws IOException { - return false; - } - - @Override - public void done(TaskAttemptID taskid) throws IOException { - - } - - @Override - public void fsError(TaskAttemptID taskId, String message) throws IOException { - - } - - @Override - public void fatalError(TaskAttemptID taskId, String message) - throws IOException { - - } - - @Override - public int getAssignedPortNum(TaskAttemptID taskid) { - return 0; - } - -} Index: yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (revision 1683052) +++ yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (working copy) @@ -1,285 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp; - -import java.io.File; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.*; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.*; -import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.Records; - -public class BSPTaskLauncher { - - private static final Log LOG = LogFactory.getLog(BSPTaskLauncher.class); - - private final Container allocatedContainer; - private final int id; - private final ContainerManagementProtocol cm; - private final Configuration conf; - private String user; - private final Path jobFile; - private final BSPJobID jobId; - - private GetContainerStatusesRequest statusRequest; - - @Override - protected void finalize() throws Throwable { - stopAndCleanup(); - } - - public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm, - Configuration conf, Path jobFile, BSPJobID jobId) - throws YarnException { - this.id = id; - this.cm = cm; - this.conf = conf; - this.allocatedContainer = container; - this.jobFile = jobFile; - this.jobId = jobId; - // FIXME why does this contain mapreduce here? - this.user = conf.get("bsp.user.name"); - if (this.user == null) { - this.user = conf.get("mapreduce.job.user.name"); - } - } - - public void stopAndCleanup() throws YarnException, IOException { - StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class); - List containerIds = new ArrayList(); - containerIds.add(allocatedContainer.getId()); - LOG.info("getId : " + allocatedContainer.getId()); - stopRequest.setContainerIds(containerIds); - LOG.info("StopContainer : " + stopRequest.getContainerIds()); - cm.stopContainers(stopRequest); - - } - - public void start() throws IOException, YarnException { - LOG.info("Spawned task with id: " + this.id - + " for allocated container id: " - + this.allocatedContainer.getId().toString()); - statusRequest = setupContainer(allocatedContainer, cm, user, id); - } - - /** - * This polls the current container status from container manager. Null if the - * container hasn't finished yet. - * - * @return - * @throws Exception - */ - public BSPTaskStatus poll() throws Exception { - - ContainerStatus lastStatus = null; - GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest); - List containerStatuses = getContainerStatusesResponse.getContainerStatuses(); - if (containerStatuses.size() <= 0) { - LOG.info("container Statuses size is zero"); - return null; - } - - for (ContainerStatus containerStatus : containerStatuses) { - LOG.info("Got container status for containerID=" + containerStatus - .getContainerId() + ", state=" + containerStatus.getState() - + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" - + containerStatus.getDiagnostics()); - - if (containerStatus.getContainerId().equals(allocatedContainer.getId())) { - lastStatus = containerStatus; - break; - } - } - - if (lastStatus.getState() != ContainerState.COMPLETE) { - LOG.info("Not completed..."); - return null; - } - LOG.info(this.id + " Last report comes with exitstatus of " + lastStatus - .getExitStatus() + " and diagnose string of " + lastStatus - .getDiagnostics()); - - return new BSPTaskStatus(id, lastStatus.getExitStatus()); - } - - private GetContainerStatusesRequest setupContainer( - Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException { - LOG.info("Setting up a container for user " + user + " with id of " + id - + " and containerID of " + allocatedContainer.getId() + " as " + user); - // Now we setup a ContainerLaunchContext - ContainerLaunchContext ctx = Records - .newRecord(ContainerLaunchContext.class); - - // Set the local resources - Map localResources = new HashMap(); - LocalResource packageResource = Records.newRecord(LocalResource.class); - FileSystem fs = FileSystem.get(conf); - Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION)); - URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile - .makeQualified(fs.getUri(), fs.getWorkingDirectory())); - LOG.info("PackageURL has been composed to " + packageUrl.toString()); - try { - LOG.info("Reverting packageURL to path: " - + ConverterUtils.getPathFromYarnURL(packageUrl)); - } catch (URISyntaxException e) { - LOG.fatal("If you see this error the workarround does not work", e); - } - - packageResource.setResource(packageUrl); - packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE))); - packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP))); - packageResource.setType(LocalResourceType.FILE); - packageResource.setVisibility(LocalResourceVisibility.APPLICATION); - - localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource); - - Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION)); - URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile - .makeQualified(fs.getUri(), fs.getWorkingDirectory())); - LOG.info("Hama release URL has been composed to " + hamaReleaseUrl.toString()); - - RemoteIterator fileStatusListIterator = fs.listFiles( - hamaReleaseFile, true); - - while(fileStatusListIterator.hasNext()) { - LocatedFileStatus lfs = fileStatusListIterator.next(); - LocalResource localRsrc = LocalResource.newInstance( - ConverterUtils.getYarnUrlFromPath(lfs.getPath()), - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, - lfs.getLen(), lfs.getModificationTime()); - localResources.put(lfs.getPath().getName(), localRsrc); - } - - ctx.setLocalResources(localResources); - - /* - * TODO Package classpath seems not to work if you're in pseudo distributed - * mode, because the resource must not be moved, it will never be unpacked. - * So we will check if our jar file has the file:// prefix and put it into - * the CP directly - */ - - StringBuilder classPathEnv = new StringBuilder( - ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar) - .append("./*"); - for (String c : conf.getStrings( - YarnConfiguration.YARN_APPLICATION_CLASSPATH, - YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { - classPathEnv.append(File.pathSeparatorChar); - classPathEnv.append(c.trim()); - } - - Vector vargs = new Vector(); - vargs.add("${JAVA_HOME}/bin/java"); - vargs.add("-cp " + classPathEnv + ""); - vargs.add(BSPRunner.class.getCanonicalName()); - - vargs.add(jobId.getJtIdentifier()); - vargs.add(Integer.toString(id)); - vargs.add(this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) - .toString()); - - vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stdout"); - vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stderr"); - - // Get final commmand - StringBuilder command = new StringBuilder(); - for (CharSequence str : vargs) { - command.append(str).append(" "); - } - - List commands = new ArrayList(); - commands.add(command.toString()); - - ctx.setCommands(commands); - LOG.info("Starting command: " + commands); - - StartContainerRequest startReq = Records - .newRecord(StartContainerRequest.class); - startReq.setContainerLaunchContext(ctx); - startReq.setContainerToken(allocatedContainer.getContainerToken()); - - List list = new ArrayList(); - list.add(startReq); - StartContainersRequest requestList = StartContainersRequest.newInstance(list); - cm.startContainers(requestList); - - GetContainerStatusesRequest statusReq = Records - .newRecord(GetContainerStatusesRequest.class); - List containerIds = new ArrayList(); - containerIds.add(allocatedContainer.getId()); - statusReq.setContainerIds(containerIds); - return statusReq; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + id; - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - BSPTaskLauncher other = (BSPTaskLauncher) obj; - if (id != other.id) - return false; - return true; - } - - public static class BSPTaskStatus { - private final int id; - private final int exitStatus; - - public BSPTaskStatus(int id, int exitStatus) { - super(); - this.id = id; - this.exitStatus = exitStatus; - } - - public int getId() { - return id; - } - - public int getExitStatus() { - return exitStatus; - } - } - -} Index: yarn/src/main/java/org/apache/hama/bsp/Job.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/Job.java (revision 1683052) +++ yarn/src/main/java/org/apache/hama/bsp/Job.java (working copy) @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp; - -import org.apache.hadoop.yarn.exceptions.YarnException; - -import java.io.IOException; - -/** - * Main interface to interact with the job. Provides only getters. - */ -public interface Job { - - public enum JobState { - NEW, RUNNING, SUCCESS, FAILED, KILLED - } - - public enum BSPPhase { - COMPUTATION, COMMUNICATION - } - - public JobState startJob() throws Exception; - - public void cleanup() throws YarnException, IOException; - - JobState getState(); - - BSPPhase getBSPPhase(); - - int getTotalBSPTasks(); - -} Index: yarn/src/main/java/org/apache/hama/bsp/JobImpl.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (revision 1683052) +++ yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (working copy) @@ -1,324 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * regarding copyright ownership. The ASF licenses this file - * distributed with this work for additional information - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedAction; -import java.util.*; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.client.api.NMTokenCache; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.Records; -import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus; - -public class JobImpl implements Job { - - private static final Log LOG = LogFactory.getLog(JobImpl.class); - private static final int DEFAULT_MEMORY_MB = 256; - - private Configuration conf; - private BSPJobID jobId; - private int numBSPTasks; - private int priority = 0; - private String childOpts; - private int taskMemoryInMb; - private Path jobFile; - - private JobState state; - private BSPPhase phase; - - private ApplicationAttemptId appAttemptId; - private YarnRPC yarnRPC; - private ApplicationMasterProtocol resourceManager; - - private List allocatedContainers; - private List releasedContainers = Collections.emptyList(); - - private Map launchers = new HashMap(); - private Deque completionQueue = new LinkedList(); - - private int lastResponseID = 0; - - private int getMemoryRequirements() { - String newMemoryProperty = conf.get("bsp.child.mem.in.mb"); - if (newMemoryProperty == null) { - LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts..."); - return getMemoryFromOptString(childOpts); - } else { - return Integer.valueOf(newMemoryProperty); - } - } - - public JobImpl(ApplicationAttemptId appAttemptId, - Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC, - String jobFile, BSPJobID jobId) { - super(); - this.appAttemptId = appAttemptId; - this.yarnRPC = yarnRPC; - this.resourceManager = amrmRPC; - this.jobFile = new Path(jobFile); - this.state = JobState.NEW; - this.jobId = jobId; - this.conf = jobConfiguration; - this.numBSPTasks = conf.getInt("bsp.peers.num", 1); - this.childOpts = conf.get("bsp.child.java.opts"); - - this.taskMemoryInMb = getMemoryRequirements(); - } - - // This really needs a testcase - private static int getMemoryFromOptString(String opts) { - if (opts == null) { - return DEFAULT_MEMORY_MB; - } - - if (!opts.contains("-Xmx")) { - LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!"); - return DEFAULT_MEMORY_MB; - } else { - // e.G: -Xmx512m - - int startIndex = opts.indexOf("-Xmx") + 4; - String xmxString = opts.substring(startIndex); - char qualifier = xmxString.charAt(xmxString.length() - 1); - int memory = Integer.valueOf(xmxString.substring(0, - xmxString.length() - 1)); - if (qualifier == 'm') { - return memory; - } else if (qualifier == 'g') { - return memory * 1024; - } else { - throw new IllegalArgumentException( - "Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was: " - + opts); - } - } - } - - @Override - public JobState startJob() throws Exception { - - this.allocatedContainers = new ArrayList(numBSPTasks); - NMTokenCache nmTokenCache = new NMTokenCache(); - while (allocatedContainers.size() < numBSPTasks) { - AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f, - createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb, - priority), releasedContainers, null); - - AllocateResponse allocateResponse = resourceManager.allocate(req); - for (NMToken token : allocateResponse.getNMTokens()) { - nmTokenCache.setToken(token.getNodeId().toString(), token.getToken()); - } - - LOG.info("Got response ID: " + allocateResponse.getResponseId() - + " with num of containers: " - + allocateResponse.getAllocatedContainers().size() - + " and following resources: " - + allocateResponse.getAvailableResources().getMemory() + "mb"); - this.lastResponseID = allocateResponse.getResponseId(); - - this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); - - LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers..."); - - Thread.sleep(1000l); - } - - LOG.info("Got " + allocatedContainers.size() + " containers!"); - - int id = 0; - for (Container allocatedContainer : allocatedContainers) { - LOG.info("Launching task on a new container." + ", containerId=" - + allocatedContainer.getId() + ", containerNode=" - + allocatedContainer.getNodeId().getHost() + ":" - + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" - + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" - + allocatedContainer.getResource().getMemory()); - - // Connect to ContainerManager on the allocated container - String user = conf.get("bsp.user.name"); - if (user == null) { - user = System.getenv(ApplicationConstants.Environment.USER.name()); - } - - ContainerManagementProtocol cm = null; - try { - cm = getContainerManagementProtocolProxy(yarnRPC, - nmTokenCache.getToken(allocatedContainer.getNodeId().toString()), allocatedContainer.getNodeId(), user); - } catch (Exception e) { - LOG.error("Failed to create ContainerManager..."); - if (cm != null) - yarnRPC.stopProxy(cm, conf); - e.printStackTrace(); - } - - BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id, - allocatedContainer, cm, conf, jobFile, jobId); - - launchers.put(id, runnableLaunchContainer); - runnableLaunchContainer.start(); - completionQueue.add(runnableLaunchContainer); - id++; - } - - LOG.info("Waiting for tasks to finish..."); - state = JobState.RUNNING; - int completed = 0; - - while (completed != numBSPTasks) { - for (BSPTaskLauncher task : completionQueue) { - BSPTaskStatus returnedTask = task.poll(); - if(returnedTask != null && returnedTask.getExitStatus() == 0) { - LOG.info("Task \"" + returnedTask.getId() - + "\" sucessfully finished!"); - completed++; - LOG.info("Waiting for " + (numBSPTasks - completed) - + " tasks to finish!"); - } - - if(returnedTask != null && returnedTask.getExitStatus() != 0) { - LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!"); - completionQueue.add(task); - state = JobState.FAILED; - return state; - } - } - Thread.sleep(1000L); - } - - state = JobState.SUCCESS; - return state; - } - - /** - * - * @param rpc - * @param nmToken - * @param nodeId - * @param user - * @return - */ - protected ContainerManagementProtocol getContainerManagementProtocolProxy( - final YarnRPC rpc, Token nmToken, NodeId nodeId, String user) { - ContainerManagementProtocol proxy; - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); - final InetSocketAddress addr = - NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort()); - if (nmToken != null) { - ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr)); - } - - proxy = ugi - .doAs(new PrivilegedAction() { - @Override - public ContainerManagementProtocol run() { - return (ContainerManagementProtocol) rpc.getProxy( - ContainerManagementProtocol.class, - addr, conf); - } - }); - return proxy; - } - - /** - * Makes a lookup for the taskid and stops its container and task. It also - * removes the task from the launcher so that we won't have to stop it twice. - * - * @param id - * @throws YarnException - */ - private void cleanupTask(int id) throws YarnException, IOException { - BSPTaskLauncher bspTaskLauncher = launchers.get(id); - bspTaskLauncher.stopAndCleanup(); - launchers.remove(id); - completionQueue.remove(bspTaskLauncher); - } - - private List createBSPTaskRequest(int numTasks, - int memoryInMb, int priority) { - - List reqList = new ArrayList(numTasks); - for (int i = 0; i < numTasks; i++) { - // Resource Request - ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class); - - // setup requirements for hosts - // whether a particular rack/host is needed - // useful for applications that are sensitive - // to data locality - rsrcRequest.setResourceName("*"); - - // set the priority for the request - Priority pri = Records.newRecord(Priority.class); - pri.setPriority(priority); - rsrcRequest.setPriority(pri); - - // Set up resource type requirements - // For now, only memory is supported so we set memory requirements - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(memoryInMb); - rsrcRequest.setCapability(capability); - - // set no. of containers needed - // matching the specifications - rsrcRequest.setNumContainers(numBSPTasks); - reqList.add(rsrcRequest); - } - return reqList; - } - - @Override - public void cleanup() throws YarnException, IOException { - for (BSPTaskLauncher launcher : completionQueue) { - LOG.info("cleanup tasks !!!"); - launcher.stopAndCleanup(); - } - } - - @Override - public JobState getState() { - return state; - } - - @Override - public int getTotalBSPTasks() { - return numBSPTasks; - } - - @Override - public BSPPhase getBSPPhase() { - return phase; - } - -} Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1683052) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy) @@ -33,10 +33,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -131,7 +130,6 @@ yarnConf = new YarnConfiguration(conf); yarnClient = YarnClient.createYarnClient(); yarnClient.init(yarnConf); - yarnClient.start(); } @Override @@ -155,6 +153,7 @@ LOG.debug("Retrieved username: " + s); } + yarnClient.start(); try { YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); LOG.info("Got Cluster metric info from ASM" @@ -188,14 +187,16 @@ } } - GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class); - GetNewApplicationResponse response = job.getApplicationsManager().getNewApplication(request); - id = response.getApplicationId(); + // Get a new application id + YarnClientApplication app = yarnClient.createApplication(); + // Create a new ApplicationSubmissionContext - ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); - // set the ApplicationId - appContext.setApplicationId(this.id); + //ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); + ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); + + id = appContext.getApplicationId(); + // set the application name appContext.setApplicationName(job.getJobName()); @@ -227,7 +228,11 @@ localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc); // add hama related jar files to localresources for container - List hamaJars = localJarfromPath(System.getProperty("hama.home.dir")); + List hamaJars; + if (System.getProperty("hama.home.dir") != null) + hamaJars = localJarfromPath(System.getProperty("hama.home.dir")); + else + hamaJars = localJarfromPath(getConf().get("hama.home.dir")); String hamaPath = getSystemDir() + "/hama"; for (File fileEntry : hamaJars) { addToLocalResources(fs, fileEntry.getCanonicalPath(), @@ -266,7 +271,7 @@ Vector vargs = new Vector(5); vargs.add("${JAVA_HOME}/bin/java"); vargs.add("-cp " + classPathEnv + ""); - vargs.add(BSPApplicationMaster.class.getCanonicalName()); + vargs.add(ApplicationMaster.class.getCanonicalName()); vargs.add(submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString()); vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-appmaster.stdout"); Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1683052) +++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy) @@ -74,12 +74,13 @@ } } - //fs.delete(OUTPUT_PATH, true); + fs.delete(OUTPUT_PATH, true); } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { HamaConfiguration conf = new HamaConfiguration(); + conf.set("hama.home.dir", System.getenv().get("HAMA_HOME")); YARNBSPJob job = new YARNBSPJob(conf); job.setBspClass(HelloBSP.class);