diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 0a6abd8..2dd4078 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -77,10 +76,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; import org.apache.tez.test.MiniTezCluster; import com.google.common.base.Joiner; @@ -92,7 +87,6 @@ * Implemention of shims against Hadoop 0.23.0. */ public class Hadoop23Shims extends HadoopShimsSecure { - private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; HadoopShims.MiniDFSShim cluster = null; @@ -232,39 +226,13 @@ public int compare(LongWritable o1, LongWritable o2) { */ @Override public void refreshDefaultQueue(Configuration conf, String userName) throws IOException { - String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME; if (StringUtils.isNotBlank(userName) && isFairScheduler(conf)) { - final AtomicReference allocConf = new AtomicReference(); - - AllocationFileLoaderService allocsLoader = new AllocationFileLoaderService(); - allocsLoader.init(conf); - allocsLoader.setReloadListener(new AllocationFileLoaderService.Listener() { - @Override - public void onReload(AllocationConfiguration allocs) { - allocConf.set(allocs); - } - }); - try { - allocsLoader.reloadAllocations(); - } catch (Exception ex) { - throw new IOException("Failed to load queue allocations", ex); - } - if (allocConf.get() == null) { - allocConf.set(new AllocationConfiguration(conf)); - } - QueuePlacementPolicy queuePolicy = allocConf.get().getPlacementPolicy(); - if (queuePolicy != null) { - requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName); - if (StringUtils.isNotBlank(requestedQueue)) { - LOG.debug("Setting queue name to " + requestedQueue + " for user " + userName); - conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); - } - } + ShimLoader.getSchedulerShims().refreshDefaultQueue(conf, userName); } } private boolean isFairScheduler (Configuration conf) { - return FairScheduler.class.getName(). + return "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler". equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER)); } diff --git shims/aggregator/pom.xml shims/aggregator/pom.xml index 5f76e84..4dd3dd7 100644 --- shims/aggregator/pom.xml +++ shims/aggregator/pom.xml @@ -63,5 +63,11 @@ ${project.version} runtime + + org.apache.hive.shims + hive-shims-scheduler + ${project.version} + runtime + diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java new file mode 100644 index 0000000..63803b8 --- /dev/null +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.shims; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +/** + * Shim for Fair scheduler + * HiveServer2 uses fair scheduler API to resolve the queue mapping for non-impersonation + * mode. This shim is avoid direct dependency of yarn fair scheduler on Hive. + */ +public interface SchedulerShim { + /** + * Reset the default fair scheduler queue mapping to end user. + * @param conf + * @param userName end user name + */ + public void refreshDefaultQueue(Configuration conf, String userName) + throws IOException; +} diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java index f354fb7..b84f1f2 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.util.VersionInfo; import org.apache.log4j.AppenderSkeleton; @@ -33,6 +34,7 @@ private static JettyShims jettyShims; private static AppenderSkeleton eventCounter; private static HadoopThriftAuthBridge hadoopThriftAuthBridge; + private static SchedulerShim schedulerShim; /** * The names of the classes for shimming Hadoop for each major version. @@ -87,6 +89,9 @@ } + private static final String SCHEDULER_SHIM_CLASSE = + "org.apache.hadoop.hive.schshim.FairSchedulerShim"; + /** * Factory method to get an instance of HadoopShims based on the * version of Hadoop on the classpath. @@ -124,6 +129,13 @@ public static synchronized HadoopThriftAuthBridge getHadoopThriftAuthBridge() { return hadoopThriftAuthBridge; } + public static synchronized SchedulerShim getSchedulerShims() { + if (schedulerShim == null) { + schedulerShim = createShim(SCHEDULER_SHIM_CLASSE, SchedulerShim.class); + } + return schedulerShim; + } + private static T loadShims(Map classMap, Class xface) { String vers = getMajorVersion(); String className = classMap.get(vers); diff --git shims/pom.xml shims/pom.xml index 7027126..d43086f 100644 --- shims/pom.xml +++ shims/pom.xml @@ -37,6 +37,7 @@ common-secure 0.20S 0.23 + scheduler aggregator diff --git shims/scheduler/pom.xml shims/scheduler/pom.xml new file mode 100644 index 0000000..794cf89 --- /dev/null +++ shims/scheduler/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + org.apache.hive + hive + 0.15.0-SNAPSHOT + ../../pom.xml + + + org.apache.hive.shims + hive-shims-scheduler + jar + Hive Shims Scheduler + + + ../.. + + + + + + + org.apache.hive.shims + hive-shims-common-secure + ${project.version} + + + + commons-logging + commons-logging + ${commons-logging.version} + + + org.apache.hadoop + hadoop-common + ${hadoop-23.version} + true + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop-23.version} + true + + + org.apache.hadoop + hadoop-yarn-api + ${hadoop-23.version} + true + + + org.apache.hadoop + hadoop-yarn-common + ${hadoop-23.version} + true + + + org.apache.hadoop + hadoop-yarn-client + ${hadoop-23.version} + true + + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + ${hadoop-23.version} + + + org.apache.hadoop + hadoop-yarn-server-tests + ${hadoop-23.version} + true + test-jar + + + diff --git shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java new file mode 100644 index 0000000..41c34aa --- /dev/null +++ shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.schshim; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.SchedulerShim; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; + +public class FairSchedulerShim implements SchedulerShim { + private static final Log LOG = LogFactory.getLog(FairSchedulerShim.class); + private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; + + @Override + public void refreshDefaultQueue(Configuration conf, String userName) + throws IOException { + String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME; + final AtomicReference allocConf = new AtomicReference(); + + AllocationFileLoaderService allocsLoader = new AllocationFileLoaderService(); + allocsLoader.init(conf); + allocsLoader.setReloadListener(new AllocationFileLoaderService.Listener() { + @Override + public void onReload(AllocationConfiguration allocs) { + allocConf.set(allocs); + } + }); + try { + allocsLoader.reloadAllocations(); + } catch (Exception ex) { + throw new IOException("Failed to load queue allocations", ex); + } + if (allocConf.get() == null) { + allocConf.set(new AllocationConfiguration(conf)); + } + QueuePlacementPolicy queuePolicy = allocConf.get().getPlacementPolicy(); + if (queuePolicy != null) { + requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName); + if (StringUtils.isNotBlank(requestedQueue)) { + LOG.debug("Setting queue name to " + requestedQueue + " for user " + + userName); + conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); + } + } + } + +}