diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index a4fd36d..e5fce14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -141,7 +141,9 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen, private TezSessionState getNewSessionState(HiveConf conf, String queueName, boolean doOpen) throws Exception { TezSessionState retTezSessionState = createSession(TezSessionState.makeSessionId()); - retTezSessionState.setQueueName(queueName); + if (queueName != null) { + conf.set("tez.queue.name", queueName); + } String what = "Created"; if (doOpen) { retTezSessionState.open(conf); @@ -221,29 +223,27 @@ private boolean canWorkWithSameSession(TezSessionState session, HiveConf conf) throw new HiveException(e); } - HiveConf existingConf = session.getConf(); - if (existingConf == null) { - return false; - } - + boolean doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); // either variables will never be null because a default value is returned in case of absence - if (existingConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) != - conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + if (doAsEnabled != conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { return false; } if (!session.isDefault()) { - if (existingConf.get("tez.queue.name") == conf.get("tez.queue.name")) { - // both are null - return true; - } - if ((existingConf.get("tez.queue.name") == null)) { - // doesn't matter if the other conf is null or not. if it is null, above case catches it - return false; + String queueName = session.getQueueName(); + LOG.info("Current queue name is " + queueName + " incoming queue name is " + + conf.get("tez.queue.name")); + if (queueName == null) { + if (conf.get("tez.queue.name") != null) { + // queue names are different + return false; + } else { + return true; + } } - if (!existingConf.get("tez.queue.name").equals(conf.get("tez.queue.name"))) { - // handles the case of incoming conf having a null for tez.queue.name + if (!queueName.equals(conf.get("tez.queue.name"))) { + // the String.equals method handles the case of conf not having the queue name as well. return false; } } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 110b80a..563fb49 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -67,13 +67,14 @@ private LocalResource appJarLr; private TezClient session; private String sessionId; - private DagUtils utils; + private final DagUtils utils; private String queueName; private boolean defaultQueue = false; private String user; private final Set additionalFilesNotFromConf = new HashSet(); private final Set localizedResources = new HashSet(); + private boolean doAsEnabled; private static List openSessions = Collections.synchronizedList(new LinkedList()); @@ -130,6 +131,8 @@ public void open(HiveConf conf) public void open(HiveConf conf, String[] additionalFiles) throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { this.conf = conf; + this.queueName = conf.get("tez.queue.name"); + this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); UserGroupInformation ugi; ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); @@ -392,4 +395,8 @@ public HiveConf getConf() { public String getUser() { return user; } + + public boolean getDoAsEnabled() { + return doAsEnabled; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index c6ac557..b6834be 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -47,117 +47,122 @@ public TezSessionState createSession(String sessionId) { } @Before - public void setUp() { - conf = new HiveConf(); - } + public void setUp() { + conf = new HiveConf(); + } @Test - public void testGetNonDefaultSession() { - poolManager = new TestTezSessionPoolManager(); - try { - TezSessionState sessionState = poolManager.getSession(null, conf, true); - TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true); - if (sessionState1 != sessionState) { - fail(); - } - } catch (Exception e) { - e.printStackTrace(); + public void testGetNonDefaultSession() { + poolManager = new TestTezSessionPoolManager(); + try { + TezSessionState sessionState = poolManager.getSession(null, conf, true); + TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true); + if (sessionState1 != sessionState) { + fail(); + } + conf.set("tez.queue.name", "nondefault"); + TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true); + if (sessionState2 == sessionState) { fail(); } + } catch (Exception e) { + e.printStackTrace(); + fail(); } + } @Test - public void testSessionPoolGetInOrder() { - try { - conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "a,b,c"); - conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 2); - - poolManager = new TestTezSessionPoolManager(); - poolManager.setupPool(conf); - poolManager.startPool(); - TezSessionState sessionState = poolManager.getSession(null, conf, true); - if (sessionState.getQueueName().compareTo("a") != 0) { - fail(); - } - poolManager.returnSession(sessionState); - - sessionState = poolManager.getSession(null, conf, true); - if (sessionState.getQueueName().compareTo("b") != 0) { - fail(); - } - poolManager.returnSession(sessionState); + public void testSessionPoolGetInOrder() { + try { + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "a,b,c"); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 2); - sessionState = poolManager.getSession(null, conf, true); - if (sessionState.getQueueName().compareTo("c") != 0) { - fail(); - } - poolManager.returnSession(sessionState); + poolManager = new TestTezSessionPoolManager(); + poolManager.setupPool(conf); + poolManager.startPool(); + TezSessionState sessionState = poolManager.getSession(null, conf, true); + if (sessionState.getQueueName().compareTo("a") != 0) { + fail(); + } + poolManager.returnSession(sessionState); - sessionState = poolManager.getSession(null, conf, true); - if (sessionState.getQueueName().compareTo("a") != 0) { - fail(); - } + sessionState = poolManager.getSession(null, conf, true); + if (sessionState.getQueueName().compareTo("b") != 0) { + fail(); + } + poolManager.returnSession(sessionState); - poolManager.returnSession(sessionState); + sessionState = poolManager.getSession(null, conf, true); + if (sessionState.getQueueName().compareTo("c") != 0) { + fail(); + } + poolManager.returnSession(sessionState); - } catch (Exception e) { - e.printStackTrace(); + sessionState = poolManager.getSession(null, conf, true); + if (sessionState.getQueueName().compareTo("a") != 0) { fail(); } + + poolManager.returnSession(sessionState); + + } catch (Exception e) { + e.printStackTrace(); + fail(); } + } public class SessionThread implements Runnable { @Override - public void run() { - try { - HiveConf tmpConf = new HiveConf(conf); - if (random.nextDouble() > 0.5) { - tmpConf.set("tez.queue.name", "default"); - } else { - tmpConf.set("tez.queue.name", ""); - } - - TezSessionState session = poolManager.getSession(null, tmpConf, true); - Thread.sleep((random.nextInt(9) % 10) * 1000); - poolManager.returnSession(session); - } catch (Exception e) { - e.printStackTrace(); + public void run() { + try { + HiveConf tmpConf = new HiveConf(conf); + if (random.nextDouble() > 0.5) { + tmpConf.set("tez.queue.name", "default"); + } else { + tmpConf.set("tez.queue.name", ""); } + + TezSessionState session = poolManager.getSession(null, tmpConf, true); + Thread.sleep((random.nextInt(9) % 10) * 1000); + poolManager.returnSession(session); + } catch (Exception e) { + e.printStackTrace(); } + } } @Test - public void testReturn() { - conf.set("tez.queue.name", ""); - random = new Random(1000); - conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "a,b,c"); - conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 2); + public void testReturn() { + conf.set("tez.queue.name", ""); + random = new Random(1000); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "a,b,c"); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 2); + try { + poolManager = new TestTezSessionPoolManager(); + poolManager.setupPool(conf); + poolManager.startPool(); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + List threadList = new ArrayList(); + for (int i = 0; i < 15; i++) { + Thread t = new Thread(new SessionThread()); + t.start(); + } + + for (Thread t : threadList) { try { - poolManager = new TestTezSessionPoolManager(); - poolManager.setupPool(conf); - poolManager.startPool(); - } catch (Exception e) { + t.join(); + } catch (InterruptedException e) { e.printStackTrace(); fail(); } - List threadList = new ArrayList(); - for (int i = 0; i < 15; i++) { - Thread t = new Thread(new SessionThread()); - t.start(); - } - - for (Thread t : threadList) { - try { - t.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - } } + } @Test public void testCloseAndOpenDefault() throws Exception { diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java index 99a7a00..63687eb 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java @@ -37,9 +37,10 @@ public class TestTezSessionState extends TezSessionState { private boolean open; - private String sessionId; + private final String sessionId; private HiveConf hiveConf; private String user; + private boolean doAsEnabled; public TestTezSessionState(String sessionId) { super(sessionId); @@ -47,38 +48,46 @@ public TestTezSessionState(String sessionId) { } @Override - public boolean isOpen() { - return open; - } + public boolean isOpen() { + return open; + } public void setOpen(boolean open) { this.open = open; } @Override - public void open(HiveConf conf) throws IOException, - LoginException, URISyntaxException, TezException { - this.hiveConf = conf; - UserGroupInformation ugi; - ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); - user = ShimLoader.getHadoopShims().getShortUserName(ugi); - } + public void open(HiveConf conf) throws IOException, LoginException, URISyntaxException, + TezException { + this.hiveConf = conf; + UserGroupInformation ugi; + ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); + user = ShimLoader.getHadoopShims().getShortUserName(ugi); + this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); + } @Override - public void close(boolean keepTmpDir) throws TezException, IOException { - open = keepTmpDir; - } + public void close(boolean keepTmpDir) throws TezException, IOException { + open = keepTmpDir; + } + @Override public HiveConf getConf() { return this.hiveConf; } @Override - public String getSessionId() { - return sessionId; - } - + public String getSessionId() { + return sessionId; + } + + @Override public String getUser() { return user; } + + @Override + public boolean getDoAsEnabled() { + return this.doAsEnabled; + } }