commit 20c94d200e23eb4287e4568b7063f2d1e436ae3a Author: Reuben Kuhnert Date: Mon May 9 20:10:05 2016 -0500 HIVE-13696: Modify FairSchedulerShim to dynamically reload changes to fair-scheduler.xml. This patch dynamically reloads changes to fair-scheduler.xml by attaching a FileSystemWatcher object onto the file. When the file changes the YARN queue routing will be automatically updated and jobs will be reconfigured to send to the correct location based on the fair-scheduler config. Change-Id: Ia99279ab417a62049dac07960cc03493697d5809 diff --git a/common/src/java/org/apache/hive/common/util/HiveStringUtils.java b/common/src/java/org/apache/hive/common/util/HiveStringUtils.java index 6d28396893532302fbbd66eace53ae32b71848c3..97feb9c38e181dbf7b20590a43f00817b5daf72a 100644 --- a/common/src/java/org/apache/hive/common/util/HiveStringUtils.java +++ b/common/src/java/org/apache/hive/common/util/HiveStringUtils.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.io.Text; -import org.apache.hadoop.util.StringUtils; /** * HiveStringUtils @@ -317,6 +316,15 @@ public static String formatTimeDiff(long finishTime, long startTime){ } /** + * Determine if a string is null or empty. + * @param string The string to check + * @return Returns true is string is null or empty. + */ + public static boolean isNullOrEmpty(String string) { + return string == null || string.isEmpty(); + } + + /** * * Given the time in long milliseconds, returns a * String in the format Xhrs, Ymins, Z sec. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 3fecc5c4ca2a06a031c0c4a711fb49e757c49062..a744c736720fcdb2fc8231d5307cb0a11865fc9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -107,6 +107,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.session.YarnFairScheduling; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import org.apache.hadoop.hive.shims.Utils; @@ -463,6 +464,8 @@ public void run() { plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext()); } + configureScheduling(conf, userName); + //do the authorization check if (!sem.skipAuthorization() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) { @@ -526,6 +529,14 @@ public void run() { } } + private HiveConf configureScheduling(HiveConf configuration, String forUser) throws IOException, HiveException { + if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(configuration)) { + YarnFairScheduling.validateYarnQueue(configuration, forUser); + } + + return configuration; + } + private ImmutableMap dumpMetaCallTimingWithoutEx(String phase) { try { return Hive.get().dumpAndClearMetaCallTiming(phase); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java new file mode 100644 index 0000000000000000000000000000000000000000..0a65b49c960f7b1c8080ccf24e7b66819124a802 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.session; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.shims.ShimLoader; + +import java.io.IOException; + +/** + * A front handle for managing job submission to Yarn-FairScheduler. + */ +public class YarnFairScheduling { + /** + * Determine if jobs can be configured for YARN fair scheduling. + * @param conf - the current HiveConf configuration. + * @return Returns true when impersonation mode is disabled and fair-scheduling is enabled. + */ + public static boolean usingNonImpersonationModeWithFairScheduling(HiveConf conf) { + return (conf != null) + && (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) + && (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE))); + } + + /** + * Configure the default YARN queue for the user. + * @param conf - The current HiveConf configuration. + * @param forUser - The user to configure scheduling for. + * @throws IOException + * @throws HiveException + */ + public static void setDefaultJobQueue(HiveConf conf, String forUser) throws IOException, HiveException { + if (!usingNonImpersonationModeWithFairScheduling(conf)) { + throw new HiveException("Unable to map job to fair-scheduler because either impersonation is on or fair-scheduling is disabled."); + } + + ShimLoader.getSchedulerShims().refreshDefaultQueue(conf, forUser); + } + + /** + * Validate the current YARN queue for the current user. + * @param conf - The current HiveConf configuration. + * @param forUser - The user to configure scheduling for. + * @throws IOException + * @throws HiveException + */ + public static void validateYarnQueue(HiveConf conf, String forUser) throws IOException, HiveException { + if (!usingNonImpersonationModeWithFairScheduling(conf)) { + throw new HiveException("Job is not configured to user fair scheduling"); + } + + ShimLoader.getSchedulerShims().validateQueueConfiguration(conf, forUser); + } +} \ No newline at end of file diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index a0015ebc655931f241b28c53fbb94cfe172841b1..48569f5b6c88d3f7b7bc2d6b7ab49873a57fab76 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -38,8 +37,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.ql.QueryPlan; -import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -48,7 +45,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; -import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.ql.session.YarnFairScheduling; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.FetchOrientation; @@ -126,12 +123,11 @@ public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, S try { // In non-impersonation mode, map scheduler queue to current user // if fair scheduler is configured. - if (! sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && - sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) { - ShimLoader.getHadoopShims().refreshDefaultQueue(sessionConf, username); + if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(sessionConf)) { + YarnFairScheduling.setDefaultJobQueue(sessionConf, username); } - } catch (IOException e) { - LOG.warn("Error setting scheduler queue: " + e, e); + } catch (IOException | HiveException ex) { + LOG.warn("Error setting scheduler queue: " + ex, ex); } // Set an explicit session name to control the download directory name sessionConf.set(ConfVars.HIVESESSIONID.varname, diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/FileSystemWatcher.java b/shims/common/src/main/java/org/apache/hadoop/fs/FileSystemWatcher.java new file mode 100644 index 0000000000000000000000000000000000000000..f0afe4ea66bd26163a1747e6822d12450e00efe8 --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/fs/FileSystemWatcher.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.File; +import java.io.IOException; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.nio.file.FileSystems; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * Utility class to keep track of when a file is modified/created/deleted. + */ +public class FileSystemWatcher { + private static final Logger LOG = LoggerFactory.getLogger(FileSystemWatcher.class); + private static final int ONE_SECOND = 1000; + + public static abstract class FileChangedCallback { + protected void onFileChanged(WatchEvent event, Path path) { } + protected void onCreate(WatchEvent event, Path path) { } + protected void onModify(WatchEvent event, Path path) { } + protected void onDelete(WatchEvent event, Path path) { } + } + + private final List callbacks; + private final Set files; + private final Thread thread; + private WatchService watchService; + private boolean shutdown; + + @VisibleForTesting + public FileSystemWatcher(List callbacks, Set files, Thread thread) { + this.callbacks = callbacks; + this.files = files; + this.thread = thread; + } + + public FileSystemWatcher() throws IOException, InterruptedException { + this.callbacks = new ArrayList<>(); + this.files = Collections.newSetFromMap(new ConcurrentHashMap()); + this.shutdown = false; + this.thread = buildThread(); + } + + /** + * Re-generates the watch service on initialization or when files have been unwatched. + * @param paths The paths of the files to watch. + * @return A configured WatchService object. + * @throws IOException + * @throws InterruptedException + */ + private synchronized WatchService rebuildWatchService(Iterable paths) throws IOException, InterruptedException { + if (!paths.iterator().hasNext()) return null; + + // Rebuilds the watcher service. Although we can incrementally add files to the watch, + // we cannot remove them, so unwatching a file requires rebuilding the service. + if (this.watchService != null) { + watchService.close(); + } + + WatchService watchService = FileSystems.getDefault().newWatchService(); + for (File file : paths) { + registerWatch(watchService, file); + } + + return watchService; + } + + /** + * Register a watch for the given WatchService object. + * @param watchService The WatchService object to configure. + * @param file The file to watch. + * @return Returns a configured WatchService object. + * @throws IOException + * @throws InterruptedException + */ + private synchronized WatchService registerWatch(WatchService watchService, File file) throws IOException, InterruptedException { + if (this.watchService == null) return rebuildWatchService(Collections.singleton(file)); + + getWatchablePath(file).register(watchService, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.OVERFLOW, + StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_DELETE); + + return watchService; + } + + private Path getWatchablePath(File file) { + return file.isDirectory() ? file.toPath() : file.getParentFile().toPath(); + } + + /** + * Generates a configured Thread for concurrently monitoring the file system. + * @return + */ + private Thread buildThread() { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + process(); + } + }); + + thread.setDaemon(true); + thread.start(); + return thread; + } + + /** + * Close the file system watcher service (cannot be re-opened). + */ + public synchronized void close() { + if (this.shutdown) return; + + this.shutdown = true; + if (this.watchService != null) { + try { + this.watchService.close(); + } catch (IOException ex) { + LOG.error("An error occured while closing the WatchService", ex); + } + } + + this.files.clear(); + this.callbacks.clear(); + } + + /** + * Forever loop to process file system notifications. + */ + @SuppressWarnings("unchecked") + private void process() { + while (!shutdown) { + try { + launchEvents(); + } catch (InterruptedException ex) { + LOG.warn("FileWatcher service was interrupted.", ex); + break; + } catch (Exception ex) { + LOG.debug("File system watcher has exited", ex); + break; + } + } + + this.close(); + } + + @SuppressWarnings("unchecked") + private synchronized void launchEvents() throws InterruptedException { + if (this.watchService == null) return; + + WatchKey watchKey = watchService.take(); + for (WatchEvent event : watchKey.pollEvents()) { + launchEvent((WatchEvent) event); + } + } + + /** + * Launches a callback associated with each type of file-system event. + * @param event - the event to process. + */ + @VisibleForTesting + protected synchronized void launchEvent(WatchEvent event) { + Path path = event.context(); + WatchEvent.Kind eventKind = event.kind(); + for (FileChangedCallback onNotify : this.callbacks) { + if (files.contains(path.toFile().getAbsoluteFile())) { + onNotify.onFileChanged(event, path); + if (eventKind == StandardWatchEventKinds.ENTRY_CREATE) { + onNotify.onCreate(event, path); + } else if (eventKind == StandardWatchEventKinds.ENTRY_MODIFY) { + onNotify.onModify(event, path); + } else if (eventKind == StandardWatchEventKinds.ENTRY_DELETE) { + onNotify.onDelete(event, path); + } + } + } + } + + /** + * Determine if the FileSystemWatcher is already monitoring a file. + * @param path The path to the file being monitored. + * @return Returns true if path is already being monitored. + */ + public boolean isWatching(String path) { + return this.files.contains(new File(path)); + } + + /** + * Adds a callback function for a particular type of event. + * @param callback The change notification callback. + */ + public synchronized void addHandler(FileChangedCallback callback) { + if (shutdown) { + throw new RuntimeException("Cannot add callbacks because the FileSystemWatcher is already closed"); + } + + this.callbacks.add(callback); + } + + /** + * Clears the list of callbacks. + */ + public synchronized void clearCallbacks() { + if (shutdown) { + throw new RuntimeException("Cannot clear callbacks because the FileSystemWatcher is already closed"); + } + + this.callbacks.clear(); + } + + /** + * Clears the list of file watchers. + * @throws IOException + * @throws InterruptedException + */ + public synchronized void clearWatchList() throws IOException, InterruptedException { + if (shutdown) { + throw new RuntimeException("Cannot clear watch-list because the FileSystemWatcher is already closed"); + } + + this.files.clear(); + this.watchService = rebuildWatchService(files); + } + + /** + * Add a watch on a particular file and reset the file system watcher. + * @param path The path to watch. + * @throws IOException + * @throws InterruptedException + */ + public synchronized void watch(String path) throws IOException, InterruptedException { + if (shutdown) { + throw new RuntimeException("Cannot watch files because the FileSystemWatcher is already closed"); + } + + if (!isWatching(path)) { + File file = new File(path); + this.files.add(file.getAbsoluteFile()); + this.watchService = registerWatch(this.watchService, file); + } + } + + /** + * Remove the watch for a particular file. Also resets the file-system watcher. + * @param path The path to watch. + * @throws IOException + * @throws InterruptedException + */ + public synchronized void unwatch(String path) throws IOException, InterruptedException { + if (shutdown) { + throw new RuntimeException("Cannot un-watch files because the FileSystemWatcher is already closed"); + } + + this.files.remove(new File(path).getAbsoluteFile()); + this.watchService = rebuildWatchService(files); + } +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java index 63803b8b0752745bd2fedaccc5d100befd97093b..f88e1928ecaceeb537f986e728e3383748f16d7d 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java @@ -34,4 +34,6 @@ */ public void refreshDefaultQueue(Configuration conf, String userName) throws IOException; + + public void validateQueueConfiguration(Configuration configuration, String forUser) throws IOException; } diff --git a/shims/scheduler/pom.xml b/shims/scheduler/pom.xml index b36c12325c588cdb609c6200b1edef73a2f79552..859ccb163c686c2635450869f1f31e05ee099977 100644 --- a/shims/scheduler/pom.xml +++ b/shims/scheduler/pom.xml @@ -77,7 +77,12 @@ ${hadoop.version} true - + + org.mockito + mockito-all + ${mockito-all.version} + + org.apache.hadoop hadoop-yarn-server-tests ${hadoop.version} diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java new file mode 100644 index 0000000000000000000000000000000000000000..4b288902369a112faaa16d5324cd103f24f82057 --- /dev/null +++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.schshim; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +public class FairSchedulerQueueAllocator implements QueueAllocator { + /** + * Generates a Yarn FairScheduler queue resolver based on 'fair-scheduler.xml'. + * @param configuration The HiveConf configuration. + * @param username The user to configure the job for. + * @return Returns a configured allocation resolver. + * @throws IOException + */ + public synchronized AtomicReference makeConfigurationFor(Configuration configuration, String username) throws IOException { + final AtomicReference allocationConfiguration = new AtomicReference(); + + AllocationFileLoaderService allocsLoader = new AllocationFileLoaderService(); + allocsLoader.init(configuration); + allocsLoader.setReloadListener(new AllocationFileLoaderService.Listener() { + @Override + public void onReload(AllocationConfiguration allocs) { + allocationConfiguration.set(allocs); + } + }); + + try { + allocsLoader.reloadAllocations(); + } catch (Exception ex) { + throw new RuntimeException("Failed to load queue allocations", ex); + } + + if (allocationConfiguration.get() == null) { + allocationConfiguration.set(new AllocationConfiguration(configuration)); + } + + return allocationConfiguration; + } +} diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java index 372244dc3c989d2a3ae2eb2bfb8cd0a235705e18..5bfca673e53cc08beed7a7cac1cfb8ffde62a93f 100644 --- a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java +++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java @@ -17,54 +17,178 @@ */ package org.apache.hadoop.hive.schshim; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemWatcher; import org.apache.hadoop.hive.shims.SchedulerShim; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +/* + * FairSchedulerShim monitors changes in fair-scheduler.xml (if it exists) to allow for dynamic + * reloading and queue resolution. When changes to the fair-scheduler.xml file are detected, the + * cached queue resolution policies for each user are cleared, and then re-cached/validated on job-submit. + */ public class FairSchedulerShim implements SchedulerShim { private static final Logger LOG = LoggerFactory.getLogger(FairSchedulerShim.class); + private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file"; private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; - @Override - public void refreshDefaultQueue(Configuration conf, String userName) - throws IOException { - String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME; - final AtomicReference allocConf = new AtomicReference(); - - AllocationFileLoaderService allocsLoader = new AllocationFileLoaderService(); - allocsLoader.init(conf); - allocsLoader.setReloadListener(new AllocationFileLoaderService.Listener() { + private final Map> perUserConfigurations + = new ConcurrentHashMap>(); + + private String lastUsedYarnSiteLocation = null; + private final FileSystemWatcher fileSystemWatcher; + private final QueueAllocator queueAllocator; + + @VisibleForTesting + public FairSchedulerShim(FileSystemWatcher fileSystemWatcher, QueueAllocator queueAllocator) { + this.fileSystemWatcher = initializeFileSystemWatcher(fileSystemWatcher); + this.queueAllocator = queueAllocator; + } + + public FairSchedulerShim() { + this(makeFileSystemWatcher(), new FairSchedulerQueueAllocator()); + } + + private static FileSystemWatcher makeFileSystemWatcher() { + try { + return new FileSystemWatcher(); + } catch (Exception ex) { + LOG.error("Unable to start FileSystemWatcher.", ex); + } + + return null; + } + + private FileSystemWatcher initializeFileSystemWatcher(FileSystemWatcher watcher) { + if (watcher == null) return null; + + watcher.addHandler(new FileSystemWatcher.FileChangedCallback() { @Override - public void onReload(AllocationConfiguration allocs) { - allocConf.set(allocs); + protected void onFileChanged(WatchEvent event, Path path) { + clearConfigurations(); } }); - try { - allocsLoader.reloadAllocations(); - } catch (Exception ex) { - throw new IOException("Failed to load queue allocations", ex); + + return watcher; + } + + /** + * Applies the default YARN fair scheduler queue for a user. + * @param conf - the current HiveConf configuration. + * @param forUser - the user to configure the default queue for. + * @throws IOException + */ + @Override + public synchronized void refreshDefaultQueue(Configuration conf, String forUser) + throws IOException { + setJobQueueForUserInternal(conf, YarnConfiguration.DEFAULT_QUEUE_NAME, forUser); + } + + /** + * Validates the YARN fair scheduler queue configuration. + * @param conf - the current HiveConf configuration. + * @param forUser - the user to configure the default queue for. + * @throws IOException + */ + @Override + public synchronized void validateQueueConfiguration(Configuration conf, String forUser) throws IOException { + String currentJobQueue = conf.get(MR2_JOB_QUEUE_PROPERTY); + if (currentJobQueue != null && !currentJobQueue.isEmpty()) { + setJobQueueForUserInternal(conf, currentJobQueue, forUser); + } else { + refreshDefaultQueue(conf, forUser); } - if (allocConf.get() == null) { - allocConf.set(new AllocationConfiguration(conf)); + } + + private void setJobQueueForUserInternal(Configuration conf, String queueName, String forUser) throws IOException { + AtomicReference allocationConfiguration = getConfigForUser(conf, forUser); + AllocationConfiguration queueConfiguration = allocationConfiguration.get(); + + if (queueConfiguration != null) { + QueuePlacementPolicy queuePolicy = queueConfiguration.getPlacementPolicy(); + if (queuePolicy != null) { + String requestedQueue = queuePolicy.assignAppToQueue(queueName, forUser); + if (StringUtils.isNotBlank(requestedQueue)) { + LOG.info("Setting queue name to: '{}' for user '{}'", requestedQueue, forUser); + conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); + } + } } - QueuePlacementPolicy queuePolicy = allocConf.get().getPlacementPolicy(); - if (queuePolicy != null) { - requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName); - if (StringUtils.isNotBlank(requestedQueue)) { - LOG.debug("Setting queue name to " + requestedQueue + " for user " - + userName); - conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); + } + + private synchronized AtomicReference getConfigForUser(Configuration conf, String username) throws IOException { + // If the location of the fair-scheduler.xml file has changed: + // (1) Clear the current file-system watch. + // (2) Re-add a new file-system watch if the file has moved. + // (3) Clear all cached configurations. + if (!StringUtils.equals(lastUsedYarnSiteLocation, conf.get(YARN_SCHEDULER_FILE_PROPERTY))) { + lastUsedYarnSiteLocation = conf.get(YARN_SCHEDULER_FILE_PROPERTY); + + try { + if (this.fileSystemWatcher != null) { + this.fileSystemWatcher.clearWatchList(); + } + } catch (InterruptedException ex) { + LOG.warn("Unable to clear watches for file {}", lastUsedYarnSiteLocation); } + + if (!StringUtils.isEmpty(lastUsedYarnSiteLocation) && exists(lastUsedYarnSiteLocation)) { + try { + if (this.fileSystemWatcher != null) { + this.fileSystemWatcher.watch(conf.get(YARN_SCHEDULER_FILE_PROPERTY)); + } + } catch (InterruptedException ex) { + LOG.warn("Unable to watch file {}", lastUsedYarnSiteLocation); + } + + LOG.debug("The fair-scheduler.xml file was updated."); + } + + clearConfigurations(); } + + if (!perUserConfigurations.containsKey(username)) { + perUserConfigurations.put(username, queueAllocator.makeConfigurationFor(conf, username)); + } + + return perUserConfigurations.get(username); } -} + private boolean exists(String path) { + if (path != null && !path.isEmpty()) { + return new File(path).exists(); + } else { + return false; + } + } + + /** + * @return Returns the path to the last used yarn site location (yarn.scheduler.fair.allocation.file). + */ + @VisibleForTesting + public String getLastUsedYarnSiteLocation() { + return this.lastUsedYarnSiteLocation; + } + + /** + * If the yarn.scheduler.fair.allocation.file property changes, then clean all configurations. + */ + private synchronized void clearConfigurations() { + perUserConfigurations.clear(); + } +} \ No newline at end of file diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java new file mode 100644 index 0000000000000000000000000000000000000000..31e064fe5eedbd7e4481d2c2c41f439266b910c1 --- /dev/null +++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.schshim; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +public interface QueueAllocator { + /** + * Generates a queue resolver for a given configuration and username. + * @param configuration The HiveConf configuration. + * @param username The user to configure the job for. + * @return Returns the queue allocation configuration. + * @throws IOException + */ + AtomicReference makeConfigurationFor(Configuration configuration, String username) throws IOException; +} diff --git a/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairScheduler.java b/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairScheduler.java new file mode 100644 index 0000000000000000000000000000000000000000..3c4aad0f88a8af0d9e9a34e6a2b332aab411d281 --- /dev/null +++ b/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairScheduler.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.schshim; + +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemWatcher; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestFairScheduler { + private static final String EMPTY = ""; + private static final int USERNAME_ARGUMENT_INDEX = 1; + private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file"; + private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; + + @Test + public void testChangingLastUsedHiveConfigurationStringDirectly() throws Exception { + Configuration configuration = new Configuration(); + + FileSystemWatcher watcher = mock(FileSystemWatcher.class); + QueueAllocator allocator = mock(QueueAllocator.class); + + final Set usersWeShouldSee = new HashSet<>(Arrays.asList("userone", "usertwo", "userthree")); + final Set usersSeenSoFar = new HashSet<>(); + when(allocator.makeConfigurationFor(any(Configuration.class), any(String.class))) + .thenAnswer(new Answer>() { + @Override + public AtomicReference answer(InvocationOnMock invocationOnMock) throws Throwable { + // Capture which user is causing the reset for verification purposes. + String username = (String) invocationOnMock.getArguments()[USERNAME_ARGUMENT_INDEX]; + usersSeenSoFar.add(username); + + return new AtomicReference(); + } + }); + + FairSchedulerShim shim = new FairSchedulerShim(watcher, allocator); + + // On initialization should be uncached. + assertNull(shim.getLastUsedYarnSiteLocation()); + + // Per job submission the location of fair-scheduler should be updated. + for (String location : new String[] { "/first", "/second", "third/fourth" }){ + for (String user : usersWeShouldSee) { + configuration.set(YARN_SCHEDULER_FILE_PROPERTY, location); + shim.refreshDefaultQueue(configuration, user); + assertEquals(shim.getLastUsedYarnSiteLocation(), location); + } + } + + assertEquals(usersSeenSoFar, usersWeShouldSee); + } + + @Test + public void testNeverBeforeSeenUsersEffectOnLastUsedHiveConfigurationString() throws Exception { + Configuration configuration = new Configuration(); + + FileSystemWatcher watcher = mock(FileSystemWatcher.class); + QueueAllocator allocator = mock(QueueAllocator.class); + + final Set usersWeShouldSee = new HashSet<>(Arrays.asList("first", "second", "third", "fourth", "fifth")); + final Set usersSeenSoFar = new HashSet<>(); + when(allocator.makeConfigurationFor(any(Configuration.class), any(String.class))) + .thenAnswer(new Answer>() { + @Override + public AtomicReference answer(InvocationOnMock invocationOnMock) throws Throwable { + // Capture which user is causing the reset for verification purposes. + String username = (String) invocationOnMock.getArguments()[USERNAME_ARGUMENT_INDEX]; + usersSeenSoFar.add(username); + + return new AtomicReference(); + } + }); + + FairSchedulerShim shim = new FairSchedulerShim(watcher, allocator); + + // On initialization should be uncached. + assertNull(shim.getLastUsedYarnSiteLocation()); + + // Per job submission the location of fair-scheduler should be updated. + configuration.set(YARN_SCHEDULER_FILE_PROPERTY, "/some/unchanging/location"); + for (String user : new String[] { "first", "second", "third", "fourth", "fifth" }) { + shim.refreshDefaultQueue(configuration, user); + assertEquals(shim.getLastUsedYarnSiteLocation(), "/some/unchanging/location"); + } + + assertEquals(usersSeenSoFar, usersWeShouldSee); + } + + @Test + public void testQueueAllocation() throws Exception { + Configuration configuration = new Configuration(); + + FileSystemWatcher watcher = mock(FileSystemWatcher.class); + QueueAllocator allocator = mock(QueueAllocator.class); + + when(allocator.makeConfigurationFor(any(Configuration.class), any(String.class))) + .thenAnswer(new Answer>() { + @Override + public AtomicReference answer(InvocationOnMock invocationOnMock) throws Throwable { + // Capture which user is causing the reset for verification purposes. + final String username = (String) invocationOnMock.getArguments()[USERNAME_ARGUMENT_INDEX]; + + AllocationConfiguration allocationConfiguration = mock(AllocationConfiguration.class); + when(allocationConfiguration.getPlacementPolicy()) + .thenAnswer(new Answer() { + @Override + public QueuePlacementPolicy answer(InvocationOnMock invocationOnMock) throws Throwable { + QueuePlacementPolicy placementPolicy = mock(QueuePlacementPolicy.class); + when(placementPolicy.assignAppToQueue(any(String.class), any(String.class))) + .thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocationOnMock) throws Throwable { + return String.format("queue.for.%s", username); + } + }); + + return placementPolicy; + } + }); + + return new AtomicReference<>(allocationConfiguration); + } + }); + + FairSchedulerShim shim = new FairSchedulerShim(watcher, allocator); + + // On initialization should be uncached. + assertNull(shim.getLastUsedYarnSiteLocation()); + + // Per job submission the location of fair-scheduler should be updated. + configuration.set(YARN_SCHEDULER_FILE_PROPERTY, "/some/file/location"); + for (String user : new String[] { "first", "second", "third", "fourth", "fifth" }) { + shim.refreshDefaultQueue(configuration, user); + + String queueName = String.format("queue.for.%s", user); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), queueName); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testFilesystemUpdates() throws Exception { + WatchEvent createEvent = mock(WatchEvent.class); + WatchEvent modifyEvent = mock(WatchEvent.class); + WatchEvent deleteEvent = mock(WatchEvent.class); + + when(createEvent.context()).thenReturn(Paths.get("/some/path")); + when(createEvent.kind()).thenReturn(StandardWatchEventKinds.ENTRY_CREATE); + when(modifyEvent.context()).thenReturn(Paths.get("/some/path")); + when(createEvent.kind()).thenReturn(StandardWatchEventKinds.ENTRY_MODIFY); + when(deleteEvent.context()).thenReturn(Paths.get("/some/path")); + when(createEvent.kind()).thenReturn(StandardWatchEventKinds.ENTRY_DELETE); + + final MutableInt counter = new MutableInt(); + QueueAllocator allocator = mock(QueueAllocator.class); + when(allocator.makeConfigurationFor(any(Configuration.class), any(String.class))) + .thenAnswer(new Answer>() { + @Override + public AtomicReference answer(InvocationOnMock invocationOnMock) throws Throwable { + final String username = (String) invocationOnMock.getArguments()[USERNAME_ARGUMENT_INDEX]; + + counter.increment(); + AllocationConfiguration allocationConfiguration = mock(AllocationConfiguration.class); + when(allocationConfiguration.getPlacementPolicy()) + .thenAnswer(new Answer() { + @Override + public QueuePlacementPolicy answer(InvocationOnMock invocationOnMock) throws Throwable { + QueuePlacementPolicy placementPolicy = mock(QueuePlacementPolicy.class); + when(placementPolicy.assignAppToQueue(any(String.class), any(String.class))) + .thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocationOnMock) throws Throwable { + return String.format("queue.for.%s.%s", username, counter); + } + }); + + return placementPolicy; + } + }); + + return new AtomicReference<>(allocationConfiguration); + } + }); + + MockFileSystemWatcher fileSystemWatcher = new MockFileSystemWatcher(new File("/some/path")); + FairSchedulerShim shim = new FairSchedulerShim(fileSystemWatcher, allocator); + + Configuration configuration = new Configuration(); + assertNull(configuration.get(MR2_JOB_QUEUE_PROPERTY)); + + // Should update for the first pass, but be the same for the second. + shim.refreshDefaultQueue(configuration, "a_user"); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), "queue.for.a_user.1"); + shim.refreshDefaultQueue(configuration, "a_user"); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), "queue.for.a_user.1"); + + // A file system event should cause an update: + fileSystemWatcher.launchEventWatchEvent(createEvent); + shim.refreshDefaultQueue(configuration, "a_user"); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), "queue.for.a_user.2"); + shim.refreshDefaultQueue(configuration, "a_user"); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), "queue.for.a_user.2"); + shim.refreshDefaultQueue(configuration, "a_user"); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), "queue.for.a_user.2"); + + fileSystemWatcher.launchEventWatchEvent(modifyEvent); + shim.refreshDefaultQueue(configuration, "a_user"); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), "queue.for.a_user.3"); + shim.refreshDefaultQueue(configuration, "a_user"); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), "queue.for.a_user.3"); + shim.refreshDefaultQueue(configuration, "a_user"); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), "queue.for.a_user.3"); + + fileSystemWatcher.launchEventWatchEvent(deleteEvent); + shim.refreshDefaultQueue(configuration, "a_user"); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), "queue.for.a_user.4"); + shim.refreshDefaultQueue(configuration, "a_user"); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), "queue.for.a_user.4"); + shim.refreshDefaultQueue(configuration, "a_user"); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), "queue.for.a_user.4"); + } + + // Mock class to acess private methods + public static class MockFileSystemWatcher extends FileSystemWatcher { + public MockFileSystemWatcher(File ... paths) throws IOException, InterruptedException { + super(new ArrayList(), new HashSet(Arrays.asList(paths)), new Thread()); + } + + public void launchEventWatchEvent(WatchEvent event) { + super.launchEvent(event); + } + } +} \ No newline at end of file