diff --git data/conf/fair-scheduler-test.xml data/conf/fair-scheduler-test.xml
new file mode 100644
index 0000000..96996a4
--- /dev/null
+++ data/conf/fair-scheduler-test.xml
@@ -0,0 +1,16 @@
+
+
+
+ *
+
+
+ *
+
+
+
+
+
+
+
+
+
diff --git itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java
index 79878ba..0c5f8a5 100644
--- itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java
+++ itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java
@@ -21,14 +21,19 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.junit.After;
import org.junit.Before;
@@ -37,6 +42,26 @@
public class TestSchedulerQueue {
+ // hadoop group mapping that maps user to same group
+ public static class HiveTestSimpleGroupMapping implements GroupMappingServiceProvider {
+ public static String primaryTag = "";
+ @Override
+ public List getGroups(String user) throws IOException {
+ List results = new ArrayList();
+ results.add(user + primaryTag);
+ results.add(user + "-group");
+ return results;
+ }
+
+ @Override
+ public void cacheGroupsRefresh() throws IOException {
+ }
+
+ @Override
+ public void cacheGroupsAdd(List groups) throws IOException {
+ }
+ }
+
private MiniHS2 miniHS2 = null;
private static HiveConf conf = new HiveConf();
private Connection hs2Conn = null;
@@ -44,6 +69,8 @@
@BeforeClass
public static void beforeTest() throws Exception {
Class.forName(MiniHS2.getJdbcDriverName());
+ conf.set("hadoop.security.group.mapping",
+ HiveTestSimpleGroupMapping.class.getName());
}
@Before
@@ -56,6 +83,7 @@ public void setUp() throws Exception {
miniHS2.setConfProperty(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
miniHS2.start(new HashMap());
+ HiveTestSimpleGroupMapping.primaryTag = "";
}
@After
@@ -79,6 +107,7 @@ public void tearDown() throws Exception {
@Test
public void testFairSchedulerQueueMapping() throws Exception {
hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(), "user1", "bar");
+ verifyProperty(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
verifyProperty("mapreduce.framework.name", "yarn");
verifyProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname,
"true");
@@ -88,6 +117,31 @@ public void testFairSchedulerQueueMapping() throws Exception {
}
/**
+ * Verify:
+ * Test is running with MR2 and queue mapping are set correctly for primary group rule.
+ * @throws Exception
+ */
+ @Test
+ public void testFairSchedulerPrimaryQueueMapping() throws Exception {
+ miniHS2.setConfProperty(FairSchedulerConfiguration.ALLOCATION_FILE, "fair-scheduler-test.xml");
+ HiveTestSimpleGroupMapping.primaryTag = "-test";
+ hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(), "user2", "bar");
+ verifyProperty("mapreduce.job.queuename", "root.user2" + HiveTestSimpleGroupMapping.primaryTag);
+ }
+
+ /**
+ * Verify:
+ * Test is running with MR2 and queue mapping are set correctly for primary group rule.
+ * @throws Exception
+ */
+ @Test
+ public void testFairSchedulerSecondaryQueueMapping() throws Exception {
+ miniHS2.setConfProperty(FairSchedulerConfiguration.ALLOCATION_FILE, "fair-scheduler-test.xml");
+ hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(), "user3", "bar");
+ verifyProperty("mapreduce.job.queuename", "root.user3-group");
+ }
+
+ /**
* Verify that the queue refresh doesn't happen when configured to be off.
*
* @throws Exception
diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 6125714..0a6abd8 100644
--- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -77,6 +78,7 @@
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
import org.apache.tez.test.MiniTezCluster;
@@ -232,8 +234,25 @@ public int compare(LongWritable o1, LongWritable o2) {
public void refreshDefaultQueue(Configuration conf, String userName) throws IOException {
String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME;
if (StringUtils.isNotBlank(userName) && isFairScheduler(conf)) {
- AllocationConfiguration allocConf = new AllocationConfiguration(conf);
- QueuePlacementPolicy queuePolicy = allocConf.getPlacementPolicy();
+ final AtomicReference allocConf = new AtomicReference();
+
+ AllocationFileLoaderService allocsLoader = new AllocationFileLoaderService();
+ allocsLoader.init(conf);
+ allocsLoader.setReloadListener(new AllocationFileLoaderService.Listener() {
+ @Override
+ public void onReload(AllocationConfiguration allocs) {
+ allocConf.set(allocs);
+ }
+ });
+ try {
+ allocsLoader.reloadAllocations();
+ } catch (Exception ex) {
+ throw new IOException("Failed to load queue allocations", ex);
+ }
+ if (allocConf.get() == null) {
+ allocConf.set(new AllocationConfiguration(conf));
+ }
+ QueuePlacementPolicy queuePolicy = allocConf.get().getPlacementPolicy();
if (queuePolicy != null) {
requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName);
if (StringUtils.isNotBlank(requestedQueue)) {