diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 32944bd..2c20d51 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2241,7 +2241,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { new StringSet("throw", "skip", "ignore"), "The approach msck should take with HDFS " + "directories that are partition-like but contain unsupported characters. 'throw' (an " + "exception) is the default; 'skip' will skip the invalid directories and still repair the" + - " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"); + " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"), + HIVE_SERVER2_LLAP_CONCURRENT_QUERIES("hive.server2.llap.concurrent.queries", -1, + "The number of queries allowed in parallel via llap. Negative number implies 'infinite'."); public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index dfa539f..39cf5b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -20,6 +20,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Semaphore; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -45,8 +46,10 @@ private static final Log LOG = LogFactory.getLog(TezSessionPoolManager.class); private BlockingQueue defaultQueuePool; + private Semaphore llapQueue; private int blockingQueueLength = -1; private HiveConf initConf = null; + int numConcurrentLlapQueries = -1; private boolean inited = false; @@ -83,11 +86,15 @@ public void setupPool(HiveConf conf) throws InterruptedException { String defaultQueues = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES); int numSessions = conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); + numConcurrentLlapQueries = + conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); // the list of queues is a comma separated list. String defaultQueueList[] = defaultQueues.split(","); defaultQueuePool = new ArrayBlockingQueue(numSessions * defaultQueueList.length); + llapQueue = new Semaphore(numConcurrentLlapQueries, true); + this.initConf = conf; /* * with this the ordering of sessions in the queue will be (with 2 sessions 3 queues) @@ -164,8 +171,11 @@ private TezSessionState getNewSessionState(HiveConf conf, return retTezSessionState; } - public void returnSession(TezSessionState tezSessionState) + public void returnSession(TezSessionState tezSessionState, boolean llap) throws Exception { + if (llap) { + llapQueue.release(); + } if (tezSessionState.isDefault()) { LOG.info("The session " + tezSessionState.getSessionId() + " belongs to the pool. Put it back in"); @@ -207,9 +217,9 @@ protected TezSessionState createSession(String sessionId) { return new TezSessionState(sessionId); } - public TezSessionState getSession( - TezSessionState session, HiveConf conf, boolean doOpen) throws Exception { - return getSession(session, conf, doOpen, false); + public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen, + boolean llap) throws Exception { + return getSession(session, conf, doOpen, false, llap); } /* @@ -268,8 +278,11 @@ private boolean canWorkWithSameSession(TezSessionState session, HiveConf conf) return true; } - public TezSessionState getSession(TezSessionState session, HiveConf conf, - boolean doOpen, boolean forceCreate) throws Exception { + public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen, + boolean forceCreate, boolean llap) throws Exception { + if (llap && this.numConcurrentLlapQueries > 0) { + llapQueue.acquire(); // blocks if no more llap queries can be submitted. + } if (canWorkWithSameSession(session, conf)) { return session; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index b2558d1..7f50bea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -114,7 +114,9 @@ public int execute(DriverContext driverContext) { // Need to remove this static hack. But this is the way currently to get a session. SessionState ss = SessionState.get(); session = ss.getTezSession(); - session = TezSessionPoolManager.getInstance().getSession(session, conf, false); + session = + TezSessionPoolManager.getInstance().getSession(session, conf, false, + getWork().getLlapMode()); ss.setTezSession(session); // jobConf will hold all the configuration for hadoop, tez, and hive @@ -173,7 +175,7 @@ public int execute(DriverContext driverContext) { // fetch the counters Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); counters = client.getDAGStatus(statusGetOpts).getDAGCounters(); - TezSessionPoolManager.getInstance().returnSession(session); + TezSessionPoolManager.getInstance().returnSession(session, getWork().getLlapMode()); if (LOG.isInfoEnabled() && counters != null && (conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index 0a22f20..d49d83e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -102,8 +102,8 @@ class LlapDecisionDispatcher implements Dispatcher { - private PhysicalContext pctx; - private HiveConf conf; + private final PhysicalContext pctx; + private final HiveConf conf; public LlapDecisionDispatcher(PhysicalContext pctx) { this.pctx = pctx; @@ -291,6 +291,7 @@ private boolean checkAggregators(Collection aggs) { Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("No scripts", ScriptOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { return new Boolean(false); @@ -299,6 +300,7 @@ public Object process(Node n, Stack s, NodeProcessorCtx c, opRules.put(new RuleRegExp("No user code in fil", FilterOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { ExprNodeDesc expr = ((FilterOperator)n).getConf().getPredicate(); @@ -308,6 +310,7 @@ public Object process(Node n, Stack s, NodeProcessorCtx c, opRules.put(new RuleRegExp("No user code in gby", GroupByOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { List aggs = ((GroupByOperator)n).getConf().getAggregators(); @@ -317,6 +320,7 @@ public Object process(Node n, Stack s, NodeProcessorCtx c, opRules.put(new RuleRegExp("No user code in select", SelectOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { List exprs = ((SelectOperator)n).getConf().getColList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 7b91002..17c5ad7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -263,7 +263,7 @@ public TezEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) { /* * Dependency is a class used for explain - */ + */ public class Dependency implements Serializable, Comparable { public BaseWork w; public EdgeType type; @@ -272,7 +272,7 @@ public TezEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) { public String getName() { return w.getName(); } - + @Explain(displayName = "Type") public String getType() { return type.toString(); @@ -306,7 +306,7 @@ public int compareTo(Dependency o) { } return result; } - + private static final String MR_JAR_PROPERTY = "tmpjars"; /** * Calls configureJobConf on instances of work that are part of this TezWork. @@ -349,7 +349,7 @@ public int compareTo(Dependency o) { /** * connect adds an edge between a and b. Both nodes have * to be added prior to calling connect. - * @param + * @param */ public void connect(BaseWork a, BaseWork b, TezEdgeProperty edgeProp) { @@ -396,4 +396,13 @@ public void setVertexType(BaseWork w, VertexType incomingVertexType) { public VertexType getVertexType(BaseWork w) { return workVertexTypeMap.get(w); } + + public boolean getLlapMode() { + for (BaseWork work : getAllWork()) { + if (work.getLlapMode()) { + return true; + } + } + return false; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index 37a84aa..c148aae 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -55,13 +55,13 @@ public void setUp() { public void testGetNonDefaultSession() { poolManager = new TestTezSessionPoolManager(); try { - TezSessionState sessionState = poolManager.getSession(null, conf, true); - TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true); + TezSessionState sessionState = poolManager.getSession(null, conf, true, false); + TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true, false); if (sessionState1 != sessionState) { fail(); } conf.set("tez.queue.name", "nondefault"); - TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true); + TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true, false); if (sessionState2 == sessionState) { fail(); } @@ -81,30 +81,30 @@ public void testSessionPoolGetInOrder() { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); poolManager.startPool(); - TezSessionState sessionState = poolManager.getSession(null, conf, true); + TezSessionState sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("a") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); - sessionState = poolManager.getSession(null, conf, true); + sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("b") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); - sessionState = poolManager.getSession(null, conf, true); + sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("c") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); - sessionState = poolManager.getSession(null, conf, true); + sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("a") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); } catch (Exception e) { e.printStackTrace(); @@ -112,8 +112,44 @@ public void testSessionPoolGetInOrder() { } } + @Test + public void testLlapSessionQueuing() { + try { + random = new Random(1000); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2); + poolManager = new TestTezSessionPoolManager(); + poolManager.setupPool(conf); + poolManager.startPool(); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + + List threadList = new ArrayList(); + for (int i = 0; i < 15; i++) { + Thread t = new Thread(new SessionThread(true)); + threadList.add(t); + t.start(); + } + + for (Thread t : threadList) { + try { + t.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + } + } + public class SessionThread implements Runnable { + private boolean llap = false; + + public SessionThread(boolean llap) { + this.llap = llap; + } + @Override public void run() { try { @@ -124,9 +160,9 @@ public void run() { tmpConf.set("tez.queue.name", ""); } - TezSessionState session = poolManager.getSession(null, tmpConf, true); + TezSessionState session = poolManager.getSession(null, tmpConf, true, llap); Thread.sleep((random.nextInt(9) % 10) * 1000); - poolManager.returnSession(session); + poolManager.returnSession(session, llap); } catch (Exception e) { e.printStackTrace(); } @@ -150,7 +186,8 @@ public void testReturn() { } List threadList = new ArrayList(); for (int i = 0; i < 15; i++) { - Thread t = new Thread(new SessionThread()); + Thread t = new Thread(new SessionThread(false)); + threadList.add(t); t.start(); }