diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 9ffe1d8..360f63e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -212,7 +212,7 @@ public ContainerExecutionResult call() throws Exception { request.getContainerIdString(), request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs, envMap, objectRegistry, pid, - executionContext, credentials, memoryAvailable, request.getUser()); + executionContext, credentials, memoryAvailable, request.getUser(), null); ContainerExecutionResult result = tezChild.run(); LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + sw.stop().elapsedMillis()); diff --git llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java index 7d41849..86c5ff3 100644 --- llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java +++ llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java @@ -14,6 +14,7 @@ package org.apache.tez.dag.app.launcher; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; @@ -103,9 +104,10 @@ public void handle(NMCommunicatorEvent event) { switch (event.getType()) { case CONTAINER_LAUNCH_REQUEST: NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event; + InetSocketAddress address = tal.getTaskCommunicator(launchEvent.getTaskCommId()).getAddress(); ListenableFuture future = executor.submit( new SubmitCallable(getProxy(launchEvent.getNodeId().getHost()), launchEvent, - tokenIdentifier, tal.getAddress().getHostName(), tal.getAddress().getPort())); + tokenIdentifier, address.getHostName(), address.getPort())); Futures.addCallback(future, new SubmitCallback(launchEvent.getContainerId(), launchEvent.getContainer().getNodeId().getHost())); break; diff --git llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java index 8746df0..47c9472 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java @@ -14,11 +14,12 @@ package org.apache.tez.dag.app.rm; -import java.io.IOException; import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -31,25 +32,17 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; -import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; -// TODO Registration with RM - so that the AM is considered dead and restarted in the expiry interval - 10 minutes. - public class DaemonTaskSchedulerService extends TaskSchedulerService { private static final Log LOG = LogFactory.getLog(DaemonTaskSchedulerService.class); @@ -58,6 +51,7 @@ private final TaskSchedulerAppCallback appClientDelegate; private final AppContext appContext; private final List serviceHosts; + private final Set serviceHostSet; private final ContainerFactory containerFactory; private final Random random = new Random(); @@ -68,8 +62,6 @@ private final ConcurrentMap runningTasks = new ConcurrentHashMap(); - private final AMRMClientAsync amRmClient; - // Per daemon private final int memoryPerInstance; private final int coresPerInstance; @@ -81,6 +73,7 @@ public DaemonTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext, String clientHostname, int clientPort, String trackingUrl, + long customAppIdIdentifier, Configuration conf) { // Accepting configuration here to allow setting up fields as final super(DaemonTaskSchedulerService.class.getName()); @@ -88,7 +81,8 @@ public DaemonTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext this.appClientDelegate = createAppCallbackDelegate(appClient); this.appContext = appContext; this.serviceHosts = new LinkedList(); - this.containerFactory = new ContainerFactory(appContext); + this.serviceHostSet = new HashSet<>(); + this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier); this.memoryPerInstance = conf .getInt(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT); @@ -104,7 +98,6 @@ public DaemonTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor); - this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new FakeAmRmCallbackHandler()); String[] hosts = conf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_AM_SERVICE_HOSTS); if (hosts == null || hosts.length == 0) { @@ -112,6 +105,7 @@ public DaemonTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext } for (String host : hosts) { serviceHosts.add(host); + serviceHostSet.add(host); } LOG.info("Running with configuration: " + @@ -125,35 +119,15 @@ public DaemonTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext @Override public void serviceInit(Configuration conf) { - amRmClient.init(conf); } @Override public void serviceStart() { - amRmClient.start(); - RegisterApplicationMasterResponse response; - try { - amRmClient.registerApplicationMaster(clientHostname, clientPort, trackingUrl); - } catch (YarnException e) { - throw new TezUncheckedException(e); - } catch (IOException e) { - throw new TezUncheckedException(e); - } } @Override public void serviceStop() { if (!this.isStopped.getAndSet(true)) { - - try { - TaskSchedulerAppCallback.AppFinalStatus status = appClientDelegate.getFinalAppStatus(); - amRmClient.unregisterApplicationMaster(status.exitStatus, status.exitMessage, - status.postCompletionTrackingUrl); - } catch (YarnException e) { - throw new TezUncheckedException(e); - } catch (IOException e) { - throw new TezUncheckedException(e); - } appCallbackExecutor.shutdownNow(); } } @@ -257,8 +231,23 @@ private String selectHost(String[] requestedHosts) { if (requestedHosts != null && requestedHosts.length > 0) { Arrays.sort(requestedHosts); host = requestedHosts[0]; - LOG.info("Selected host: " + host + " from requested hosts: " + Arrays.toString(requestedHosts)); - } else { + if (serviceHostSet.contains(host)) { + LOG.info("Selected host: " + host + " from requested hosts: " + Arrays.toString(requestedHosts)); + } else { + LOG.info("Preferred host: " + host + " not present. Attempting to select another one"); + host = null; + for (String h : requestedHosts) { + if (serviceHostSet.contains(h)) { + host = h; + break; + } + } + if (host == null) { + LOG.info("Requested hosts: " + Arrays.toString(requestedHosts) + " not present. Randomizing the host"); + } + } + } + if (host == null) { host = serviceHosts.get(random.nextInt(serviceHosts.size())); LOG.info("Selected random host: " + host + " since the request contained no host information"); } @@ -266,17 +255,19 @@ private String selectHost(String[] requestedHosts) { } static class ContainerFactory { - final AppContext appContext; + final ApplicationAttemptId customAppAttemptId; AtomicInteger nextId; - public ContainerFactory(AppContext appContext) { - this.appContext = appContext; + public ContainerFactory(AppContext appContext, long appIdLong) { this.nextId = new AtomicInteger(1); + ApplicationId appId = ApplicationId + .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId()); + this.customAppAttemptId = ApplicationAttemptId + .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId()); } public Container createContainer(Resource capability, Priority priority, String hostname) { - ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId(); - ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement()); + ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement()); NodeId nodeId = NodeId.newInstance(hostname, 0); String nodeHttpAddress = "hostname:0"; @@ -290,37 +281,4 @@ public Container createContainer(Resource capability, Priority priority, String return container; } } - - private static class FakeAmRmCallbackHandler implements AMRMClientAsync.CallbackHandler { - - @Override - public void onContainersCompleted(List statuses) { - - } - - @Override - public void onContainersAllocated(List containers) { - - } - - @Override - public void onShutdownRequest() { - - } - - @Override - public void onNodesUpdated(List updatedNodes) { - - } - - @Override - public float getProgress() { - return 0; - } - - @Override - public void onError(Throwable e) { - - } - } }