diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6a1a5f0542ca0723e9e415863ca74c8b43bcc96c..b26234c44eb8703d6b265834b2b1b5f3decc7582 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1622,6 +1622,11 @@ HIVE_SERVER2_USE_SSL("hive.server2.use.SSL", false, ""), HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", "", ""), HIVE_SERVER2_SSL_KEYSTORE_PASSWORD("hive.server2.keystore.password", "", ""), + HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE("hive.server2.map.fair.scheduler.queue", true, + "This setting is only applicable if fair scheduler is configured.\n" + + "In non-impersonation mode, where all MR jobs run as the hive system user,\n" + + "this setting allows for setting fair scheduler queue mapping for current\n" + + "user, so that MR resource usage can be tracked by user."), HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), diff --git itests/hive-unit/pom.xml itests/hive-unit/pom.xml index 077631a7d809e6d5fd79815fca8882898ee172e9..f117d751e4b70ccbccf8caba334a19ea6bdbb4f9 100644 --- itests/hive-unit/pom.xml +++ itests/hive-unit/pom.xml @@ -243,6 +243,20 @@ test + org.apache.hadoop + hadoop-yarn-server-tests + ${hadoop-23.version} + tests + test + + + org.apache.hadoop + hadoop-yarn-server-tests + ${hadoop-23.version} + tests + test + + com.sun.jersey jersey-servlet test diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..79878ba1b9c52773b8ca7baad0cab7db65fcc178 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashMap; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSchedulerQueue { + + private MiniHS2 miniHS2 = null; + private static HiveConf conf = new HiveConf(); + private Connection hs2Conn = null; + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + } + + @Before + public void setUp() throws Exception { + DriverManager.setLoginTimeout(0); + miniHS2 = new MiniHS2(conf, true); + miniHS2.setConfProperty(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false"); + miniHS2.setConfProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname, + "true"); + miniHS2.setConfProperty(YarnConfiguration.RM_SCHEDULER, + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + miniHS2.start(new HashMap()); + } + + @After + public void tearDown() throws Exception { + if (hs2Conn != null) { + hs2Conn.close(); + } + if (miniHS2 != null && miniHS2.isStarted()) { + miniHS2.stop(); + } + System.clearProperty("mapreduce.job.queuename"); + } + + /** + * Verify: + * Test is running with MR2 and queue mapping defaults are set. + * Queue mapping is set for the connected user. + * + * @throws Exception + */ + @Test + public void testFairSchedulerQueueMapping() throws Exception { + hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(), "user1", "bar"); + verifyProperty("mapreduce.framework.name", "yarn"); + verifyProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname, + "true"); + verifyProperty(YarnConfiguration.RM_SCHEDULER, + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + verifyProperty("mapreduce.job.queuename", "root.user1"); + } + + /** + * Verify that the queue refresh doesn't happen when configured to be off. + * + * @throws Exception + */ + @Test + public void testQueueMappingCheckDisabled() throws Exception { + miniHS2.setConfProperty( + HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname, "false"); + hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(), "user1", "bar"); + verifyProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname, + "false"); + verifyProperty("mapreduce.job.queuename", YarnConfiguration.DEFAULT_QUEUE_NAME); + } + + /** + * Verify that the given property contains the expected value. + * + * @param propertyName + * @param expectedValue + * @throws Exception + */ + private void verifyProperty(String propertyName, String expectedValue) throws Exception { + Statement stmt = hs2Conn .createStatement(); + ResultSet res = stmt.executeQuery("set " + propertyName); + assertTrue(res.next()); + String results[] = res.getString(1).split("="); + assertEquals("Property should be set", results.length, 2); + assertEquals("Property should be set", expectedValue, results[1]); + } +} diff --git pom.xml pom.xml index f5a25e5c86b0801d382abd66597be8d7ac69e426..f5e6c495cfb139979fa805dc75683d6433913f37 100644 --- pom.xml +++ pom.xml @@ -762,6 +762,7 @@ **/ql/exec/vector/udf/generic/*.java **/TestHiveServer2Concurrency.java **/TestHiveMetaStore.java + ${excluded.tests} true false @@ -957,6 +958,9 @@ hadoop-1 + + **/TestSchedulerQueue.java + @@ -1033,6 +1037,11 @@ org.apache.hadoop + hadoop-yarn-server-resourcemanager + ${hadoop-23.version} + + + org.apache.hadoop hadoop-minikdc ${hadoop-23.version} diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index f021870cf48c55a3d2dea4c825f1f25d0ab008f3..7c6e99bf6b57c5321abeeb6397b63998b60499e3 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.processors.SetProcessor; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.*; @@ -95,6 +96,17 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo this.hiveConf = new HiveConf(serverhiveConf); this.ipAddress = ipAddress; + try { + // In non-impersonation mode, map scheduler queue to current user + // if fair scheduler is configured. + if (! hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && + hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) { + ShimLoader.getHadoopShims().refreshDefaultQueue(hiveConf, username); + } + } catch (IOException e) { + LOG.warn("Error setting scheduler queue ", e); + } + // Set an explicit session name to control the download directory name hiveConf.set(ConfVars.HIVESESSIONID.varname, sessionHandle.getHandleIdentifier().toString()); diff --git shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index 52f02c1a7915125fedd7b756e1db4611cf81d966..3c7d2afa904bb93c2b4629e9531f203ecaa65157 100644 --- shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -825,6 +825,11 @@ public short getDefaultReplication(FileSystem fs, Path path) { } @Override + public void refreshDefaultQueue(Configuration conf, String userName) { + // MR1 does not expose API required to set MR queue mapping for user + } + + @Override public String getTokenFileLocEnvName() { throw new UnsupportedOperationException( "Kerberos not supported in current hadoop version"); diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index 07946df1b0fc9064925f431fe1f0e2c020b41dfc..2626af707d666d4cfb4a5e40e38141925a32d18c 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -159,6 +159,11 @@ public short getDefaultReplication(FileSystem fs, Path path) { } @Override + public void refreshDefaultQueue(Configuration conf, String userName) { + // MR1 does not expose API required to set MR queue mapping for user + } + + @Override public void setTotalOrderPartitionFile(JobConf jobConf, Path partitionFile){ TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile); } diff --git shims/0.23/pom.xml shims/0.23/pom.xml index ad71fccc2f94f4523cbb3572fecc06e54528d630..c41da9d51a462602447240b1c5b6e06e9e880df4 100644 --- shims/0.23/pom.xml +++ shims/0.23/pom.xml @@ -120,6 +120,11 @@ true + org.apache.hadoop + hadoop-yarn-server-resourcemanager + ${hadoop-23.version} + + org.apache.tez tez-tests ${tez.version} diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 9829c1b54ed2c1cb3068a569ff47d897f6284933..6125714643d5ec837b0d029e3fedcba2bc40e969 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -75,6 +75,10 @@ import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; import org.apache.tez.test.MiniTezCluster; import com.google.common.base.Joiner; @@ -86,6 +90,7 @@ * Implemention of shims against Hadoop 0.23.0. */ public class Hadoop23Shims extends HadoopShimsSecure { + private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; HadoopShims.MiniDFSShim cluster = null; @@ -221,6 +226,30 @@ public int compare(LongWritable o1, LongWritable o2) { } /** + * Load the fair scheduler queue for given user if available. + */ + @Override + public void refreshDefaultQueue(Configuration conf, String userName) throws IOException { + String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME; + if (StringUtils.isNotBlank(userName) && isFairScheduler(conf)) { + AllocationConfiguration allocConf = new AllocationConfiguration(conf); + QueuePlacementPolicy queuePolicy = allocConf.getPlacementPolicy(); + if (queuePolicy != null) { + requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName); + if (StringUtils.isNotBlank(requestedQueue)) { + LOG.debug("Setting queue name to " + requestedQueue + " for user " + userName); + conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); + } + } + } + } + + private boolean isFairScheduler (Configuration conf) { + return FairScheduler.class.getName(). + equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER)); + } + + /** * Returns a shim to wrap MiniMrCluster */ @Override diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index d56795f7be1cffa13ea7b3ea2cd50db90730c020..d856260bce39ad807c680ef1451880e83f57de09 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -366,6 +366,15 @@ public boolean moveToAppropriateTrash(FileSystem fs, Path path, Configuration co public short getDefaultReplication(FileSystem fs, Path path); /** + * Reset the default scheduler queue if applicable + * @param conf + * @param userName + * @return + */ + public void refreshDefaultQueue(Configuration conf, String userName) + throws IOException; + + /** * Create the proxy ugi for the given userid * @param userName * @return