diff --git llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java index 381c16b..2269f06 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java @@ -78,5 +78,15 @@ public LlapConfiguration() { LLAP_DAEMON_PREFIX + "task.scheduler.node.re-enable.timeout.ms"; public static final long LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS_DEFAULT = 2000l; + public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE = + LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size"; + public static final int LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT = 10; + public static final String LLAP_DAEMON_TASK_SCHEDULER_RUN_QUEUE_SIZE = + LLAP_DAEMON_PREFIX + "task.scheduler.run.queue.size"; + public static final int LLAP_DAEMON_TASK_SCHEDULER_RUN_QUEUE_SIZE_DEFAULT = 10; + + public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION = + LLAP_DAEMON_PREFIX + "task.scheduler.enable.preemption"; + public static final boolean LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT = false; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/BoundedPriorityBlockingQueue.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/BoundedPriorityBlockingQueue.java new file mode 100644 index 0000000..78d3c6c --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/BoundedPriorityBlockingQueue.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.util.Comparator; +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Priority blocking queue of bounded size. The entries that are added already added will be + * ordered based on the specified comparator. If the queue is full, offer() will return false and + * add() will throw IllegalStateException. + */ +public class BoundedPriorityBlockingQueue extends PriorityBlockingQueue { + private int maxSize; + + public BoundedPriorityBlockingQueue(int maxSize) { + this.maxSize = maxSize; + } + + public BoundedPriorityBlockingQueue(Comparator comparator, int maxSize) { + super(maxSize, comparator); + this.maxSize = maxSize; + } + + @Override + public boolean add(E e) { + if (size() >= maxSize) { + throw new IllegalStateException("BoundedPriorityBlockingQueue is full"); + } else { + return super.add(e); + } + } + + @Override + public boolean offer(E e) { + if (size() >= maxSize) { + return false; + } else { + return super.offer(e); + } + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index c142982..ca76616 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -18,24 +18,17 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; @@ -46,45 +39,22 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; -import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; -import org.apache.hadoop.hive.llap.tezplugins.Converters; import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.log4j.Logger; import org.apache.log4j.NDC; -import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.TezException; -import org.apache.tez.mapreduce.input.MRInputLegacy; -import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; -import org.apache.tez.runtime.api.impl.InputSpec; -import org.apache.tez.runtime.api.impl.TaskSpec; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; -import org.apache.tez.runtime.internals.api.TaskReporterInterface; -import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.tez.runtime.task.TezTaskRunner; public class ContainerRunnerImpl extends AbstractService implements ContainerRunner { @@ -93,7 +63,7 @@ private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class); private volatile AMReporter amReporter; - private final ListeningExecutorService executorService; + private final Scheduler executorService; private final AtomicReference localAddress; private final String[] localDirsBase; private final Map localEnv = new HashMap<>(); @@ -101,13 +71,14 @@ private final long memoryPerExecutor; private final LlapDaemonExecutorMetrics metrics; private final Configuration conf; - private final ConfParams confParams; + private final TaskRunnerCallable.ConfParams confParams; // Map of dagId to vertices and associated state. private final ConcurrentMap> sourceCompletionMap = new ConcurrentHashMap<>(); // TODO Support for removing queued containers, interrupting / killing specific containers - public ContainerRunnerImpl(Configuration conf, int numExecutors, String[] localDirsBase, int localShufflePort, + public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize, + int runQueueSize, boolean enablePreemption, String[] localDirsBase, int localShufflePort, AtomicReference localAddress, long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics) { super("ContainerRunnerImpl"); @@ -117,9 +88,8 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, String[] localD this.localDirsBase = localDirsBase; this.localAddress = localAddress; - ExecutorService raw = Executors.newFixedThreadPool(numExecutors, - new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build()); - this.executorService = MoreExecutors.listeningDecorator(raw); + this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, runQueueSize, + enablePreemption); AuxiliaryServiceHelper.setServiceDataIntoEnv( TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(localShufflePort), localEnv); @@ -134,7 +104,7 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, String[] localD } catch (IOException e) { throw new RuntimeException("Failed to setup local filesystem instance", e); } - confParams = new ConfParams( + confParams = new TaskRunnerCallable.ConfParams( conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT), conf.getLong( @@ -230,16 +200,23 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { ConcurrentMap sourceCompletionMap = getSourceCompletionMap(request.getFragmentSpec().getDagName()); TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, - credentials, memoryPerExecutor, amReporter, sourceCompletionMap, confParams); - ListenableFuture future = executorService.submit(callable); - Futures.addCallback(future, new TaskRunnerCallback(request, callable)); - metrics.incrExecutorTotalRequestsHandled(); - metrics.incrExecutorNumQueuedRequests(); + credentials, memoryPerExecutor, amReporter, sourceCompletionMap, confParams, metrics); + try { + executorService.schedule(callable); + metrics.incrExecutorTotalRequestsHandled(); + metrics.incrExecutorNumQueuedRequests(); + } catch (RejectedExecutionException e) { + notifyAMOfRejection(callable); + } } finally { NDC.pop(); } } + private void notifyAMOfRejection(TaskRunnerCallable callable) { + LOG.error("Notifying AM of request rejection is not implemented yet!"); + } + @Override public void sourceStateUpdated(SourceStateUpdatedRequestProto request) { LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request)); @@ -247,273 +224,6 @@ public void sourceStateUpdated(SourceStateUpdatedRequestProto request) { dagMap.put(request.getSrcName(), request.getState()); } - static class TaskRunnerCallable extends CallableWithNdc { - - private final SubmitWorkRequestProto request; - private final Configuration conf; - private final String[] localDirs; - private final Map envMap; - private final String pid = null; - private final ObjectRegistryImpl objectRegistry; - private final ExecutionContext executionContext; - private final Credentials credentials; - private final long memoryAvailable; - private final ConfParams confParams; - private final Token jobToken; - private final AMReporter amReporter; - private final ConcurrentMap sourceCompletionMap; - private final TaskSpec taskSpec; - private volatile TezTaskRunner taskRunner; - private volatile TaskReporterInterface taskReporter; - private volatile ListeningExecutorService executor; - private LlapTaskUmbilicalProtocol umbilical; - private volatile long startTime; - private volatile String threadName; - private volatile boolean cancelled = false; - - - - TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf, - ExecutionContext executionContext, Map envMap, - String[] localDirs, Credentials credentials, - long memoryAvailable, AMReporter amReporter, - ConcurrentMap sourceCompletionMap, ConfParams confParams) { - this.request = request; - this.conf = conf; - this.executionContext = executionContext; - this.envMap = envMap; - this.localDirs = localDirs; - this.objectRegistry = new ObjectRegistryImpl(); - this.sourceCompletionMap = sourceCompletionMap; - this.credentials = credentials; - this.memoryAvailable = memoryAvailable; - this.confParams = confParams; - this.jobToken = TokenCache.getSessionToken(credentials); - this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec()); - this.amReporter = amReporter; - // Register with the AMReporter when the callable is setup. Unregister once it starts running. - this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken); - } - - @Override - protected ContainerExecutionResult callInternal() throws Exception { - this.startTime = System.currentTimeMillis(); - this.threadName = Thread.currentThread().getName(); - if (LOG.isDebugEnabled()) { - LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); - } - - - // Unregister from the AMReporter, since the task is now running. - this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); - - // TODO This executor seems unnecessary. Here and TezChild - ExecutorService executorReal = Executors.newFixedThreadPool(1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat( - "TezTaskRunner_" + request.getFragmentSpec().getTaskAttemptIdString()) - .build()); - executor = MoreExecutors.listeningDecorator(executorReal); - - // TODO Consolidate this code with TezChild. - Stopwatch sw = new Stopwatch().start(); - UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(request.getUser()); - taskUgi.addCredentials(credentials); - - Map serviceConsumerMetadata = new HashMap<>(); - serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, - TezCommonUtils.convertJobTokenToBytes(jobToken)); - Multimap startedInputsMap = HashMultimap.create(); - - UserGroupInformation taskOwner = - UserGroupInformation.createRemoteUser(request.getTokenIdentifier()); - final InetSocketAddress address = - NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); - SecurityUtil.setTokenService(jobToken, address); - taskOwner.addToken(jobToken); - umbilical = taskOwner.doAs(new PrivilegedExceptionAction() { - @Override - public LlapTaskUmbilicalProtocol run() throws Exception { - return RPC.getProxy(LlapTaskUmbilicalProtocol.class, - LlapTaskUmbilicalProtocol.versionID, address, conf); - } - }); - - taskReporter = new LlapTaskReporter( - umbilical, - confParams.amHeartbeatIntervalMsMax, - confParams.amCounterHeartbeatInterval, - confParams.amMaxEventsPerHeartbeat, - new AtomicLong(0), - request.getContainerIdString()); - - taskRunner = new TezTaskRunner(conf, taskUgi, localDirs, - taskSpec, - request.getAppAttemptNumber(), - serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, - pid, - executionContext, memoryAvailable); - - boolean shouldDie; - try { - shouldDie = !taskRunner.run(); - if (shouldDie) { - LOG.info("Got a shouldDie notification via heartbeats. Shutting down"); - return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, - "Asked to die by the AM"); - } - } catch (IOException e) { - return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, - e, "TaskExecutionFailure: " + e.getMessage()); - } catch (TezException e) { - return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, - e, "TaskExecutionFailure: " + e.getMessage()); - } finally { - // TODO Fix UGI and FS Handling. Closing UGI here causes some errors right now. -// FileSystem.closeAllForUGI(taskUgi); - } - LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + - sw.stop().elapsedMillis()); - if (LOG.isDebugEnabled()) { - LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); - } - - return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, - null); - } - - /** - * Check whether a task can run to completion or may end up blocking on it's sources. - * This currently happens via looking up source state. - * TODO: Eventually, this should lookup the Hive Processor to figure out whether - * it's reached a state where it can finish - especially in cases of failures - * after data has been fetched. - * @return - */ - public boolean canFinish() { - List inputSpecList = taskSpec.getInputs(); - boolean canFinish = true; - if (inputSpecList != null && !inputSpecList.isEmpty()) { - for (InputSpec inputSpec : inputSpecList) { - if (isSourceOfInterest(inputSpec)) { - // Lookup the state in the map. - SourceStateProto state = sourceCompletionMap.get(inputSpec.getSourceVertexName()); - if (state != null && state == SourceStateProto.S_SUCCEEDED) { - continue; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot finish due to source: " + inputSpec.getSourceVertexName()); - } - canFinish = false; - break; - } - } - } - } - return canFinish; - } - - private boolean isSourceOfInterest(InputSpec inputSpec) { - String inputClassName = inputSpec.getInputDescriptor().getClassName(); - // MRInput is not of interest since it'll always be ready. - return !inputClassName.equals(MRInputLegacy.class.getName()); - } - - public void shutdown() { - executor.shutdownNow(); - if (taskReporter != null) { - taskReporter.shutdown(); - } - if (umbilical != null) { - RPC.stopProxy(umbilical); - } - } - } - - final class TaskRunnerCallback implements FutureCallback { - - private final SubmitWorkRequestProto request; - private final TaskRunnerCallable taskRunnerCallable; - - TaskRunnerCallback(SubmitWorkRequestProto request, - TaskRunnerCallable taskRunnerCallable) { - this.request = request; - this.taskRunnerCallable = taskRunnerCallable; - } - - // TODO Slightly more useful error handling - @Override - public void onSuccess(ContainerExecutionResult result) { - switch (result.getExitStatus()) { - case SUCCESS: - LOG.info("Successfully finished: " + getTaskIdentifierString(request)); - metrics.incrExecutorTotalSuccess(); - break; - case EXECUTION_FAILURE: - LOG.info("Failed to run: " + getTaskIdentifierString(request)); - metrics.incrExecutorTotalExecutionFailed(); - break; - case INTERRUPTED: - LOG.info("Interrupted while running: " + getTaskIdentifierString(request)); - metrics.incrExecutorTotalInterrupted(); - break; - case ASKED_TO_DIE: - LOG.info("Asked to die while running: " + getTaskIdentifierString(request)); - metrics.incrExecutorTotalAskedToDie(); - break; - } - taskRunnerCallable.shutdown(); - HistoryLogger - .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), - localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), - request.getFragmentSpec().getVertexName(), - request.getFragmentSpec().getFragmentNumber(), - request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, - taskRunnerCallable.startTime, true); - metrics.decrExecutorNumQueuedRequests(); - } - - @Override - public void onFailure(Throwable t) { - LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t); - // TODO HIVE-10236 Report a fatal error over the umbilical - taskRunnerCallable.shutdown(); - HistoryLogger - .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), - localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), - request.getFragmentSpec().getVertexName(), - request.getFragmentSpec().getFragmentNumber(), - request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, - taskRunnerCallable.startTime, false); - metrics.decrExecutorNumQueuedRequests(); - } - - private String getTaskIdentifierString(SubmitWorkRequestProto request) { - StringBuilder sb = new StringBuilder(); - sb.append("AppId=").append(request.getApplicationIdString()) - .append(", containerId=").append(request.getContainerIdString()) - .append(", Dag=").append(request.getFragmentSpec().getDagName()) - .append(", Vertex=").append(request.getFragmentSpec().getVertexName()) - .append(", FragmentNum=").append(request.getFragmentSpec().getFragmentNumber()) - .append(", Attempt=").append(request.getFragmentSpec().getAttemptNumber()); - return sb.toString(); - } - } - - private static class ConfParams { - final int amHeartbeatIntervalMsMax; - final long amCounterHeartbeatInterval; - final int amMaxEventsPerHeartbeat; - - public ConfParams(int amHeartbeatIntervalMsMax, long amCounterHeartbeatInterval, - int amMaxEventsPerHeartbeat) { - this.amHeartbeatIntervalMsMax = amHeartbeatIntervalMsMax; - this.amCounterHeartbeatInterval = amCounterHeartbeatInterval; - this.amMaxEventsPerHeartbeat = amMaxEventsPerHeartbeat; - } - } - private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { StringBuilder sb = new StringBuilder(); sb.append("dagName=").append(request.getDagName()) @@ -522,7 +232,7 @@ private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto return sb.toString(); } - private String stringifySubmitRequest(SubmitWorkRequestProto request) { + public static String stringifySubmitRequest(SubmitWorkRequestProto request) { StringBuilder sb = new StringBuilder(); sb.append("am_details=").append(request.getAmHost()).append(":").append(request.getAmPort()); sb.append(", user=").append(request.getUser()); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index eb8d64b..7df690b 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -68,8 +68,8 @@ private final AtomicReference address = new AtomicReference(); public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes, - boolean ioEnabled, long ioMemoryBytes, String[] localDirs, int rpcPort, - int shufflePort) { + boolean ioEnabled, long ioMemoryBytes, String[] localDirs, int rpcPort, + int shufflePort) { super("LlapDaemon"); printAsciiArt(); @@ -89,6 +89,15 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor this.numExecutors = numExecutors; this.localDirs = localDirs; + int waitQueueSize = daemonConf.getInt( + LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE, + LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT); + int runQueueSize = daemonConf.getInt( + LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_RUN_QUEUE_SIZE, + LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_RUN_QUEUE_SIZE_DEFAULT); + boolean enablePreemption = daemonConf.getBoolean( + LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION, + LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT); LOG.info("Attempting to start LlapDaemonConf with the following configuration: " + "numExecutors=" + numExecutors + ", rpcListenerPort=" + rpcPort + @@ -97,7 +106,10 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor ", executorMemory=" + executorMemoryBytes + ", llapIoEnabled=" + ioEnabled + ", llapIoCacheSize=" + ioMemoryBytes + - ", jvmAvailableMemory=" + maxJvmMemory); + ", jvmAvailableMemory=" + maxJvmMemory + + ", waitQueueSize= " + waitQueueSize + + ", runQueueSize= " + runQueueSize + + ", enablePreemption= " + enablePreemption); long memRequired = executorMemoryBytes + (ioEnabled ? ioMemoryBytes : 0); Preconditions.checkState(maxJvmMemory >= memRequired, @@ -131,9 +143,17 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor LOG.info("Started LlapMetricsSystem with displayName: " + displayName + " sessionId: " + sessionId); - this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, localDirs, shufflePort, address, - executorMemoryBytes, metrics); - + this.containerRunner = new ContainerRunnerImpl(daemonConf, + numExecutors, + waitQueueSize, + runQueueSize, + enablePreemption, + localDirs, + shufflePort, + address, + executorMemoryBytes, + metrics); + this.registry = new LlapRegistryService(); } @@ -202,24 +222,25 @@ public static void main(String[] args) throws Exception { // Cache settings will need to be setup in llap-daemon-site.xml - since the daemons don't read hive-site.xml // Ideally, these properties should be part of LlapDameonConf rather than HiveConf LlapConfiguration daemonConf = new LlapConfiguration(); - int numExecutors = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, - LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); - String[] localDirs = - daemonConf.getTrimmedStrings(LlapConfiguration.LLAP_DAEMON_WORK_DIRS); - int rpcPort = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); - int shufflePort = daemonConf - .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); - long executorMemoryBytes = daemonConf - .getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, - LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 1024l * 1024l; - long cacheMemoryBytes = - HiveConf.getLongVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE); - boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED); - llapDaemon = - new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, - cacheMemoryBytes, localDirs, - rpcPort, shufflePort); + int numExecutors = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, + LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); + + String[] localDirs = + daemonConf.getTrimmedStrings(LlapConfiguration.LLAP_DAEMON_WORK_DIRS); + int rpcPort = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, + LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + int shufflePort = daemonConf + .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); + long executorMemoryBytes = daemonConf + .getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, + LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 1024l * 1024l; + long cacheMemoryBytes = + HiveConf.getLongVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE); + boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED); + llapDaemon = + new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, + cacheMemoryBytes, localDirs, + rpcPort, shufflePort); llapDaemon.init(daemonConf); llapDaemon.start(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java new file mode 100644 index 0000000..c3102f9 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.util.concurrent.RejectedExecutionException; + +/** + * Task scheduler interface + */ +public interface Scheduler { + + /** + * Schedule the task or throw RejectedExecutionException if queues are full + * @param t - task to schedule + * @throws RejectedExecutionException + */ + void schedule(T t) throws RejectedExecutionException; +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java new file mode 100644 index 0000000..58ebc8e --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.tez.runtime.task.TezChild; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * + */ +public class TaskExecutorService implements Scheduler { + private static final Logger LOG = Logger.getLogger(TaskRunnerCallable.class); + private static final boolean isInfoEnabled = LOG.isInfoEnabled(); + private static final boolean isDebugEnabled = LOG.isDebugEnabled(); + private static final boolean isTraceEnabled = LOG.isTraceEnabled(); + private static final String TASK_SCHEDULER_THREAD_NAME = "Task-Scheduler"; + private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME = "Wait-Queue-Scheduler"; + + // some object to lock upon. Used by task scheduler to notify wait queue scheduler of new items + // to wait queue + private final Object waitLock; + private ListeningExecutorService executorService; + private ExecutorService waitQueueExecutorService; + private BlockingQueue waitQueue; + private BlockingQueue runQueue; + private Map idToTaskMap; + private Map> premptionMap; + private boolean enablePreemption; + private ThreadPoolExecutor threadPoolExecutor; + + public TaskExecutorService(int numExecutors, int waitQueueSize, int runQueueSize, + boolean enablePreemption) { + this.waitLock = new Object(); + this.waitQueue = new BoundedPriorityBlockingQueue<>(new Comparator() { + @Override + public int compare(TaskRunnerCallable o1, TaskRunnerCallable o2) { + boolean newCanFinish = o1.canFinish(); + boolean oldCanFinish = o2.canFinish(); + if (newCanFinish == true && oldCanFinish == false) { + return -1; + } else if (newCanFinish == false && oldCanFinish == true) { + return 1; + } + return 0; + } + }, waitQueueSize); + this.runQueue = new ArrayBlockingQueue<>(runQueueSize); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat( + TASK_SCHEDULER_THREAD_NAME).build(); + this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size + numExecutors, // max pool size + 1, TimeUnit.MINUTES, runQueue, threadFactory); + this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor); + this.idToTaskMap = new HashMap<>(); + this.premptionMap = new HashMap<>(); + this.enablePreemption = enablePreemption; + + // single threaded scheduler for tasks from wait queue to run queue + this.waitQueueExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat(WAIT_QUEUE_SCHEDULER_THREAD_NAME).build()); + waitQueueExecutorService.execute(new WaitQueueWorker()); + } + + private final class WaitQueueWorker implements Runnable { + TaskRunnerCallable task; + + @Override + public void run() { + try { + if (waitQueue.isEmpty()) { + synchronized (waitLock) { + waitLock.wait(); + } + } + + while ((task = waitQueue.take()) != null) { + trySchedule(task); + synchronized (waitLock) { + waitLock.wait(); + } + } + } catch (InterruptedException e) { + // Executor service will create new thread if the current thread gets + // interrupted. We don't + // need to do anything with the exception. + LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME + " thread has been interrupted."); + } + } + } + + @Override + public void schedule(TaskRunnerCallable task) throws RejectedExecutionException { + try { + trySchedule(task); + } catch (RejectedExecutionException e) { + notifyAM(task, true); + throw new RejectedExecutionException(e); + } + } + + private void trySchedule(TaskRunnerCallable task) { + boolean finishable = task.canFinish(); + try { + ListenableFuture future = executorService.submit(task); + FutureCallback wrappedCallback = + new InternalCompletionListener(task.getCallback()); + Futures.addCallback(future, wrappedCallback); + + if (isInfoEnabled) { + LOG.info(task.getRequestId() + " scheduled for execution (added to run queue)."); + } + + // only tasks that cannot finish immediately are pre-emptable (if all inputs to tasks are + // not ready yet) + if (enablePreemption && !finishable) { + + if (isDebugEnabled) { + LOG.debug(task.getRequestId() + " is not finishable and pre-emption is enabled." + + "Adding it to pre-emption list."); + } + addTaskToPreemptionList(task, future); + } + } catch (RejectedExecutionException e) { + + // If executor service rejects a request, check if pre-emption is enabled and preempt the + // first task in pre-emption map. TODO: Ideally, this map should be sorted based on some + // factor for pre-emption (based on the number of pending tasks?). + // If preemption is disabled just add the task to wait queue. If wait queue is also full + // throw RejectedExecutionException + if (enablePreemption && finishable && !premptionMap.isEmpty()) { + + Map.Entry> entry = premptionMap.entrySet() + .iterator().next(); + TaskRunnerCallable pRequest = entry.getKey(); + ListenableFuture pFuture = entry.getValue(); + + if (isDebugEnabled) { + LOG.debug(pRequest.getRequestId() + " is chosen for pre-emption."); + } + + if (isTraceEnabled) { + LOG.trace("idToTaskMap: " + idToTaskMap.keySet()); + LOG.trace("preemptionMap: " + premptionMap.keySet()); + } + + pFuture.cancel(true); + // purge the cancelled task from thread pool + threadPoolExecutor.purge(); + removeTaskFromPreemptionList(pRequest, pRequest.getRequestId()); + + if (isDebugEnabled) { + LOG.debug("Pre-emption invoked for " + pRequest.getRequestId() + + " by interrupting the thread."); + } + + // future is cancelled or completed normally, in which case schedule the new request + if (pFuture.isDone()) { + ListenableFuture future = executorService.submit(task); + Futures.addCallback(future, task.getCallback()); + + if (isDebugEnabled) { + LOG.debug("New request " + task.getRequestId() + " submitted to executor service."); + } + + // notify AM of pre-emption + if (pFuture.isCancelled()) { + if (isDebugEnabled) { + LOG.debug(pRequest.getRequestId() + " request preempted by " + task.getRequestId()); + } + + notifyAM(pRequest, false); + } + } + + } else { + + // on rejection of the request by executor service, offer it to wait queue + if (waitQueue.offer(task)) { + if (isDebugEnabled) { + LOG.debug(task.getRequestId() + + " request rejected by executor service. Added to wait queue."); + } + } else { + throw new RejectedExecutionException("Queues are full. Rejecting request."); + } + } + } + } + + private void removeTaskFromPreemptionList(TaskRunnerCallable pRequest, String requestId) { + idToTaskMap.remove(requestId); + premptionMap.remove(pRequest); + } + + private void addTaskToPreemptionList(TaskRunnerCallable task, + ListenableFuture future) { + idToTaskMap.put(task.getRequestId(), task); + premptionMap.put(task, future); + } + + private final class InternalCompletionListener implements + FutureCallback { + private TaskRunnerCallable.TaskRunnerCallback wrappedCallback; + + public InternalCompletionListener(TaskRunnerCallable.TaskRunnerCallback wrappedCallback) { + this.wrappedCallback = wrappedCallback; + } + + @Override + public void onSuccess(TezChild.ContainerExecutionResult result) { + updatePreemptionList(true); + wrappedCallback.onSuccess(result); + synchronized (waitLock) { + waitLock.notify(); + } + } + + private void updatePreemptionList(boolean success) { + // if this task was added to pre-emption list, remove it + String taskId = wrappedCallback.getRequestId(); + TaskRunnerCallable task = idToTaskMap.get(taskId); + String state = success ? "succeeded" : "failed"; + if (enablePreemption && task != null) { + removeTaskFromPreemptionList(task, taskId); + if (isDebugEnabled) { + LOG.debug(task.getRequestId() + " request " + state + "! Removed from preemption list."); + } + } + } + + @Override + public void onFailure(Throwable t) { + updatePreemptionList(false); + wrappedCallback.onFailure(t); + synchronized (waitLock) { + waitLock.notify(); + } + } + } + + private void notifyAM(TaskRunnerCallable request, boolean rejected) { + // TODO: Report to AM of pre-emption and rejection + if (rejected) { + LOG.info("Notifying to AM of rejection is not implemented yet!"); + } else { + LOG.info("Notifying to AM of preemption is not implemented yet!"); + } + } + + @VisibleForTesting + public BlockingQueue getWaitQueue() { + return waitQueue; + } + + @VisibleForTesting + public int getWaitQueueSize() { + return waitQueue.size(); + } + + @VisibleForTesting + public int getRunQueueSize() { + return runQueue.size(); + } + + @VisibleForTesting + public int getPreemptionListSize() { + return premptionMap.size(); + } + + // TODO: llap daemon should call this to gracefully shutdown the task executor service + public void shutDown(boolean awaitTermination) { + if (awaitTermination) { + if (isDebugEnabled) { + LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + + " service gracefully"); + } + executorService.shutdown(); + try { + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + } + + waitQueueExecutorService.shutdown(); + try { + if (!waitQueueExecutorService.awaitTermination(1, TimeUnit.MINUTES)) { + waitQueueExecutorService.shutdownNow(); + } + } catch (InterruptedException e) { + waitQueueExecutorService.shutdownNow(); + } + } else { + if (isDebugEnabled) { + LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + + " service immediately"); + } + executorService.shutdownNow(); + waitQueueExecutorService.shutdownNow(); + } + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java new file mode 100644 index 0000000..a830f92 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.CallableWithNdc; +import org.apache.hadoop.hive.llap.daemon.HistoryLogger; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.hive.llap.tezplugins.Converters; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.log4j.Logger; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.runtime.api.ExecutionContext; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; +import org.apache.tez.runtime.internals.api.TaskReporterInterface; +import org.apache.tez.runtime.task.TezChild; +import org.apache.tez.runtime.task.TezTaskRunner; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * + */ +public class TaskRunnerCallable extends CallableWithNdc { + private static final Logger LOG = Logger.getLogger(TaskRunnerCallable.class); + private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request; + private final Configuration conf; + private final String[] localDirs; + private final Map envMap; + private final String pid = null; + private final ObjectRegistryImpl objectRegistry; + private final ExecutionContext executionContext; + private final Credentials credentials; + private final long memoryAvailable; + private final ConfParams confParams; + private final Token jobToken; + private final AMReporter amReporter; + private final ConcurrentMap sourceCompletionMap; + private final TaskSpec taskSpec; + private volatile TezTaskRunner taskRunner; + private volatile TaskReporterInterface taskReporter; + private volatile ListeningExecutorService executor; + private LlapTaskUmbilicalProtocol umbilical; + private volatile long startTime; + private volatile String threadName; + private LlapDaemonExecutorMetrics metrics; + protected String requestId; + + TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, Configuration conf, + ExecutionContext executionContext, Map envMap, + String[] localDirs, Credentials credentials, + long memoryAvailable, AMReporter amReporter, + ConcurrentMap sourceCompletionMap, + ConfParams confParams, LlapDaemonExecutorMetrics metrics) { + this.request = request; + this.conf = conf; + this.executionContext = executionContext; + this.envMap = envMap; + this.localDirs = localDirs; + this.objectRegistry = new ObjectRegistryImpl(); + this.sourceCompletionMap = sourceCompletionMap; + this.credentials = credentials; + this.memoryAvailable = memoryAvailable; + this.confParams = confParams; + this.jobToken = TokenCache.getSessionToken(credentials); + this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec()); + this.amReporter = amReporter; + // Register with the AMReporter when the callable is setup. Unregister once it starts running. + if (jobToken != null) { + this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), + request.getUser(), jobToken); + } + this.metrics = metrics; + this.requestId = getTaskIdentifierString(request); + } + + @Override + protected TezChild.ContainerExecutionResult callInternal() throws Exception { + this.startTime = System.currentTimeMillis(); + this.threadName = Thread.currentThread().getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); + } + + // Unregister from the AMReporter, since the task is now running. + this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); + + // TODO This executor seems unnecessary. Here and TezChild + ExecutorService executorReal = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat( + "TezTaskRunner_" + request.getFragmentSpec().getTaskAttemptIdString()) + .build()); + executor = MoreExecutors.listeningDecorator(executorReal); + + // TODO Consolidate this code with TezChild. + Stopwatch sw = new Stopwatch().start(); + UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(request.getUser()); + taskUgi.addCredentials(credentials); + + Map serviceConsumerMetadata = new HashMap<>(); + serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, + TezCommonUtils.convertJobTokenToBytes(jobToken)); + Multimap startedInputsMap = HashMultimap.create(); + + UserGroupInformation taskOwner = + UserGroupInformation.createRemoteUser(request.getTokenIdentifier()); + final InetSocketAddress address = + NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); + SecurityUtil.setTokenService(jobToken, address); + taskOwner.addToken(jobToken); + umbilical = taskOwner.doAs(new PrivilegedExceptionAction() { + @Override + public LlapTaskUmbilicalProtocol run() throws Exception { + return RPC.getProxy(LlapTaskUmbilicalProtocol.class, + LlapTaskUmbilicalProtocol.versionID, address, conf); + } + }); + + taskReporter = new LlapTaskReporter( + umbilical, + confParams.amHeartbeatIntervalMsMax, + confParams.amCounterHeartbeatInterval, + confParams.amMaxEventsPerHeartbeat, + new AtomicLong(0), + request.getContainerIdString()); + + taskRunner = new TezTaskRunner(conf, taskUgi, localDirs, + taskSpec, + request.getAppAttemptNumber(), + serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, + pid, + executionContext, memoryAvailable); + + boolean shouldDie; + try { + shouldDie = !taskRunner.run(); + if (shouldDie) { + LOG.info("Got a shouldDie notification via heartbeats. Shutting down"); + return new TezChild.ContainerExecutionResult( + TezChild.ContainerExecutionResult.ExitStatus.SUCCESS, null, + "Asked to die by the AM"); + } + } catch (IOException e) { + return new TezChild.ContainerExecutionResult( + TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, + e, "TaskExecutionFailure: " + e.getMessage()); + } catch (TezException e) { + return new TezChild.ContainerExecutionResult( + TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, + e, "TaskExecutionFailure: " + e.getMessage()); + } finally { + // TODO Fix UGI and FS Handling. Closing UGI here causes some errors right now. + // FileSystem.closeAllForUGI(taskUgi); + } + LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + + sw.stop().elapsedMillis()); + if (LOG.isDebugEnabled()) { + LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); + } + + return new TezChild.ContainerExecutionResult( + TezChild.ContainerExecutionResult.ExitStatus.SUCCESS, null, + null); + } + + /** + * Check whether a task can run to completion or may end up blocking on it's sources. + * This currently happens via looking up source state. + * TODO: Eventually, this should lookup the Hive Processor to figure out whether + * it's reached a state where it can finish - especially in cases of failures + * after data has been fetched. + * + * @return + */ + public boolean canFinish() { + List inputSpecList = taskSpec.getInputs(); + boolean canFinish = true; + if (inputSpecList != null && !inputSpecList.isEmpty()) { + for (InputSpec inputSpec : inputSpecList) { + if (isSourceOfInterest(inputSpec)) { + // Lookup the state in the map. + LlapDaemonProtocolProtos.SourceStateProto state = sourceCompletionMap + .get(inputSpec.getSourceVertexName()); + if (state != null && state == LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED) { + continue; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot finish due to source: " + inputSpec.getSourceVertexName()); + } + canFinish = false; + break; + } + } + } + } + return canFinish; + } + + private boolean isSourceOfInterest(InputSpec inputSpec) { + String inputClassName = inputSpec.getInputDescriptor().getClassName(); + // MRInput is not of interest since it'll always be ready. + return !inputClassName.equals(MRInputLegacy.class.getName()); + } + + public void shutdown() { + if (executor != null) { + executor.shutdownNow(); + } + if (taskReporter != null) { + taskReporter.shutdown(); + } + if (umbilical != null) { + RPC.stopProxy(umbilical); + } + } + + @Override + public String toString() { + return requestId; + } + + @Override + public int hashCode() { + return requestId.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof TaskRunnerCallable)) { + return false; + } + return requestId.equals(((TaskRunnerCallable) obj).getRequestId()); + } + + public String getRequestId() { + return requestId; + } + + public TaskRunnerCallback getCallback() { + return new TaskRunnerCallback(request, this); + } + + final class TaskRunnerCallback implements FutureCallback { + + private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request; + private final TaskRunnerCallable taskRunnerCallable; + private final String requestId; + + TaskRunnerCallback(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, + TaskRunnerCallable taskRunnerCallable) { + this.request = request; + this.taskRunnerCallable = taskRunnerCallable; + this.requestId = getTaskIdentifierString(request); + } + + public String getRequestId() { + return requestId; + } + + // TODO Slightly more useful error handling + @Override + public void onSuccess(TezChild.ContainerExecutionResult result) { + switch (result.getExitStatus()) { + case SUCCESS: + LOG.info("Successfully finished: " + requestId); + metrics.incrExecutorTotalSuccess(); + break; + case EXECUTION_FAILURE: + LOG.info("Failed to run: " + requestId); + metrics.incrExecutorTotalExecutionFailed(); + break; + case INTERRUPTED: + LOG.info("Interrupted while running: " + requestId); + metrics.incrExecutorTotalInterrupted(); + break; + case ASKED_TO_DIE: + LOG.info("Asked to die while running: " + requestId); + metrics.incrExecutorTotalAskedToDie(); + break; + } + taskRunnerCallable.shutdown(); + HistoryLogger + .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), + executionContext.getHostName(), request.getFragmentSpec().getDagName(), + request.getFragmentSpec().getVertexName(), + request.getFragmentSpec().getFragmentNumber(), + request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, + taskRunnerCallable.startTime, true); + metrics.decrExecutorNumQueuedRequests(); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t); + // TODO HIVE-10236 Report a fatal error over the umbilical + taskRunnerCallable.shutdown(); + HistoryLogger + .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), + executionContext.getHostName(), request.getFragmentSpec().getDagName(), + request.getFragmentSpec().getVertexName(), + request.getFragmentSpec().getFragmentNumber(), + request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, + taskRunnerCallable.startTime, false); + if (metrics != null) { + metrics.decrExecutorNumQueuedRequests(); + } + } + } + + public static class ConfParams { + final int amHeartbeatIntervalMsMax; + final long amCounterHeartbeatInterval; + final int amMaxEventsPerHeartbeat; + + public ConfParams(int amHeartbeatIntervalMsMax, long amCounterHeartbeatInterval, + int amMaxEventsPerHeartbeat) { + this.amHeartbeatIntervalMsMax = amHeartbeatIntervalMsMax; + this.amCounterHeartbeatInterval = amCounterHeartbeatInterval; + this.amMaxEventsPerHeartbeat = amMaxEventsPerHeartbeat; + } + } + + public static String getTaskIdentifierString( + LlapDaemonProtocolProtos.SubmitWorkRequestProto request) { + StringBuilder sb = new StringBuilder(); + sb.append("AppId=").append(request.getApplicationIdString()) + .append(", containerId=").append(request.getContainerIdString()) + .append(", Dag=").append(request.getFragmentSpec().getDagName()) + .append(", Vertex=").append(request.getFragmentSpec().getVertexName()) + .append(", FragmentNum=").append(request.getFragmentSpec().getFragmentNumber()) + .append(", Attempt=").append(request.getFragmentSpec().getAttemptNumber()); + return sb.toString(); + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java new file mode 100644 index 0000000..276e264 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.task.TezChild; +import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult; +import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult.ExitStatus; +import org.junit.Before; +import org.junit.Test; + +public class TestTaskExecutorService { + private static Configuration conf; + private static Credentials cred = new Credentials(); + + private static class MockRequest extends TaskRunnerCallable { + private int workTime; + private boolean canFinish; + + public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto, + boolean canFinish, int workTime) { + super(requestProto, conf, new ExecutionContextImpl("localhost"), null, null, cred, 0, null, + null, null, null); + this.workTime = workTime; + this.canFinish = canFinish; + } + + @Override + protected TezChild.ContainerExecutionResult callInternal() throws Exception { + System.out.println(requestId + " is executing.."); + Thread.sleep(workTime); + return new ContainerExecutionResult(ExitStatus.SUCCESS, null, null); + } + + @Override + public boolean canFinish() { + return canFinish; + } + } + + @Before + public void setup() { + conf = new Configuration(); + } + + private SubmitWorkRequestProto createRequest(int fragmentNumber) { + ApplicationId appId = ApplicationId.newInstance(9999, 72); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID vId = TezVertexID.getInstance(dagId, 35); + TezTaskID tId = TezTaskID.getInstance(vId, 389); + TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, 0); + SubmitWorkRequestProto request = SubmitWorkRequestProto + .newBuilder() + .setFragmentSpec( + FragmentSpecProto + .newBuilder() + .setAttemptNumber(0) + .setDagName("MockDag") + .setFragmentNumber(fragmentNumber) + .setVertexName("MockVertex") + .setProcessorDescriptor( + EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) + .setTaskAttemptIdString(taId.toString()).build()).setAmHost("localhost") + .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1") + .setContainerIdString("MockContainer_1").setUser("MockUser") + .setTokenIdentifier("MockToken_1").build(); + return request; + } + + @Test(expected = RejectedExecutionException.class) + public void testThreadPoolRejection() throws InterruptedException { + TaskExecutorService scheduler = new TaskExecutorService(2, 2, 2, false); + scheduler.schedule(new MockRequest(createRequest(1), true, 1000)); + scheduler.schedule(new MockRequest(createRequest(2), true, 1000)); + assertEquals(0, scheduler.getWaitQueueSize()); + assertEquals(0, scheduler.getRunQueueSize()); + scheduler.schedule(new MockRequest(createRequest(3), true, 1000)); + scheduler.schedule(new MockRequest(createRequest(4), true, 1000)); + assertEquals(0, scheduler.getWaitQueueSize()); + assertEquals(2, scheduler.getRunQueueSize()); + scheduler.schedule(new MockRequest(createRequest(5), true, 1000)); + scheduler.schedule(new MockRequest(createRequest(6), true, 1000)); + assertEquals(2, scheduler.getWaitQueueSize()); + assertEquals(2, scheduler.getRunQueueSize()); + // this request should be rejected + scheduler.schedule(new MockRequest(createRequest(7), true, 1000)); + } + + @Test + public void testPreemption() throws InterruptedException { + TaskExecutorService scheduler = new TaskExecutorService(2, 2, 2, true); + scheduler.schedule(new MockRequest(createRequest(1), false, 100)); + scheduler.schedule(new MockRequest(createRequest(2), false, 100)); + assertEquals(0, scheduler.getWaitQueueSize()); + assertEquals(0, scheduler.getRunQueueSize()); + assertEquals(2, scheduler.getPreemptionListSize()); + scheduler.schedule(new MockRequest(createRequest(3), false, 100)); + scheduler.schedule(new MockRequest(createRequest(4), false, 100)); + assertEquals(0, scheduler.getWaitQueueSize()); + assertEquals(2, scheduler.getRunQueueSize()); + assertEquals(4, scheduler.getPreemptionListSize()); + // these should invoke preemption + scheduler.schedule(new MockRequest(createRequest(5), true, 100)); + assertEquals(3, scheduler.getPreemptionListSize()); + scheduler.schedule(new MockRequest(createRequest(6), true, 100)); + assertEquals(2, scheduler.getPreemptionListSize()); + Thread.sleep(1000); + } + + @Test + public void testBoundedPriorityBlockingQueue() { + TaskExecutorService scheduler = new TaskExecutorService(2, 2, 2, false); + scheduler.schedule(new MockRequest(createRequest(1), true, 100000)); + scheduler.schedule(new MockRequest(createRequest(2), true, 100000)); + assertEquals(0, scheduler.getWaitQueueSize()); + assertEquals(0, scheduler.getRunQueueSize()); + scheduler.schedule(new MockRequest(createRequest(3), true, 1000)); + scheduler.schedule(new MockRequest(createRequest(4), true, 1000)); + assertEquals(0, scheduler.getWaitQueueSize()); + assertEquals(2, scheduler.getRunQueueSize()); + MockRequest r5 = new MockRequest(createRequest(5), false, 1000); + scheduler.schedule(r5); + MockRequest r6 = new MockRequest(createRequest(6), true, 1000); + scheduler.schedule(r6); + assertEquals(2, scheduler.getWaitQueueSize()); + assertEquals(2, scheduler.getRunQueueSize()); + + // both r5 and r6 are added to wait queue, but r6 (canFinish = true) has higher priority + // over r5 (canFinish = false). Retrieving from wait queue should return r6 first + BlockingQueue waitQueue = scheduler.getWaitQueue(); + assertEquals(r6, waitQueue.remove()); + assertEquals(r5, waitQueue.remove()); + } +}