diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/SingleUAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/SingleUAM.java new file mode 100644 index 0000000..333d366 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/SingleUAM.java @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.applications.unmanagedamlauncher; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.client.api.*; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.Records; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * This class is responsible for one user + */ +@InterfaceAudience.Private +class SingleUAM implements Configurable, AMRMClientAsync.CallbackHandler { + private static Logger LOG = LoggerFactory.getLogger(SingleUAM.class); + + private Configuration conf; + private Configuration yarnConf; + private UserGroupInformation ugi; + private YarnClient yarnClient; + private NMClient nmClient; + private AMRMClientAsync amRmClientAsync; + private ApplicationId appId; + + private int numActiveContainers = 0; + private BlockingQueue allocatedContainers = new LinkedBlockingDeque(); + private long lastUsedTimeInMillis = 0; + + public SingleUAM(UserGroupInformation ugi, Configuration conf) { + this.ugi = ugi; + this.conf = conf; + this.yarnConf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf); + this.lastUsedTimeInMillis = System.currentTimeMillis(); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.yarnConf = new YarnConfiguration(); + + for (Map.Entry entry : getConf()) { + yarnConf.set((String) entry.getKey(), (String) entry.getValue()); + } + } + + @Override + public Configuration getConf() { + return conf; + } + + public void start() { + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(yarnConf); + yarnClient.start(); + LOG.info("Yarn client started at " + yarnClient.getStartTime()); + return null; + } + }); + } catch (Throwable ex) { + String msg = "Unable to create YarnClient for " + ugi.getUserName(); + fatalException(msg, ex); + } + } + + public void stop() { + unregister(); + ugi.doAs(new PrivilegedAction() { + @Override + public Void run() { + if (yarnClient != null) { + yarnClient.stop(); + yarnClient = null; + } + return null; + } + }); + if (nmClient != null) { + nmClient.stop(); + } + } + + @SuppressWarnings("unchecked") + public void register(final String queue) { + try { + Token amRmToken = ugi.doAs( + new PrivilegedExceptionAction>() { + @Override + public Token run() throws Exception { + return initYarnApp(queue); + } + }); + ugi.addToken(amRmToken); + // we need to use a new doAs block after adding the AMRM token because + // the UGI credentials are copied on doAs() invocation and changes won't + // be reflected. + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + _registerSchedulerAndCreateNMClient(queue); + return null; + } + }); + } catch (Exception ex) { + LOG.error("Unable to register AM with the RM for {0} ", appId, ex); + } + } + + private Token initYarnApp(String queue) throws Exception { + appId = createAndSubmitApp(yarnClient, queue); + _monitorAppState(yarnClient, appId, ACCEPTED, false); + LOG.debug("Created Application, AM '{0}' for '{1}' queue", appId, queue); + Token token = yarnClient.getAMRMToken(appId); + int counter = 0; + while (token == null && counter < 10) { + Thread.sleep(200); + token = yarnClient.getAMRMToken(appId); + counter++; + } + return token; + } + + private void _registerSchedulerAndCreateNMClient(String queue) throws Exception { + NMTokenCache nmTokenCache = new NMTokenCache(); + nmClient = NMClient.createNMClient(); + nmClient.setNMTokenCache(nmTokenCache); + nmClient.init(yarnConf); + nmClient.start(); + LOG.debug("Started NMClient, AM '{0}' with scheduler for '{1}' queue", appId, queue); + int heartbeatInterval = 200; //TODO: Add a constant or config + AMRMClient amRmClient = AMRMClient.createAMRMClient(); + amRmClient.setNMTokenCache(nmTokenCache); + amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, + heartbeatInterval, SingleUAM.this); + amRmClientAsync.init(yarnConf); + amRmClientAsync.start(); + + // TODO: Very poor parameters being passsed to registerApplicationMaster + amRmClientAsync.registerApplicationMaster("", 0, ""); + LOG.debug("Registered with scheduler, AM '{0}' for '{1}' queue", appId, queue); + } + + private ApplicationId createAndSubmitApp(YarnClient rmClient, String queue) { + try { + // Create application + YarnClientApplication newApp = rmClient.createApplication(); + ApplicationId appId = newApp.getNewApplicationResponse(). + getApplicationId(); + + // Create launch context for app master + ApplicationSubmissionContext appContext = Records.newRecord( + ApplicationSubmissionContext.class); + + // set the application id + appContext.setApplicationId(appId); + + // set the application name + appContext.setApplicationName("OozieServer"); + + appContext.setApplicationType("OozieServer"); + + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + int priority = 0; // TODO: Add a constant or a config + pri.setPriority(priority); + appContext.setPriority(pri); + + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue(queue); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records.newRecord( + ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + + // unmanaged AM + appContext.setUnmanagedAM(true); + + // setting max attempts to 1 to avoid warning from Yarn RM + // as the AM is unmanaged, it doesn't really matter. + appContext.setMaxAppAttempts(1); + + // Submit the application to the applications manager + return rmClient.submitApplication(appContext); + } catch (Exception ex) { + fatalException("Error creating/submitting a new application", ex); + return null; + } + } + + private static final Set ACCEPTED = EnumSet.of + (YarnApplicationState.ACCEPTED); + + private static final Set STOPPED = EnumSet.of( + YarnApplicationState.KILLED, YarnApplicationState.FAILED, + YarnApplicationState.FINISHED); + + private ApplicationReport _monitorAppState(YarnClient rmClient, + ApplicationId appId, Set states, + boolean calledFromStopped) throws Exception { + try { + long timeout = 30 * 1000; // TODO: Add a constant or config + + long polling = 300; // TODO: Add a constant or config + + long start = System.currentTimeMillis(); + ApplicationReport report = rmClient.getApplicationReport(appId); + while (!states.contains(report.getYarnApplicationState())) { + if (System.currentTimeMillis() - start > timeout) { + fatalException("Error starting or stopping the AM for " + appId, null); + } + Thread.sleep(polling); + report = rmClient.getApplicationReport(appId); + } + return report; + } catch (Exception ex) { + if (!calledFromStopped) { + _stop(FinalApplicationStatus.FAILED, "Could not start, error: " + ex); + } + throw ex; + } + } + + public void unregister() { + ugi.doAs(new PrivilegedAction() { + @Override + public Void run() { + _stop(FinalApplicationStatus.SUCCEEDED, "Stopped by AM"); + return null; + } + }); + } + + private synchronized void _stop(FinalApplicationStatus status, String msg) { + if (amRmClientAsync != null) { + LOG.debug("Stopping AM '{}'", appId); + try { + amRmClientAsync.unregisterApplicationMaster(status, msg, ""); + } catch (Exception ex) { + LOG.warn("Error un-registering AM client, " + ex, ex); + } + amRmClientAsync.stop(); + amRmClientAsync = null; + } + + if (yarnClient != null) { + try { + ApplicationReport report = _monitorAppState(yarnClient, appId, STOPPED, + true); + if (report.getFinalApplicationStatus() + != FinalApplicationStatus.SUCCEEDED) { + LOG.warn("Problem stopping application, final status '{0}'", report.getFinalApplicationStatus()); + } + } catch (Exception ex) { + LOG.warn("Error stopping application, " + ex, ex); + } + yarnClient.stop(); + yarnClient = null; + } + } + + public void requestContainerForLauncher() throws Exception { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(Resource.newInstance(1024, 1), null, null, + Priority.newInstance(1000000), true); + amRmClientAsync.addContainerRequest(request); + return null; + } + }); + } + + @Override + public void onContainersCompleted(List containerStatuses) { + for (ContainerStatus containerStatus : containerStatuses) { + ContainerId containerId = containerStatus.getContainerId(); + // we have the containerId only if we did not release it. + switch (containerStatus.getExitStatus()) { + case ContainerExitStatus.SUCCESS: + LOG.info("Container {0} successfully completed.", containerId); + break; + case ContainerExitStatus.PREEMPTED: + LOG.info("Container {0} got preempted", containerId); + break; + case ContainerExitStatus.ABORTED: + default: + LOG.error("Container {0} aborted. God knows what happened!", containerId); + break; + } + + // TODO: Stop container to prevent leaks. + } + setInActive(); + } + + @Override + public void onContainersAllocated(List containers) { + LOG.info("{0} containers allocated:", containers.size()); + this.allocatedContainers.addAll(containers); + this.numActiveContainers += containers.size(); + for (Container container : containers) { + LOG.info("\t{0}", container); + } + } + + @Override + public void onShutdownRequest() { + setInActive(); + LOG.warn("Yarn requested AM to shutdown"); + // no need to use a ugi.doAs() as this is called from within Yarn client + _stop(FinalApplicationStatus.FAILED, "Shutdown by Yarn"); + } + + @Override + public void onNodesUpdated(List nodeReports) { + LOG.debug("Received nodes update for '{0}' nodes", nodeReports.size()); + } + + @Override + public float getProgress() { + return 0; + } + + @Override + public void onError(final Throwable ex) { + setInActive(); + LOG.error("Error in Yarn client", ex); + // no need to use a ugi.doAs() as this is called from within Yarn client + _stop(FinalApplicationStatus.FAILED, "Error in Yarn client: " + ex.toString()); + } + + public Container getContainer() throws InterruptedException { + return this.allocatedContainers.take(); + } + + private void fatalException(String msg, Throwable ex) { + LOG.error(msg, ex); + ExitUtil.terminate(1, msg); + } + + public void launchContainer(final Container container, final ContainerLaunchContext launchContext) throws Exception { + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + nmClient.startContainer(container, launchContext); + return null; + } + }); + } catch (Exception e) { + LOG.error("Container {0} failed to launch", container.getId()); + throw e; + } + this.lastUsedTimeInMillis = System.currentTimeMillis(); + } + + private void setInActive() { + this.numActiveContainers--; + this.lastUsedTimeInMillis = System.currentTimeMillis(); + } + + public boolean isActive(long maxAllowedTime) { + boolean active = this.numActiveContainers > 0 || + System.currentTimeMillis() - lastUsedTimeInMillis < maxAllowedTime; + return active; + } + + public String getUser() { + return ugi.getUserName(); + } + + public ContainerStatus getContainerStatus(final ContainerId containerId, final NodeId nodeId) throws IOException, InterruptedException { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public ContainerStatus run() throws IOException { + try { + return nmClient.getContainerStatus(containerId, nodeId); + } catch (YarnException ye) { + LOG.debug("Couldn't fetch container status", ye); + if (ye.getMessage().contains("stopped")) { + // Container stopped. Returning null for UnmanagedAMPool to take care of. + return null; + } else { + throw new IOException(ye); + } + } + } + + }); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMPool.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMPool.java new file mode 100644 index 0000000..16500d2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMPool.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.unmanagedamlauncher; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class UnmanagedAMPool { + private static UnmanagedAMPool INSTANCE; + private static final String QUEUE = "root.oozie"; + private static final long RECOVER_AM_TIMEOUT = 60 * 1000; + private static Logger LOG = LoggerFactory.getLogger(UnmanagedAMPool.class); + + private Map usersToAMsMap; + private Thread userAMMonitor; + + private UnmanagedAMPool() { + usersToAMsMap = new HashMap<>(); + userAMMonitor = new MonitorAMs(); + userAMMonitor.setName("AM-Monitoring-Thread"); + userAMMonitor.setDaemon(true); + userAMMonitor.start(); + } + + public static UnmanagedAMPool getInstance() { + if (INSTANCE == null) { + INSTANCE = new UnmanagedAMPool(); + } + + return INSTANCE; + } + + public synchronized void close() { + if (userAMMonitor != null) { + userAMMonitor.interrupt(); + } + + for (SingleUAM userAM : usersToAMsMap.values()) { + userAM.stop(); + } + usersToAMsMap.clear(); + } + + public Container launchContainer(UserGroupInformation ugi, final Configuration conf, + final ContainerLaunchContext launchContext) throws Exception { + String user = ugi.getUserName(); + SingleUAM userAM; + + synchronized (this) { + userAM = usersToAMsMap.get(user); + if (userAM == null) { + userAM = new SingleUAM(ugi, conf); + userAM.start(); + userAM.register(QUEUE); + usersToAMsMap.put(user, userAM); + } + } + + // Verify userAM is not null + assert(userAM != null); + + // Request userAM for container + userAM.requestContainerForLauncher(); + // Launch container + Container container = userAM.getContainer(); + if (container == null) { + LOG.error("Failed to get container from YARN"); + return null; + } + + userAM.launchContainer(container, launchContext); + return container; + } + + public ContainerStatus getContainerStatus(String user, ContainerId containerId, NodeId nodeId) throws IOException, InterruptedException { + SingleUAM userAM; + synchronized (this) { + userAM = usersToAMsMap.get(user); + } + ContainerStatus status = null; + if (userAM != null) { + status = userAM.getContainerStatus(containerId, nodeId); + } + + return (status != null) ? status + : ContainerStatus.newInstance(containerId, ContainerState.COMPLETE, "Assuming it completed", 0); + } + + private class MonitorAMs extends Thread { + public void run() { + while (!Thread.interrupted()) { + synchronized (UnmanagedAMPool.this) { + for (SingleUAM userAM : usersToAMsMap.values()) { + if (!userAM.isActive(RECOVER_AM_TIMEOUT)) { + LOG.info("Stopping AM for {0} due to inactivity", userAM.getUser()); + userAM.stop(); + usersToAMsMap.remove(userAM.getUser()); + } + } + } + try { + Thread.sleep(60 * 1000); + } catch (InterruptedException e) { + return; + } + } + } + } + +}