diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index ded84c1..3ec542c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -28,6 +29,7 @@ import org.apache.hadoop.hive.common.UgiFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapOutputFormatService.CloseableGenerator; import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.NotTezEventHelper; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; @@ -80,7 +82,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; -public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler { +public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler, CloseableGenerator { // TODO Setup a set of threads to process incoming requests. // Make sure requests for a single dag/query are handled by the same thread @@ -468,4 +470,39 @@ public void taskKilled(String amLocation, int port, String user, public Set getExecutorStatus() { return executorService.getExecutorsStatus(); } + + void fragmentCompleteWithQueryCleanup(QueryFragmentInfo fragmentInfo) { + queryTracker.fragmentCompleteWithQueryCleanup(fragmentInfo); + } + + /** + * Generate Closeable object for fragment/query cleanup of fragments submitted via external clients + */ + @Override + public Closeable createCloseableForFragment(String fragmentId) { + QueryFragmentInfo fragmentInfo = queryTracker.getFragmentInfoForFragment(fragmentId); + if (fragmentInfo == null) { + LOG.warn("No fragment info found, skipping cleanup for fragmentId " + fragmentId); + return null; + } + return new ExternalFragmentCleanup(this, fragmentInfo); + } + + private static class ExternalFragmentCleanup implements Closeable { + private final ContainerRunnerImpl containerRunner; + private final QueryFragmentInfo fragmentInfo; + + private ExternalFragmentCleanup(ContainerRunnerImpl cr, QueryFragmentInfo fi) { + Preconditions.checkNotNull(cr); + Preconditions.checkNotNull(fi); + + this.containerRunner = cr; + this.fragmentInfo = fi; + } + + @Override + public void close() throws IOException { + containerRunner.fragmentCompleteWithQueryCleanup(fragmentInfo); + } + } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index c7e9d32..8d6fc50 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -355,7 +355,7 @@ public void serviceStart() throws Exception { this.shufflePort.set(ShuffleHandler.get().getPort()); getConfig() .setInt(ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT.varname, ShuffleHandler.get().getPort()); - LlapOutputFormatService.initializeAndStart(getConfig(), secretManager); + LlapOutputFormatService.initializeAndStart(getConfig(), secretManager, containerRunner); super.serviceStart(); // Setup the actual ports in the configuration. diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index a965872..f31b5f6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -18,6 +18,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.Path; @@ -41,6 +42,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -80,7 +82,7 @@ private final Lock lock = new ReentrantLock(); - private final ConcurrentMap dagSpecificLocks = new ConcurrentHashMap<>(); + private final ConcurrentMap dagSpecificLocks = new ConcurrentHashMap<>(); // Tracks various maps for dagCompletions. This is setup here since stateChange messages // may be processed by a thread which ends up executing before a task. @@ -94,6 +96,13 @@ private final ConcurrentHashMap queryIdentifierToHiveQueryId = new ConcurrentHashMap<>(); + // Pending query cleanup tasks, only used for fragments submitted by the external API. + private final ConcurrentHashMap pendingQueryCleanerTasks = + new ConcurrentHashMap<>(); + // Mapping of fragmentId to QueryFragmentInfo + private final ConcurrentHashMap fragmentIdToQueryIdMapping = + new ConcurrentHashMap<>(); + public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId) { super("QueryTracker"); this.localDirsBase = localDirsBase; @@ -120,8 +129,7 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, SignableVertexSpec vertex, Token appToken, String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException { - ReadWriteLock dagLock = getDagLock(queryIdentifier); - dagLock.readLock().lock(); + DagLock dagLock = getAndTakeDagReadLock(queryIdentifier); try { if (completedDagMap.contains(queryIdentifier)) { // Cleanup the dag lock here, since it may have been created after the query completed @@ -141,6 +149,16 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId if (UserGroupInformation.isSecurityEnabled()) { Preconditions.checkNotNull(tokenInfo.userName); } + + // Cancel any query cleanup for this queryID, since we are registering a new fragment with this query. + CancellableCleanerCallable queryCleanerTask = pendingQueryCleanerTasks.get(queryIdentifier); + if (queryCleanerTask != null) { + boolean cancelled = queryCleanerTask.cancel(); + if (!cancelled) { + throw new IOException("Unable to cancel pending query cleanup for " + queryIdentifier); + } + } + queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user, getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, tokenInfo.userName, tokenInfo.appId); @@ -164,8 +182,10 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId .registerDag(appIdString, dagIdentifier, appToken, user, queryInfo.getLocalDirs()); - return queryInfo.registerFragment( + QueryFragmentInfo queryFragmentInfo = queryInfo.registerFragment( vertexName, fragmentNumber, attemptNumber, vertex, fragmentIdString); + fragmentIdToQueryIdMapping.put(fragmentIdString, queryFragmentInfo); + return queryFragmentInfo; } finally { dagLock.readLock().unlock(); } @@ -184,6 +204,36 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { LOG.info("Ignoring fragmentComplete message for unknown query: {}", qId); } else { queryInfo.unregisterFragment(fragmentInfo); + fragmentIdToQueryIdMapping.remove(fragmentInfo.getFragmentIdentifierString()); + } + } + + /** + * Clean up the query-specific info if there are no outstanding fragments for this query. + * This is from internal execution within the daemon + * @param fragmentInfo + */ + void fragmentCompleteWithQueryCleanup(QueryFragmentInfo fragmentInfo) { + // + fragmentComplete(fragmentInfo); + + QueryIdentifier queryIdentifier = fragmentInfo.getQueryInfo().getQueryIdentifier(); + QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); + if (queryInfo != null) { + // Check if are no more fragments for this query + DagLock dagLock = getAndTakeDagWriteLock(queryIdentifier); + try { + List queryFragments = queryInfo.getRegisteredFragments(); + if (queryFragments.size() == 0) { + // Setup query cleanup task + long delaySecs = 3600; + CancellableCleanerCallable cleanupTask = new CancellableCleanerCallable(queryIdentifier); + ScheduledFuture cleanupFuture = executorService.schedule(cleanupTask, delaySecs, TimeUnit.SECONDS); + cleanupTask.setFuture(cleanupFuture); + } + } finally { + dagLock.writeLock().unlock(); + } } } @@ -194,15 +244,22 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { */ List queryComplete(QueryIdentifier queryIdentifier, long deleteDelay, boolean isInternal) throws IOException { + // We will remember completed DAG for an hour to avoid execution out-of-order fragments. + long dagDelaySecs = 3600; + return queryComplete(queryIdentifier, deleteDelay, dagDelaySecs, isInternal, false); + } + + List queryComplete(QueryIdentifier queryIdentifier, long deleteDelay, long dagDelay, + boolean isInternal, boolean waitForDeleteTasks) throws IOException { if (deleteDelay == -1) { deleteDelay = defaultDeleteDelaySeconds; } - ReadWriteLock dagLock = getDagLock(queryIdentifier); - dagLock.writeLock().lock(); + List> deleteTasks = new LinkedList>(); + DagLock dagLock = getAndTakeDagWriteLock(queryIdentifier); try { QueryInfo queryInfo = isInternal ? queryInfoMap.get(queryIdentifier) : checkPermissionsAndGetQuery(queryIdentifier); - rememberCompletedDag(queryIdentifier); + deleteTasks.add(rememberCompletedDag(queryIdentifier, dagDelay)); LOG.info("Processing queryComplete for queryIdentifier={} with deleteDelay={} seconds", queryIdentifier, deleteDelay); queryInfoMap.remove(queryIdentifier); @@ -213,7 +270,7 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { String[] localDirs = queryInfo.getLocalDirsNoCreate(); if (localDirs != null) { for (String localDir : localDirs) { - cleanupDir(localDir, deleteDelay); + deleteTasks.add(cleanupDir(localDir, deleteDelay)); ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier()); } } @@ -224,24 +281,37 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { sourceCompletionMap.remove(queryIdentifier); String savedQueryId = queryIdentifierToHiveQueryId.remove(queryIdentifier); dagSpecificLocks.remove(queryIdentifier); + // This lock is no longer valid for this query - invalidate it. + dagLock.invalidate(); if (savedQueryId != null) { ObjectCacheFactory.removeLlapQueryCache(savedQueryId); } + if (waitForDeleteTasks) { + for (ScheduledFuture task : deleteTasks) { + while (!task.isDone()) { + try { + task.wait(); + } catch (InterruptedException err) { + LOG.error("Interrupted cleanup task: ", err); + } + } + } + } return queryInfo.getRegisteredFragments(); } finally { dagLock.writeLock().unlock(); } } - - - public void rememberCompletedDag(QueryIdentifier queryIdentifier) { + public ScheduledFuture rememberCompletedDag(QueryIdentifier queryIdentifier, long delaySecs) { + ScheduledFuture retval = null; if (completedDagMap.add(queryIdentifier)) { - // We will remember completed DAG for an hour to avoid execution out-of-order fragments. - executorService.schedule(new DagMapCleanerCallable(queryIdentifier), 1, TimeUnit.HOURS); + // We will remember completed DAG for up to delaySecs to avoid execution out-of-order fragments. + retval = executorService.schedule(new DagMapCleanerCallable(queryIdentifier), delaySecs, TimeUnit.SECONDS); } else { LOG.warn("Couldn't add {} to completed dag set", queryIdentifier); } + return retval; } /** @@ -263,12 +333,12 @@ void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceNam } - private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) { + private DagLock getDagLock(QueryIdentifier queryIdentifier) { lock.lock(); try { - ReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier); + DagLock dagLock = dagSpecificLocks.get(queryIdentifier); if (dagLock == null) { - dagLock = new ReentrantReadWriteLock(); + dagLock = new DagLock(); dagSpecificLocks.put(queryIdentifier, dagLock); } return dagLock; @@ -277,6 +347,31 @@ private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) { } } + private DagLock getAndTakeDagReadLock(QueryIdentifier queryIdentifier) { + DagLock dagLock = null; + while (dagLock == null) { + dagLock = getDagLock(queryIdentifier); + dagLock.readLock().lock(); + if (!dagLock.isValid()) { + // Lock is no longer valid for this queryID + dagLock = null; + } + } + return dagLock; + } + + private DagLock getAndTakeDagWriteLock(QueryIdentifier queryIdentifier) { + DagLock dagLock = null; + while (dagLock == null) { + dagLock = getDagLock(queryIdentifier); + dagLock.writeLock().lock(); + if (!dagLock.isValid()) { + // Lock is no longer valid for this queryID + dagLock = null; + } + } + return dagLock; + } private ConcurrentMap getSourceCompletionMap(QueryIdentifier queryIdentifier) { ConcurrentMap dagMap = sourceCompletionMap.get(queryIdentifier); if (dagMap == null) { @@ -304,9 +399,31 @@ public void serviceStop() { LOG.info(getName() + " stopped"); } - private void cleanupDir(String dir, long deleteDelay) { + private ScheduledFuture cleanupDir(String dir, long deleteDelay) { LOG.info("Scheduling deletion of {} after {} seconds", dir, deleteDelay); - executorService.schedule(new FileCleanerCallable(dir), deleteDelay, TimeUnit.SECONDS); + return executorService.schedule(new FileCleanerCallable(dir), deleteDelay, TimeUnit.SECONDS); + } + + /** + * A read-write lock that can be set as invalid. This can be checked after taking the lock + * to see any of the previous holders of the lock has invalidated it (during query cleanup) + */ + private static class DagLock extends ReentrantReadWriteLock { + private static final long serialVersionUID = 1L; + + private boolean valid = true; + + public DagLock() { + super(); + } + + public boolean isValid() { + return valid; + } + + public void invalidate() { + valid = false; + } } private class FileCleanerCallable extends CallableWithNdc { @@ -345,6 +462,91 @@ protected Void callInternal() { } } + enum CleanerState { + PENDING, + ACTIVE, + CANCELLED + }; + + private class CancellableCleanerCallable extends CallableWithNdc { + private final QueryIdentifier queryIdentifier; + private CleanerState cleanerState = CleanerState.PENDING; + private ScheduledFuture future; + private Object cleanerLock = new Object(); + + private CancellableCleanerCallable(QueryIdentifier queryIdentifier) { + this.queryIdentifier = queryIdentifier; + } + + void setFuture(ScheduledFuture future) { + this.future = future; + } + + boolean cancel() { + // Set state to cancelled if it has not already been activated. + synchronized (cleanerLock) { + switch (cleanerState) { + case ACTIVE: + // Cleaner has already started, cannot cancel + LOG.info("Cleanup for queryID " + queryIdentifier + " has already started and will not be cancelled"); + return false; + case PENDING: + cleanerState = CleanerState.CANCELLED; + LOG.debug("Cleanup for queryID " + queryIdentifier + " cancelled"); + break; + case CANCELLED: + LOG.warn("Attempting to cancel a cleanup which has already been cancelled."); + return true; + } + } + + pendingQueryCleanerTasks.remove(queryIdentifier); + + // Cancel this callable if possible. + if (future != null) { + boolean cancelled = future.cancel(false); + if (!cancelled) { + LOG.info("Unable to cancel cleanup callable for " + queryIdentifier); + } + } + // Even if the future.cancel() failed we still set the state to cancelled. + return true; + } + + @Override + protected Void callInternal() { + pendingQueryCleanerTasks.remove(queryIdentifier); + + // Check if this has been cancelled, and if it has not been then set to active + synchronized (cleanerLock) { + switch (cleanerState) { + case ACTIVE: + throw new RuntimeException("Cleaner for " + queryIdentifier + " was already set to active"); + case PENDING: + cleanerState = CleanerState.ACTIVE; + LOG.debug("Running cleanup for queryID " + queryIdentifier); + break; + case CANCELLED: + LOG.debug("Cleaner was cancelled for " + queryIdentifier + ", not running clean operations"); + return null; + } + } + + // Submit cleaner tasks for the queryIdentifier + // We will wait for the cleanup tasks to complete, so schedule them immediately + long deleteDelay = 0; + boolean waitForDeleteTasks = true; + boolean isInternal = true; + try { + queryComplete(queryIdentifier, deleteDelay, deleteDelay, isInternal, waitForDeleteTasks); + } catch (IOException err) { + LOG.warn("Ignoring exception while issuing queryComplete", err); + } + + return null; + } + } + private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IOException { QueryInfo queryInfo = queryInfoMap.get(queryId); if (queryInfo == null) return null; @@ -356,4 +558,11 @@ private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IO public boolean checkPermissionsForQuery(QueryIdentifier queryId) throws IOException { return checkPermissionsAndGetQuery(queryId) != null; } + + /** + * Get the query ID corresponding to this fragmentId. Must be looked up while the fragment is still registered. + */ + QueryFragmentInfo getFragmentInfoForFragment(String fragmentId) { + return fragmentIdToQueryIdMapping.get(fragmentId); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 825488f..53d8e96 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.HashMap; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; @@ -75,19 +76,24 @@ private ServerBootstrap serverBootstrap; private ChannelFuture listeningChannelFuture; private int port; + private final SecretManager sm; private final long writerTimeoutMs; + private final CloseableGenerator closeableGenerator; - private LlapOutputFormatService(Configuration conf, SecretManager sm) throws IOException { + private LlapOutputFormatService(Configuration conf, + SecretManager sm, CloseableGenerator closeableGenerator) throws IOException { this.sm = sm; this.conf = conf; this.writerTimeoutMs = HiveConf.getTimeVar( conf, ConfVars.LLAP_DAEMON_OUTPUT_STREAM_TIMEOUT, TimeUnit.MILLISECONDS); + this.closeableGenerator = closeableGenerator; } - public static void initializeAndStart(Configuration conf, SecretManager sm) throws Exception { + public static void initializeAndStart(Configuration conf, + SecretManager sm, CloseableGenerator closeableGenerator) throws Exception { if (!initing.getAndSet(true)) { - INSTANCE = new LlapOutputFormatService(conf, sm); + INSTANCE = new LlapOutputFormatService(conf, sm, closeableGenerator); INSTANCE.start(); started.set(true); } @@ -202,8 +208,14 @@ private void registerReader(ChannelHandlerContext ctx, String id, byte[] tokenBy if (!writers.containsKey(id)) { isFailed = false; writers.put(id, writer); + + Closeable cleanup = null; + if (closeableGenerator != null) { + cleanup = closeableGenerator.createCloseableForFragment(id); + } + // Add listener to handle any cleanup for when the connection is closed - ctx.channel().closeFuture().addListener(new LlapOutputFormatChannelCloseListener(id)); + ctx.channel().closeFuture().addListener(new LlapOutputFormatChannelCloseListener(id, cleanup)); lock.notifyAll(); } } @@ -234,9 +246,11 @@ public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Ex protected class LlapOutputFormatChannelCloseListener implements ChannelFutureListener { private String id; + private Closeable cleanup; - LlapOutputFormatChannelCloseListener(String id) { + LlapOutputFormatChannelCloseListener(String id, Closeable cleanup) { this.id = id; + this.cleanup = cleanup; } @Override @@ -249,6 +263,10 @@ public void operationComplete(ChannelFuture future) throws Exception { if (writer == null) { LOG.warn("Did not find a writer for ID " + id); } + + if (cleanup != null) { + cleanup.close(); + } } } @@ -267,4 +285,8 @@ public void initChannel(SocketChannel ch) throws Exception { new LlapOutputFormatServiceHandler(sendBufferSize)); } } + + public interface CloseableGenerator { + Closeable createCloseableForFragment(String fragmentId); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java index 2288cd4..35e12b6 100644 --- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java @@ -54,7 +54,7 @@ public static void setUp() throws Exception { Configuration conf = new Configuration(); // Pick random avail port HiveConf.setIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, 0); - LlapOutputFormatService.initializeAndStart(conf, null); + LlapOutputFormatService.initializeAndStart(conf, null, null); service = LlapOutputFormatService.get(); LlapProxy.setDaemon(true); LOG.debug("Output service up");