diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index cf24247..d30d6c1 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -59,7 +59,7 @@ protected static MiniHS2 miniHS2 = null; protected static String dataFileDir; static Path kvDataFilePath; - private static String tableName = "testtab1"; + protected static String tableName = "testtab1"; protected static HiveConf conf = null; protected Connection hs2Conn = null; @@ -324,7 +324,7 @@ private void createSleepUDF() throws SQLException { stmt.close(); } - private void runQueryWithTrigger(final String query, final List setCmds, + protected void runQueryWithTrigger(final String query, final List setCmds, final String expect) throws Exception { diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java index fdb660a..3d30146 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java @@ -16,6 +16,8 @@ package org.apache.hive.jdbc; +import static org.mockito.Mockito.spy; + import java.io.File; import java.net.URL; import java.util.HashMap; @@ -27,10 +29,16 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.Expression; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManager { @@ -67,6 +75,19 @@ public static void beforeTest() throws Exception { miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); } + @Test(timeout = 60000) + public void testTriggerSlowQueryPriorityKill() throws Exception { + Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); + Trigger killTrigger = new ExecutionTrigger("slow_query_kill", expression, Trigger.Action.KILL_QUERY); + Trigger moveTrigger = new ExecutionTrigger("slow_query_move", expression, + Trigger.Action.MOVE_TO_POOL.setPoolName("r2")); + setupTriggers(Lists.newArrayList(killTrigger, moveTrigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + // TODO: Make sure slow_query_kill gets invoked after HIVE-17888 + runQueryWithTrigger(query, null, "Query was cancelled"); + } + @Override protected void setupTriggers(final List triggers) throws Exception { WorkloadManager wm = WorkloadManager.getInstance(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java new file mode 100644 index 0000000..000513e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PerPoolTriggerValidatorRunnable implements Runnable { + protected static transient Logger LOG = LoggerFactory.getLogger(PerPoolTriggerValidatorRunnable.class); + private final WorkloadManager wm; + private final TriggerActionHandler triggerActionHandler; + private final Map poolValidators; + private final long triggerValidationIntervalMs; + + PerPoolTriggerValidatorRunnable(final WorkloadManager wm, final TriggerActionHandler triggerActionHandler, + final long triggerValidationIntervalMs) { + this.wm = wm; + this.triggerActionHandler = triggerActionHandler; + this.poolValidators = new HashMap<>(); + this.triggerValidationIntervalMs = triggerValidationIntervalMs; + } + + @Override + public void run() { + try { + Map poolStates = wm.getAllSessionTriggerProviders(); + ScheduledExecutorService validatorExecutorService = Executors.newScheduledThreadPool(poolStates.size()); + for (Map.Entry entry : poolStates.entrySet()) { + String poolName = entry.getKey(); + if (!poolValidators.containsKey(poolName)) { + LOG.info("Creating trigger validator for pool: {}", poolName); + TriggerValidatorRunnable poolValidator = new TriggerValidatorRunnable(entry.getValue(), triggerActionHandler); + validatorExecutorService.scheduleWithFixedDelay(poolValidator, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + poolValidators.put(poolName, poolValidator); + } + } + } catch (Throwable t) { + // if exception is thrown in scheduled tasks, no further tasks will be scheduled, hence this ugly catch + LOG.warn(PerPoolTriggerValidatorRunnable.class.getSimpleName() + " caught exception.", t); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 9b4714f..4baad4f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -129,13 +130,14 @@ public void setupPool(HiveConf conf) throws Exception { numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); llapQueue = new Semaphore(numConcurrentLlapQueries, true); - sessionTriggerProvider = new SessionTriggerProvider(); + long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + sessionTriggerProvider = new SessionTriggerProvider(new ArrayList<>(), new ArrayList<>()); triggerActionHandler = new KillTriggerActionHandler(); - // TODO: update runnable to handle per pool validation - triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); + triggerValidatorRunnable = new TriggerValidatorRunnable(sessionTriggerProvider, triggerActionHandler); Hive db = Hive.get(conf); globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db); - startTriggerValidator(conf); + startTriggerValidator(triggerValidationIntervalMs); String queueAllowedStr = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); @@ -345,16 +347,6 @@ public void destroy(TezSessionState tezSessionState) throws Exception { } @Override - SessionTriggerProvider getSessionTriggerProvider() { - return sessionTriggerProvider; - } - - @Override - TriggerActionHandler getTriggerActionHandler() { - return triggerActionHandler; - } - - @Override TriggerValidatorRunnable getTriggerValidatorRunnable() { return triggerValidatorRunnable; } @@ -488,8 +480,8 @@ public void registerOpenSession(TezSessionPoolSession session) { private void updateSessionsTriggers() { if (sessionTriggerProvider != null && globalTriggersFetcher != null) { - sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions)); - sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch())); + sessionTriggerProvider = new SessionTriggerProvider(Collections.unmodifiableList(openSessions), Collections + .unmodifiableList(globalTriggersFetcher.fetch())); } } @@ -520,7 +512,7 @@ public void setGlobalTriggersFetcher(MetastoreGlobalTriggersFetcher metastoreGlo public List getTriggerCounterNames() { List counterNames = new ArrayList<>(); if (sessionTriggerProvider != null) { - List activeTriggers = sessionTriggerProvider.getActiveTriggers(); + List activeTriggers = sessionTriggerProvider.getTriggers(); for (Trigger trigger : activeTriggers) { counterNames.add(trigger.getExpression().getCounterLimit().getName()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 6135223..7f484cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -67,18 +67,12 @@ TezSessionState reopen(TezSessionState session, Configuration conf, } public static abstract class AbstractTriggerValidator { - abstract SessionTriggerProvider getSessionTriggerProvider(); + abstract Runnable getTriggerValidatorRunnable(); - abstract TriggerActionHandler getTriggerActionHandler(); - - abstract TriggerValidatorRunnable getTriggerValidatorRunnable(); - - public void startTriggerValidator(Configuration conf) { - long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + public void startTriggerValidator(long triggerValidationIntervalMs) { final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); - TriggerValidatorRunnable triggerValidatorRunnable = getTriggerValidatorRunnable(); + Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, triggerValidationIntervalMs, TimeUnit.MILLISECONDS); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index a1b7cfb..e919b50 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -162,7 +162,7 @@ public int execute(DriverContext driverContext) { // TODO: in future, we may also pass getUserIpAddress. // Note: for now this will just block to wait for a session based on parallelism. session = wm.getSession(session, ss.getUserName(), conf); - desiredCounters.addAll(wm.getTriggerCounterNames()); + desiredCounters.addAll(wm.getTriggerCounterNames(session)); } else { TezSessionPoolManager pm = TezSessionPoolManager.getInstance(); session = pm.getSession(session, conf, false, getWork().getLlapMode()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index b0378eb..5f2ed07 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -42,8 +42,8 @@ public void run() { try { Map violatedSessions = new HashMap<>(); - final List sessions = sessionTriggerProvider.getOpenSessions(); - final List triggers = sessionTriggerProvider.getActiveTriggers(); + final List sessions = sessionTriggerProvider.getSessions(); + final List triggers = sessionTriggerProvider.getTriggers(); for (TezSessionState s : sessions) { TriggerContext triggerContext = s.getTriggerContext(); if (triggerContext != null && !triggerContext.isQueryCompleted()) { @@ -55,9 +55,23 @@ public void run() { long currentCounterValue = currentCounters.get(desiredCounter); if (t.apply(currentCounterValue)) { String queryId = s.getTriggerContext().getQueryId(); - LOG.info("Query {} violated trigger {}. Current counter value: {}. Going to apply action {}", queryId, - t, currentCounterValue, t.getAction()); - violatedSessions.put(s, t.getAction()); + Trigger.Action currentAction = t.getAction(); + if (violatedSessions.containsKey(s)) { + // session already has a violation + Trigger.Action existingAction = violatedSessions.get(s); + // KILL always takes priority over MOVE + if (existingAction.name().equals(Trigger.Action.MOVE_TO_POOL.name()) && + currentAction.name().equals(Trigger.Action.KILL_QUERY.name())) { + violatedSessions.put(s, currentAction); + LOG.info("Query {} violated trigger {}. Going to apply action {} over {}", + queryId, t, currentAction, existingAction); + } + // if multiple MOVE happens, we will just pick the first move, so nothing to be done + } else { + // first violation for the session + violatedSessions.put(s, currentAction); + LOG.info("Query {} violated trigger {}. Going to apply action {}", queryId, t, currentAction); + } } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java index ce9147f..bd90e73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java @@ -26,13 +26,18 @@ public class TriggerViolationActionHandler implements TriggerActionHandler { private static final Logger LOG = LoggerFactory.getLogger(TriggerViolationActionHandler.class); + private final WorkloadManager wm; + public TriggerViolationActionHandler(final WorkloadManager wm) { + this.wm = wm; + } @Override public void applyAction(final Map queriesViolated) { + TezSessionState sessionState; for (Map.Entry entry : queriesViolated.entrySet()) { switch (entry.getValue()) { case KILL_QUERY: - TezSessionState sessionState = entry.getKey(); + sessionState = entry.getKey(); String queryId = sessionState.getTriggerContext().getQueryId(); try { sessionState.getKillQuery().killQuery(queryId); @@ -40,6 +45,22 @@ public void applyAction(final Map queriesViolat LOG.warn("Unable to kill query {} for trigger violation"); } break; + case MOVE_TO_POOL: + sessionState = entry.getKey(); + if (sessionState instanceof WmTezSession) { + WmTezSession wmTezSession = (WmTezSession) sessionState; + String destPoolName = entry.getValue().getPoolName(); + String srcPoolName = wmTezSession.getPoolName(); + if (!srcPoolName.equalsIgnoreCase(destPoolName)) { + LOG.warn("Moving session {} from {} pool to {} pool", wmTezSession.getSessionId(), srcPoolName, destPoolName); + // FIXME: update after HIVE-17841 + wm.redistributePoolAllocations(destPoolName, wmTezSession, null, false); + } else { + LOG.warn("Ignoring moving session {} to same pool. srcPoolName: {} destPoolName: {}", + wmTezSession.getSessionId(), srcPoolName, destPoolName); + } + } + break; default: throw new RuntimeException("Unsupported action: " + entry.getValue()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 35e5710..29b784f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -20,16 +20,17 @@ import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.IdentityHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -77,9 +78,26 @@ private int internalPoolsVersion; private UserPoolMapping userPoolMapping; - private SessionTriggerProvider sessionTriggerProvider; - private TriggerActionHandler triggerActionHandler; - private TriggerValidatorRunnable triggerValidatorRunnable; + private PerPoolTriggerValidatorRunnable triggerValidatorRunnable; + + public List getTriggerCounterNames(final TezSessionState session) { + if (session instanceof WmTezSession) { + WmTezSession wmTezSession = (WmTezSession) session; + String poolName = wmTezSession.getPoolName(); + PoolState poolState = pools.get(poolName); + if (poolState != null) { + List counterNames = new ArrayList<>(); + List triggers = poolState.getTriggers(); + if (triggers != null) { + for (Trigger trigger : triggers) { + counterNames.add(trigger.getExpression().getCounterLimit().getName()); + } + } + return counterNames; + } + } + return null; + } public static class PoolState { // Add stuff here as WM is implemented. @@ -93,12 +111,13 @@ private final int queryParallelism; private List triggers = new ArrayList<>(); - public PoolState(String fullName, int queryParallelism, double fraction) { + public PoolState(String fullName, int queryParallelism, double fraction, List triggers) { this.fullName = fullName; this.queryParallelism = queryParallelism; // A fair semaphore to ensure correct queue order. this.sessionsClaimed = new Semaphore(queryParallelism, true); this.finalFraction = this.finalFractionRemaining = fraction; + this.triggers = triggers; } @Override @@ -106,17 +125,20 @@ public String toString() { return "[" + fullName + ", query parallelism " + queryParallelism + ", fraction of the cluster " + finalFraction + ", fraction used by child pools " + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size() - + "]"; + + ", triggers " + triggers + "]"; } @VisibleForTesting - // will change in HIVE-17809 public void setTriggers(final List triggers) { this.triggers = triggers; } + public List getSessions() { + // also casts WmTezSession to TezSessionState + return Collections.unmodifiableList(sessions); + } public List getTriggers() { - return triggers; + return Collections.unmodifiableList(triggers); } } @@ -172,11 +194,12 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, TmpResourc for (int i = 0; i < numSessions; i++) { sessions.addInitialSession(createSession()); } - // TODO: add support for per pool action handler and triggers fetcher (+atomic update to active triggers) - sessionTriggerProvider = new SessionTriggerProvider(); - triggerActionHandler = new TriggerViolationActionHandler(); - triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); - startTriggerValidator(conf); + final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + TriggerActionHandler triggerActionHandler = new TriggerViolationActionHandler(this); + triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(this, triggerActionHandler, + triggerValidationIntervalMs); + startTriggerValidator(triggerValidationIntervalMs); } private int initializeHivePools(TmpResourcePlan plan) { @@ -208,7 +231,7 @@ private int addHivePool(TmpHivePool pool, PoolState parent) { fraction = parent.finalFraction * pool.getResourceFraction(); parent.finalFractionRemaining -= fraction; } - PoolState state = new PoolState(fullName, totalQueryParallelism, fraction); + PoolState state = new PoolState(fullName, totalQueryParallelism, fraction, pool.getTriggers()); if (pool.getChildren() != null) { for (TmpHivePool child : pool.getChildren()) { totalQueryParallelism += addHivePool(child, state); @@ -338,17 +361,17 @@ protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception { return true; } - private void redistributePoolAllocations( - String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove, + void redistributePoolAllocations( + String destPoolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove, boolean releaseParallelism) { List sessionsToUpdate = null; double totalAlloc = 0; - assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName()); - assert sessionToRemove == null || poolName.equals(sessionToRemove.getPoolName()); + assert sessionToAdd == null || destPoolName.equals(sessionToAdd.getPoolName()); + assert sessionToRemove == null || destPoolName.equals(sessionToRemove.getPoolName()); poolsLock.readLock().lock(); boolean hasRemoveFailed = false; try { - PoolState pool = pools.get(poolName); + PoolState pool = pools.get(destPoolName); synchronized (pool.lock) { // This should be a 2nd order fn but it's too much pain in Java for one LOC. if (sessionToAdd != null) { @@ -376,7 +399,6 @@ private void redistributePoolAllocations( poolsLock.readLock().unlock(); } allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate); - updateSessionsTriggers(); if (hasRemoveFailed) { throw new AssertionError("Cannot remove the session from the pool and release " + "the query slot; HS2 may fail to accept queries"); @@ -508,7 +530,6 @@ public void registerOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { openSessions.put(session, null); } - updateSessionsTriggers(); } /** Called by TezSessionPoolSession when closed. */ @@ -517,20 +538,6 @@ public void unregisterOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { openSessions.remove(session); } - updateSessionsTriggers(); - } - - private void updateSessionsTriggers() { - if (sessionTriggerProvider != null) { - List openSessions = new ArrayList<>(); - List activeTriggers = new ArrayList<>(); - for (PoolState poolState : pools.values()) { - activeTriggers.addAll(poolState.getTriggers()); - openSessions.addAll(poolState.sessions); - } - sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions)); - sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(activeTriggers)); - } } @VisibleForTesting @@ -574,17 +581,7 @@ int getNumSessions() { } @Override - SessionTriggerProvider getSessionTriggerProvider() { - return sessionTriggerProvider; - } - - @Override - TriggerActionHandler getTriggerActionHandler() { - return triggerActionHandler; - } - - @Override - TriggerValidatorRunnable getTriggerValidatorRunnable() { + Runnable getTriggerValidatorRunnable() { return triggerValidatorRunnable; } @@ -593,19 +590,27 @@ TriggerValidatorRunnable getTriggerValidatorRunnable() { return pools; } - protected final HiveConf getConf() { - return conf; - } - - public List getTriggerCounterNames() { - List activeTriggers = sessionTriggerProvider.getActiveTriggers(); - List counterNames = new ArrayList<>(); - for (Trigger trigger : activeTriggers) { - counterNames.add(trigger.getExpression().getCounterLimit().getName()); + public Map getAllSessionTriggerProviders() { + Map result = new HashMap<>(); + poolsLock.readLock().lock(); + try { + // TODO: After HIVE-17841, this will change (do this only if change happened) + for (Map.Entry entry : pools.entrySet()) { + PoolState poolState = entry.getValue(); + List triggers = Collections.unmodifiableList(poolState.getTriggers()); + List sessionStates = Collections.unmodifiableList(poolState.getSessions()); + SessionTriggerProvider sessionTriggerProvider = new SessionTriggerProvider(sessionStates, triggers); + result.put(entry.getKey(), sessionTriggerProvider); + } + } finally { + poolsLock.readLock().unlock(); } - return counterNames; + return result; } + protected final HiveConf getConf() { + return conf; + } // TODO: temporary until real WM schema is created. public static class TmpHivePool { @@ -613,13 +618,15 @@ protected final HiveConf getConf() { private final List children; private final int queryParallelism; private final double resourceFraction; + private final List triggers; - public TmpHivePool(String name, - List children, int queryParallelism, double resourceFraction) { + public TmpHivePool(String name, List children, int queryParallelism, + double resourceFraction, List triggers) { this.name = name; this.children = children; this.queryParallelism = queryParallelism; this.resourceFraction = resourceFraction; + this.triggers = triggers; } public String getName() { @@ -634,6 +641,7 @@ public int getQueryParallelism() { public double getResourceFraction() { return resourceFraction; } + public List getTriggers() {return triggers;} } public static enum TmpUserMappingType { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java index 408aa2d..f22b43c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java @@ -15,9 +15,6 @@ */ package org.apache.hadoop.hive.ql.wm; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; import java.util.List; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; @@ -26,31 +23,19 @@ * Implementation for providing current open sessions and active trigger. */ public class SessionTriggerProvider { - private List openSessions = new ArrayList<>(); - private List activeTriggers = new ArrayList<>(); - - public SessionTriggerProvider() { - - } + private final List sessions; + private final List triggers; public SessionTriggerProvider(final List openSessions, final List triggers) { - this.openSessions = openSessions; - this.activeTriggers = triggers; - } - - public void setOpenSessions(final List openSessions) { - this.openSessions = openSessions; - } - - public void setActiveTriggers(final List activeTriggers) { - this.activeTriggers = activeTriggers; + this.sessions = openSessions; + this.triggers = triggers; } - public List getOpenSessions() { - return Collections.unmodifiableList(openSessions); + public List getSessions() { + return sessions; } - public List getActiveTriggers() { - return Collections.unmodifiableList(activeTriggers); + public List getTriggers() { + return triggers; } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 258a865..44de9da 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -115,7 +115,7 @@ public WorkloadManagerForTest(String yarnQueue, HiveConf conf, private static TmpResourcePlan createDummyPlan(int numSessions) { return new TmpResourcePlan( - Lists.newArrayList(new TmpHivePool("llap", null, numSessions, 1.0f)), + Lists.newArrayList(new TmpHivePool("llap", null, numSessions, 1.0f, null)), Lists.newArrayList(new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 0))); } @@ -227,9 +227,9 @@ public void testClusterFractions() throws Exception { HiveConf conf = createConf(); MockQam qam = new MockQam(); List l2 = Lists.newArrayList( - new TmpHivePool("p1", null, 1, 0.5f), new TmpHivePool("p2", null, 2, 0.3f)); + new TmpHivePool("p1", null, 1, 0.5f, null), new TmpHivePool("p2", null, 2, 0.3f, null)); TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList( - new TmpHivePool("r1", l2, 1, 0.6f), new TmpHivePool("r2", null, 1, 0.4f)), + new TmpHivePool("r1", l2, 1, 0.6f, null), new TmpHivePool("r2", null, 1, 0.4f, null)), Lists.newArrayList(create("p1", "r1/p1"), create("p2", "r1/p2"), create("r1", "r1"), create("r2", "r2"))); WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); @@ -259,7 +259,7 @@ public void testQueueing() throws Exception { final HiveConf conf = createConf(); MockQam qam = new MockQam(); TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList( - new TmpHivePool("A", null, 2, 0.5f), new TmpHivePool("B", null, 2, 0.5f)), + new TmpHivePool("A", null, 2, 0.5f, null), new TmpHivePool("B", null, 2, 0.5f, null)), Lists.newArrayList(create("A", "A"), create("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); @@ -367,7 +367,7 @@ public void testReuseWithDifferentPool() throws Exception { final HiveConf conf = createConf(); MockQam qam = new MockQam(); TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList( - new TmpHivePool("A", null, 2, 0.6f), new TmpHivePool("B", null, 1, 0.4f)), + new TmpHivePool("A", null, 2, 0.6f, null), new TmpHivePool("B", null, 1, 0.4f, null)), Lists.newArrayList(create("A", "A"), create("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 2c4fe7f..448e818 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -187,7 +187,7 @@ public void run() { String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); if (wmQueue != null && !wmQueue.isEmpty()) { wm = WorkloadManager.create(wmQueue, hiveConf, new TmpResourcePlan( - Lists.newArrayList(new TmpHivePool("llap", null, 1, 1.0f)), + Lists.newArrayList(new TmpHivePool("llap", null, 1, 1.0f, null)), Lists.newArrayList(new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 0)))); } else { wm = null;