diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml index d91c67b..a9b6952 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -97,6 +97,11 @@ org.apache.hadoop hadoop-yarn-client + + + org.apache.hadoop + hadoop-yarn-registry + org.apache.hadoop diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index df9f34b..90a474c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -57,6 +57,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -93,6 +94,13 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.registry.client.api.RegistryConstants; +import org.apache.hadoop.yarn.registry.client.binding.RegistryUtils; +import org.apache.hadoop.yarn.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.yarn.registry.client.services.RegistryOperationsService; +import org.apache.hadoop.yarn.registry.client.api.CreateFlags; +import org.apache.hadoop.yarn.registry.client.types.PersistencePolicies; +import org.apache.hadoop.yarn.registry.client.types.ServiceRecord; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.LogManager; @@ -270,6 +278,9 @@ private final String linux_bash_command = "bash"; private final String windows_command = "cmd /c"; + @VisibleForTesting + RegistryOperationsService registryOperations; + /** * @param args Command line args */ @@ -570,6 +581,50 @@ public void run() throws YarnException, IOException { RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl); + + // Register with the YARN registry if it is enabled + boolean registryEnabled = + conf.getBoolean(RegistryConstants.KEY_REGISTRY_ENABLED, + RegistryConstants.DEFAULT_REGISTRY_ENABLED); + if (registryEnabled) { + LOG.info("Registering Service"); + registryOperations = new RegistryOperationsService(); + registryOperations.init(conf); + registryOperations.start(); + ServiceRecord serviceRecord = new ServiceRecord(); + String attemptID = this.appAttemptID.toString(); + String appId = this.appAttemptID.getApplicationId().toString(); + + serviceRecord.yarn_id = attemptID; + serviceRecord.yarn_persistence = PersistencePolicies.APPLICATION_ATTEMPT; + serviceRecord.description = "Distributed Shell"; + // if this service offered external RPC/Web access, they + // can be added to the service record + + String username = RegistryUtils.currentUser(); + String serviceClass = DSConstants.SERVICE_CLASS_DISTRIBUTED_SHELL; + String serviceName = RegistryPathUtils.encodeYarnID(appId); + String path = + RegistryUtils.servicePath(username, serviceClass, + serviceName); + registryOperations.mknode(RegistryPathUtils.parentOf(path), true); + // app attempt entry + registryOperations.create(path + "-attempt", serviceRecord, + CreateFlags.OVERWRITE); + LOG.info("Registered " + serviceRecord + " at " + path ); + + serviceRecord.yarn_id = appId; + serviceRecord.yarn_persistence = PersistencePolicies.APPLICATION; + registryOperations.create(path + "-app", serviceRecord, + CreateFlags.OVERWRITE); + + // register one that is not deleted + serviceRecord.yarn_id = ""; + serviceRecord.yarn_persistence = PersistencePolicies.PERMANENT; + registryOperations.create(path + "-permanent", serviceRecord, + CreateFlags.OVERWRITE); + } + // Dump out information about cluster capability as seen by the // resource manager int maxMem = response.getMaximumResourceCapability().getMemory(); @@ -680,6 +735,8 @@ protected boolean finish() { } amRMClient.stop(); + ServiceOperations.stop(registryOperations); + ServiceOperations.stop(timelineClient); return success; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index f3ce64c..7df1fc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -670,7 +670,7 @@ public boolean run() throws IOException, YarnException { * @throws YarnException * @throws IOException */ - private boolean monitorApplication(ApplicationId appId) + protected boolean monitorApplication(ApplicationId appId) throws YarnException, IOException { while (true) { @@ -720,7 +720,7 @@ else if (YarnApplicationState.KILLED == state return false; } - if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { + if (timedOut()) { LOG.info("Reached client specified timeout for application. Killing application"); forceKillApplication(appId); return false; @@ -730,12 +730,46 @@ else if (YarnApplicationState.KILLED == state } /** + * Get the YARN client + * @return the client + */ + public YarnClient getYarnClient() { + return yarnClient; + } + + /** + * Get the client's configuration + * @return the configuration + */ + public Configuration getConf() { + return conf; + } + + public long getClientTimeout() { + return clientTimeout; + } + + public void setClientTimeout(long clientTimeout) { + this.clientTimeout = clientTimeout; + } + + /** + * Query the clock and timeout settings to decide + * whether or not the current run has timed ut + * @return true if the client's monitorApplication() operation + * has taken too long. + */ + protected boolean timedOut() { + return System.currentTimeMillis() > (clientStartTime + clientTimeout); + } + + /** * Kill a submitted application by sending a call to the ASM * @param appId Application Id to be killed. * @throws YarnException * @throws IOException */ - private void forceKillApplication(ApplicationId appId) + protected void forceKillApplication(ApplicationId appId) throws YarnException, IOException { // TODO clarify whether multiple jobs with the same app id can be submitted and be running at // the same time. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java index 5912f14..67f9a77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java @@ -44,4 +44,10 @@ * Used to validate the local resource. */ public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN"; + + /** + * Service class used when registering the service + */ + public static final String SERVICE_CLASS_DISTRIBUTED_SHELL = + "org-apache-hadoop-distributedshell"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 6dff94c..a1ff237 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -38,14 +38,23 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathNotFoundException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.registry.client.api.RegistryConstants; +import org.apache.hadoop.yarn.registry.client.binding.RegistryUtils; +import org.apache.hadoop.yarn.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.yarn.registry.client.services.RegistryOperationsService; +import org.apache.hadoop.yarn.registry.client.types.ServiceRecord; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; @@ -69,14 +78,16 @@ @Before public void setup() throws Exception { LOG.info("Starting up YARN cluster"); + conf.setBoolean(RegistryConstants.KEY_REGISTRY_ENABLED, true); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); conf.set("yarn.log.dir", "target"); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); if (yarnCluster == null) { + // create a minicluster with the registry enabled yarnCluster = new MiniYARNCluster( - TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true); + TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true, true); yarnCluster.init(conf); yarnCluster.start(); NodeManager nm = yarnCluster.getNodeManager(0); @@ -305,7 +316,7 @@ public void testDSRestartWithPreviousRunningContainers() throws Exception { LOG.info("Client run completed. Result=" + result); // application should succeed - Assert.assertTrue(result); + Assert.assertTrue("client failed", result); } /* @@ -846,5 +857,242 @@ private int verifyContainerLog(int containerNum, } return numOfWords; } + + @Test(timeout = 90000) + public void testRegistryOperations() throws Exception { + + // create a client config with an aggressive timeout policy + Configuration clientConf = new Configuration(yarnCluster.getConfig()); + clientConf.setInt(RegistryConstants.KEY_REGISTRY_ZK_CONNECTION_TIMEOUT, 1000); + clientConf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_TIMES, 1); + clientConf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_CEILING, 1); + clientConf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_INTERVAL, 500); + clientConf.setInt(RegistryConstants.KEY_REGISTRY_ZK_SESSION_TIMEOUT, 2000); + + // create a registry operations instance + RegistryOperationsService regOps = new RegistryOperationsService(); + regOps.init(clientConf); + regOps.start(); + LOG.info("Registry Binding: " + regOps); + + // do a simple registry operation to verify that it is live + regOps.list("/"); + // check the system dir is present + regOps.list(RegistryConstants.PATH_SYSTEM_SERVICES); + // check the users dir is present + regOps.list(RegistryConstants.PATH_USERS); + + try { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + "sleep 15", + "--master_memory", + "512", + "--container_memory", + "128", + }; + + LOG.info("Initializing DS Client"); + RegistryMonitoringClient client = + new RegistryMonitoringClient(clientConf); + + client.init(args); + LOG.info("Running DS Client"); + boolean result; + try { + result = client.run(); + } finally { + client.stop(); + } + + LOG.info("Client run completed. Result=" + result); + + // application should have found service records + ServiceRecord serviceRecord = client.appAttemptRecord; + LOG.info("Service record = " + serviceRecord); + IOException lookupException = + client.lookupException; + if (serviceRecord == null && lookupException != null) { + LOG.error("Lookup of " + client.servicePath + + " failed with " + lookupException, lookupException); + throw lookupException; + } + + // the app should have succeeded or returned a failure message + Assert.assertTrue("run returned false:" + client.failureText, result); + + // the app-level record must have been retrieved + Assert.assertNotNull("No application record at " + client.appRecordPath, + client.appRecord); + + // sleep to let some async operations in the RM continue + Thread.sleep(10000); + // after the app finishes its records should have been purged + assertDeleted(regOps, client.appRecordPath); + assertDeleted(regOps, client.servicePath); + } finally { + ServiceOperations.stop(regOps); + } + } + + protected void assertDeleted(RegistryOperationsService regOps, + String path) throws IOException { + try { + ServiceRecord record = regOps.resolve(path); + Assert.fail("Expected the record at " + path + " to have been purged," + + " but found " + record); + } catch (PathNotFoundException expected) { + // expected + } + } + + /** + * This is a subclass of the distributed shell client which + * monitors the registry as well as the YARN app status + */ + private class RegistryMonitoringClient extends Client { + private String servicePath; + private ServiceRecord permanentRecord; + private String permanentPath; + private IOException lookupException; + private ServiceRecord appAttemptRecord; + private String appAttemptPath; + private ServiceRecord appRecord; + private String appRecordPath; + private String failureText; + private ApplicationReport report; + private final RegistryOperationsService regOps; + + private RegistryMonitoringClient(Configuration conf) throws Exception { + super(conf); + // client timeout of 30s for the test runs + setClientTimeout(30000); + regOps = new RegistryOperationsService(); + regOps.init(getConf()); + regOps.start(); + } + + public void stop() { + ServiceOperations.stopQuietly(regOps); + } + + + @Override + protected boolean monitorApplication(ApplicationId appId) + throws YarnException, IOException { + + String username = RegistryUtils.currentUser(); + String serviceClass = DSConstants.SERVICE_CLASS_DISTRIBUTED_SHELL; + String serviceName = RegistryPathUtils.encodeYarnID(appId.toString()); + servicePath = + RegistryUtils.servicePath(username, serviceClass, + serviceName); + appAttemptPath = servicePath + "-attempt"; + appRecordPath = servicePath + "-app"; + permanentPath = servicePath + "-permanent"; + YarnClient yarnClient = getYarnClient(); + + while (!timedOut()) { + // Check app status every 1 second. + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + + // Get application report for the appId we are interested in + report = yarnClient.getApplicationReport(appId); + + switch (report.getYarnApplicationState()) { + case NEW: + case NEW_SAVING: + case SUBMITTED: + case ACCEPTED: + continue; + + // running, extract service records if not already done + case RUNNING: + try { + permanentRecord = maybeResolve(permanentRecord, permanentPath); + // succesfull lookup, so discard any failure + lookupException = null; + } catch (PathNotFoundException e) { + lookupException = e; + } + appRecord = maybeResolveQuietly(appRecord, appRecordPath); + appAttemptRecord = maybeResolveQuietly(appAttemptRecord, + appAttemptPath); + continue; + + case FINISHED: + // completed + boolean read = permanentRecord != null; + if (!read) { + failureText = "Permanent record was not resolved"; + } + return read; + + case KILLED: + failureText = "Application Killed: " + report.getDiagnostics(); + return false; + + case FAILED: + failureText = "Application Failed: " + report.getDiagnostics(); + return false; + + default: + break; + } + + } + if (timedOut()) { + failureText = "Timed out: Killing application"; + forceKillApplication(appId); + } + return false; + } + + /** + * Resolve a record if it has not been resolved already + * @param r record + * @param path path + * @return r if it was non null, else the resolved record + * @throws IOException on any failure + */ + ServiceRecord maybeResolve(ServiceRecord r, String path) throws IOException { + if (r == null) { + ServiceRecord record = regOps.resolve(path); + LOG.info("Resolved at " + path + ": " + record); + return record; + } + return r; + } + + /** + * Resolve a record if it has not been resolved already —ignoring + * any PathNotFoundException exceptions. + * @param r record + * @param path path + * @return r if it was non null, a resolved record if it was found, + * or null if the resolution failed with a PathNotFoundException + * @throws IOException on any failure + */ + ServiceRecord maybeResolveQuietly(ServiceRecord r, String path) throws + IOException { + try { + return maybeResolve(r, path); + } catch (PathNotFoundException ignored) { + // ignored + } + return r; + } + + } // end of class RegistryMonitoringClient + + }