diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FragmentCompletionHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FragmentCompletionHandler.java new file mode 100644 index 0000000..511347a --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FragmentCompletionHandler.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +import org.apache.hadoop.hive.llap.daemon.impl.QueryFragmentInfo; + +public interface FragmentCompletionHandler { + + void fragmentComplete(QueryFragmentInfo fragmentInfo); +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 39b3634..ea2d77a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -55,6 +55,8 @@ */ public class AMReporter extends AbstractService { + // TODO In case of a failure to heartbeat, tasks for the specific DAG should ideally be KILLED + /* registrations and un-registrations will happen as and when tasks are submitted or are removed. reference counting is likely required. diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 3fd7920..a208bdd 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -20,13 +20,13 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; +import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; @@ -57,11 +56,11 @@ import com.google.common.base.Preconditions; -public class ContainerRunnerImpl extends AbstractService implements ContainerRunner { +// TODO Convert this to a CompositeService +public class ContainerRunnerImpl extends AbstractService implements ContainerRunner, FragmentCompletionHandler { - public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor "; - public static final String THREAD_NAME_FORMAT = THREAD_NAME_FORMAT_PREFIX + "%d"; private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class); + public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor "; private volatile AMReporter amReporter; private final QueryTracker queryTracker; @@ -74,10 +73,6 @@ private final TaskRunnerCallable.ConfParams confParams; private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl(); - // Map of dagId to vertices and associated state. - private final ConcurrentMap> sourceCompletionMap = new ConcurrentHashMap<>(); - // TODO Support for removing queued containers, interrupting / killing specific containers - public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize, boolean enablePreemption, String[] localDirsBase, int localShufflePort, AtomicReference localAddress, @@ -114,9 +109,14 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi ); } + public void serviceInit(Configuration conf) { + queryTracker.init(conf); + } + @Override public void serviceStart() { // The node id will only be available at this point, since the server has been started in LlapDaemon + queryTracker.start(); LlapNodeId llapNodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort()); this.amReporter = new AMReporter(llapNodeId, conf); @@ -130,7 +130,7 @@ protected void serviceStop() throws Exception { amReporter.stop(); amReporter = null; } - queryTracker.shutdown(); + queryTracker.stop(); super.serviceStop(); } @@ -151,7 +151,7 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { request.getFragmentSpec().getAttemptNumber(); NDC.push(ndcContextString); try { - Map env = new HashMap(); + Map env = new HashMap<>(); // TODO What else is required in this environment map. env.putAll(localEnv); env.put(ApplicationConstants.Environment.USER.name(), request.getUser()); @@ -161,13 +161,13 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { fragmentSpec.getTaskAttemptIdString()); int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); - queryTracker + QueryFragmentInfo fragmentInfo = queryTracker .registerFragment(null, request.getApplicationIdString(), fragmentSpec.getDagName(), dagIdentifier, fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(), - fragmentSpec.getAttemptNumber(), request.getUser()); + fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec()); - String []localDirs = queryTracker.getLocalDirs(null, fragmentSpec.getDagName(), request.getUser()); + String []localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); if (LOG.isDebugEnabled()) { @@ -190,11 +190,17 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { .registerDag(request.getApplicationIdString(), dagIdentifier, jobToken, request.getUser(), localDirs); - ConcurrentMap sourceCompletionMap = getSourceCompletionMap(request.getFragmentSpec().getDagName()); - TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()), - new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, - credentials, memoryPerExecutor, amReporter, sourceCompletionMap, confParams, metrics, killedTaskHandler); - executorService.schedule(callable); + TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()), + new ExecutionContextImpl(localAddress.get().getHostName()), env, + credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, + this); + try { + executorService.schedule(callable); + } catch (RejectedExecutionException e) { + // Stop tracking the fragment and re-throw the error. + fragmentComplete(fragmentInfo); + throw e; + } metrics.incrExecutorTotalRequestsHandled(); metrics.incrExecutorNumQueuedRequests(); } finally { @@ -205,8 +211,8 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { @Override public void sourceStateUpdated(SourceStateUpdatedRequestProto request) { LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request)); - ConcurrentMap dagMap = getSourceCompletionMap(request.getDagName()); - dagMap.put(request.getSrcName(), request.getState()); + queryTracker.registerSourceStateChange(request.getDagName(), request.getSrcName(), + request.getState()); } @Override @@ -280,14 +286,9 @@ public static String stringifySubmitRequest(SubmitWorkRequestProto request) { return sb.toString(); } - private ConcurrentMap getSourceCompletionMap(String dagName) { - ConcurrentMap dagMap = sourceCompletionMap.get(dagName); - if (dagMap == null) { - dagMap = new ConcurrentHashMap<>(); - ConcurrentMap old = sourceCompletionMap.putIfAbsent(dagName, dagMap); - dagMap = (old != null) ? old : dagMap; - } - return dagMap; + @Override + public void fragmentComplete(QueryFragmentInfo fragmentInfo) { + queryTracker.fragmentComplete(fragmentInfo); } private class KilledTaskHandlerImpl implements KilledTaskHandler { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java index 926835b..ab3a130 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -65,6 +65,10 @@ public synchronized void remove(E e) { deque.remove(e); } + public synchronized int size() { + return deque.size(); + } + @Override public synchronized String toString() { return deque.toString(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java new file mode 100644 index 0000000..f6cd8ab --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java @@ -0,0 +1,149 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.io.IOException; +import java.util.List; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; +import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QueryFragmentInfo { + + private static final Logger LOG = LoggerFactory.getLogger(QueryFragmentInfo.class); + + private final QueryInfo queryInfo; + private final String vertexName; + private final int fragmentNumber; + private final int attemptNumber; + private final FragmentSpecProto fragmentSpec; + + public QueryFragmentInfo(QueryInfo queryInfo, String vertexName, int fragmentNumber, + int attemptNumber, + FragmentSpecProto fragmentSpec) { + Preconditions.checkNotNull(queryInfo); + Preconditions.checkNotNull(vertexName); + Preconditions.checkNotNull(fragmentSpec); + this.queryInfo = queryInfo; + this.vertexName = vertexName; + this.fragmentNumber = fragmentNumber; + this.attemptNumber = attemptNumber; + this.fragmentSpec = fragmentSpec; + } + + // Only meant for use by the QueryTracker + QueryInfo getQueryInfo() { + return this.queryInfo; + } + + public FragmentSpecProto getFragmentSpec() { + return fragmentSpec; + } + + public String getVertexName() { + return vertexName; + } + + public int getFragmentNumber() { + return fragmentNumber; + } + + public int getAttemptNumber() { + return attemptNumber; + } + + /** + * Check whether a task can run to completion or may end up blocking on it's sources. + * This currently happens via looking up source state. + * TODO: Eventually, this should lookup the Hive Processor to figure out whether + * it's reached a state where it can finish - especially in cases of failures + * after data has been fetched. + * + * @return true if the task can finish, false otherwise + */ + public boolean canFinish() { + List inputSpecList = fragmentSpec.getInputSpecsList(); + boolean canFinish = true; + if (inputSpecList != null && !inputSpecList.isEmpty()) { + for (IOSpecProto inputSpec : inputSpecList) { + if (isSourceOfInterest(inputSpec)) { + // Lookup the state in the map. + LlapDaemonProtocolProtos.SourceStateProto state = queryInfo.getSourceStateMap() + .get(inputSpec.getConnectedVertexName()); + if (state != null && state == LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED) { + continue; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot finish due to source: " + inputSpec.getConnectedVertexName()); + } + canFinish = false; + break; + } + } + } + } + return canFinish; + } + + /** + * Get, and create if required, local-dirs for a fragment + * @return + * @throws IOException + */ + public String[] getLocalDirs() throws IOException { + return queryInfo.getLocalDirs(); + } + + private boolean isSourceOfInterest(IOSpecProto inputSpec) { + String inputClassName = inputSpec.getIoDescriptor().getClassName(); + // MRInput is not of interest since it'll always be ready. + return !inputClassName.equals(MRInputLegacy.class.getName()); + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + QueryFragmentInfo that = (QueryFragmentInfo) o; + + if (fragmentNumber != that.fragmentNumber) { + return false; + } + if (attemptNumber != that.attemptNumber) { + return false; + } + return vertexName.equals(that.vertexName); + + } + + @Override + public int hashCode() { + int result = vertexName.hashCode(); + result = 31 * result + fragmentNumber; + result = 31 * result + attemptNumber; + return result; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java new file mode 100644 index 0000000..efa18cd --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -0,0 +1,128 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; + +public class QueryInfo { + private final String queryId; + private final String appIdString; + private final String dagName; + private final int dagIdentifier; + private final String user; + private final String[] localDirsBase; + private final FileSystem localFs; + private String[] localDirs; + // Map of states for different vertices. + + private final Set knownFragments = + Collections.newSetFromMap(new ConcurrentHashMap()); + + private final ConcurrentMap sourceStateMap; + + + public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier, + String user, ConcurrentMap sourceStateMap, + String[] localDirsBase, FileSystem localFs) { + this.queryId = queryId; + this.appIdString = appIdString; + this.dagName = dagName; + this.dagIdentifier = dagIdentifier; + this.sourceStateMap = sourceStateMap; + this.user = user; + this.localDirsBase = localDirsBase; + this.localFs = localFs; + } + + public String getQueryId() { + return queryId; + } + + public String getAppIdString() { + return appIdString; + } + + public String getDagName() { + return dagName; + } + + public int getDagIdentifier() { + return dagIdentifier; + } + + public String getUser() { + return user; + } + + public ConcurrentMap getSourceStateMap() { + return sourceStateMap; + } + + public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber, int attemptNumber, FragmentSpecProto fragmentSpec) { + QueryFragmentInfo fragmentInfo = new QueryFragmentInfo(this, vertexName, fragmentNumber, attemptNumber, + fragmentSpec); + knownFragments.add(fragmentInfo); + return fragmentInfo; + } + + public void unregisterFragment(QueryFragmentInfo fragmentInfo) { + knownFragments.remove(fragmentInfo); + } + + private synchronized void createLocalDirs() throws IOException { + if (localDirs == null) { + localDirs = new String[localDirsBase.length]; + for (int i = 0; i < localDirsBase.length; i++) { + localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], appIdString, user, dagIdentifier); + localFs.mkdirs(new Path(localDirs[i])); + } + } + } + + /** + * Get, and create if required, local-dirs for a query + * @return + * @throws IOException + */ + public synchronized String[] getLocalDirs() throws IOException { + if (localDirs == null) { + createLocalDirs(); + } + return localDirs; + } + + public synchronized String[] getLocalDirsNoCreate() { + return this.localDirs; + } + + private static String createAppSpecificLocalDir(String baseDir, String applicationIdString, + String user, int dagIdentifier) { + // TODO This is broken for secure clusters. The app will not have permission to create these directories. + // May work via Slider - since the directory would already exist. Otherwise may need a custom shuffle handler. + // TODO This should be the process user - and not the user on behalf of whom the query is being submitted. + return baseDir + File.separator + "usercache" + File.separator + user + File.separator + + "appcache" + File.separator + applicationIdString + File.separator + dagIdentifier; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 5c8116e..90ad923 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -16,19 +16,28 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; +import org.apache.hadoop.service.CompositeService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + /** * Tracks queries running within a daemon */ -public class QueryTracker { +public class QueryTracker extends CompositeService { private static final Logger LOG = LoggerFactory.getLogger(QueryTracker.class); private final QueryFileCleaner queryFileCleaner; @@ -39,107 +48,162 @@ private final String[] localDirsBase; private final FileSystem localFs; + // TODO At the moment there's no way of knowing whether a query is running or not. + // A race is possible between dagComplete and registerFragment - where the registerFragment + // is processed after a dagCompletes. + // May need to keep track of completed dags for a certain time duration to avoid this. + // Alternately - send in an explicit dag start message before any other message is processed. + // Multiple threads communicating from a single AM gets in the way of this. + + // Keeps track of completed dags. Assumes dag names are unique across AMs. + private final Set completedDagMap = Collections.newSetFromMap(new ConcurrentHashMap()); + + + private final Lock lock = new ReentrantLock(); + private final ConcurrentMap dagSpecificLocks = new ConcurrentHashMap<>(); + + // Tracks various maps for dagCompletions. This is setup here since stateChange messages + // may be processed by a thread which ends up executing before a task. + private final ConcurrentMap> sourceCompletionMap = new ConcurrentHashMap<>(); + public QueryTracker(Configuration conf, String[] localDirsBase) { + super("QueryTracker"); this.localDirsBase = localDirsBase; try { localFs = FileSystem.getLocal(conf); } catch (IOException e) { throw new RuntimeException("Failed to setup local filesystem instance", e); } + queryFileCleaner = new QueryFileCleaner(conf, localFs); - queryFileCleaner.init(conf); - queryFileCleaner.start(); + addService(queryFileCleaner); } - void registerFragment(String queryId, String appIdString, String dagName, int dagIdentifier, + + /** + * Register a new fragment for a specific query + * @param queryId + * @param appIdString + * @param dagName + * @param dagIdentifier + * @param vertexName + * @param fragmentNumber + * @param attemptNumber + * @param user + * @throws IOException + */ + QueryFragmentInfo registerFragment(String queryId, String appIdString, String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, - String user) throws + String user, FragmentSpecProto fragmentSpec) throws IOException { - QueryInfo queryInfo = queryInfoMap.get(dagName); - if (queryInfo == null) { - queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user); - queryInfoMap.putIfAbsent(dagName, queryInfo); + ReadWriteLock dagLock = getDagLock(dagName); + dagLock.readLock().lock(); + try { + if (!completedDagMap.contains(dagName)) { + QueryInfo queryInfo = queryInfoMap.get(dagName); + if (queryInfo == null) { + queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user, + getSourceCompletionMap(dagName), localDirsBase, localFs); + queryInfoMap.putIfAbsent(dagName, queryInfo); + } + return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec); + } else { + // Cleanup the dag lock here, since it may have been created after the query completed + dagSpecificLocks.remove(dagName); + throw new RuntimeException( + "Dag " + dagName + " already complete. Rejecting fragment [" + vertexName + ", " + fragmentNumber + + ", " + attemptNumber); + } + } finally { + dagLock.readLock().unlock(); } - // TODO Start tracking individual fragments, so that taskKilled etc messages - // can be routed through this layer to simplify the interfaces. } - String[] getLocalDirs(String queryId, String dagName, String user) throws IOException { + /** + * Indicate to the tracker that a fragment is complete. This is from internal execution within the daemon + * @param fragmentInfo + */ + void fragmentComplete(QueryFragmentInfo fragmentInfo) { + String dagName = fragmentInfo.getQueryInfo().getDagName(); QueryInfo queryInfo = queryInfoMap.get(dagName); - return queryInfo.getLocalDirs(); + if (queryInfo == null) { + // Possible because a queryComplete message from the AM can come in first - KILL / SUCCESSFUL, + // before the fragmentComplete is reported + LOG.info("Ignoring fragmentComplete message for unknown query"); + } else { + queryInfo.unregisterFragment(fragmentInfo); + } } + /** + * Register completion for a query + * @param queryId + * @param dagName + * @param deleteDelay + */ void queryComplete(String queryId, String dagName, long deleteDelay) { - LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", dagName, deleteDelay); - QueryInfo queryInfo = queryInfoMap.remove(dagName); - if (queryInfo == null) { - LOG.warn("Ignoring query complete for unknown dag: {}", dagName); - } - String []localDirs = queryInfo.getLocalDirsNoCreate(); - if (localDirs != null) { - for (String localDir : localDirs) { - queryFileCleaner.cleanupDir(localDir, deleteDelay); - ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.dagIdentifier); + ReadWriteLock dagLock = getDagLock(dagName); + dagLock.writeLock().lock(); + try { + completedDagMap.add(dagName); + LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", dagName, + deleteDelay); + completedDagMap.add(dagName); + QueryInfo queryInfo = queryInfoMap.remove(dagName); + if (queryInfo == null) { + LOG.warn("Ignoring query complete for unknown dag: {}", dagName); } + String[] localDirs = queryInfo.getLocalDirsNoCreate(); + if (localDirs != null) { + for (String localDir : localDirs) { + queryFileCleaner.cleanupDir(localDir, deleteDelay); + ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.getDagIdentifier()); + } + } + sourceCompletionMap.remove(dagName); + dagSpecificLocks.remove(dagName); + // TODO HIVE-10762 Issue a kill message to all running fragments for this container. + // TODO HIVE-10535 Cleanup map join cache + } finally { + dagLock.writeLock().unlock(); } - // TODO HIVE-10535 Cleanup map join cache } - void shutdown() { - queryFileCleaner.stop(); + /** + * Register an update to a source within an executing dag + * @param dagName + * @param sourceName + * @param sourceState + */ + void registerSourceStateChange(String dagName, String sourceName, SourceStateProto sourceState) { + getSourceCompletionMap(dagName).put(sourceName, sourceState); + // TODO HIVE-10758 source completion notifications } - private class QueryInfo { - - private final String queryId; - private final String appIdString; - private final String dagName; - private final int dagIdentifier; - private final String user; - private String[] localDirs; - - public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier, - String user) { - this.queryId = queryId; - this.appIdString = appIdString; - this.dagName = dagName; - this.dagIdentifier = dagIdentifier; - this.user = user; - } - - - - - private synchronized void createLocalDirs() throws IOException { - if (localDirs == null) { - localDirs = new String[localDirsBase.length]; - for (int i = 0; i < localDirsBase.length; i++) { - localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], appIdString, user, dagIdentifier); - localFs.mkdirs(new Path(localDirs[i])); - } - } - } - - private synchronized String[] getLocalDirs() throws IOException { - if (localDirs == null) { - createLocalDirs(); + private ReadWriteLock getDagLock(String dagName) { + lock.lock(); + try { + ReadWriteLock dagLock = dagSpecificLocks.get(dagName); + if (dagLock == null) { + dagLock = new ReentrantReadWriteLock(); + dagSpecificLocks.put(dagName, dagLock); } - return localDirs; - } - - private synchronized String[] getLocalDirsNoCreate() { - return this.localDirs; + return dagLock; + } finally { + lock.unlock(); } } - private static String createAppSpecificLocalDir(String baseDir, String applicationIdString, - String user, int dagIdentifier) { - // TODO This is broken for secure clusters. The app will not have permission to create these directories. - // May work via Slider - since the directory would already exist. Otherwise may need a custom shuffle handler. - // TODO This should be the process user - and not the user on behalf of whom the query is being submitted. - return baseDir + File.separator + "usercache" + File.separator + user + File.separator + - "appcache" + File.separator + applicationIdString + File.separator + dagIdentifier; + private ConcurrentMap getSourceCompletionMap(String dagName) { + ConcurrentMap dagMap = sourceCompletionMap.get(dagName); + if (dagMap == null) { + dagMap = new ConcurrentHashMap<>(); + ConcurrentMap old = + sourceCompletionMap.putIfAbsent(dagName, dagMap); + dagMap = (old != null) ? old : dagMap; + } + return dagMap; } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 599c759..18daa75 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -108,6 +108,11 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePr executionCompletionExecutorService = MoreExecutors.listeningDecorator(executionCompletionExecutorServiceRaw); ListenableFuture future = waitQueueExecutorService.submit(new WaitQueueWorker()); Futures.addCallback(future, new WaitQueueWorkerCallback()); + + LOG.info("TaskExecutorService started with parameters: " + + "numExecutors=" + numExecutors + + ", waitQueueSize=" + waitQueueSize + + ", enablePreemption=" + enablePreemption); } /** @@ -134,11 +139,20 @@ public void run() { // if the task cannot finish and if no slots are available then don't schedule it. // TODO: Event notifications that change canFinish state should notify waitLock synchronized (waitLock) { - // KKK Is this a tight loop when there's only finishable tasks available ? - if (!task.canFinish() && numSlotsAvailable.get() == 0) { - waitLock.wait(); + boolean shouldWait = false; + if (task.canFinish()) { + if (numSlotsAvailable.get() == 0 && preemptionQueue.isEmpty()) { + shouldWait = true; + } + } else { + if (numSlotsAvailable.get() == 0) { + shouldWait = true; + } + } + if (shouldWait) { // Another task at a higher priority may have come in during the wait. Lookup the // queue again to pick up the task at the highest priority. + waitLock.wait(); continue; } } @@ -174,7 +188,7 @@ public void onSuccess(Object result) { @Override public void onFailure(Throwable t) { - LOG.error("Wait queue scheduler worker exited with failure!"); + LOG.error("Wait queue scheduler worker exited with failure!", t); } } @@ -237,7 +251,8 @@ private boolean trySchedule(final TaskRunnerCallable task) { if (pRequest != null) { if (isInfoEnabled) { - LOG.info("Invoking kill task for {} due to pre-emption.", pRequest.getRequestId()); + LOG.info("Invoking kill task for {} due to pre-emption to run {}", + pRequest.getRequestId(), task.getRequestId()); } // The task will either be killed or is already in the process of completing, which will diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 94512d6..166dac5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -21,9 +21,7 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -31,9 +29,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.CallableWithNdc; +import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; @@ -50,9 +48,7 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.runtime.api.ExecutionContext; -import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; @@ -76,9 +72,8 @@ */ public class TaskRunnerCallable extends CallableWithNdc { private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerCallable.class); - private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request; + private final SubmitWorkRequestProto request; private final Configuration conf; - private final String[] localDirs; private final Map envMap; private final String pid = null; private final ObjectRegistryImpl objectRegistry; @@ -88,16 +83,17 @@ private final ConfParams confParams; private final Token jobToken; private final AMReporter amReporter; - private final ConcurrentMap sourceCompletionMap; private final TaskSpec taskSpec; + private final QueryFragmentInfo fragmentInfo; private final KilledTaskHandler killedTaskHandler; + private final FragmentCompletionHandler fragmentCompletionHanler; private volatile TezTaskRunner2 taskRunner; private volatile TaskReporterInterface taskReporter; private volatile ListeningExecutorService executor; private LlapTaskUmbilicalProtocol umbilical; private volatile long startTime; private volatile String threadName; - private LlapDaemonExecutorMetrics metrics; + private final LlapDaemonExecutorMetrics metrics; private final String requestId; private boolean shouldRunTask = true; final Stopwatch runtimeWatch = new Stopwatch(); @@ -105,20 +101,20 @@ private final AtomicBoolean isCompleted = new AtomicBoolean(false); private final AtomicBoolean killInvoked = new AtomicBoolean(false); - TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, Configuration conf, - ExecutionContext executionContext, Map envMap, - String[] localDirs, Credentials credentials, - long memoryAvailable, AMReporter amReporter, - ConcurrentMap sourceCompletionMap, - ConfParams confParams, LlapDaemonExecutorMetrics metrics, - KilledTaskHandler killedTaskHandler) { + TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, + Configuration conf, + ExecutionContext executionContext, Map envMap, + Credentials credentials, + long memoryAvailable, AMReporter amReporter, + ConfParams confParams, LlapDaemonExecutorMetrics metrics, + KilledTaskHandler killedTaskHandler, + FragmentCompletionHandler fragmentCompleteHandler) { this.request = request; + this.fragmentInfo = fragmentInfo; this.conf = conf; this.executionContext = executionContext; this.envMap = envMap; - this.localDirs = localDirs; this.objectRegistry = new ObjectRegistryImpl(); - this.sourceCompletionMap = sourceCompletionMap; this.credentials = credentials; this.memoryAvailable = memoryAvailable; this.confParams = confParams; @@ -133,6 +129,7 @@ this.metrics = metrics; this.requestId = getTaskAttemptId(request); this.killedTaskHandler = killedTaskHandler; + this.fragmentCompletionHanler = fragmentCompleteHandler; } @Override @@ -189,7 +186,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { synchronized (this) { if (shouldRunTask) { - taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs, + taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(), taskSpec, request.getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, @@ -240,6 +237,7 @@ public void killTask() { boolean killed = taskRunner.killTask(); if (killed) { // Sending a kill message to the AM right here. Don't need to wait for the task to complete. + LOG.info("Kill request for task {} completed. Informing AM", taskSpec.getTaskAttemptID()); reportTaskKilled(); } else { LOG.info("Kill request for task {} did not complete because the task is already complete", @@ -268,43 +266,8 @@ public void reportTaskKilled() { taskSpec.getTaskAttemptID()); } - /** - * Check whether a task can run to completion or may end up blocking on it's sources. - * This currently happens via looking up source state. - * TODO: Eventually, this should lookup the Hive Processor to figure out whether - * it's reached a state where it can finish - especially in cases of failures - * after data has been fetched. - * - * @return - */ public boolean canFinish() { - List inputSpecList = taskSpec.getInputs(); - boolean canFinish = true; - if (inputSpecList != null && !inputSpecList.isEmpty()) { - for (InputSpec inputSpec : inputSpecList) { - if (isSourceOfInterest(inputSpec)) { - // Lookup the state in the map. - LlapDaemonProtocolProtos.SourceStateProto state = sourceCompletionMap - .get(inputSpec.getSourceVertexName()); - if (state != null && state == LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED) { - continue; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot finish due to source: " + inputSpec.getSourceVertexName()); - } - canFinish = false; - break; - } - } - } - } - return canFinish; - } - - private boolean isSourceOfInterest(InputSpec inputSpec) { - String inputClassName = inputSpec.getInputDescriptor().getClassName(); - // MRInput is not of interest since it'll always be ready. - return !inputClassName.equals(MRInputLegacy.class.getName()); + return fragmentInfo.canFinish(); } private Multimap createStartedInputMap(FragmentSpecProto fragmentSpec) { @@ -371,10 +334,10 @@ public SubmitWorkRequestProto getRequest() { final class TaskRunnerCallback implements FutureCallback { - private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request; + private final SubmitWorkRequestProto request; private final TaskRunnerCallable taskRunnerCallable; - TaskRunnerCallback(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, + TaskRunnerCallback(SubmitWorkRequestProto request, TaskRunnerCallable taskRunnerCallable) { this.request = request; this.taskRunnerCallable = taskRunnerCallable; @@ -385,6 +348,7 @@ public SubmitWorkRequestProto getRequest() { @Override public void onSuccess(TaskRunner2Result result) { isCompleted.set(true); + switch(result.getEndReason()) { // Only the KILLED case requires a message to be sent out to the AM. case SUCCESS: @@ -413,6 +377,7 @@ public void onSuccess(TaskRunner2Result result) { metrics.incrExecutorTotalExecutionFailed(); break; } + fragmentCompletionHanler.fragmentComplete(fragmentInfo); taskRunnerCallable.shutdown(); HistoryLogger @@ -427,8 +392,9 @@ public void onSuccess(TaskRunner2Result result) { @Override public void onFailure(Throwable t) { - isCompleted.set(true); LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t); + isCompleted.set(true); + fragmentCompletionHanler.fragmentComplete(fragmentInfo); // TODO HIVE-10236 Report a fatal error over the umbilical taskRunnerCallable.shutdown(); HistoryLogger @@ -458,7 +424,7 @@ public ConfParams(int amHeartbeatIntervalMsMax, long amCounterHeartbeatInterval, } public static String getTaskIdentifierString( - LlapDaemonProtocolProtos.SubmitWorkRequestProto request) { + SubmitWorkRequestProto request) { StringBuilder sb = new StringBuilder(); sb.append("AppId=").append(request.getApplicationIdString()) .append(", containerId=").append(request.getContainerIdString()) diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index f0e53a7..a2e9501 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -25,6 +25,7 @@ import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; @@ -52,15 +53,17 @@ public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto, boolean canFinish, int workTime) { - super(requestProto, conf, new ExecutionContextImpl("localhost"), null, null, cred, 0, null, - null, null, null, mock(KilledTaskHandler.class)); + super(requestProto, mock(QueryFragmentInfo.class), conf, + new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null, + mock(KilledTaskHandler.class), mock( + FragmentCompletionHandler.class)); this.workTime = workTime; this.canFinish = canFinish; } @Override protected TaskRunner2Result callInternal() { - System.out.println(requestId + " is executing.."); + System.out.println(super.getRequestId() + " is executing.."); try { Thread.sleep(workTime); } catch (InterruptedException e) {