diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 018c6035dd557ebb812f4224509bcf17bfdf045a..f141c532b87351adae4f1bb3ae067d55197bc78d 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1625,6 +1625,12 @@ HIVE_SERVER2_USE_SSL("hive.server2.use.SSL", false, ""), HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", "", ""), HIVE_SERVER2_SSL_KEYSTORE_PASSWORD("hive.server2.keystore.password", "", ""), + HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE("hive.server2.map.fair.scheduler.queue", true, + "If the YARN fair scheduler is configured and HiveServer2 is running in non-impersonation mode,\n" + + "this setting determines the user for fair scheduler queue mapping.\n" + + "If set to true (default), the logged-in user determines the fair scheduler queue\n" + + "for submitted jobs, so that map reduce resource usage can be tracked by user.\n" + + "If set to false, all Hive jobs go to the 'hive' user's queue."), HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), diff --git itests/hive-unit-hadoop2/pom.xml itests/hive-unit-hadoop2/pom.xml index cbdf506f94e2f720543e5932f0af65623fb43dc5..f8ffc0fb62d4a3b9f04b737551f2a3e75183ee88 100644 --- itests/hive-unit-hadoop2/pom.xml +++ itests/hive-unit-hadoop2/pom.xml @@ -118,6 +118,12 @@ test-jar test + + org.apache.hive + hive-it-unit + ${project.version} + test + org.apache.hadoop diff --git itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..79878ba1b9c52773b8ca7baad0cab7db65fcc178 --- /dev/null +++ itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashMap; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSchedulerQueue { + + private MiniHS2 miniHS2 = null; + private static HiveConf conf = new HiveConf(); + private Connection hs2Conn = null; + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + } + + @Before + public void setUp() throws Exception { + DriverManager.setLoginTimeout(0); + miniHS2 = new MiniHS2(conf, true); + miniHS2.setConfProperty(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false"); + miniHS2.setConfProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname, + "true"); + miniHS2.setConfProperty(YarnConfiguration.RM_SCHEDULER, + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + miniHS2.start(new HashMap()); + } + + @After + public void tearDown() throws Exception { + if (hs2Conn != null) { + hs2Conn.close(); + } + if (miniHS2 != null && miniHS2.isStarted()) { + miniHS2.stop(); + } + System.clearProperty("mapreduce.job.queuename"); + } + + /** + * Verify: + * Test is running with MR2 and queue mapping defaults are set. + * Queue mapping is set for the connected user. + * + * @throws Exception + */ + @Test + public void testFairSchedulerQueueMapping() throws Exception { + hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(), "user1", "bar"); + verifyProperty("mapreduce.framework.name", "yarn"); + verifyProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname, + "true"); + verifyProperty(YarnConfiguration.RM_SCHEDULER, + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + verifyProperty("mapreduce.job.queuename", "root.user1"); + } + + /** + * Verify that the queue refresh doesn't happen when configured to be off. + * + * @throws Exception + */ + @Test + public void testQueueMappingCheckDisabled() throws Exception { + miniHS2.setConfProperty( + HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname, "false"); + hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(), "user1", "bar"); + verifyProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname, + "false"); + verifyProperty("mapreduce.job.queuename", YarnConfiguration.DEFAULT_QUEUE_NAME); + } + + /** + * Verify that the given property contains the expected value. + * + * @param propertyName + * @param expectedValue + * @throws Exception + */ + private void verifyProperty(String propertyName, String expectedValue) throws Exception { + Statement stmt = hs2Conn .createStatement(); + ResultSet res = stmt.executeQuery("set " + propertyName); + assertTrue(res.next()); + String results[] = res.getString(1).split("="); + assertEquals("Property should be set", results.length, 2); + assertEquals("Property should be set", expectedValue, results[1]); + } +} diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index f021870cf48c55a3d2dea4c825f1f25d0ab008f3..fa28a6b6a4acb61d8b442ed13b0421e1e0f13368 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.processors.SetProcessor; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.*; @@ -95,6 +96,17 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo this.hiveConf = new HiveConf(serverhiveConf); this.ipAddress = ipAddress; + try { + // In non-impersonation mode, map scheduler queue to current user + // if fair scheduler is configured. + if (! hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && + hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) { + ShimLoader.getHadoopShims().refreshDefaultQueue(hiveConf, username); + } + } catch (IOException e) { + LOG.warn("Error setting scheduler queue: " + e, e); + } + // Set an explicit session name to control the download directory name hiveConf.set(ConfVars.HIVESESSIONID.varname, sessionHandle.getHandleIdentifier().toString()); diff --git shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index 52f02c1a7915125fedd7b756e1db4611cf81d966..3c7d2afa904bb93c2b4629e9531f203ecaa65157 100644 --- shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -825,6 +825,11 @@ public short getDefaultReplication(FileSystem fs, Path path) { } @Override + public void refreshDefaultQueue(Configuration conf, String userName) { + // MR1 does not expose API required to set MR queue mapping for user + } + + @Override public String getTokenFileLocEnvName() { throw new UnsupportedOperationException( "Kerberos not supported in current hadoop version"); diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index 07946df1b0fc9064925f431fe1f0e2c020b41dfc..2626af707d666d4cfb4a5e40e38141925a32d18c 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -159,6 +159,11 @@ public short getDefaultReplication(FileSystem fs, Path path) { } @Override + public void refreshDefaultQueue(Configuration conf, String userName) { + // MR1 does not expose API required to set MR queue mapping for user + } + + @Override public void setTotalOrderPartitionFile(JobConf jobConf, Path partitionFile){ TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile); } diff --git shims/0.23/pom.xml shims/0.23/pom.xml index ad71fccc2f94f4523cbb3572fecc06e54528d630..c41da9d51a462602447240b1c5b6e06e9e880df4 100644 --- shims/0.23/pom.xml +++ shims/0.23/pom.xml @@ -120,6 +120,11 @@ true + org.apache.hadoop + hadoop-yarn-server-resourcemanager + ${hadoop-23.version} + + org.apache.tez tez-tests ${tez.version} diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 9829c1b54ed2c1cb3068a569ff47d897f6284933..6125714643d5ec837b0d029e3fedcba2bc40e969 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -75,6 +75,10 @@ import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; import org.apache.tez.test.MiniTezCluster; import com.google.common.base.Joiner; @@ -86,6 +90,7 @@ * Implemention of shims against Hadoop 0.23.0. */ public class Hadoop23Shims extends HadoopShimsSecure { + private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; HadoopShims.MiniDFSShim cluster = null; @@ -221,6 +226,30 @@ public int compare(LongWritable o1, LongWritable o2) { } /** + * Load the fair scheduler queue for given user if available. + */ + @Override + public void refreshDefaultQueue(Configuration conf, String userName) throws IOException { + String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME; + if (StringUtils.isNotBlank(userName) && isFairScheduler(conf)) { + AllocationConfiguration allocConf = new AllocationConfiguration(conf); + QueuePlacementPolicy queuePolicy = allocConf.getPlacementPolicy(); + if (queuePolicy != null) { + requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName); + if (StringUtils.isNotBlank(requestedQueue)) { + LOG.debug("Setting queue name to " + requestedQueue + " for user " + userName); + conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); + } + } + } + } + + private boolean isFairScheduler (Configuration conf) { + return FairScheduler.class.getName(). + equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER)); + } + + /** * Returns a shim to wrap MiniMrCluster */ @Override diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index d56795f7be1cffa13ea7b3ea2cd50db90730c020..99a2bcf99ca46b877fb9f2db25ec04f18c2ca768 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -366,6 +366,15 @@ public boolean moveToAppropriateTrash(FileSystem fs, Path path, Configuration co public short getDefaultReplication(FileSystem fs, Path path); /** + * Reset the default fair scheduler queue mapping to end user. + * + * @param conf + * @param userName end user name + */ + public void refreshDefaultQueue(Configuration conf, String userName) + throws IOException; + + /** * Create the proxy ugi for the given userid * @param userName * @return