commit 5ef0133e92e2163d6666e2a347ac75c8aece863c Author: Reuben Kuhnert Date: Sat May 14 12:22:26 2016 -0500 HIVE-13696: Modify FairSchedulerShim to dynamically reload changes to fair-scheduler.xml. This patch dynamically reloads changes to fair-scheduler.xml by attaching a FileSystemWatcher object onto the file. When the file changes the YARN queue routing will be automatically updated and jobs will be reconfigured to send to the correct location based on the fair-scheduler config. Change-Id: I78aa99834ebd3aa1b72c101b79ff4e1742b71579 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 14f221a..554568d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -107,6 +107,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.session.YarnFairScheduling; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import org.apache.hadoop.hive.shims.Utils; @@ -461,6 +462,8 @@ public void run() { plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext()); } + configureScheduling(conf, userName); + //do the authorization check if (!sem.skipAuthorization() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) { @@ -524,6 +527,14 @@ public void run() { } } + private HiveConf configureScheduling(HiveConf configuration, String forUser) throws IOException, HiveException { + if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(configuration)) { + YarnFairScheduling.validateYarnQueue(configuration, forUser); + } + + return configuration; + } + private ImmutableMap dumpMetaCallTimingWithoutEx(String phase) { try { return Hive.get().dumpAndClearMetaCallTiming(phase); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java new file mode 100644 index 0000000..e3ba47c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.session; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.shims.ShimLoader; + +import java.io.IOException; + +/** + * A front handle for managing job submission to Yarn-FairScheduler. + */ +public class YarnFairScheduling { + /** + * Determine if jobs can be configured for YARN fair scheduling. + * @param conf - the current HiveConf configuration. + * @return Returns true when impersonation mode is disabled and fair-scheduling is enabled. + */ + public static boolean usingNonImpersonationModeWithFairScheduling(HiveConf conf) { + return (conf != null) + && (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) + && (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE))); + } + + /** + * Configure the default YARN queue for the user. + * @param conf - The current HiveConf configuration. + * @param forUser - The user to configure scheduling for. + * @throws IOException + * @throws HiveException + */ + public static void setDefaultJobQueue(HiveConf conf, String forUser) throws IOException, HiveException { + Preconditions.checkState(usingNonImpersonationModeWithFairScheduling(conf), + "Unable to map job to fair-scheduler because either impersonation is on or fair-scheduling is disabled."); + + ShimLoader.getSchedulerShims().refreshDefaultQueue(conf, forUser); + } + + /** + * Validate the current YARN queue for the current user. + * @param conf - The current HiveConf configuration. + * @param forUser - The user to configure scheduling for. + * @throws IOException + * @throws HiveException + */ + public static void validateYarnQueue(HiveConf conf, String forUser) throws IOException, HiveException { + Preconditions.checkState(usingNonImpersonationModeWithFairScheduling(conf), + "Unable to map job to fair-scheduler because either impersonation is on or fair-scheduling is disabled."); + + ShimLoader.getSchedulerShims().validateQueueConfiguration(conf, forUser); + } +} \ No newline at end of file diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 72ad86c..d70b750 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.SetProcessor; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.YarnFairScheduling; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; import org.apache.hadoop.hive.shims.ShimLoader; @@ -130,11 +131,10 @@ public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, try { // In non-impersonation mode, map scheduler queue to current user // if fair scheduler is configured. - if (! sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && - sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) { - ShimLoader.getHadoopShims().refreshDefaultQueue(sessionConf, username); + if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(sessionConf)) { + YarnFairScheduling.setDefaultJobQueue(sessionConf, username); } - } catch (IOException e) { + } catch (Exception e) { LOG.warn("Error setting scheduler queue: " + e, e); } // Set an explicit session name to control the download directory name diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java index 63803b8..f88e192 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java @@ -34,4 +34,6 @@ */ public void refreshDefaultQueue(Configuration conf, String userName) throws IOException; + + public void validateQueueConfiguration(Configuration configuration, String forUser) throws IOException; } diff --git a/shims/scheduler/pom.xml b/shims/scheduler/pom.xml index 9141c1e..4242973 100644 --- a/shims/scheduler/pom.xml +++ b/shims/scheduler/pom.xml @@ -77,7 +77,12 @@ ${hadoop.version} true - + + org.mockito + mockito-all + ${mockito-all.version} + + org.apache.hadoop hadoop-yarn-server-tests ${hadoop.version} diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java new file mode 100644 index 0000000..0e32ff0 --- /dev/null +++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.schshim; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +public class FairSchedulerQueueAllocator implements QueueAllocator { + private static final Logger LOG = LoggerFactory.getLogger(FairSchedulerQueueAllocator.class); + private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file"; + + private String currentlyWatching; + private AllocationFileLoaderService loaderService; + private final AtomicReference allocationConfiguration + = new AtomicReference(); + + /** + * Generates a Yarn FairScheduler queue resolver based on 'fair-scheduler.xml'. + * @param config The HiveConf configuration. + * @param username The user to configure the job for. + * @return Returns a configured allocation resolver. + * @throws IOException + */ + public synchronized AtomicReference makeConfigurationFor(Configuration config, String username) throws IOException { + updateWatcher(config); + + return allocationConfiguration; + } + + public synchronized void refresh(Configuration config) { + updateWatcher(config); + } + + @VisibleForTesting + public String getCurrentlyWatchingFile() { + return this.currentlyWatching; + } + + private void updateWatcher(Configuration config) { + if (this.loaderService != null && StringUtils.equals(currentlyWatching, config.get(YARN_SCHEDULER_FILE_PROPERTY))) return; + + this.currentlyWatching = config.get(YARN_SCHEDULER_FILE_PROPERTY); + + if (this.loaderService != null) { + this.loaderService.stop(); + } + + this.loaderService = new AllocationFileLoaderService(); + this.loaderService.init(config); + this.loaderService.setReloadListener(new AllocationFileLoaderService.Listener() { + @Override + public void onReload(AllocationConfiguration allocs) { + allocationConfiguration.set(allocs); + } + }); + + try { + this.loaderService.reloadAllocations(); + } catch (Exception ex) { + LOG.error("Failed to load queue allocations", ex); + } + + if (allocationConfiguration.get() == null) { + allocationConfiguration.set(new AllocationConfiguration(config)); + } + + this.loaderService.start(); + } +} diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java index 372244d..4856c10 100644 --- a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java +++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java @@ -17,54 +17,90 @@ */ package org.apache.hadoop.hive.schshim; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.shims.SchedulerShim; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; +import java.io.IOException; + +/* + * FairSchedulerShim monitors changes in fair-scheduler.xml (if it exists) to allow for dynamic + * reloading and queue resolution. When changes to the fair-scheduler.xml file are detected, the + * cached queue resolution policies for each user are cleared, and then re-cached/validated on job-submit. + */ + public class FairSchedulerShim implements SchedulerShim { private static final Logger LOG = LoggerFactory.getLogger(FairSchedulerShim.class); private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; + private final QueueAllocator queueAllocator; + + @VisibleForTesting + public FairSchedulerShim(QueueAllocator queueAllocator) { + this.queueAllocator = queueAllocator; + } + + public FairSchedulerShim() { + this(new FairSchedulerQueueAllocator()); + } + + /** + * Applies the default YARN fair scheduler queue for a user. + * @param conf - the current HiveConf configuration. + * @param forUser - the user to configure the default queue for. + * @throws IOException + */ @Override - public void refreshDefaultQueue(Configuration conf, String userName) - throws IOException { - String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME; - final AtomicReference allocConf = new AtomicReference(); + public synchronized void refreshDefaultQueue(Configuration conf, String forUser) + throws IOException { + setJobQueueForUserInternal(conf, YarnConfiguration.DEFAULT_QUEUE_NAME, forUser); + } - AllocationFileLoaderService allocsLoader = new AllocationFileLoaderService(); - allocsLoader.init(conf); - allocsLoader.setReloadListener(new AllocationFileLoaderService.Listener() { - @Override - public void onReload(AllocationConfiguration allocs) { - allocConf.set(allocs); - } - }); - try { - allocsLoader.reloadAllocations(); - } catch (Exception ex) { - throw new IOException("Failed to load queue allocations", ex); - } - if (allocConf.get() == null) { - allocConf.set(new AllocationConfiguration(conf)); + /** + * Validates the YARN fair scheduler queue configuration. + * @param conf - the current HiveConf configuration. + * @param forUser - the user to configure the default queue for. + * @throws IOException + */ + @Override + public synchronized void validateQueueConfiguration(Configuration conf, String forUser) throws IOException { + // Currently, "validation" is just to ensure that the client can still set the same queue that they + // could previously. In almost all situations, this should be essentially a no-op (unless the fair-scheduler.xml + // file changes in such a way as this is disallowed). Currently this implementation is just inteded to allow us + // to validate that the user's configuration is at least reasonable on a per-request basis beyond from the already- + // occurring per session setup. + + // TODO: Build out ACL enforcement. + + String currentJobQueue = conf.get(MR2_JOB_QUEUE_PROPERTY); + if (currentJobQueue != null && !currentJobQueue.isEmpty()) { + setJobQueueForUserInternal(conf, currentJobQueue, forUser); + } else { + refreshDefaultQueue(conf, forUser); } - QueuePlacementPolicy queuePolicy = allocConf.get().getPlacementPolicy(); + } + + public QueueAllocator getQueueAllocator() { + return this.queueAllocator; + } + + private void setJobQueueForUserInternal(Configuration conf, String queueName, String forUser) throws IOException { + QueuePlacementPolicy queuePolicy = queueAllocator.makeConfigurationFor(conf, forUser).get().getPlacementPolicy(); + if (queuePolicy != null) { - requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName); + String requestedQueue = queuePolicy.assignAppToQueue(queueName, forUser); if (StringUtils.isNotBlank(requestedQueue)) { - LOG.debug("Setting queue name to " + requestedQueue + " for user " - + userName); + LOG.info("Setting queue name to: '{}' for user '{}'", requestedQueue, forUser); conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); + } else { + LOG.warn("Unable to set queue: {} for user: {}, resetting to user's default queue.", requestedQueue, forUser); + conf.set(MR2_JOB_QUEUE_PROPERTY, queuePolicy.assignAppToQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, forUser)); } } } - -} +} \ No newline at end of file diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java new file mode 100644 index 0000000..daf02da --- /dev/null +++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.schshim; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +public interface QueueAllocator { + /** + * Generates a queue resolver for a given configuration and username. + * @param configuration The HiveConf configuration. + * @param username The user to configure the job for. + * @return Returns the queue allocation configuration. + * @throws IOException + */ + AtomicReference makeConfigurationFor(Configuration configuration, String username) throws IOException; + void refresh(Configuration configuration); +} diff --git a/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java b/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java new file mode 100644 index 0000000..58465e4 --- /dev/null +++ b/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.schshim; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestFairSchedulerQueueAllocator { + private static final String EMPTY = ""; + private static final int USERNAME_ARGUMENT_INDEX = 1; + private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file"; + private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; + + @Test + public void testChangingLastUsedHiveConfigurationStringDirectly() throws Exception { + Configuration configuration = new Configuration(); + FairSchedulerShim shim = new FairSchedulerShim(); + FairSchedulerQueueAllocator allocator = (FairSchedulerQueueAllocator) shim.getQueueAllocator(); + + // On initialization should be uncached. + assertNull(allocator.getCurrentlyWatchingFile()); + + // Per job submission the location of fair-scheduler should be updated. + for (String location : new String[] { "/first", "/second", "third/fourth" }){ + for (String user : new String[] { "firstUser", "secondUser", "thirdUser" }) { + configuration.set(YARN_SCHEDULER_FILE_PROPERTY, location); + shim.refreshDefaultQueue(configuration, user); + assertEquals(allocator.getCurrentlyWatchingFile(), location); + } + } + } + + @Test + public void testNeverBeforeSeenUsersEffectOnLastUsedHiveConfigurationString() throws Exception { + final Configuration configuration = new Configuration(); + FairSchedulerShim shim = new FairSchedulerShim(); + FairSchedulerQueueAllocator allocator = (FairSchedulerQueueAllocator) shim.getQueueAllocator(); + + // Per job submission the location of fair-scheduler should be updated. + configuration.set(YARN_SCHEDULER_FILE_PROPERTY, "/some/unchanging/location"); + for (String user : new String[] { "first", "second", "third", "fourth", "fifth" }) { + shim.refreshDefaultQueue(configuration, user); + assertEquals(allocator.getCurrentlyWatchingFile(), "/some/unchanging/location"); + } + } + + @Test + public void testQueueAllocation() throws Exception { + Configuration configuration = new Configuration(); + QueueAllocator allocator = mock(QueueAllocator.class); + + when(allocator.makeConfigurationFor(any(Configuration.class), any(String.class))) + .thenAnswer(new Answer>() { + @Override + public AtomicReference answer(InvocationOnMock invocationOnMock) throws Throwable { + // Capture which user is causing the reset for verification purposes. + final String username = (String) invocationOnMock.getArguments()[USERNAME_ARGUMENT_INDEX]; + + AllocationConfiguration allocationConfiguration = mock(AllocationConfiguration.class); + when(allocationConfiguration.getPlacementPolicy()) + .thenAnswer(new Answer() { + @Override + public QueuePlacementPolicy answer(InvocationOnMock invocationOnMock) throws Throwable { + QueuePlacementPolicy placementPolicy = mock(QueuePlacementPolicy.class); + when(placementPolicy.assignAppToQueue(any(String.class), any(String.class))) + .thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocationOnMock) throws Throwable { + return String.format("queue.for.%s", username); + } + }); + + return placementPolicy; + } + }); + + return new AtomicReference<>(allocationConfiguration); + } + }); + + FairSchedulerShim shim = new FairSchedulerShim(allocator); + + // Per job submission the location of fair-scheduler should be updated. + configuration.set(YARN_SCHEDULER_FILE_PROPERTY, "/some/file/location"); + for (String user : new String[] { "first", "second", "third", "fourth", "fifth" }) { + shim.refreshDefaultQueue(configuration, user); + + String queueName = String.format("queue.for.%s", user); + assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), queueName); + } + } +} \ No newline at end of file