commit 8deeb5726cbe7abecc6d88d4788eba0d129d6614 Author: Reuben Kuhnert Date: Thu May 5 13:00:43 2016 -0500 HIVE-13696: Validate jobs submitted to fair-scheduler. Change-Id: I1b43041b7d291b4c0847cc289bf762984ceb52dd diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 6a610cbcb1deb7f7f55bb8aff58020b057454b31..cdb7a02dbc997cf87071c5f915ff698bc5cb14a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -107,6 +107,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.session.YarnFairScheduling; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import org.apache.hadoop.hive.shims.Utils; @@ -492,6 +493,8 @@ public void run() { } } } + + configureScheduling(conf, userName); return 0; } catch (Exception e) { ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage()); @@ -561,6 +564,18 @@ private String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan, return ret; } + private HiveConf configureScheduling(HiveConf configuration, String forUser) throws IOException { + if (YarnFairScheduling.shouldUseYarnFairScheduling(configuration)) { + try { + YarnFairScheduling.validateYarnQueue(configuration, forUser); + } catch (HiveException ex) { + LOG.error("Unable to configure scheduling because fair scheduling is not enabled.", ex); + } + } + + return configuration; + } + /** * Do authorization using post semantic analysis information in the semantic analyzer * The original command is also passed so that authorization interface can provide diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java new file mode 100644 index 0000000000000000000000000000000000000000..ae61577711d2bc0094f3b8f57dbff7966bf23552 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java @@ -0,0 +1,34 @@ +package org.apache.hadoop.hive.ql.session; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.shims.ShimLoader; + +import java.io.IOException; + +/** + * Created by sircodesalot on 16/5/3. + */ +public class YarnFairScheduling { + // In non-impersonation mode, map scheduler queue to current user if fair scheduler is configured. + public static boolean shouldUseYarnFairScheduling(HiveConf configuration) { + return (!configuration.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) && + configuration.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)); + } + + public static void configureDefualtSchedulerQueue(HiveConf configuration, String forUser) throws IOException, HiveException { + if (!shouldUseYarnFairScheduling(configuration)) { + throw new HiveException("Cannot map job to use fair scheduler because fair scheduling is not enabled."); + } + + ShimLoader.getSchedulerShims().refreshDefaultQueue(configuration, forUser); + } + + public static void validateYarnQueue(HiveConf configuration, String forUser) throws IOException, HiveException { + if (!shouldUseYarnFairScheduling(configuration)) { + throw new HiveException("Job is not configured to user fair scheduling"); + } + + ShimLoader.getSchedulerShims().validateQueueConfiguration(configuration, forUser); + } +} \ No newline at end of file diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index a0015ebc655931f241b28c53fbb94cfe172841b1..09188939ec930b71dee20faed640618a10bf2785 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; +import org.apache.hadoop.hive.ql.session.YarnFairScheduling; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; @@ -126,12 +127,11 @@ public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, S try { // In non-impersonation mode, map scheduler queue to current user // if fair scheduler is configured. - if (! sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && - sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) { - ShimLoader.getHadoopShims().refreshDefaultQueue(sessionConf, username); + if (YarnFairScheduling.shouldUseYarnFairScheduling(sessionConf)) { + YarnFairScheduling.configureDefualtSchedulerQueue(sessionConf, username); } - } catch (IOException e) { - LOG.warn("Error setting scheduler queue: " + e, e); + } catch (IOException | HiveException ex) { + LOG.warn("Error setting scheduler queue: " + ex, ex); } // Set an explicit session name to control the download directory name sessionConf.set(ConfVars.HIVESESSIONID.varname, diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/FileSystemWatcher.java b/shims/common/src/main/java/org/apache/hadoop/fs/FileSystemWatcher.java new file mode 100644 index 0000000000000000000000000000000000000000..4044821aac5e8ee2520f23845f69ca01dbcf2c5e --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/fs/FileSystemWatcher.java @@ -0,0 +1,137 @@ +package org.apache.hadoop.fs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.*; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class FileSystemWatcher { + private static final Log LOG = LogFactory.getLog(FileSystemWatcher.class); + + private static final int ONE_SECOND = 1000; + + public static abstract class FileChangedCallback { + protected void onFileChanged(WatchEvent event, Path path) { } + protected void onCreate(WatchEvent event, Path path) { } + protected void onModify(WatchEvent event, Path path) { } + protected void onDelete(WatchEvent event, Path path) { } + } + + private final List callbacks; + private final Set files; + private final Thread thread; + + private WatchService watchService; + private boolean shutdown; + + public FileSystemWatcher() throws IOException, InterruptedException { + this.callbacks = new ArrayList<>(); + this.files = Collections.newSetFromMap(new ConcurrentHashMap()); + this.shutdown = false; + this.thread = buildThread(); + } + + private synchronized WatchService buildWatcher(Iterable paths) throws IOException, InterruptedException { + if (!paths.iterator().hasNext()) return null; + if (this.watchService != null) { + watchService.close(); + } + + WatchService watchService = FileSystems.getDefault().newWatchService(); + for (File file : paths) { + Path directory = file.isDirectory() ? file.toPath() : file.getParentFile().toPath(); + directory.register(watchService, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.OVERFLOW, + StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_DELETE); + } + + return watchService; + } + + private Thread buildThread() { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + process(); + } + }); + + thread.setDaemon(true); + thread.start(); + return thread; + } + + public synchronized void close() { + this.shutdown = true; + } + + @SuppressWarnings("unchecked") + private void process() { + while (!shutdown) { + try { + synchronized (this) { + // Add in a sleep because watchService.take() doesn't block in all JVMs. + Thread.sleep(5 * ONE_SECOND); + WatchKey watchKey = watchService.take(); + for (WatchEvent event : watchKey.pollEvents()) { + launchEvent((WatchEvent) event); + } + } + } catch (InterruptedException ex) { + LOG.debug("FileWatcher service was interrupted."); + } + } + + this.shutdown = false; + } + + private synchronized void launchEvent(WatchEvent event) { + Path path = event.context(); + WatchEvent.Kind eventKind = event.kind(); + for (FileChangedCallback onNotify : this.callbacks) { + if (files.contains(path.toFile().getAbsoluteFile())) { + onNotify.onFileChanged(event, path); + if (eventKind == StandardWatchEventKinds.ENTRY_CREATE) { + onNotify.onCreate(event, path); + } else if (eventKind == StandardWatchEventKinds.ENTRY_MODIFY) { + onNotify.onModify(event, path); + } else if (eventKind == StandardWatchEventKinds.ENTRY_DELETE) { + onNotify.onDelete(event, path); + } + } + } + } + + public synchronized void addHandler(FileChangedCallback callback) { + this.callbacks.add(callback); + } + + public synchronized void clearCallbacks() { + this.callbacks.clear(); + } + + public synchronized void clearWatchList() throws IOException, InterruptedException { + this.files.clear(); + this.watchService = buildWatcher(files); + } + + public synchronized void watch(String path) throws IOException, InterruptedException { + this.files.add(new File(path).getAbsoluteFile()); + this.watchService = buildWatcher(files); + } + + public synchronized void unwatch(String path) throws IOException, InterruptedException { + this.files.remove(new File(path).getAbsoluteFile()); + this.watchService = buildWatcher(files); + } +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java index 63803b8b0752745bd2fedaccc5d100befd97093b..f88e1928ecaceeb537f986e728e3383748f16d7d 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java @@ -34,4 +34,6 @@ */ public void refreshDefaultQueue(Configuration conf, String userName) throws IOException; + + public void validateQueueConfiguration(Configuration configuration, String forUser) throws IOException; } diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java index 372244dc3c989d2a3ae2eb2bfb8cd0a235705e18..5e3d7884867f7ba5c5fa4ac09e64cf407c439ba0 100644 --- a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java +++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java @@ -17,35 +17,127 @@ */ package org.apache.hadoop.hive.schshim; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemWatcher; import org.apache.hadoop.hive.shims.SchedulerShim; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + public class FairSchedulerShim implements SchedulerShim { - private static final Logger LOG = LoggerFactory.getLogger(FairSchedulerShim.class); + private static final Log LOG = LogFactory.getLog(FairSchedulerShim.class); + private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file"; private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; + private static final FileSystemWatcher FILE_SYSTEM_WATCHER = makeFileSystemWatcher(); + private static String LAST_USED_YARN_SITE_LOCATION = StringUtils.EMPTY; + private static final Map> PER_USER_CONFIGURATIONS + = new ConcurrentHashMap>(); + + private static synchronized FileSystemWatcher makeFileSystemWatcher() { + try { + return new FileSystemWatcher(); + } catch (InterruptedException | IOException ex) { + String message = String.format("Unable to start FileSystemWatcher, cause: %s", ex.getMessage()); + LOG.error(message); + } + return null; + } + + public FairSchedulerShim() { + if (FILE_SYSTEM_WATCHER != null) { + FILE_SYSTEM_WATCHER.addHandler(new FileSystemWatcher.FileChangedCallback() { + @Override + protected void onFileChanged(WatchEvent event, Path path) { + clearConfigurations(); + } + }); + } + } + + @Override + public void refreshDefaultQueue(Configuration configuration, String forUser) + throws IOException { + attemptSetScheduleForUser(configuration, YarnConfiguration.DEFAULT_QUEUE_NAME, forUser); + } + @Override - public void refreshDefaultQueue(Configuration conf, String userName) - throws IOException { - String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME; - final AtomicReference allocConf = new AtomicReference(); + public void validateQueueConfiguration(Configuration configuration, String forUser) throws IOException { + String currentJobQueue = StringUtils.defaultString(configuration.get(MR2_JOB_QUEUE_PROPERTY)); + if (!currentJobQueue.isEmpty()) { + attemptSetScheduleForUser(configuration, currentJobQueue, forUser); + } else { + refreshDefaultQueue(configuration, forUser); + } + } + + private void attemptSetScheduleForUser(Configuration configuration, String queueName, String forUser) throws IOException { + AtomicReference allocationConfiguration = configurationFor(configuration, forUser); + QueuePlacementPolicy queuePolicy = allocationConfiguration.get().getPlacementPolicy(); + if (queuePolicy != null) { + String requestedQueue = queuePolicy.assignAppToQueue(queueName, forUser); + if (StringUtils.isNotBlank(requestedQueue)) { + LOG.debug(String.format("Setting queue name to: '%s' for user '%s'", requestedQueue, forUser)); + configuration.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); + } + } + } + + private synchronized AtomicReference configurationFor(Configuration configuration, String username) throws IOException { + if (!LAST_USED_YARN_SITE_LOCATION.equals(StringUtils.defaultString(configuration.get(YARN_SCHEDULER_FILE_PROPERTY)))) { + LAST_USED_YARN_SITE_LOCATION = StringUtils.defaultString(configuration.get(YARN_SCHEDULER_FILE_PROPERTY)); + + try { + FILE_SYSTEM_WATCHER.clearWatchList(); + } catch (InterruptedException ex) { + LOG.warn(String.format("Unable to clear watches for file %s", LAST_USED_YARN_SITE_LOCATION)); + } + + if (!StringUtils.isEmpty(LAST_USED_YARN_SITE_LOCATION)) { + try { + FILE_SYSTEM_WATCHER.watch(configuration.get(YARN_SCHEDULER_FILE_PROPERTY)); + } catch (InterruptedException ex) { + LOG.warn(String.format("Unable to watch file %s", LAST_USED_YARN_SITE_LOCATION)); + } + + LOG.debug("The fair-scheduler.xml file was updated."); + } + + clearConfigurations(); + } + + if (!PER_USER_CONFIGURATIONS.containsKey(username)) { + makeConfigurationWatcherFor(configuration, username); + } + + return PER_USER_CONFIGURATIONS.get(username); + } + + // If the yarn.scheduler.fair.allocation.file property changes, then clean all configurations. + private synchronized void clearConfigurations() { + PER_USER_CONFIGURATIONS.clear(); + } + + private synchronized AtomicReference makeConfigurationWatcherFor(Configuration configuration, String username) throws IOException { + final AtomicReference allocationConfiguration = new AtomicReference(); AllocationFileLoaderService allocsLoader = new AllocationFileLoaderService(); - allocsLoader.init(conf); + allocsLoader.init(configuration); allocsLoader.setReloadListener(new AllocationFileLoaderService.Listener() { @Override public void onReload(AllocationConfiguration allocs) { - allocConf.set(allocs); + allocationConfiguration.set(allocs); } }); try { @@ -53,18 +145,13 @@ public void onReload(AllocationConfiguration allocs) { } catch (Exception ex) { throw new IOException("Failed to load queue allocations", ex); } - if (allocConf.get() == null) { - allocConf.set(new AllocationConfiguration(conf)); - } - QueuePlacementPolicy queuePolicy = allocConf.get().getPlacementPolicy(); - if (queuePolicy != null) { - requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName); - if (StringUtils.isNotBlank(requestedQueue)) { - LOG.debug("Setting queue name to " + requestedQueue + " for user " - + userName); - conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); - } + + if (allocationConfiguration.get() == null) { + allocationConfiguration.set(new AllocationConfiguration(configuration)); } + + PER_USER_CONFIGURATIONS.put(username, allocationConfiguration); + return allocationConfiguration; } -} +} \ No newline at end of file