diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
index d91c67b..a9b6952 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
@@ -97,6 +97,11 @@
org.apache.hadoop
hadoop-yarn-client
+
+
+ org.apache.hadoop
+ hadoop-yarn-registry
+
org.apache.hadoop
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index df9f34b..90a474c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -57,6 +57,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -93,6 +94,13 @@
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.registry.client.api.RegistryConstants;
+import org.apache.hadoop.yarn.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.yarn.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.yarn.registry.client.services.RegistryOperationsService;
+import org.apache.hadoop.yarn.registry.client.api.CreateFlags;
+import org.apache.hadoop.yarn.registry.client.types.PersistencePolicies;
+import org.apache.hadoop.yarn.registry.client.types.ServiceRecord;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.LogManager;
@@ -270,6 +278,9 @@
private final String linux_bash_command = "bash";
private final String windows_command = "cmd /c";
+ @VisibleForTesting
+ RegistryOperationsService registryOperations;
+
/**
* @param args Command line args
*/
@@ -570,6 +581,50 @@ public void run() throws YarnException, IOException {
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
+
+ // Register with the YARN registry if it is enabled
+ boolean registryEnabled =
+ conf.getBoolean(RegistryConstants.KEY_REGISTRY_ENABLED,
+ RegistryConstants.DEFAULT_REGISTRY_ENABLED);
+ if (registryEnabled) {
+ LOG.info("Registering Service");
+ registryOperations = new RegistryOperationsService();
+ registryOperations.init(conf);
+ registryOperations.start();
+ ServiceRecord serviceRecord = new ServiceRecord();
+ String attemptID = this.appAttemptID.toString();
+ String appId = this.appAttemptID.getApplicationId().toString();
+
+ serviceRecord.yarn_id = attemptID;
+ serviceRecord.yarn_persistence = PersistencePolicies.APPLICATION_ATTEMPT;
+ serviceRecord.description = "Distributed Shell";
+ // if this service offered external RPC/Web access, they
+ // can be added to the service record
+
+ String username = RegistryUtils.currentUser();
+ String serviceClass = DSConstants.SERVICE_CLASS_DISTRIBUTED_SHELL;
+ String serviceName = RegistryPathUtils.encodeYarnID(appId);
+ String path =
+ RegistryUtils.servicePath(username, serviceClass,
+ serviceName);
+ registryOperations.mknode(RegistryPathUtils.parentOf(path), true);
+ // app attempt entry
+ registryOperations.create(path + "-attempt", serviceRecord,
+ CreateFlags.OVERWRITE);
+ LOG.info("Registered " + serviceRecord + " at " + path );
+
+ serviceRecord.yarn_id = appId;
+ serviceRecord.yarn_persistence = PersistencePolicies.APPLICATION;
+ registryOperations.create(path + "-app", serviceRecord,
+ CreateFlags.OVERWRITE);
+
+ // register one that is not deleted
+ serviceRecord.yarn_id = "";
+ serviceRecord.yarn_persistence = PersistencePolicies.PERMANENT;
+ registryOperations.create(path + "-permanent", serviceRecord,
+ CreateFlags.OVERWRITE);
+ }
+
// Dump out information about cluster capability as seen by the
// resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
@@ -680,6 +735,8 @@ protected boolean finish() {
}
amRMClient.stop();
+ ServiceOperations.stop(registryOperations);
+ ServiceOperations.stop(timelineClient);
return success;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index f3ce64c..7df1fc8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -670,7 +670,7 @@ public boolean run() throws IOException, YarnException {
* @throws YarnException
* @throws IOException
*/
- private boolean monitorApplication(ApplicationId appId)
+ protected boolean monitorApplication(ApplicationId appId)
throws YarnException, IOException {
while (true) {
@@ -720,7 +720,7 @@ else if (YarnApplicationState.KILLED == state
return false;
}
- if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
+ if (timedOut()) {
LOG.info("Reached client specified timeout for application. Killing application");
forceKillApplication(appId);
return false;
@@ -730,12 +730,46 @@ else if (YarnApplicationState.KILLED == state
}
/**
+ * Get the YARN client
+ * @return the client
+ */
+ public YarnClient getYarnClient() {
+ return yarnClient;
+ }
+
+ /**
+ * Get the client's configuration
+ * @return the configuration
+ */
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public long getClientTimeout() {
+ return clientTimeout;
+ }
+
+ public void setClientTimeout(long clientTimeout) {
+ this.clientTimeout = clientTimeout;
+ }
+
+ /**
+ * Query the clock and timeout settings to decide
+ * whether or not the current run has timed ut
+ * @return true if the client's monitorApplication() operation
+ * has taken too long.
+ */
+ protected boolean timedOut() {
+ return System.currentTimeMillis() > (clientStartTime + clientTimeout);
+ }
+
+ /**
* Kill a submitted application by sending a call to the ASM
* @param appId Application Id to be killed.
* @throws YarnException
* @throws IOException
*/
- private void forceKillApplication(ApplicationId appId)
+ protected void forceKillApplication(ApplicationId appId)
throws YarnException, IOException {
// TODO clarify whether multiple jobs with the same app id can be submitted and be running at
// the same time.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java
index 5912f14..67f9a77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java
@@ -44,4 +44,10 @@
* Used to validate the local resource.
*/
public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN";
+
+ /**
+ * Service class used when registering the service
+ */
+ public static final String SERVICE_CLASS_DISTRIBUTED_SHELL =
+ "org-apache-hadoop-distributedshell";
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 6dff94c..a1ff237 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -38,14 +38,23 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.registry.client.api.RegistryConstants;
+import org.apache.hadoop.yarn.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.yarn.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.yarn.registry.client.services.RegistryOperationsService;
+import org.apache.hadoop.yarn.registry.client.types.ServiceRecord;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
@@ -69,14 +78,16 @@
@Before
public void setup() throws Exception {
LOG.info("Starting up YARN cluster");
+ conf.setBoolean(RegistryConstants.KEY_REGISTRY_ENABLED, true);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
conf.set("yarn.log.dir", "target");
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
if (yarnCluster == null) {
+ // create a minicluster with the registry enabled
yarnCluster = new MiniYARNCluster(
- TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true);
+ TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true, true);
yarnCluster.init(conf);
yarnCluster.start();
NodeManager nm = yarnCluster.getNodeManager(0);
@@ -305,7 +316,7 @@ public void testDSRestartWithPreviousRunningContainers() throws Exception {
LOG.info("Client run completed. Result=" + result);
// application should succeed
- Assert.assertTrue(result);
+ Assert.assertTrue("client failed", result);
}
/*
@@ -846,5 +857,242 @@ private int verifyContainerLog(int containerNum,
}
return numOfWords;
}
+
+ @Test(timeout = 90000)
+ public void testRegistryOperations() throws Exception {
+
+ // create a client config with an aggressive timeout policy
+ Configuration clientConf = new Configuration(yarnCluster.getConfig());
+ clientConf.setInt(RegistryConstants.KEY_REGISTRY_ZK_CONNECTION_TIMEOUT, 1000);
+ clientConf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_TIMES, 1);
+ clientConf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_CEILING, 1);
+ clientConf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_INTERVAL, 500);
+ clientConf.setInt(RegistryConstants.KEY_REGISTRY_ZK_SESSION_TIMEOUT, 2000);
+
+ // create a registry operations instance
+ RegistryOperationsService regOps = new RegistryOperationsService();
+ regOps.init(clientConf);
+ regOps.start();
+ LOG.info("Registry Binding: " + regOps);
+
+ // do a simple registry operation to verify that it is live
+ regOps.list("/");
+ // check the system dir is present
+ regOps.list(RegistryConstants.PATH_SYSTEM_SERVICES);
+ // check the users dir is present
+ regOps.list(RegistryConstants.PATH_USERS);
+
+ try {
+ String[] args = {
+ "--jar",
+ APPMASTER_JAR,
+ "--num_containers",
+ "1",
+ "--shell_command",
+ "sleep 15",
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128",
+ };
+
+ LOG.info("Initializing DS Client");
+ RegistryMonitoringClient client =
+ new RegistryMonitoringClient(clientConf);
+
+ client.init(args);
+ LOG.info("Running DS Client");
+ boolean result;
+ try {
+ result = client.run();
+ } finally {
+ client.stop();
+ }
+
+ LOG.info("Client run completed. Result=" + result);
+
+ // application should have found service records
+ ServiceRecord serviceRecord = client.appAttemptRecord;
+ LOG.info("Service record = " + serviceRecord);
+ IOException lookupException =
+ client.lookupException;
+ if (serviceRecord == null && lookupException != null) {
+ LOG.error("Lookup of " + client.servicePath
+ + " failed with " + lookupException, lookupException);
+ throw lookupException;
+ }
+
+ // the app should have succeeded or returned a failure message
+ Assert.assertTrue("run returned false:" + client.failureText, result);
+
+ // the app-level record must have been retrieved
+ Assert.assertNotNull("No application record at " + client.appRecordPath,
+ client.appRecord);
+
+ // sleep to let some async operations in the RM continue
+ Thread.sleep(10000);
+ // after the app finishes its records should have been purged
+ assertDeleted(regOps, client.appRecordPath);
+ assertDeleted(regOps, client.servicePath);
+ } finally {
+ ServiceOperations.stop(regOps);
+ }
+ }
+
+ protected void assertDeleted(RegistryOperationsService regOps,
+ String path) throws IOException {
+ try {
+ ServiceRecord record = regOps.resolve(path);
+ Assert.fail("Expected the record at " + path + " to have been purged,"
+ + " but found " + record);
+ } catch (PathNotFoundException expected) {
+ // expected
+ }
+ }
+
+ /**
+ * This is a subclass of the distributed shell client which
+ * monitors the registry as well as the YARN app status
+ */
+ private class RegistryMonitoringClient extends Client {
+ private String servicePath;
+ private ServiceRecord permanentRecord;
+ private String permanentPath;
+ private IOException lookupException;
+ private ServiceRecord appAttemptRecord;
+ private String appAttemptPath;
+ private ServiceRecord appRecord;
+ private String appRecordPath;
+ private String failureText;
+ private ApplicationReport report;
+ private final RegistryOperationsService regOps;
+
+ private RegistryMonitoringClient(Configuration conf) throws Exception {
+ super(conf);
+ // client timeout of 30s for the test runs
+ setClientTimeout(30000);
+ regOps = new RegistryOperationsService();
+ regOps.init(getConf());
+ regOps.start();
+ }
+
+ public void stop() {
+ ServiceOperations.stopQuietly(regOps);
+ }
+
+
+ @Override
+ protected boolean monitorApplication(ApplicationId appId)
+ throws YarnException, IOException {
+
+ String username = RegistryUtils.currentUser();
+ String serviceClass = DSConstants.SERVICE_CLASS_DISTRIBUTED_SHELL;
+ String serviceName = RegistryPathUtils.encodeYarnID(appId.toString());
+ servicePath =
+ RegistryUtils.servicePath(username, serviceClass,
+ serviceName);
+ appAttemptPath = servicePath + "-attempt";
+ appRecordPath = servicePath + "-app";
+ permanentPath = servicePath + "-permanent";
+ YarnClient yarnClient = getYarnClient();
+
+ while (!timedOut()) {
+ // Check app status every 1 second.
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ LOG.debug("Thread sleep in monitoring loop interrupted");
+ }
+
+ // Get application report for the appId we are interested in
+ report = yarnClient.getApplicationReport(appId);
+
+ switch (report.getYarnApplicationState()) {
+ case NEW:
+ case NEW_SAVING:
+ case SUBMITTED:
+ case ACCEPTED:
+ continue;
+
+ // running, extract service records if not already done
+ case RUNNING:
+ try {
+ permanentRecord = maybeResolve(permanentRecord, permanentPath);
+ // succesfull lookup, so discard any failure
+ lookupException = null;
+ } catch (PathNotFoundException e) {
+ lookupException = e;
+ }
+ appRecord = maybeResolveQuietly(appRecord, appRecordPath);
+ appAttemptRecord = maybeResolveQuietly(appAttemptRecord,
+ appAttemptPath);
+ continue;
+
+ case FINISHED:
+ // completed
+ boolean read = permanentRecord != null;
+ if (!read) {
+ failureText = "Permanent record was not resolved";
+ }
+ return read;
+
+ case KILLED:
+ failureText = "Application Killed: " + report.getDiagnostics();
+ return false;
+
+ case FAILED:
+ failureText = "Application Failed: " + report.getDiagnostics();
+ return false;
+
+ default:
+ break;
+ }
+
+ }
+ if (timedOut()) {
+ failureText = "Timed out: Killing application";
+ forceKillApplication(appId);
+ }
+ return false;
+ }
+
+ /**
+ * Resolve a record if it has not been resolved already
+ * @param r record
+ * @param path path
+ * @return r if it was non null, else the resolved record
+ * @throws IOException on any failure
+ */
+ ServiceRecord maybeResolve(ServiceRecord r, String path) throws IOException {
+ if (r == null) {
+ ServiceRecord record = regOps.resolve(path);
+ LOG.info("Resolved at " + path + ": " + record);
+ return record;
+ }
+ return r;
+ }
+
+ /**
+ * Resolve a record if it has not been resolved already —ignoring
+ * any PathNotFoundException exceptions.
+ * @param r record
+ * @param path path
+ * @return r if it was non null, a resolved record if it was found,
+ * or null if the resolution failed with a PathNotFoundException
+ * @throws IOException on any failure
+ */
+ ServiceRecord maybeResolveQuietly(ServiceRecord r, String path) throws
+ IOException {
+ try {
+ return maybeResolve(r, path);
+ } catch (PathNotFoundException ignored) {
+ // ignored
+ }
+ return r;
+ }
+
+ } // end of class RegistryMonitoringClient
+
+
}