diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 25a8ff2..ecdcf12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -25,14 +25,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -140,9 +138,6 @@ private PerPoolTriggerValidatorRunnable triggerValidatorRunnable; private Map perPoolProviders = new ConcurrentHashMap<>(); - private SessionTriggerProvider sessionTriggerProvider; - private TriggerActionHandler triggerActionHandler; - // The master thread and various workers. /** The master thread the processes the events from EventState. */ @VisibleForTesting @@ -152,8 +147,6 @@ /** Used to schedule timeouts for some async operations. */ private final ScheduledExecutorService timeoutPool; - // The initial plan initalization future, to wait for the plan to apply during setup. - private ListenableFuture initRpFuture; private LlapPluginEndpointClientImpl amComm; private static final FutureCallback FATAL_ERROR_CALLBACK = new FutureCallback() { @@ -175,7 +168,8 @@ public static WorkloadManager getInstance() { } /** Called once, when HS2 initializes. */ - public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullResourcePlan plan) { + public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullResourcePlan plan) + throws ExecutionException, InterruptedException { assert INSTANCE == null; // We could derive the expected number of AMs to pass in. LlapPluginEndpointClientImpl amComm = new LlapPluginEndpointClientImpl(conf, null, -1); @@ -185,11 +179,10 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso @VisibleForTesting WorkloadManager(LlapPluginEndpointClientImpl amComm, String yarnQueue, HiveConf conf, - QueryAllocationManager qam, WMFullResourcePlan plan) { + QueryAllocationManager qam, WMFullResourcePlan plan) throws ExecutionException, InterruptedException { this.yarnQueue = yarnQueue; this.conf = conf; this.totalQueryParallelism = determineQueryParallelism(plan); - this.initRpFuture = this.updateResourcePlanAsync(plan); this.allocationManager = qam; this.allocationManager.setClusterChangedCallback(() -> notifyOfClusterStateChange()); @@ -215,6 +208,9 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso wmThread = new Thread(() -> runWmThread(), "Workload management master"); wmThread.setDaemon(true); + wmThread.start(); + + updateResourcePlanAsync(plan).get(); // Wait for the initial resource plan to be applied. } private static int determineQueryParallelism(WMFullResourcePlan plan) { @@ -234,8 +230,6 @@ public void start() throws Exception { amComm.start(); } allocationManager.start(); - wmThread.start(); - initRpFuture.get(); // Wait for the initial resource plan to be applied. final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); @@ -1842,10 +1836,15 @@ public String toString() { boolean isManaged(MappingInput input) { // This is always replaced atomically, so we don't care about concurrency here. - return userPoolMapping.mapSessionToPoolName(input) != null; + if (userPoolMapping != null) { + String mappedPool = userPoolMapping.mapSessionToPoolName(input); + LOG.info("Mapping input: {} mapped to pool: {}", input, mappedPool); + return true; + } + return false; } - private static enum KillQueryResult { + private enum KillQueryResult { OK, RESTART_REQUIRED, IN_PROGRESS diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java index 315a2dc..0a9fa72 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java @@ -51,6 +51,7 @@ public static TezSessionState getSession(TezSessionState session, HiveConf conf, desiredCounters.addAll(wm.getTriggerCounterNames(result)); return result; } catch (WorkloadManager.NoPoolMappingException ex) { + LOG.info("NoPoolMappingException thrown. Getting an un-managed session.."); return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index bceb31d..78df962 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -152,12 +152,12 @@ public static WMMapping mapping(String type, String user, String pool, int order public static class WorkloadManagerForTest extends WorkloadManager { public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, - QueryAllocationManager qam) { + QueryAllocationManager qam) throws ExecutionException, InterruptedException { super(null, yarnQueue, conf, qam, createDummyPlan(numSessions)); } public WorkloadManagerForTest(String yarnQueue, HiveConf conf, - QueryAllocationManager qam, WMFullResourcePlan plan) { + QueryAllocationManager qam, WMFullResourcePlan plan) throws ExecutionException, InterruptedException { super(null, yarnQueue, conf, qam, plan); } diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index c3afa19..717c4d6 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -27,6 +27,7 @@ import java.util.Properties; import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -206,7 +207,11 @@ public void run() { String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); if (wmQueue != null && !wmQueue.isEmpty()) { LOG.info("Initializing workload management"); - wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan); + try { + wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan); + } catch (ExecutionException | InterruptedException e) { + throw new ServiceException("Unable to instantiate Workload Manager", e); + } } tezSessionPoolManager.updateTriggers(resourcePlan); LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName());