commit 7d59b84b3effb4a21fa863a08785e72024d2cbf5 Author: Reuben Kuhnert Date: Mon May 9 20:10:05 2016 -0500 HIVE-13696: Modify FairSchedulerShim to dynamically reload changes to fair-scheduler.xml. This patch dynamically reloads changes to fair-scheduler.xml by attaching a FileSystemWatcher object onto the file. When the file changes the YARN queue routing will be automatically updated and jobs will be reconfigured to send to the correct location based on the fair-scheduler config. Change-Id: Ia99279ab417a62049dac07960cc03493697d5809 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 926f6e883030b5a01d025994bd02c67f0f5a275c..f80b2e6de46d9fe4b4aa95de29a1e9753f52ea27 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -32,6 +32,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.session.YarnFairScheduling; import org.apache.hadoop.mapreduce.MRJobConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -429,6 +430,8 @@ public int execute(DriverContext driverContext) { TezSessionPoolManager.getInstance().closeIfNotDefault(session, true); } + configureScheduling(conf, ss.getUserName()); + // Finally SUBMIT the JOB! rj = jc.submitJob(job); this.jobID = rj.getJobID(); @@ -495,6 +498,18 @@ public int execute(DriverContext driverContext) { return (returnVal); } + private HiveConf configureScheduling(HiveConf configuration, String forUser) throws IOException { + if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(configuration)) { + try { + YarnFairScheduling.validateYarnQueue(configuration, forUser); + } catch (HiveException ex) { + LOG.error("Unable to configure scheduling because fair scheduling is not enabled.", ex); + } + } + + return configuration; + } + private void handleSampling(Context context, MapWork mWork, JobConf job) throws Exception { assert mWork.getAliasToWork().keySet().size() == 1; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java new file mode 100644 index 0000000000000000000000000000000000000000..0ff6d175fc772a3b6c1b699fa8ecef20f5551dde --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.session; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.shims.ShimLoader; + +import java.io.IOException; + +/** + * The YarnFairScheduling class is a front handle for managing job submission to Yarn-FairScheduler. + */ +public class YarnFairScheduling { + // In non-impersonation mode, map scheduler queue to current user if fair scheduler is configured. + public static boolean usingNonImpersonationModeWithFairScheduling(HiveConf configuration) { + return (configuration != null) + && (!configuration.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) + && (configuration.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE))); + } + + public static void configureDefualtSchedulerQueue(HiveConf configuration, String forUser) throws IOException, HiveException { + if (!usingNonImpersonationModeWithFairScheduling(configuration)) { + throw new HiveException("Unable to map job to fair-scheduler because either impersonation is on or fair-scheduling is disabled."); + } + + ShimLoader.getSchedulerShims().refreshDefaultQueue(configuration, forUser); + } + + public static void validateYarnQueue(HiveConf configuration, String forUser) throws IOException, HiveException { + if (!usingNonImpersonationModeWithFairScheduling(configuration)) { + throw new HiveException("Job is not configured to user fair scheduling"); + } + + ShimLoader.getSchedulerShims().validateQueueConfiguration(configuration, forUser); + } +} \ No newline at end of file diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index a0015ebc655931f241b28c53fbb94cfe172841b1..8a61729537839839e7354ca61f566d0e17c99841 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -38,8 +37,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.ql.QueryPlan; -import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -48,7 +45,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; -import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.ql.session.YarnFairScheduling; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.FetchOrientation; @@ -126,12 +123,11 @@ public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, S try { // In non-impersonation mode, map scheduler queue to current user // if fair scheduler is configured. - if (! sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && - sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) { - ShimLoader.getHadoopShims().refreshDefaultQueue(sessionConf, username); + if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(sessionConf)) { + YarnFairScheduling.configureDefualtSchedulerQueue(sessionConf, username); } - } catch (IOException e) { - LOG.warn("Error setting scheduler queue: " + e, e); + } catch (IOException | HiveException ex) { + LOG.warn("Error setting scheduler queue: " + ex, ex); } // Set an explicit session name to control the download directory name sessionConf.set(ConfVars.HIVESESSIONID.varname, diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/FileSystemWatcher.java b/shims/common/src/main/java/org/apache/hadoop/fs/FileSystemWatcher.java new file mode 100644 index 0000000000000000000000000000000000000000..2871399fcb71c2c7645be7bb21f16795e18e9b06 --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/fs/FileSystemWatcher.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.*; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/* + * The FileSystemWatcher utility class is used to keep track of when a file is modified/created/deleted. + */ + +public class FileSystemWatcher { + private static final Log LOG = LogFactory.getLog(FileSystemWatcher.class); + + private static final int ONE_SECOND = 1000; + + public static abstract class FileChangedCallback { + protected void onFileChanged(WatchEvent event, Path path) { } + protected void onCreate(WatchEvent event, Path path) { } + protected void onModify(WatchEvent event, Path path) { } + protected void onDelete(WatchEvent event, Path path) { } + } + + private final List callbacks; + private final Set files; + private final Thread thread; + + private WatchService watchService; + private boolean shutdown; + + public FileSystemWatcher() throws IOException, InterruptedException { + this.callbacks = new ArrayList<>(); + this.files = Collections.newSetFromMap(new ConcurrentHashMap()); + this.shutdown = false; + this.thread = buildThread(); + } + + // Generate the actual file sytem watcher. + private synchronized WatchService buildWatcher(Iterable paths) throws IOException, InterruptedException { + if (!paths.iterator().hasNext()) return null; + if (this.watchService != null) { + watchService.close(); + } + + WatchService watchService = FileSystems.getDefault().newWatchService(); + for (File file : paths) { + Path directory = file.isDirectory() ? file.toPath() : file.getParentFile().toPath(); + directory.register(watchService, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.OVERFLOW, + StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_DELETE); + } + + return watchService; + } + + // Create the listener thread. + private Thread buildThread() { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + process(); + } + }); + + thread.setDaemon(true); + thread.start(); + return thread; + } + + // Close the FileSystemWatcher (cannot be re-opened to simplify the thread model). + public synchronized void close() { + this.shutdown = true; + } + + // Forever loop to process file system notifications. + @SuppressWarnings("unchecked") + private void process() { + while (!shutdown) { + try { + synchronized (this) { + // Add in a sleep because watchService.take() doesn't block in all JVMs. + Thread.sleep(5 * ONE_SECOND); + WatchKey watchKey = watchService.take(); + for (WatchEvent event : watchKey.pollEvents()) { + launchEvent((WatchEvent) event); + } + } + } catch (InterruptedException ex) { + LOG.debug("FileWatcher service was interrupted."); + } + } + + this.shutdown = false; + } + + // Launches a callback associated with each type of file-system event. + private synchronized void launchEvent(WatchEvent event) { + Path path = event.context(); + WatchEvent.Kind eventKind = event.kind(); + for (FileChangedCallback onNotify : this.callbacks) { + if (files.contains(path.toFile().getAbsoluteFile())) { + onNotify.onFileChanged(event, path); + if (eventKind == StandardWatchEventKinds.ENTRY_CREATE) { + onNotify.onCreate(event, path); + } else if (eventKind == StandardWatchEventKinds.ENTRY_MODIFY) { + onNotify.onModify(event, path); + } else if (eventKind == StandardWatchEventKinds.ENTRY_DELETE) { + onNotify.onDelete(event, path); + } + } + } + } + + // Adds a callback function for a particular type of event. + public synchronized void addHandler(FileChangedCallback callback) { + this.callbacks.add(callback); + } + + // Clears the list of callbacks. + public synchronized void clearCallbacks() { + this.callbacks.clear(); + } + + // Clears the list of file watchers. + public synchronized void clearWatchList() throws IOException, InterruptedException { + this.files.clear(); + this.watchService = buildWatcher(files); + } + + // Add a watch on a particular file and reset the file system watcher. + public synchronized void watch(String path) throws IOException, InterruptedException { + this.files.add(new File(path).getAbsoluteFile()); + this.watchService = buildWatcher(files); + } + + // Remove the watch for a particular file. Also resets the file-system watcher. + public synchronized void unwatch(String path) throws IOException, InterruptedException { + this.files.remove(new File(path).getAbsoluteFile()); + this.watchService = buildWatcher(files); + } +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java index 63803b8b0752745bd2fedaccc5d100befd97093b..f88e1928ecaceeb537f986e728e3383748f16d7d 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java @@ -34,4 +34,6 @@ */ public void refreshDefaultQueue(Configuration conf, String userName) throws IOException; + + public void validateQueueConfiguration(Configuration configuration, String forUser) throws IOException; } diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java index 372244dc3c989d2a3ae2eb2bfb8cd0a235705e18..d7039b389c618ea7a10a0c197c101276fb245e95 100644 --- a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java +++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java @@ -17,35 +17,141 @@ */ package org.apache.hadoop.hive.schshim; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemWatcher; import org.apache.hadoop.hive.shims.SchedulerShim; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +/* + * FairSchedulerShim monitors changes in fair-scheduler.xml (if it exists) to allow for dynamic + * reloading and queue resolution. When changes to the fair-scheduler.xml file are detected, the + * cached queue resolution policies for each user are cleared, and then re-cached/validated on job-submit. + */ + public class FairSchedulerShim implements SchedulerShim { - private static final Logger LOG = LoggerFactory.getLogger(FairSchedulerShim.class); + private static final Log LOG = LogFactory.getLog(FairSchedulerShim.class); + private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file"; private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; + private static final FileSystemWatcher FILE_SYSTEM_WATCHER = makeFileSystemWatcher(); + private static String LAST_USED_YARN_SITE_LOCATION = StringUtils.EMPTY; + private static final Map> PER_USER_CONFIGURATIONS + = new ConcurrentHashMap>(); + + private static synchronized FileSystemWatcher makeFileSystemWatcher() { + try { + return new FileSystemWatcher(); + } catch (InterruptedException | IOException ex) { + String message = String.format("Unable to start FileSystemWatcher, cause: %s", ex.getMessage()); + LOG.error(message); + } + return null; + } + + public FairSchedulerShim() { + if (FILE_SYSTEM_WATCHER != null) { + FILE_SYSTEM_WATCHER.addHandler(new FileSystemWatcher.FileChangedCallback() { + @Override + protected void onFileChanged(WatchEvent event, Path path) { + clearConfigurations(); + } + }); + } + } + + @Override + public void refreshDefaultQueue(Configuration configuration, String forUser) + throws IOException { + attemptSetScheduleForUser(configuration, YarnConfiguration.DEFAULT_QUEUE_NAME, forUser); + } + @Override - public void refreshDefaultQueue(Configuration conf, String userName) - throws IOException { - String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME; - final AtomicReference allocConf = new AtomicReference(); + public void validateQueueConfiguration(Configuration configuration, String forUser) throws IOException { + String currentJobQueue = StringUtils.defaultString(configuration.get(MR2_JOB_QUEUE_PROPERTY)); + if (!currentJobQueue.isEmpty()) { + attemptSetScheduleForUser(configuration, currentJobQueue, forUser); + } else { + refreshDefaultQueue(configuration, forUser); + } + } + + private void attemptSetScheduleForUser(Configuration configuration, String queueName, String forUser) throws IOException { + AtomicReference allocationConfiguration = configurationFor(configuration, forUser); + QueuePlacementPolicy queuePolicy = allocationConfiguration.get().getPlacementPolicy(); + if (queuePolicy != null) { + String requestedQueue = queuePolicy.assignAppToQueue(queueName, forUser); + if (StringUtils.isNotBlank(requestedQueue)) { + LOG.debug(String.format("Setting queue name to: '%s' for user '%s'", requestedQueue, forUser)); + configuration.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); + } + } + } + + private synchronized AtomicReference configurationFor(Configuration configuration, String username) throws IOException { + // If the location of the fair-scheduler.xml file has changed: + // (1) Clear the current file-system watch. + // (2) Re-add a new file-system watch if the file has moved. + // (3) Clear all cached configurations. + if (!LAST_USED_YARN_SITE_LOCATION.equals(StringUtils.defaultString(configuration.get(YARN_SCHEDULER_FILE_PROPERTY)))) { + LAST_USED_YARN_SITE_LOCATION = StringUtils.defaultString(configuration.get(YARN_SCHEDULER_FILE_PROPERTY)); + + try { + if (FILE_SYSTEM_WATCHER != null) { + FILE_SYSTEM_WATCHER.clearWatchList(); + } + } catch (InterruptedException ex) { + LOG.warn(String.format("Unable to clear watches for file %s", LAST_USED_YARN_SITE_LOCATION)); + } + + if (!StringUtils.isEmpty(LAST_USED_YARN_SITE_LOCATION)) { + try { + if (FILE_SYSTEM_WATCHER != null) { + FILE_SYSTEM_WATCHER.watch(configuration.get(YARN_SCHEDULER_FILE_PROPERTY)); + } + } catch (InterruptedException ex) { + LOG.warn(String.format("Unable to watch file %s", LAST_USED_YARN_SITE_LOCATION)); + } + + LOG.debug("The fair-scheduler.xml file was updated."); + } + + clearConfigurations(); + } + + if (!PER_USER_CONFIGURATIONS.containsKey(username)) { + makeConfigurationWatcherFor(configuration, username); + } + + return PER_USER_CONFIGURATIONS.get(username); + } + + // If the yarn.scheduler.fair.allocation.file property changes, then clean all configurations. + private synchronized void clearConfigurations() { + PER_USER_CONFIGURATIONS.clear(); + } + + private synchronized AtomicReference makeConfigurationWatcherFor(Configuration configuration, String username) throws IOException { + final AtomicReference allocationConfiguration = new AtomicReference(); AllocationFileLoaderService allocsLoader = new AllocationFileLoaderService(); - allocsLoader.init(conf); + allocsLoader.init(configuration); allocsLoader.setReloadListener(new AllocationFileLoaderService.Listener() { @Override public void onReload(AllocationConfiguration allocs) { - allocConf.set(allocs); + allocationConfiguration.set(allocs); } }); try { @@ -53,18 +159,13 @@ public void onReload(AllocationConfiguration allocs) { } catch (Exception ex) { throw new IOException("Failed to load queue allocations", ex); } - if (allocConf.get() == null) { - allocConf.set(new AllocationConfiguration(conf)); - } - QueuePlacementPolicy queuePolicy = allocConf.get().getPlacementPolicy(); - if (queuePolicy != null) { - requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName); - if (StringUtils.isNotBlank(requestedQueue)) { - LOG.debug("Setting queue name to " + requestedQueue + " for user " - + userName); - conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); - } + + if (allocationConfiguration.get() == null) { + allocationConfiguration.set(new AllocationConfiguration(configuration)); } + + PER_USER_CONFIGURATIONS.put(username, allocationConfiguration); + return allocationConfiguration; } -} +} \ No newline at end of file