diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index bfc5172..76794e4 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2220,7 +2220,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null), - "If value is greater than 0 logs in fixed intervals of size n rather than exponentially."); + "If value is greater than 0 logs in fixed intervals of size n rather than exponentially."), + HIVE_SERVER2_LLAP_CONCURRENT_QUERIES("hive.server2.llap.concurrent.queries", 1, + "The number of queries allowed in parallel via llap"); public final String varname; private final String defaultExpr; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 1798201..aee1349 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -42,6 +41,7 @@ private static final Log LOG = LogFactory.getLog(TezSessionPoolManager.class); private BlockingQueue defaultQueuePool; + private BlockingQueue llapQueue; private int blockingQueueLength = -1; private HiveConf initConf = null; @@ -76,11 +76,17 @@ public void setupPool(HiveConf conf) throws InterruptedException { String defaultQueues = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES); int numSessions = conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); + int numConcurrentLlapQueries = + conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); // the list of queues is a comma separated list. String defaultQueueList[] = defaultQueues.split(","); defaultQueuePool = new ArrayBlockingQueue(numSessions * defaultQueueList.length); + llapQueue = new ArrayBlockingQueue(numConcurrentLlapQueries); + for (int i = 0; i < numConcurrentLlapQueries; i++) { + llapQueue.add(true); + } this.initConf = conf; /* * with this the ordering of sessions in the queue will be (with 2 sessions 3 queues) @@ -156,8 +162,11 @@ private TezSessionState getNewSessionState(HiveConf conf, return retTezSessionState; } - public void returnSession(TezSessionState tezSessionState) + public void returnSession(TezSessionState tezSessionState, boolean llap) throws Exception { + if (llap) { + llapQueue.put(true); + } if (tezSessionState.isDefault()) { LOG.info("The session " + tezSessionState.getSessionId() + " belongs to the pool. Put it back in"); @@ -195,9 +204,9 @@ protected TezSessionState createSession(String sessionId) { return new TezSessionState(sessionId); } - public TezSessionState getSession( - TezSessionState session, HiveConf conf, boolean doOpen) throws Exception { - return getSession(session, conf, doOpen, false); + public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen, + boolean llap) throws Exception { + return getSession(session, conf, doOpen, false, llap); } /* @@ -256,8 +265,11 @@ private boolean canWorkWithSameSession(TezSessionState session, HiveConf conf) return true; } - public TezSessionState getSession(TezSessionState session, HiveConf conf, - boolean doOpen, boolean forceCreate) throws Exception { + public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen, + boolean forceCreate, boolean llap) throws Exception { + if (llap) { + llapQueue.take(); // blocks if no more llap queries can be submitted. + } if (canWorkWithSameSession(session, conf)) { return session; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 955c8a3..80ecd69 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -114,7 +114,9 @@ public int execute(DriverContext driverContext) { // Need to remove this static hack. But this is the way currently to get a session. SessionState ss = SessionState.get(); session = ss.getTezSession(); - session = TezSessionPoolManager.getInstance().getSession(session, conf, false); + session = + TezSessionPoolManager.getInstance().getSession(session, conf, false, + getWork().getLlapMode()); ss.setTezSession(session); // jobConf will hold all the configuration for hadoop, tez, and hive @@ -173,7 +175,7 @@ public int execute(DriverContext driverContext) { // fetch the counters Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); counters = client.getDAGStatus(statusGetOpts).getDAGCounters(); - TezSessionPoolManager.getInstance().returnSession(session); + TezSessionPoolManager.getInstance().returnSession(session, getWork().getLlapMode()); if (LOG.isInfoEnabled() && counters != null && (conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index 0a22f20..ac42139 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -102,8 +102,8 @@ class LlapDecisionDispatcher implements Dispatcher { - private PhysicalContext pctx; - private HiveConf conf; + private final PhysicalContext pctx; + private final HiveConf conf; public LlapDecisionDispatcher(PhysicalContext pctx) { this.pctx = pctx; @@ -126,6 +126,7 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) private void handleWork(TezWork tezWork, BaseWork work) throws SemanticException { if (evaluateWork(tezWork, work)) { + tezWork.setLlapMode(true); convertWork(tezWork, work); } } @@ -291,6 +292,7 @@ private boolean checkAggregators(Collection aggs) { Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("No scripts", ScriptOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { return new Boolean(false); @@ -299,6 +301,7 @@ public Object process(Node n, Stack s, NodeProcessorCtx c, opRules.put(new RuleRegExp("No user code in fil", FilterOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { ExprNodeDesc expr = ((FilterOperator)n).getConf().getPredicate(); @@ -308,6 +311,7 @@ public Object process(Node n, Stack s, NodeProcessorCtx c, opRules.put(new RuleRegExp("No user code in gby", GroupByOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { List aggs = ((GroupByOperator)n).getConf().getAggregators(); @@ -317,6 +321,7 @@ public Object process(Node n, Stack s, NodeProcessorCtx c, opRules.put(new RuleRegExp("No user code in select", SelectOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { List exprs = ((SelectOperator)n).getConf().getColList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 7b91002..3a6b77a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -77,6 +77,8 @@ public static boolean isCustomInputType(VertexType vertex) { new HashMap, TezEdgeProperty>(); private final Map workVertexTypeMap = new HashMap(); + private boolean llapMode = false; + public TezWork(String name) { this.name = name + ":" + (++counter); } @@ -263,7 +265,7 @@ public TezEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) { /* * Dependency is a class used for explain - */ + */ public class Dependency implements Serializable, Comparable { public BaseWork w; public EdgeType type; @@ -272,7 +274,7 @@ public TezEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) { public String getName() { return w.getName(); } - + @Explain(displayName = "Type") public String getType() { return type.toString(); @@ -306,7 +308,7 @@ public int compareTo(Dependency o) { } return result; } - + private static final String MR_JAR_PROPERTY = "tmpjars"; /** * Calls configureJobConf on instances of work that are part of this TezWork. @@ -349,7 +351,7 @@ public int compareTo(Dependency o) { /** * connect adds an edge between a and b. Both nodes have * to be added prior to calling connect. - * @param + * @param */ public void connect(BaseWork a, BaseWork b, TezEdgeProperty edgeProp) { @@ -396,4 +398,12 @@ public void setVertexType(BaseWork w, VertexType incomingVertexType) { public VertexType getVertexType(BaseWork w) { return workVertexTypeMap.get(w); } + + public void setLlapMode(boolean mode) { + this.llapMode = this.llapMode | mode; + } + + public boolean getLlapMode() { + return llapMode; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index 37a84aa..c148aae 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -55,13 +55,13 @@ public void setUp() { public void testGetNonDefaultSession() { poolManager = new TestTezSessionPoolManager(); try { - TezSessionState sessionState = poolManager.getSession(null, conf, true); - TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true); + TezSessionState sessionState = poolManager.getSession(null, conf, true, false); + TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true, false); if (sessionState1 != sessionState) { fail(); } conf.set("tez.queue.name", "nondefault"); - TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true); + TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true, false); if (sessionState2 == sessionState) { fail(); } @@ -81,30 +81,30 @@ public void testSessionPoolGetInOrder() { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); poolManager.startPool(); - TezSessionState sessionState = poolManager.getSession(null, conf, true); + TezSessionState sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("a") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); - sessionState = poolManager.getSession(null, conf, true); + sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("b") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); - sessionState = poolManager.getSession(null, conf, true); + sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("c") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); - sessionState = poolManager.getSession(null, conf, true); + sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("a") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); } catch (Exception e) { e.printStackTrace(); @@ -112,8 +112,44 @@ public void testSessionPoolGetInOrder() { } } + @Test + public void testLlapSessionQueuing() { + try { + random = new Random(1000); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2); + poolManager = new TestTezSessionPoolManager(); + poolManager.setupPool(conf); + poolManager.startPool(); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + + List threadList = new ArrayList(); + for (int i = 0; i < 15; i++) { + Thread t = new Thread(new SessionThread(true)); + threadList.add(t); + t.start(); + } + + for (Thread t : threadList) { + try { + t.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + } + } + public class SessionThread implements Runnable { + private boolean llap = false; + + public SessionThread(boolean llap) { + this.llap = llap; + } + @Override public void run() { try { @@ -124,9 +160,9 @@ public void run() { tmpConf.set("tez.queue.name", ""); } - TezSessionState session = poolManager.getSession(null, tmpConf, true); + TezSessionState session = poolManager.getSession(null, tmpConf, true, llap); Thread.sleep((random.nextInt(9) % 10) * 1000); - poolManager.returnSession(session); + poolManager.returnSession(session, llap); } catch (Exception e) { e.printStackTrace(); } @@ -150,7 +186,8 @@ public void testReturn() { } List threadList = new ArrayList(); for (int i = 0; i < 15; i++) { - Thread t = new Thread(new SessionThread()); + Thread t = new Thread(new SessionThread(false)); + threadList.add(t); t.start(); }