diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java index bcce3dc..bd04cee 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java @@ -16,17 +16,16 @@ package org.apache.hive.jdbc; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import java.util.List; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; -import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; import org.apache.hadoop.hive.ql.wm.Trigger; import org.junit.Test; @@ -56,8 +55,13 @@ public void testTriggerTotalTasks() throws Exception { @Override void setupTriggers(final List triggers) throws Exception { - MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); - when(triggersFetcher.fetch()).thenReturn(triggers); - TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); + WMPool pool = new WMPool("rp", "tez"); + pool.setQueryParallelism(1); + WMFullResourcePlan rp = new WMFullResourcePlan( + new WMResourcePlan("rp"), null); + for (Trigger trigger : triggers) { + rp.addToTriggers(wmTriggerFromTrigger(trigger)); + } + TezSessionPoolManager.getInstance().updateTriggers(rp); } } \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index b377275..a73c66e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -16,18 +16,17 @@ package org.apache.hive.jdbc; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; -import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; import org.apache.hadoop.hive.ql.wm.Trigger; import org.junit.Test; @@ -245,8 +244,13 @@ public void testMultipleTriggers2() throws Exception { @Override protected void setupTriggers(final List triggers) throws Exception { - MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); - when(triggersFetcher.fetch()).thenReturn(triggers); - TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); + WMPool pool = new WMPool("rp", "tez"); + pool.setQueryParallelism(1); + WMFullResourcePlan rp = new WMFullResourcePlan( + new WMResourcePlan("rp"), null); + for (Trigger trigger : triggers) { + rp.addToTriggers(wmTriggerFromTrigger(trigger)); + } + TezSessionPoolManager.getInstance().updateTriggers(rp); } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index e7af5e0..c2e565c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -114,6 +114,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo; import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer; @@ -713,47 +714,55 @@ private int alterResourcePlan(Hive db, AlterResourcePlanDesc desc) throws HiveEx resourcePlan.setDefaultPoolPath(desc.getDefaultPoolPath()); } - boolean isActivate = false, isInTest = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST); + final String wmQueue = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); + final boolean wmEnabled = wmQueue != null && !wmQueue.isEmpty(); + boolean isActivate = false; WorkloadManager wm = null; + TezSessionPoolManager pm = null; if (desc.getStatus() != null) { resourcePlan.setStatus(desc.getStatus()); isActivate = desc.getStatus() == WMResourcePlanStatus.ACTIVE; if (isActivate) { - wm = WorkloadManager.getInstance(); - if (wm == null && !isInTest) { - throw new HiveException("Resource plan can only be activated when WM is enabled"); + if (wmEnabled) { + wm = WorkloadManager.getInstance(); + } else { + pm = TezSessionPoolManager.getInstance(); } } } WMFullResourcePlan appliedRp = db.alterResourcePlan( desc.getRpName(), resourcePlan, desc.isEnableActivate()); - if (!isActivate || (wm == null && isInTest)) return 0; - assert wm != null; + if (!isActivate || (wm == null && pm == null)) return 0; if (appliedRp == null) { throw new HiveException("Cannot get a resource plan to apply"); // TODO: shut down HS2? } final String name = (desc.getNewName() != null) ? desc.getNewName() : desc.getRpName(); LOG.info("Activating a new resource plan " + name + ": " + appliedRp); - // Note: as per our current constraints, the behavior of two parallel activates is - // undefined; although only one will succeed and the other will receive exception. - // We need proper (semi-)transactional modifications to support this without hacks. - ListenableFuture future = wm.updateResourcePlanAsync(appliedRp); - boolean isOk = false; - try { - // Note: we may add an async option in future. For now, let the task fail for the user. - future.get(); - isOk = true; - LOG.info("Successfully activated resource plan " + name); - return 0; - } catch (InterruptedException | ExecutionException e) { - throw new HiveException(e); - } finally { - if (!isOk) { - LOG.error("Failed to activate resource plan " + name); - // TODO: shut down HS2? + if (wm != null) { + // Note: as per our current constraints, the behavior of two parallel activates is + // undefined; although only one will succeed and the other will receive exception. + // We need proper (semi-)transactional modifications to support this without hacks. + ListenableFuture future = wm.updateResourcePlanAsync(appliedRp); + boolean isOk = false; + try { + // Note: we may add an async option in future. For now, let the task fail for the user. + future.get(); + isOk = true; + LOG.info("Successfully activated resource plan " + name); + return 0; + } catch (InterruptedException | ExecutionException e) { + throw new HiveException(e); + } finally { + if (!isOk) { + LOG.error("Failed to activate resource plan " + name); + // TODO: shut down HS2? + } } + } else { + pm.updateTriggers(appliedRp); + return 0; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 4e48f15..692317f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -30,10 +30,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.MetastoreTriggersFetcher; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; @@ -83,13 +86,13 @@ /** This is used to close non-default sessions, and also all sessions when stopping. */ private final List openSessions = new LinkedList<>(); - private MetastoreGlobalTriggersFetcher globalTriggersFetcher; + private final List triggers = new LinkedList<>(); private SessionTriggerProvider sessionTriggerProvider; private TriggerActionHandler triggerActionHandler; private TriggerValidatorRunnable triggerValidatorRunnable; /** Note: this is not thread-safe. */ - public static TezSessionPoolManager getInstance() throws Exception { + public static TezSessionPoolManager getInstance() { TezSessionPoolManager local = instance; if (local == null) { instance = local = new TezSessionPoolManager(); @@ -183,15 +186,10 @@ public TezSessionPoolSession create(TezSessionPoolSession oldSession) { } public void initTriggers(final HiveConf conf) throws HiveException { - if (globalTriggersFetcher == null) { - Hive db = Hive.get(conf); - globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db); - } - if (triggerValidatorRunnable == null) { final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars .HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); - sessionTriggerProvider = new SessionTriggerProvider(openSessions, globalTriggersFetcher.fetch()); + sessionTriggerProvider = new SessionTriggerProvider(openSessions, triggers); triggerActionHandler = new KillTriggerActionHandler(); triggerValidatorRunnable = new TriggerValidatorRunnable(sessionTriggerProvider, triggerActionHandler); startTriggerValidator(triggerValidationIntervalMs); @@ -349,6 +347,10 @@ public void stop() throws Exception { expirationTracker.stop(); } + if (triggerValidatorRunnable != null) { + stopTriggerValidator(); + } + instance = null; } @@ -503,13 +505,23 @@ public void registerOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { openSessions.add(session); } - updateSessionsTriggers(); + updateSessions(); } - private void updateSessionsTriggers() { - if (sessionTriggerProvider != null && globalTriggersFetcher != null) { + private void updateSessions() { + if (sessionTriggerProvider != null) { sessionTriggerProvider.setSessions(Collections.unmodifiableList(openSessions)); - sessionTriggerProvider.setTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch())); + } + } + + public void updateTriggers(final WMFullResourcePlan appliedRp) { + if (sessionTriggerProvider != null) { + List wmTriggers = appliedRp.getTriggers(); + List triggers = new ArrayList<>(); + for (WMTrigger wmTrigger : wmTriggers) { + triggers.add(ExecutionTrigger.fromWMTrigger(wmTrigger)); + } + sessionTriggerProvider.setTriggers(Collections.unmodifiableList(triggers)); } } @@ -525,7 +537,7 @@ public void unregisterOpenSession(TezSessionPoolSession session) { if (defaultSessionPool != null) { defaultSessionPool.notifyClosed(session); } - updateSessionsTriggers(); + updateSessions(); } @VisibleForTesting @@ -534,13 +546,7 @@ public SessionExpirationTracker getExpirationTracker() { } - @VisibleForTesting - public void setGlobalTriggersFetcher(MetastoreGlobalTriggersFetcher metastoreGlobalTriggersFetcher) { - this.globalTriggersFetcher = metastoreGlobalTriggersFetcher; - updateSessionsTriggers(); - } - - public List getTriggerCounterNames() { + List getTriggerCounterNames() { List counterNames = new ArrayList<>(); if (sessionTriggerProvider != null) { List activeTriggers = sessionTriggerProvider.getTriggers(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 769b24a..b3ccd24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -73,14 +73,24 @@ TezSessionState reopen(TezSessionState session, Configuration conf, } public static abstract class AbstractTriggerValidator { + private ScheduledExecutorService scheduledExecutorService = null; abstract Runnable getTriggerValidatorRunnable(); - public void startTriggerValidator(long triggerValidationIntervalMs) { - final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); - Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); - scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, - triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + void startTriggerValidator(long triggerValidationIntervalMs) { + if (scheduledExecutorService == null) { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); + Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); + scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + } + } + + void stopTriggerValidator() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + scheduledExecutorService = null; + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 1fe5859..f5ee19a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -202,13 +202,6 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso wmThread = new Thread(() -> runWmThread(), "Workload management master"); wmThread.setDaemon(true); - - final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); - TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this); - triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler, - triggerValidationIntervalMs); - startTriggerValidator(triggerValidationIntervalMs); } private static int determineQueryParallelism(WMFullResourcePlan plan) { @@ -227,6 +220,13 @@ public void start() throws Exception { allocationManager.start(); wmThread.start(); initRpFuture.get(); // Wait for the initial resource plan to be applied. + + final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this); + triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler, + triggerValidationIntervalMs); + startTriggerValidator(triggerValidationIntervalMs); } public void stop() throws Exception { @@ -247,6 +247,9 @@ public void stop() throws Exception { workPool.shutdownNow(); timeoutPool.shutdownNow(); + if (triggerValidatorRunnable != null) { + stopTriggerValidator(); + } INSTANCE = null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java deleted file mode 100644 index 87c007f..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.wm; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.metadata.Hive; - -/** - * Fetch global (non-llap) rules from metastore - */ -public class MetastoreGlobalTriggersFetcher { - private static final String GLOBAL_TRIGGER_NAME = "global"; - private Hive db; - - public MetastoreGlobalTriggersFetcher(final Hive db) { - this.db = db; - } - - public List fetch() { - // TODO: this entire class will go away, DDLTask will push RP to TezSessionPoolManager where triggers are available - return new ArrayList<>(); - } -} diff --git a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/WMFullResourcePlan.java b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/WMFullResourcePlan.java index 1d21908..00ce5e1 100644 --- a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/WMFullResourcePlan.java +++ b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/WMFullResourcePlan.java @@ -326,7 +326,7 @@ public void addToTriggers(WMTrigger elem) { } public List getTriggers() { - return this.triggers; + return this.triggers == null ? new ArrayList<>() : this.triggers; } public void setTriggers(List triggers) {