diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java index 7ab9a7b..a820e26 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java @@ -42,6 +42,9 @@ public LlapDaemonConfiguration() { public static final String LLAP_DAEMON_YARN_SHUFFLE_PORT = LLAP_DAEMON_PREFIX + "yarn.shuffle.port"; public static final int LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT = 15551; + public static final String LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED = LLAP_DAEMON_PREFIX + "shuffle.dir-watcher.enabled"; + public static final boolean LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT = false; + // Section for configs used in AM and executors public static final String LLAP_DAEMON_NUM_EXECUTORS = LLAP_DAEMON_PREFIX + "num.executors"; @@ -71,4 +74,5 @@ public LlapDaemonConfiguration() { LLAP_DAEMON_PREFIX + "task.scheduler.node.re-enable.timeout.ms"; public static final long LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS_DEFAULT = 2000l; + } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index d485f87..4dd7cb1 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -35,6 +35,10 @@ import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; @@ -166,7 +170,7 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber()); - LOG.info("Queuing container for execution: " + request); + LOG.info("Queueing container for execution: " + stringifyRequest(request)); // This is the start of container-annotated logging. // TODO Reduce the length of this string. Way too verbose at the moment. String ndcContextString = @@ -210,7 +214,7 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { // TODO Unregistering does not happen at the moment, since there's no signals on when an app completes. LOG.info("DEBUG: Registering request with the ShuffleHandler"); - ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser()); + ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser(), localDirs); TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, @@ -422,4 +426,44 @@ private String getTaskIdentifierString(SubmitWorkRequestProto request) { long amCounterHeartbeatInterval; int amMaxEventsPerHeartbeat; } + + private String stringifyRequest(SubmitWorkRequestProto request) { + StringBuilder sb = new StringBuilder(); + sb.append("am_details=").append(request.getAmHost()).append(":").append(request.getAmPort()); + sb.append(", user=").append(request.getUser()); + sb.append(", appIdString=").append(request.getApplicationIdString()); + sb.append(", appAttemptNum=").append(request.getAppAttemptNumber()); + sb.append(", containerIdString=").append(request.getContainerIdString()); + FragmentSpecProto fragmentSpec = request.getFragmentSpec(); + sb.append(", dagName=").append(fragmentSpec.getDagName()); + sb.append(", vertexName=").append(fragmentSpec.getVertexName()); + sb.append(", taskInfo=").append(fragmentSpec.getTaskAttemptIdString()); + sb.append(", processor=").append(fragmentSpec.getProcessorDescriptor().getClassName()); + sb.append(", numInputs=").append(fragmentSpec.getInputSpecsCount()); + sb.append(", numOutputs=").append(fragmentSpec.getOutputSpecsCount()); + sb.append(", numGroupedInputs=").append(fragmentSpec.getGroupedInputSpecsCount()); + sb.append(", Inputs={"); + if (fragmentSpec.getInputSpecsCount() > 0) { + for (IOSpecProto ioSpec : fragmentSpec.getInputSpecsList()) { + sb.append("{").append(ioSpec.getConnectedVertexName()).append(",").append(ioSpec.getIoDescriptor().getClassName()).append(",").append(ioSpec.getPhysicalEdgeCount()).append("}"); + } + } + sb.append("}"); + sb.append(", Outputs={"); + if (fragmentSpec.getOutputSpecsCount() > 0) { + for (IOSpecProto ioSpec : fragmentSpec.getOutputSpecsList()) { + sb.append("{").append(ioSpec.getConnectedVertexName()).append(",").append(ioSpec.getIoDescriptor().getClassName()).append(",").append(ioSpec.getPhysicalEdgeCount()).append("}"); + } + } + sb.append("}"); + sb.append(", GroupedInputs={"); + if (fragmentSpec.getGroupedInputSpecsCount() > 0) { + for (GroupInputSpecProto group : fragmentSpec.getGroupedInputSpecsList()) { + sb.append("{").append("groupName=").append(group.getGroupName()).append(", elements=").append(group.getGroupVerticesList()).append("}"); + sb.append(group.getGroupVerticesList()); + } + } + sb.append("}"); + return sb.toString(); + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 0ff255c..a5f7c0e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -109,6 +109,9 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, shufflePort); this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS, StringUtils.arrayToString(localDirs)); + this.shuffleHandlerConf.setBoolean(ShuffleHandler.SHUFFLE_DIR_WATCHER_ENABLED, daemonConf + .getBoolean(LlapDaemonConfiguration.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED, + LlapDaemonConfiguration.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT)); // Less frequently set parameter, not passing in as a param. int numHandlers = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java index b6451be..5ea11fd 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java @@ -64,7 +64,6 @@ public LlapDaemonProtocolServerImpl(int numHandlers, public SubmitWorkResponseProto submitWork(RpcController controller, LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws ServiceException { - LOG.info("DEBUG: Received request: " + request); try { containerRunner.submitWork(request); } catch (IOException e) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/AttemptRegistrationListener.java llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/AttemptRegistrationListener.java new file mode 100644 index 0000000..276a7f1 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/AttemptRegistrationListener.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.shufflehandler; + +import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler.AttemptPathIdentifier; +import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler.AttemptPathInfo; + +public interface AttemptRegistrationListener { + + void registerAttemptDirs(AttemptPathIdentifier identifier, AttemptPathInfo pathInfo); + +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java new file mode 100644 index 0000000..08e4787 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java @@ -0,0 +1,404 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.shufflehandler; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.OVERFLOW; + +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler.AttemptPathIdentifier; + +class DirWatcher { + + private static final Log LOG = LogFactory.getLog(DirWatcher.class); + + private static enum Type { + BASE, // App Base Dir + OUTPUT, // appBase/output/ + FINAL, // appBase/output/attemptDir + } + + + private static final String OUTPUT = "output"; + + private final AttemptRegistrationListener listener; + + private final WatchService watchService; + private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final WatcherCallable watcherCallable = new WatcherCallable(); + private final ListeningExecutorService watcherExecutorService; + private volatile ListenableFuture watcherFuture; + + private final DelayQueue watchedPathQueue = new DelayQueue<>(); + private final WatchExpirerCallable expirerCallable = new WatchExpirerCallable(); + private final ListeningExecutorService expirerExecutorService; + private volatile ListenableFuture expirerFuture; + + private final ConcurrentMap foundAttempts = new ConcurrentHashMap<>(); + private final ConcurrentMap watchedPaths = new ConcurrentHashMap<>(); + private final ConcurrentMap> watchesPerAttempt = new ConcurrentHashMap<>(); + + DirWatcher(AttemptRegistrationListener listener) throws IOException { + this.watchService = FileSystems.getDefault().newWatchService(); + this.listener = listener; + ExecutorService executor1 = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DirWatcher").build()); + watcherExecutorService = MoreExecutors.listeningDecorator(executor1); + + ExecutorService executor2 = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WatchExpirer").build()); + expirerExecutorService = MoreExecutors.listeningDecorator(executor2); + } + + /** + * Register a base dir for an application + * @param pathString the full path including jobId, user - /${local.dir}/appCache/${appId}/userCache/${user} + * @param appId the appId + * @param user the user + * @param expiry when to expire the watch - in ms + * @throws IOException + */ + void registerApplicationDir(String pathString, String appId, String user, long expiry) throws IOException { + Path path = FileSystems.getDefault().getPath(pathString); + WatchedPathInfo watchedPathInfo = new WatchedPathInfo(System.currentTimeMillis() + expiry, Type.BASE, appId, user); + watchedPaths.put(path, watchedPathInfo); + WatchKey watchKey = path.register(watchService, ENTRY_CREATE); + watchedPathInfo.setWatchKey(watchKey); + watchedPathQueue.add(watchedPathInfo); + + // TODO Watches on the output dirs need to be cancelled at some point. For now - via the expiry. + } + + /** + * Invoke when a pathIdentifier has been found, or is no longer of interest + * @param pathIdentifier + */ + void attemptInfoFound(AttemptPathIdentifier pathIdentifier) { + cancelWatchesForAttempt(pathIdentifier); + } + + void start() { + watcherFuture = watcherExecutorService.submit(watcherCallable); + expirerFuture = expirerExecutorService.submit(expirerCallable); + } + + void stop() throws IOException { + shutdown.set(true); + if (watcherFuture != null) { + watcherFuture.cancel(true); + } + if (expirerFuture != null) { + expirerFuture.cancel(true); + } + watchService.close(); + watcherExecutorService.shutdownNow(); + expirerExecutorService.shutdownNow(); + } + + + + private void registerDir(Path path, WatchedPathInfo watchedPathInfo) { + watchedPaths.put(path, watchedPathInfo); + try { + WatchKey watchKey = path.register(watchService, ENTRY_CREATE); + watchedPathInfo.setWatchKey(watchKey); + watchedPathQueue.add(watchedPathInfo); + if (watchedPathInfo.type == Type.FINAL) { + trackWatchForAttempt(watchedPathInfo, watchKey); + } + } catch (IOException e) { + LOG.warn("Unable to setup watch for: " + path); + } + } + + private void trackWatchForAttempt(WatchedPathInfo watchedPathInfo, WatchKey watchKey) { + assert watchedPathInfo.pathIdentifier != null; + // TODO May be possible to do finer grained locks. + synchronized (watchesPerAttempt) { + List list = watchesPerAttempt.get(watchedPathInfo.pathIdentifier); + if (list == null) { + list = new LinkedList<>(); + watchesPerAttempt.put(watchedPathInfo.pathIdentifier, list); + } + list.add(watchKey); + } + } + + private void cancelWatchesForAttempt(AttemptPathIdentifier pathIdentifier) { + // TODO May be possible to do finer grained locks. + synchronized(watchesPerAttempt) { + List list = watchesPerAttempt.remove(pathIdentifier); + if (list != null) { + for (WatchKey watchKey : list) { + watchKey.cancel(); + } + } + } + } + + public void watch() { + while (!shutdown.get()) { + WatchKey watchKey; + try { + watchKey = watchService.take(); + } catch (InterruptedException e) { + if (shutdown.get()) { + LOG.info("Shutting down watcher"); + break; + } else { + LOG.error("Watcher interrupted before being shutdown"); + throw new RuntimeException("Watcher interrupted before being shutdown", e); + } + } + Path watchedPath = (Path) watchKey.watchable(); + WatchedPathInfo parentWatchedPathInfo = watchedPaths.get(watchedPath); + boolean cancelledWatch = false; + for (WatchEvent rawEvent : watchKey.pollEvents()) { + if (rawEvent.kind().equals(OVERFLOW)) { + // Ignoring and continuing to watch for additional elements in the dir. + continue; + } + + WatchEvent event = (WatchEvent) rawEvent; + WatchedPathInfo watchedPathInfo; + Path resolvedPath; + + switch (parentWatchedPathInfo.type) { + case BASE: + // Add the output dir to the watch set, scan it, and cancel current watch. + if (event.context().getFileName().toString().equals(OUTPUT)) { + resolvedPath = watchedPath.resolve(event.context()); + watchedPathInfo = new WatchedPathInfo(parentWatchedPathInfo, Type.OUTPUT, null); + registerDir(resolvedPath, watchedPathInfo); + // Scan the "output" directory for existing files, and add watches + try (DirectoryStream dirStream = Files.newDirectoryStream(resolvedPath)) { + for (Path path : dirStream) { + // This would be an attempt directory. Add a watch, and track it. + if (path.toFile().isDirectory()) { + watchedPathInfo = new WatchedPathInfo(parentWatchedPathInfo, Type.FINAL, path.getFileName().toString()); + registerDir(path, watchedPathInfo); + scanForFinalFiles(watchedPathInfo, path); + } else { + LOG.warn("Ignoring unexpected file: " + path); + } + } + } catch (IOException e) { + LOG.warn("Unable to list files under: " + resolvedPath); + } + // Cancel the watchKey since the output dir has been found. + cancelledWatch = true; + watchKey.cancel(); + } else { + LOG.warn("DEBUG: Found unexpected directory: " + event.context() + " under " + watchedPath); + } + break; + case OUTPUT: + // Add the attemptDir to the watch set, scan it and add to the list of found files + resolvedPath = watchedPath.resolve(event.context()); + // New attempt path crated. Add a watch on it, and scan it for existing files. + watchedPathInfo = new WatchedPathInfo(parentWatchedPathInfo, Type.FINAL, event.context().getFileName().toString()); + registerDir(resolvedPath, watchedPathInfo); + scanForFinalFiles(watchedPathInfo, resolvedPath); + + break; + case FINAL: + resolvedPath = watchedPath.resolve(event.context()); + if (event.context().getFileName().toString().equals(ShuffleHandler.DATA_FILE_NAME)) { + registerFoundAttempt(parentWatchedPathInfo.pathIdentifier, null, resolvedPath); + } else if (event.context().getFileName().toString().equals(ShuffleHandler.INDEX_FILE_NAME)) { + registerFoundAttempt(parentWatchedPathInfo.pathIdentifier, resolvedPath, null); + } else { + LOG.warn("Ignoring unexpected file: " + watchedPath.resolve(event.context())); + } + break; + } + + } + if (!cancelledWatch) { + boolean valid = watchKey.reset(); + if (!valid) { + LOG.warn("DEBUG: WatchKey: " + watchKey.watchable() + " no longer valid"); + } + } + } + } + + private void scanForFinalFiles(WatchedPathInfo watchedPathInfo, Path path) { + try (DirectoryStream dirStream = Files.newDirectoryStream(path) ) { + for (Path p : dirStream) { + if (p.getFileName().toString().equals(ShuffleHandler.DATA_FILE_NAME)) { + registerFoundAttempt(watchedPathInfo.pathIdentifier, null, path); + } else if (p.getFileName().toString().equals(ShuffleHandler.INDEX_FILE_NAME)) { + registerFoundAttempt(watchedPathInfo.pathIdentifier, path, null); + } else { + LOG.warn("Ignoring unknown file: " + p.getFileName()); + } + } + } catch (IOException e) { + LOG.warn("Unable to open dir stream for attemptDir: " + path); + } + } + + private void registerFoundAttempt(AttemptPathIdentifier pathIdentifier, Path indexFile, Path dataFile) { + FoundPathInfo pathInfo = foundAttempts.get(pathIdentifier); + if (pathInfo == null) { + pathInfo = new FoundPathInfo(indexFile, dataFile); + foundAttempts.put(pathIdentifier, pathInfo); + } + if (pathInfo.isComplete()) { + // Inform the shuffle handler + listener.registerAttemptDirs(pathIdentifier, + new ShuffleHandler.AttemptPathInfo(new org.apache.hadoop.fs.Path(indexFile.toUri()), + new org.apache.hadoop.fs.Path(dataFile.toUri()))); + // Cancel existing watches + cancelWatchesForAttempt(pathIdentifier); + // Cleanup structures + foundAttempts.remove(pathIdentifier); + } + } + + private class WatcherCallable implements Callable { + + @Override + public Void call() throws Exception { + watch(); + return null; + } + } + + private class WatchExpirerCallable implements Callable { + + @Override + public Void call() { + while (!shutdown.get()) { + // Relying on watchService.close to clean up all pending watches + WatchedPathInfo pathInfo; + try { + pathInfo = watchedPathQueue.take(); + } catch (InterruptedException e) { + if (shutdown.get()) { + LOG.info("Shutting down WatchExpirer"); + break; + } else { + LOG.error("WatchExpirer interrupted before being shutdown"); + throw new RuntimeException("WatchExpirer interrupted before being shutdown", e); + } + } + WatchKey watchKey = pathInfo.getWatchKey(); + if (watchKey != null && watchKey.isValid()) { + watchKey.cancel(); + } + } + return null; + } + } + + + private static class FoundPathInfo { + Path indexPath; + Path dataPath; + + public FoundPathInfo(Path indexPath, Path dataPath) { + this.indexPath = indexPath; + this.dataPath = dataPath; + } + + boolean isComplete() { + return indexPath != null && dataPath != null; + } + } + + private static class WatchedPathInfo implements Delayed { + final long expiry; + final Type type; + final String appId; + final String user; + final String attemptId; + final AttemptPathIdentifier pathIdentifier; + WatchKey watchKey; + + public WatchedPathInfo(long expiry, Type type, String jobId, String user) { + this.expiry = expiry; + this.type = type; + this.appId = jobId; + this.user = user; + this.attemptId = null; + this.pathIdentifier = null; + } + + public WatchedPathInfo(WatchedPathInfo other, Type type, String attemptId) { + this.expiry = other.expiry; + this.appId = other.appId; + this.user = other.user; + this.type = type; + this.attemptId = attemptId; + if (attemptId != null) { + pathIdentifier = new AttemptPathIdentifier(appId, user, attemptId); + } else { + pathIdentifier = null; + } + } + + synchronized void setWatchKey(WatchKey watchKey) { + this.watchKey = watchKey; + } + + synchronized WatchKey getWatchKey() { + return this.watchKey; + } + + @Override + public long getDelay(TimeUnit unit) { + return expiry - System.currentTimeMillis(); + } + + @Override + public int compareTo(Delayed o) { + WatchedPathInfo other = (WatchedPathInfo)o; + if (other.expiry > this.expiry) { + return -1; + } else if (other.expiry < this.expiry) { + return 1; + } else { + return 0; + } + } + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index 60e5992..d640b36 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -42,6 +42,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -50,6 +51,12 @@ import com.google.common.base.Charsets; import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.cache.Weigher; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -106,7 +113,7 @@ import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.util.CharsetUtil; -public class ShuffleHandler { +public class ShuffleHandler implements AttemptRegistrationListener { private static final Log LOG = LogFactory.getLog(ShuffleHandler.class); @@ -117,6 +124,9 @@ public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes"; public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; + + public static final String SHUFFLE_DIR_WATCHER_ENABLED = "llap.shuffle.dir-watcher.enabled"; + public static final boolean SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT = false; // pattern to identify errors related to the client closing the socket early // idea borrowed from Netty SslHandler @@ -130,8 +140,8 @@ protected HttpPipelineFactory pipelineFact; private final int sslFileBufferSize; private final Configuration conf; - - private final ConcurrentMap registeredApps = new ConcurrentHashMap(); + private final String[] localDirs; + private final DirWatcher dirWatcher; /** * Should the shuffle use posix_fadvise calls to manage the OS cache during @@ -144,7 +154,10 @@ private final boolean shuffleTransferToAllowed; private final ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); - private Map userRsrc; + /* List of registered applications */ + private final ConcurrentMap registeredApps = new ConcurrentHashMap(); + /* Maps application identifiers (jobIds) to the associated user for the app */ + private final ConcurrentMap userRsrc; private JobTokenSecretManager secretManager; public static final String MAPREDUCE_SHUFFLE_SERVICEID = @@ -165,7 +178,7 @@ public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = "mapreduce.shuffle.mapoutput-info.meta.cache.size"; public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = - 1000; + 10000; public static final String CONNECTION_CLOSE = "close"; @@ -191,13 +204,27 @@ public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = false; - final boolean connectionKeepAliveEnabled; - final int connectionKeepAliveTimeOut; - final int mapOutputMetaInfoCacheSize; + static final String DATA_FILE_NAME = "file.out"; + static final String INDEX_FILE_NAME = "file.out.index"; private static final AtomicBoolean started = new AtomicBoolean(false); private static final AtomicBoolean initing = new AtomicBoolean(false); private static ShuffleHandler INSTANCE; + + final boolean connectionKeepAliveEnabled; + final int connectionKeepAliveTimeOut; + final int mapOutputMetaInfoCacheSize; + private final LocalDirAllocator lDirAlloc = + new LocalDirAllocator(SHUFFLE_HANDLER_LOCAL_DIRS); + private final Shuffle shuffle; + + @Override + public void registerAttemptDirs(AttemptPathIdentifier identifier, + AttemptPathInfo pathInfo) { + shuffle.registerAttemptDirs(identifier, pathInfo); + } + + @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @Metric("Shuffle output in bytes") @@ -236,6 +263,8 @@ public ShuffleHandler(Configuration conf) { maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors(); } + localDirs = conf.getTrimmedStrings(SHUFFLE_HANDLER_LOCAL_DIRS); + shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE, DEFAULT_SHUFFLE_BUFFER_SIZE); @@ -267,8 +296,22 @@ public ShuffleHandler(Configuration conf) { Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE, DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE)); - userRsrc = new ConcurrentHashMap(); + userRsrc = new ConcurrentHashMap<>(); secretManager = new JobTokenSecretManager(); + shuffle = new Shuffle(conf); + if (conf.getBoolean(SHUFFLE_DIR_WATCHER_ENABLED, SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT)) { + LOG.info("Attempting to start dirWatcher"); + DirWatcher localDirWatcher = null; + try { + localDirWatcher = new DirWatcher(this); + } catch (IOException e) { + LOG.warn("Unable to start DirWatcher. Active scans disabled"); + } + dirWatcher = localDirWatcher; + } else { + LOG.info("DirWatcher disabled by config"); + dirWatcher = null; + } } @@ -286,6 +329,9 @@ public void start() throws Exception { port = ((InetSocketAddress)ch.getLocalAddress()).getPort(); conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port)); pipelineFact.SHUFFLE.setPort(port); + if (dirWatcher != null) { + dirWatcher.start(); + } LOG.info("LlapShuffleHandler" + " listening on port " + port); } @@ -359,16 +405,34 @@ public int getPort() { return port; } + /** + * Register an application and it's associated credentials and user information. + * @param applicationIdString + * @param appToken + * @param user + */ public void registerApplication(String applicationIdString, Token appToken, - String user) { + String user, String [] appDirs) { + // TODO Fix this. There's a race here, where an app may think everything is registered, finish really fast, send events and the consumer will not find the registration. Boolean registered = registeredApps.putIfAbsent(applicationIdString, Boolean.valueOf(true)); if (registered == null) { + LOG.info("DEBUG: Registering watches for AppDirs: appId=" + applicationIdString); recordJobShuffleInfo(applicationIdString, user, appToken); + if (dirWatcher != null) { + for (String appDir : appDirs) { + try { + dirWatcher.registerApplicationDir(appDir, applicationIdString, user, 5 * 60 * 1000); + } catch (IOException e) { + LOG.warn("Unable to register dir: " + appDir + " with watcher"); + } + } + } } } public void unregisterApplication(String applicationIdString) { removeJobShuffleInfo(applicationIdString); + // TOOD Unregister from the dirWatcher } @@ -381,15 +445,20 @@ protected void stop() throws Exception { if (pipelineFact != null) { pipelineFact.destroy(); } + if (dirWatcher != null) { + dirWatcher.stop(); + } } protected Shuffle getShuffle(Configuration conf) { - return new Shuffle(conf); + return shuffle; } private void addJobToken(String appIdString, String user, Token jobToken) { + // This is in place to be compatible with the MR ShuffleHandler. Requests from ShuffleInputs + // arrive with a job_ prefix. String jobIdString = appIdString.replace("application", "job"); userRsrc.put(jobIdString, user); secretManager.addTokenForJob(jobIdString, jobToken); @@ -450,11 +519,49 @@ public ChannelPipeline getPipeline() throws Exception { class Shuffle extends SimpleChannelUpstreamHandler { private final Configuration conf; + // TODO Change the indexCache to be a guava loading cache, rather than a custom implementation. private final IndexCache indexCache; - private final LocalDirAllocator lDirAlloc = - new LocalDirAllocator(SHUFFLE_HANDLER_LOCAL_DIRS); private int port; + private final LoadingCache pathCache = + CacheBuilder.newBuilder().expireAfterAccess(300, TimeUnit.SECONDS).softValues() + .concurrencyLevel(16) + .removalListener(new RemovalListener() { + @Override + public void onRemoval( + RemovalNotification notification) { + LOG.info("DEBUG: PathCacheEviction: " + notification.getKey() + ", Reason=" + + notification.getCause()); + } + }) + .maximumWeight(10 * 1024 * 1024).weigher( + new Weigher() { + @Override + public int weigh(AttemptPathIdentifier key, AttemptPathInfo value) { + return key.jobId.length() + key.user.length() + key.attemptId.length() + + value.indexPath.toString().length() + + value.dataPath.toString().length(); + } + }).build(new CacheLoader() { + @Override + public AttemptPathInfo load(AttemptPathIdentifier key) throws + Exception { + String base = getBaseLocation(key.jobId, key.user); + String attemptBase = base + key.attemptId; + Path indexFileName = + lDirAlloc.getLocalPathToRead(attemptBase + "/" + INDEX_FILE_NAME, conf); + Path mapOutputFileName = + lDirAlloc.getLocalPathToRead(attemptBase + "/" + DATA_FILE_NAME, conf); + + LOG.info("DEBUG: Loaded : " + key + " via loader"); + if (dirWatcher != null) { + dirWatcher.attemptInfoFound(key); + } + return new AttemptPathInfo(indexFileName, mapOutputFileName); + + } + }); + public Shuffle(Configuration conf) { this.conf = conf; indexCache = new IndexCache(conf); @@ -465,6 +572,12 @@ public void setPort(int port) { this.port = port; } + void registerAttemptDirs(AttemptPathIdentifier identifier, + AttemptPathInfo pathInfo) { + LOG.info("DEBUG: Registering " + identifier + " via watcher"); + pathCache.put(identifier, pathInfo); + } + private List splitMaps(List mapq) { if (null == mapq) { return null; @@ -569,14 +682,9 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) Channel ch = evt.getChannel(); String user = userRsrc.get(jobId); - // $x/$user/appcache/$appId/output/$mapId - // TODO: Once Shuffle is out of NM, this can use MR APIs to convert - // between App and Job - String outputBasePathStr = getBaseLocation(jobId, user); - try { - populateHeaders(mapIds, outputBasePathStr, user, reduceId, request, - response, keepAliveParam, mapOutputInfoMap); + populateHeaders(mapIds, jobId, user, reduceId, + response, keepAliveParam, mapOutputInfoMap); } catch(IOException e) { ch.write(response); LOG.error("Shuffle error in populating headers :", e); @@ -590,8 +698,10 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) for (String mapId : mapIds) { try { MapOutputInfo info = mapOutputInfoMap.get(mapId); + // This will be hit if there's a large number of mapIds in a single request + // (Determined by the cache size further up), in which case we go to disk again. if (info == null) { - info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user); + info = getMapOutputInfo(jobId, mapId, reduceId, user); } lastMap = sendMapOutput(ctx, ch, user, mapId, @@ -619,61 +729,56 @@ private String getErrorMessage(Throwable t) { return sb.toString(); } - private final String USERCACHE_CONSTANT = "usercache"; - private final String APPCACHE_CONSTANT = "appcache"; - private String getBaseLocation(String jobIdString, String user) { - String parts[] = jobIdString.split("_"); - Preconditions.checkArgument(parts.length == 3, "Invalid jobId. Expecting 3 parts"); - final ApplicationId appID = - ApplicationId.newInstance(Long.parseLong(parts[1]), Integer.parseInt(parts[2])); - final String baseStr = - USERCACHE_CONSTANT + "/" + user + "/" - + APPCACHE_CONSTANT + "/" - + ConverterUtils.toString(appID) + "/output" + "/"; - return baseStr; - } + protected MapOutputInfo getMapOutputInfo(String jobId, String mapId, + int reduce, String user) throws IOException { + AttemptPathInfo pathInfo; + try { + AttemptPathIdentifier identifier = new AttemptPathIdentifier(jobId, user, mapId); + pathInfo = pathCache.get(identifier); + LOG.info("DEBUG: Retrieved pathInfo for " + identifier + " check for corresponding loaded messages to determine whether it was loaded or cached"); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } + } - protected MapOutputInfo getMapOutputInfo(String base, String mapId, - int reduce, String user) throws IOException { - // Index file - Path indexFileName = - lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf); TezIndexRecord info = - indexCache.getIndexInformation(mapId, reduce, indexFileName, user); + indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user); - Path mapOutputFileName = - lDirAlloc.getLocalPathToRead(base + "/file.out", conf); if (LOG.isDebugEnabled()) { - LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName); + LOG.debug("jobId=" + jobId + ", mapId=" + mapId + ",dataFile=" + pathInfo.dataPath + + ", indexFile=" + pathInfo.indexPath); } - MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info); + + // TODO Get rid of MapOutputInfo if possible + MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, info); return outputInfo; } - protected void populateHeaders(List mapIds, String outputBaseStr, - String user, int reduce, HttpRequest request, HttpResponse response, + protected void populateHeaders(List mapIds, String jobId, + String user, int reduce, HttpResponse response, boolean keepAliveParam, Map mapOutputInfoMap) throws IOException { + // Reads the index file for each requested mapId, and figures out the overall + // length of the response - which is populated into the response header. long contentLength = 0; for (String mapId : mapIds) { - String base = outputBaseStr + mapId; - MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user); + MapOutputInfo outputInfo = getMapOutputInfo(jobId, mapId, reduce, user); + // mapOutputInfoMap is used to share the lookups with the caller if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) { mapOutputInfoMap.put(mapId, outputInfo); } - // Index file - Path indexFileName = - lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf); - TezIndexRecord info = - indexCache.getIndexInformation(mapId, reduce, indexFileName, user); ShuffleHeader header = - new ShuffleHeader(mapId, info.getPartLength(), info.getRawLength(), reduce); + new ShuffleHeader(mapId, outputInfo.indexRecord.getPartLength(), + outputInfo.indexRecord.getRawLength(), reduce); DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); - contentLength += info.getPartLength(); + contentLength += outputInfo.indexRecord.getPartLength(); contentLength += dob.getLength(); } @@ -697,8 +802,8 @@ protected void setResponseHeaders(HttpResponse response, } class MapOutputInfo { - final Path mapOutputFileName; - final TezIndexRecord indexRecord; + final Path mapOutputFileName; // 100-200 byte string. Maybe replace with a local-dir-id, and construct on the fly. + final TezIndexRecord indexRecord; // 3 longs + reference overheads. MapOutputInfo(Path mapOutputFileName, TezIndexRecord indexRecord) { this.mapOutputFileName = mapOutputFileName; @@ -842,4 +947,84 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) } } } + + + private static final String USERCACHE_CONSTANT = "usercache"; + private static final String APPCACHE_CONSTANT = "appcache"; + + private static String getBaseLocation(String jobIdString, String user) { + // $x/$user/appcache/$appId/output/$mapId + // TODO: Once Shuffle is out of NM, this can use MR APIs to convert + // between App and Job + String parts[] = jobIdString.split("_"); + Preconditions.checkArgument(parts.length == 3, "Invalid jobId. Expecting 3 parts"); + final ApplicationId appID = + ApplicationId.newInstance(Long.parseLong(parts[1]), Integer.parseInt(parts[2])); + final String baseStr = + USERCACHE_CONSTANT + "/" + user + "/" + + APPCACHE_CONSTANT + "/" + + ConverterUtils.toString(appID) + "/output" + "/"; + return baseStr; + } + + static class AttemptPathInfo { + // TODO Change this over to just store local dir indices, instead of the entire path. Far more efficient. + private final Path indexPath; + private final Path dataPath; + + public AttemptPathInfo(Path indexPath, Path dataPath) { + this.indexPath = indexPath; + this.dataPath = dataPath; + } + } + + static class AttemptPathIdentifier { + private final String jobId; + private final String user; + private final String attemptId; + + public AttemptPathIdentifier(String jobId, String user, String attemptId) { + this.jobId = jobId; + this.user = user; + this.attemptId = attemptId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AttemptPathIdentifier that = (AttemptPathIdentifier) o; + + if (!attemptId.equals(that.attemptId)) { + return false; + } + if (!jobId.equals(that.jobId)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = jobId.hashCode(); + result = 31 * result + attemptId.hashCode(); + return result; + } + + @Override + public String toString() { + return "AttemptPathIdentifier{" + + "attemptId='" + attemptId + '\'' + + ", jobId='" + jobId + '\'' + + '}'; + } + } + + } diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 4fa9fad..895f389 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -711,9 +711,9 @@ public long getDelay(TimeUnit unit) { public int compareTo(Delayed o) { NodeInfo other = (NodeInfo) o; if (other.expireTimeMillis > this.expireTimeMillis) { - return 1; - } else if (other.expireTimeMillis < this.expireTimeMillis) { return -1; + } else if (other.expireTimeMillis < this.expireTimeMillis) { + return 1; } else { return 0; }