diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6a1a5f0542ca0723e9e415863ca74c8b43bcc96c..b26234c44eb8703d6b265834b2b1b5f3decc7582 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1622,6 +1622,11 @@
HIVE_SERVER2_USE_SSL("hive.server2.use.SSL", false, ""),
HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", "", ""),
HIVE_SERVER2_SSL_KEYSTORE_PASSWORD("hive.server2.keystore.password", "", ""),
+ HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE("hive.server2.map.fair.scheduler.queue", true,
+ "This setting is only applicable if fair scheduler is configured.\n" +
+ "In non-impersonation mode, where all MR jobs run as the hive system user,\n" +
+ "this setting allows for setting fair scheduler queue mapping for current\n" +
+ "user, so that MR resource usage can be tracked by user."),
HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
"Comma separated list of non-SQL Hive commands users are authorized to execute"),
diff --git itests/hive-unit/pom.xml itests/hive-unit/pom.xml
index 077631a7d809e6d5fd79815fca8882898ee172e9..f117d751e4b70ccbccf8caba334a19ea6bdbb4f9 100644
--- itests/hive-unit/pom.xml
+++ itests/hive-unit/pom.xml
@@ -243,6 +243,20 @@
test
+ org.apache.hadoop
+ hadoop-yarn-server-tests
+ ${hadoop-23.version}
+ tests
+ test
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-tests
+ ${hadoop-23.version}
+ tests
+ test
+
+
com.sun.jersey
jersey-servlet
test
diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java
new file mode 100644
index 0000000000000000000000000000000000000000..79878ba1b9c52773b8ca7baad0cab7db65fcc178
--- /dev/null
+++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSchedulerQueue {
+
+ private MiniHS2 miniHS2 = null;
+ private static HiveConf conf = new HiveConf();
+ private Connection hs2Conn = null;
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ Class.forName(MiniHS2.getJdbcDriverName());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ DriverManager.setLoginTimeout(0);
+ miniHS2 = new MiniHS2(conf, true);
+ miniHS2.setConfProperty(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
+ miniHS2.setConfProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname,
+ "true");
+ miniHS2.setConfProperty(YarnConfiguration.RM_SCHEDULER,
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+ miniHS2.start(new HashMap());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (hs2Conn != null) {
+ hs2Conn.close();
+ }
+ if (miniHS2 != null && miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+ System.clearProperty("mapreduce.job.queuename");
+ }
+
+ /**
+ * Verify:
+ * Test is running with MR2 and queue mapping defaults are set.
+ * Queue mapping is set for the connected user.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testFairSchedulerQueueMapping() throws Exception {
+ hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(), "user1", "bar");
+ verifyProperty("mapreduce.framework.name", "yarn");
+ verifyProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname,
+ "true");
+ verifyProperty(YarnConfiguration.RM_SCHEDULER,
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+ verifyProperty("mapreduce.job.queuename", "root.user1");
+ }
+
+ /**
+ * Verify that the queue refresh doesn't happen when configured to be off.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testQueueMappingCheckDisabled() throws Exception {
+ miniHS2.setConfProperty(
+ HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname, "false");
+ hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(), "user1", "bar");
+ verifyProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname,
+ "false");
+ verifyProperty("mapreduce.job.queuename", YarnConfiguration.DEFAULT_QUEUE_NAME);
+ }
+
+ /**
+ * Verify that the given property contains the expected value.
+ *
+ * @param propertyName
+ * @param expectedValue
+ * @throws Exception
+ */
+ private void verifyProperty(String propertyName, String expectedValue) throws Exception {
+ Statement stmt = hs2Conn .createStatement();
+ ResultSet res = stmt.executeQuery("set " + propertyName);
+ assertTrue(res.next());
+ String results[] = res.getString(1).split("=");
+ assertEquals("Property should be set", results.length, 2);
+ assertEquals("Property should be set", expectedValue, results[1]);
+ }
+}
diff --git pom.xml pom.xml
index f5a25e5c86b0801d382abd66597be8d7ac69e426..f5e6c495cfb139979fa805dc75683d6433913f37 100644
--- pom.xml
+++ pom.xml
@@ -762,6 +762,7 @@
**/ql/exec/vector/udf/generic/*.java
**/TestHiveServer2Concurrency.java
**/TestHiveMetaStore.java
+ ${excluded.tests}
true
false
@@ -957,6 +958,9 @@
hadoop-1
+
+ **/TestSchedulerQueue.java
+
@@ -1033,6 +1037,11 @@
org.apache.hadoop
+ hadoop-yarn-server-resourcemanager
+ ${hadoop-23.version}
+
+
+ org.apache.hadoop
hadoop-minikdc
${hadoop-23.version}
diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index f021870cf48c55a3d2dea4c825f1f25d0ab008f3..7c6e99bf6b57c5321abeeb6397b63998b60499e3 100644
--- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.processors.SetProcessor;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.*;
@@ -95,6 +96,17 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo
this.hiveConf = new HiveConf(serverhiveConf);
this.ipAddress = ipAddress;
+ try {
+ // In non-impersonation mode, map scheduler queue to current user
+ // if fair scheduler is configured.
+ if (! hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
+ hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) {
+ ShimLoader.getHadoopShims().refreshDefaultQueue(hiveConf, username);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error setting scheduler queue ", e);
+ }
+
// Set an explicit session name to control the download directory name
hiveConf.set(ConfVars.HIVESESSIONID.varname,
sessionHandle.getHandleIdentifier().toString());
diff --git shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
index 52f02c1a7915125fedd7b756e1db4611cf81d966..5cb07f40b0c1ccaef03c39c42537d4e91ecdc39a 100644
--- shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
+++ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
@@ -825,6 +825,12 @@ public short getDefaultReplication(FileSystem fs, Path path) {
}
@Override
+ public void refreshDefaultQueue(Configuration conf, String userName) {
+ throw new UnsupportedOperationException(
+ "Current version of hadoop does not expose API required to set MR queue mapping for user.");
+ }
+
+ @Override
public String getTokenFileLocEnvName() {
throw new UnsupportedOperationException(
"Kerberos not supported in current hadoop version");
diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
index 07946df1b0fc9064925f431fe1f0e2c020b41dfc..29aa102783b112c11dcbf04ff009b007bc922c74 100644
--- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
+++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
@@ -159,6 +159,12 @@ public short getDefaultReplication(FileSystem fs, Path path) {
}
@Override
+ public void refreshDefaultQueue(Configuration conf, String userName) {
+ throw new UnsupportedOperationException(
+ "Current version of hadoop does not expose API required to set MR queue mapping for user.");
+ }
+
+ @Override
public void setTotalOrderPartitionFile(JobConf jobConf, Path partitionFile){
TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
}
diff --git shims/0.23/pom.xml shims/0.23/pom.xml
index ad71fccc2f94f4523cbb3572fecc06e54528d630..c41da9d51a462602447240b1c5b6e06e9e880df4 100644
--- shims/0.23/pom.xml
+++ shims/0.23/pom.xml
@@ -120,6 +120,11 @@
true
+ org.apache.hadoop
+ hadoop-yarn-server-resourcemanager
+ ${hadoop-23.version}
+
+
org.apache.tez
tez-tests
${tez.version}
diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 9829c1b54ed2c1cb3068a569ff47d897f6284933..6125714643d5ec837b0d029e3fedcba2bc40e969 100644
--- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -75,6 +75,10 @@
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
import org.apache.tez.test.MiniTezCluster;
import com.google.common.base.Joiner;
@@ -86,6 +90,7 @@
* Implemention of shims against Hadoop 0.23.0.
*/
public class Hadoop23Shims extends HadoopShimsSecure {
+ private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename";
HadoopShims.MiniDFSShim cluster = null;
@@ -221,6 +226,30 @@ public int compare(LongWritable o1, LongWritable o2) {
}
/**
+ * Load the fair scheduler queue for given user if available.
+ */
+ @Override
+ public void refreshDefaultQueue(Configuration conf, String userName) throws IOException {
+ String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME;
+ if (StringUtils.isNotBlank(userName) && isFairScheduler(conf)) {
+ AllocationConfiguration allocConf = new AllocationConfiguration(conf);
+ QueuePlacementPolicy queuePolicy = allocConf.getPlacementPolicy();
+ if (queuePolicy != null) {
+ requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName);
+ if (StringUtils.isNotBlank(requestedQueue)) {
+ LOG.debug("Setting queue name to " + requestedQueue + " for user " + userName);
+ conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue);
+ }
+ }
+ }
+ }
+
+ private boolean isFairScheduler (Configuration conf) {
+ return FairScheduler.class.getName().
+ equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER));
+ }
+
+ /**
* Returns a shim to wrap MiniMrCluster
*/
@Override
diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index d56795f7be1cffa13ea7b3ea2cd50db90730c020..d856260bce39ad807c680ef1451880e83f57de09 100644
--- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -366,6 +366,15 @@ public boolean moveToAppropriateTrash(FileSystem fs, Path path, Configuration co
public short getDefaultReplication(FileSystem fs, Path path);
/**
+ * Reset the default scheduler queue if applicable
+ * @param conf
+ * @param userName
+ * @return
+ */
+ public void refreshDefaultQueue(Configuration conf, String userName)
+ throws IOException;
+
+ /**
* Create the proxy ugi for the given userid
* @param userName
* @return