diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0cc8de0..0b22201 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3042,6 +3042,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.tez.exec.print.summary", false, "Display breakdown of execution steps, for every query executed by the shell."), + TEZ_SESSION_EVENTS_SUMMARY( + "hive.tez.session.events.print.summary", + false, + "Display summary of all tez sessions related events"), TEZ_EXEC_INPLACE_PROGRESS( "hive.tez.exec.inplace.progress", true, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 97b52b0..b8af7f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hive.ql.parse.QB; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -150,18 +150,18 @@ */ private Map insertBranchToNamePrefix = new HashMap<>(); private Operation operation = Operation.OTHER; - private TriggerContext triggerContext; + private WMContext wmContext; public void setOperation(Operation operation) { this.operation = operation; } - public TriggerContext getTriggerContext() { - return triggerContext; + public WMContext getWmContext() { + return wmContext; } - public void setTriggerContext(final TriggerContext triggerContext) { - this.triggerContext = triggerContext; + public void setWmContext(final WMContext wmContext) { + this.wmContext = wmContext; } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 389a1a6..5e8063b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -113,7 +113,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; @@ -732,8 +732,8 @@ private void setTriggerContext(final String queryId) { } else { queryStartTime = queryDisplay.getQueryStartTime(); } - TriggerContext triggerContext = new TriggerContext(queryStartTime, queryId); - ctx.setTriggerContext(triggerContext); + WMContext wmContext = new WMContext(queryStartTime, queryId); + ctx.setWmContext(wmContext); } private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java index 132bec6..457e69d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java @@ -18,8 +18,11 @@ package org.apache.hadoop.hive.ql; import com.google.common.collect.ImmutableMap; + +import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskResult; +import org.apache.hadoop.hive.ql.exec.tez.WMEvent; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.Serializable; @@ -46,6 +49,7 @@ private final Map> hmsTimingMap = new HashMap>(); private final Map> perfLogStartMap = new HashMap>(); private final Map> perfLogEndMap = new HashMap>(); + private final Map wmEventMap = new HashMap<>(); private final LinkedHashMap tasks = new LinkedHashMap(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java index 0509cbc..4b35617 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java @@ -18,18 +18,23 @@ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hive.common.util.Ref; - -import java.util.concurrent.TimeoutException; - import org.apache.hadoop.security.token.Token; +import org.apache.hive.common.util.Ref; import org.apache.tez.common.security.JobTokenIdentifier; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.annotate.JsonSerialize; +@JsonSerialize public interface AmPluginNode { - public static class AmPluginInfo { + class AmPluginInfo { + @JsonProperty("amPluginPort") public final int amPluginPort; + @JsonIgnore public final Token amPluginToken; + @JsonProperty("amPluginTokenJobId") public final String amPluginTokenJobId; + @JsonProperty("amHost") public final String amHost; AmPluginInfo(String amHost, int amPluginPort, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java index 94b189b..03e1d43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java @@ -43,7 +43,7 @@ public void applyAction(final Map queriesViolated) { switch (entry.getValue().getAction().getType()) { case KILL_QUERY: sessionState = entry.getKey(); - String queryId = sessionState.getTriggerContext().getQueryId(); + String queryId = sessionState.getWmContext().getQueryId(); try { sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg()); } catch (HiveException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index 8c60b6f..15e054b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -37,7 +37,7 @@ public void applyAction(final Map queriesViolated) { switch (entry.getValue().getAction().getType()) { case KILL_QUERY: TezSessionState sessionState = entry.getKey(); - String queryId = sessionState.getTriggerContext().getQueryId(); + String queryId = sessionState.getWmContext().getQueryId(); try { KillQuery killQuery = sessionState.getKillQuery(); // if kill query is null then session might have been released to pool or closed already diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 8417ebb..c16098d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.dag.api.TezConfiguration; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 7a7fe15..2f9695f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -60,7 +60,7 @@ import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.security.Credentials; @@ -83,6 +83,9 @@ import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.annotate.JsonSerialize; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; @@ -90,6 +93,7 @@ /** * Holds session state related to Tez */ +@JsonSerialize public class TezSessionState { protected static final Logger LOG = LoggerFactory.getLogger(TezSessionState.class.getName()); @@ -99,26 +103,37 @@ private static final String LLAP_LAUNCHER = LlapContainerLauncher.class.getName(); private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName(); + @JsonIgnore private final HiveConf conf; + @JsonIgnore private Path tezScratchDir; + @JsonIgnore private LocalResource appJarLr; + @JsonIgnore private TezClient session; + @JsonIgnore private Future sessionFuture; + @JsonIgnore /** Console used for user feedback during async session opening. */ private LogHelper console; + @JsonProperty("sessionId") private String sessionId; private final DagUtils utils; + @JsonProperty("queueName") private String queueName; + @JsonProperty("defaultQueue") private boolean defaultQueue = false; + @JsonProperty("user") private String user; private AtomicReference ownerThread = new AtomicReference<>(null); private final Set additionalFilesNotFromConf = new HashSet(); private final Set localizedResources = new HashSet(); + @JsonProperty("doAsEnabled") private boolean doAsEnabled; private boolean isLegacyLlapMode; - private TriggerContext triggerContext; + private WMContext wmContext; private KillQuery killQuery; /** @@ -831,12 +846,12 @@ public void destroy() throws Exception { TezSessionPoolManager.getInstance().destroy(this); } - public TriggerContext getTriggerContext() { - return triggerContext; + public WMContext getWmContext() { + return wmContext; } - public void setTriggerContext(final TriggerContext triggerContext) { - this.triggerContext = triggerContext; + public void setWmContext(final WMContext wmContext) { + this.wmContext = wmContext; } public void setKillQuery(final KillQuery killQuery) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 8087b01..2d3fa03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.QueryInfo; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; @@ -63,7 +62,7 @@ import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -146,8 +145,8 @@ public int execute(DriverContext driverContext) { // some DDL task that directly executes a TezTask does not setup Context and hence TriggerContext. // Setting queryId is messed up. Some DDL tasks have executionId instead of proper queryId. String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); - TriggerContext triggerContext = new TriggerContext(System.currentTimeMillis(), queryId); - ctx.setTriggerContext(triggerContext); + WMContext wmContext = new WMContext(System.currentTimeMillis(), queryId); + ctx.setWmContext(wmContext); } // Need to remove this static hack. But this is the way currently to get a session. @@ -166,13 +165,13 @@ public int execute(DriverContext driverContext) { MappingInput mi = (userName == null) ? new MappingInput("anonymous", null) : new MappingInput(ss.getUserName(), UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups()); + WMContext wmContext = ctx.getWmContext(); session = WorkloadManagerFederation.getSession( - session, conf, mi, getWork().getLlapMode(), desiredCounters); + session, conf, mi, getWork().getLlapMode(), desiredCounters, wmContext); - TriggerContext triggerContext = ctx.getTriggerContext(); - triggerContext.setDesiredCounters(desiredCounters); - session.setTriggerContext(triggerContext); - LOG.info("Subscribed to counters: {} for queryId: {}", desiredCounters, triggerContext.getQueryId()); + wmContext.setDesiredCounters(desiredCounters); + session.setWmContext(wmContext); + LOG.info("Subscribed to counters: {} for queryId: {}", desiredCounters, wmContext.getQueryId()); ss.setTezSession(session); try { // jobConf will hold all the configuration for hadoop, tez, and hive diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index 5821659..4cbf39d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,17 +46,17 @@ public void run() { final List sessions = sessionTriggerProvider.getSessions(); final List triggers = sessionTriggerProvider.getTriggers(); for (TezSessionState sessionState : sessions) { - TriggerContext triggerContext = sessionState.getTriggerContext(); - if (triggerContext != null && !triggerContext.isQueryCompleted() - && !triggerContext.getCurrentCounters().isEmpty()) { - Map currentCounters = triggerContext.getCurrentCounters(); + WMContext wmContext = sessionState.getWmContext(); + if (wmContext != null && !wmContext.isQueryCompleted() + && !wmContext.getCurrentCounters().isEmpty()) { + Map currentCounters = wmContext.getCurrentCounters(); for (Trigger currentTrigger : triggers) { String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName(); // there could be interval where desired counter value is not populated by the time we make this check if (currentCounters.containsKey(desiredCounter)) { long currentCounterValue = currentCounters.get(desiredCounter); if (currentTrigger.apply(currentCounterValue)) { - String queryId = sessionState.getTriggerContext().getQueryId(); + String queryId = sessionState.getWmContext().getQueryId(); if (violatedSessions.containsKey(sessionState)) { // session already has a violation Trigger existingTrigger = violatedSessions.get(sessionState); @@ -84,7 +84,7 @@ public void run() { Trigger chosenTrigger = violatedSessions.get(sessionState); if (chosenTrigger != null) { - LOG.info("Query: {}. {}. Applying action.", sessionState.getTriggerContext().getQueryId(), + LOG.info("Query: {}. {}. Applying action.", sessionState.getWmContext().getQueryId(), chosenTrigger.getViolationMsg()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WMEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WMEvent.java new file mode 100644 index 0000000..2a0d2f5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WMEvent.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * Workload Manager events at query level. + */ +public class WMEvent { + enum EventType { + GET, + DESTROY, + RESTART, + RETURN, + MOVE + } + + // snapshot of subset of wm tez session info for printing in events summary + public static class WmTezSessionInfo { + private final String poolName; + private final String sessionId; + private final double clusterPercent; + + WmTezSessionInfo(WmTezSession wmTezSession) { + this.poolName = wmTezSession.getPoolName(); + this.sessionId = wmTezSession.getSessionId(); + this.clusterPercent = wmTezSession.getClusterFraction() * 100.0; + } + + public String getPoolName() { + return poolName; + } + + public String getSessionId() { + return sessionId; + } + + public double getClusterPercent() { + return clusterPercent; + } + } + + @JsonProperty("wmTezSessionInfo") + private final WmTezSessionInfo wmTezSessionInfo; + @JsonProperty("eventTimestamp") + private long eventTimestamp; + @JsonProperty("eventType") + private final EventType eventType; + + WMEvent(final WmTezSession sessionState, final EventType eventType) { + this.wmTezSessionInfo = new WmTezSessionInfo(sessionState); + this.eventTimestamp = System.currentTimeMillis(); + this.eventType = eventType; + } + + public long getEventTimestamp() { + return eventTimestamp; + } + + public EventType getEventType() { + return eventType; + } + + public WmTezSessionInfo getWmTezSessionInfo() { + return wmTezSessionInfo; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index 96d70c9..1507bb1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -27,21 +27,34 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.hive.common.util.Ref; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.annotate.JsonSerialize; +@JsonSerialize public class WmTezSession extends TezSessionPoolSession implements AmPluginNode { + @JsonProperty("poolName") private String poolName; + @JsonProperty("clusterFraction") private double clusterFraction; /** * The reason to kill an AM. Note that this is for the entire session, not just for a query. * Once set, this can never be unset because you can only kill the session once. */ + @JsonProperty("killReason") private String killReason = null; + @JsonIgnore private final Object amPluginInfoLock = new Object(); + @JsonProperty("amPluginInfo") private AmPluginInfo amPluginInfo = null; - private Integer amPluginendpointVersion = null; + @JsonProperty("amPluginEndpointVersion") + private Integer amPluginEndpointVersion = null; + @JsonIgnore private SettableFuture amRegistryFuture = null; + @JsonIgnore private ScheduledFuture timeoutTimer = null; + @JsonProperty("queryId") private String queryId; private final WorkloadManager wmParent; @@ -99,12 +112,12 @@ void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { synchronized (amPluginInfoLock) { // Ignore the outdated updates; for the same version, ignore non-null updates because // we assume that removal is the last thing that happens for any given version. - if ((amPluginendpointVersion != null) && ((amPluginendpointVersion > ephSeqVersion) - || (amPluginendpointVersion == ephSeqVersion && info != null))) { + if ((amPluginEndpointVersion != null) && ((amPluginEndpointVersion > ephSeqVersion) + || (amPluginEndpointVersion == ephSeqVersion && info != null))) { LOG.info("Ignoring an outdated info update {}: {}", ephSeqVersion, si); return; } - this.amPluginendpointVersion = ephSeqVersion; + this.amPluginEndpointVersion = ephSeqVersion; this.amPluginInfo = info; if (info != null) { // Only update someone waiting for info if we have the info. @@ -123,7 +136,7 @@ void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { @Override public AmPluginInfo getAmPluginInfo(Ref version) { synchronized (amPluginInfoLock) { - version.value = amPluginendpointVersion; + version.value = amPluginEndpointVersion; return amPluginInfo; } } @@ -132,7 +145,7 @@ void setPoolName(String poolName) { this.poolName = poolName; } - String getPoolName() { + public String getPoolName() { return poolName; } @@ -146,7 +159,7 @@ void clearWm() { this.queryId = null; } - double getClusterFraction() { + public double getClusterFraction() { return this.clusterFraction; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index a8360bd..c6b3a5e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -18,20 +18,7 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import com.google.common.collect.Lists; - -import java.util.concurrent.ExecutionException; - -import java.util.Collection; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -45,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -71,11 +59,24 @@ import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hive.common.util.Ref; import org.apache.tez.dag.api.TezConfiguration; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** Workload management entry point for HS2. * Note on how this class operates. @@ -95,6 +96,7 @@ private static final char POOL_SEPARATOR = '.'; private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR; + private final ObjectMapper objectMapper; // Various final services, configs, etc. private final HiveConf conf; private final TezSessionPool tezAmPool; @@ -115,6 +117,7 @@ private Map pools; private String rpName, defaultPool; // For information only. private int totalQueryParallelism; + private WMFullResourcePlan initialRP; /** * The queries being killed. This is used to sync between the background kill finishing and the * query finishing and user returning the sessions, which can happen in separate iterations @@ -190,7 +193,7 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso this.yarnQueue = yarnQueue; this.conf = conf; this.totalQueryParallelism = determineQueryParallelism(plan); - this.initRpFuture = this.updateResourcePlanAsync(plan); + this.initialRP = plan; this.amComm = amComm; if (this.amComm != null) { this.amComm.init(conf); @@ -214,6 +217,13 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso wmThread = new Thread(() -> runWmThread(), "Workload management master"); wmThread.setDaemon(true); + + objectMapper = new ObjectMapper(); + objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); + // serialize json based on field annotations only + objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker() + .withGetterVisibility(JsonAutoDetect.Visibility.NONE) + .withSetterVisibility(JsonAutoDetect.Visibility.NONE)); } private static int determineQueryParallelism(WMFullResourcePlan plan) { @@ -234,6 +244,9 @@ public void start() throws Exception { } allocationManager.start(); wmThread.start(); + + // apply initial RP after starting WM thread for proper initialization of user pool mappings + initRpFuture = updateResourcePlanAsync(initialRP); initRpFuture.get(); // Wait for the initial resource plan to be applied. final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, @@ -425,6 +438,7 @@ private void scheduleWork(WmThreadSyncWork context) { try { // Note: sessions in toRestart are always in use, so they cannot expire in parallel. tezAmPool.replaceSession(toRestart, false, null); + addWMEvent(toRestart.getWmContext(), toRestart, WMEvent.EventType.RESTART); } catch (Exception ex) { LOG.error("Failed to restart an old session; ignoring " + ex.getMessage()); } @@ -438,6 +452,7 @@ private void scheduleWork(WmThreadSyncWork context) { workPool.submit(() -> { try { toDestroy.close(false); + addWMEvent(toDestroy.getWmContext(), toDestroy, WMEvent.EventType.DESTROY); } catch (Exception ex) { LOG.error("Failed to close an old session; ignoring " + ex.getMessage()); } @@ -517,6 +532,7 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw if (!wasReturned) { syncWork.toDestroyNoRestart.add(sessionToReturn); } + addWMEvent(sessionToReturn.getWmContext(), sessionToReturn, WMEvent.EventType.RETURN); break; case NOT_FOUND: syncWork.toRestartInUse.add(sessionToReturn); // Restart if there's an internal error. @@ -563,8 +579,9 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw // We could consider delaying the move (when destination capacity is full) until there is claim in src pool. // May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long // as possible + List pendingMoveEvents = new ArrayList<>(); for (MoveSession moveSession : e.moveSessions) { - handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute); + handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, pendingMoveEvents); } e.moveSessions.clear(); @@ -631,7 +648,14 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw } } - // 13. Notify tests and global async ops. + // 13. Handle pending WM move events. This is handled here as cluster fraction updates happen at step 11 + // and move happens earlier + for (WmTezSession wmTezSession : pendingMoveEvents) { + WMContext wmContext = wmTezSession.getWmContext(); + addWMEvent(wmContext, wmTezSession, WMEvent.EventType.MOVE); + } + + // 14. Notify tests and global async ops. if (e.dumpStateFuture != null) { List result = new ArrayList<>(); result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool); @@ -678,7 +702,8 @@ private void dumpPoolState(PoolState ps, List set) { private void handleMoveSessionOnMasterThread(final MoveSession moveSession, final WmThreadSyncWork syncWork, - final HashSet poolsToRedistribute) { + final HashSet poolsToRedistribute, + final List pendingMoveEvents) { String destPoolName = moveSession.destPool; LOG.info("Handling move session event: {}", moveSession); if (validMove(moveSession.srcSession, destPoolName)) { @@ -692,6 +717,7 @@ private void handleMoveSessionOnMasterThread(final MoveSession moveSession, Boolean added = checkAndAddSessionToAnotherPool(moveSession.srcSession, destPoolName, poolsToRedistribute); if (added != null && added) { moveSession.future.set(true); + pendingMoveEvents.add(moveSession.srcSession); return; } else { LOG.error("Failed to move session: {}. Session is not added to destination.", moveSession); @@ -807,7 +833,7 @@ private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session case OK: // If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK. PoolState pool = pools.get(poolName); - SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId()); + SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId(), session.getWmContext()); // We have just removed the session from the same pool, so don't check concurrency here. pool.initializingSessions.add(sw); ListenableFuture getFuture = tezAmPool.getSessionAsync(); @@ -917,6 +943,14 @@ private void applyNewResourcePlanOnMasterThread( totalQueryParallelism += qp; } } + // TODO: in the current impl, triggers are added to RP. For tez, no pool triggers (mapping between trigger name and + // pool name) will exist which means all triggers applies to tez. For LLAP, pool triggers has to exist for attaching + // triggers to specific pools. + // For usability, + // Provide a way for triggers sharing/inheritance possibly with following modes + // ONLY - only to pool + // INHERIT - child pools inherit from parent + // GLOBAL - all pools inherit if (e.resourcePlanToApply.isSetTriggers() && e.resourcePlanToApply.isSetPoolTriggers()) { Map triggers = new HashMap<>(); for (WMTrigger trigger : e.resourcePlanToApply.getTriggers()) { @@ -1021,6 +1055,7 @@ private void queueGetRequestOnMasterThread( req.sessionToReuse.setPoolName(poolName); req.sessionToReuse.setQueueName(yarnQueue); req.sessionToReuse.setQueryId(req.queryId); + req.sessionToReuse.setWmContext(req.wmContext); pool.sessions.add(req.sessionToReuse); if (pool != oldPool) { poolsToRedistribute.add(poolName); @@ -1057,7 +1092,8 @@ private void processPoolChangesOnMasterThread( // Note that in theory, we are guaranteed to have a session waiting for us here, but // the expiration, failures, etc. may cause one to be missing pending restart. // See SessionInitContext javadoc. - SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId); + SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId, + queueReq.wmContext); ListenableFuture getFuture = tezAmPool.getSessionAsync(); Futures.addCallback(getFuture, sw); // It is possible that all the async methods returned on the same thread because the @@ -1160,7 +1196,7 @@ private Boolean checkAndAddSessionToAnotherPool( poolsToRedistribute.add(destPoolName); return true; } - LOG.error("Session {} was not not added to pool {}", session, destPoolName); + LOG.error("Session {} was not added to pool {}", session, destPoolName); return null; } @@ -1213,15 +1249,18 @@ public int compare(GetRequest o1, GetRequest o2) { private final SettableFuture future; private WmTezSession sessionToReuse; private final String queryId; + private final WMContext wmContext; private GetRequest(MappingInput mappingInput, String queryId, - SettableFuture future, WmTezSession sessionToReuse, long order) { + SettableFuture future, WmTezSession sessionToReuse, long order, + final WMContext wmContext) { assert mappingInput != null; this.mappingInput = mappingInput; this.queryId = queryId; this.future = future; this.sessionToReuse = sessionToReuse; this.order = order; + this.wmContext = wmContext; } @Override @@ -1231,14 +1270,14 @@ public String toString() { } public TezSessionState getSession( - TezSessionState session, MappingInput input, HiveConf conf) throws Exception { + TezSessionState session, MappingInput input, HiveConf conf, final WMContext wmContext) throws Exception { // Note: not actually used for pool sessions; verify some things like doAs are not set. validateConfig(conf); String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); SettableFuture future = SettableFuture.create(); WmTezSession wmSession = checkSessionForReuse(session); GetRequest req = new GetRequest( - input, queryId, future, wmSession, getRequestVersion.incrementAndGet()); + input, queryId, future, wmSession, getRequestVersion.incrementAndGet(), wmContext); currentLock.lock(); try { current.getRequests.add(req); @@ -1250,7 +1289,17 @@ public TezSessionState getSession( } finally { currentLock.unlock(); } - return future.get(); + WmTezSession sessionState = future.get(); + addWMEvent(wmContext, sessionState, WMEvent.EventType.GET); + return sessionState; + } + + private void addWMEvent(final WMContext wmContext, final WmTezSession sessionState, + final WMEvent.EventType wmEventType) { + if (wmContext != null) { + WMEvent wmEvent = new WMEvent(sessionState, wmEventType); + wmContext.addWMEvent(wmEvent); + } } @Override @@ -1518,7 +1567,8 @@ protected final HiveConf getConf() { return conf; } - public List getTriggerCounterNames(final TezSessionState session) { + public List getTriggerCounterNames(final TezSessionState session, + final WMContext wmContext) { if (session instanceof WmTezSession) { WmTezSession wmTezSession = (WmTezSession) session; String poolName = wmTezSession.getPoolName(); @@ -1526,10 +1576,16 @@ protected final HiveConf getConf() { if (poolState != null) { List counterNames = new ArrayList<>(); List triggers = poolState.getTriggers(); + List triggerStr = new ArrayList<>(); + List triggerNamesStr = new ArrayList<>(); if (triggers != null) { for (Trigger trigger : triggers) { counterNames.add(trigger.getExpression().getCounterLimit().getName()); + triggerStr.add(trigger.toString()); + triggerNamesStr.add(trigger.getName()); } + wmContext.setAppliedTriggers(triggerStr); + wmContext.setAppliedTriggersNames(triggerNamesStr); } return counterNames; } @@ -1679,12 +1735,15 @@ public void setTriggers(final LinkedList triggers) { private SettableFuture future; private SessionInitState state; private String cancelReason; + private WMContext wmContext; - public SessionInitContext(SettableFuture future, String poolName, String queryId) { + public SessionInitContext(SettableFuture future, String poolName, String queryId, + final WMContext wmContext) { this.state = SessionInitState.GETTING; this.future = future; this.poolName = poolName; this.queryId = queryId; + this.wmContext = wmContext; } @Override @@ -1701,6 +1760,7 @@ public void onSuccess(WmTezSession session) { session.setPoolName(poolName); session.setQueueName(yarnQueue); session.setQueryId(queryId); + session.setWmContext(wmContext); this.session = session; this.state = SessionInitState.WAITING_FOR_REGISTRY; break; @@ -1748,6 +1808,7 @@ public void onSuccess(WmTezSession session) { session.setPoolName(null); session.setClusterFraction(0f); session.setQueryId(null); + session.setWmContext(null); tezAmPool.returnSession(session); break; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java index 315a2dc..519b5a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,19 +16,20 @@ * limitations under the License. */package org.apache.hadoop.hive.ql.exec.tez; -import org.slf4j.LoggerFactory; - -import org.slf4j.Logger; - import java.util.Set; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; +import org.apache.hadoop.hive.ql.wm.WMContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WorkloadManagerFederation { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManagerFederation.class); public static TezSessionState getSession(TezSessionState session, HiveConf conf, - MappingInput input, boolean isUnmanagedLlapMode, Set desiredCounters) throws Exception { + MappingInput input, boolean isUnmanagedLlapMode, Set desiredCounters, + final WMContext wmContext) throws Exception { // 1. If WM is not present just go to unmanaged. WorkloadManager wm = WorkloadManager.getInstance(); if (wm == null) { @@ -47,8 +48,8 @@ public static TezSessionState getSession(TezSessionState session, HiveConf conf, try { // Note: this may just block to wait for a session based on parallelism. LOG.info("Getting a WM session for " + input); - TezSessionState result = wm.getSession(session, input, conf); - desiredCounters.addAll(wm.getTriggerCounterNames(result)); + TezSessionState result = wm.getSession(session, input, conf, wmContext); + desiredCounters.addAll(wm.getTriggerCounterNames(result, wmContext)); return result; } catch (WorkloadManager.NoPoolMappingException ex) { return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode); @@ -56,7 +57,7 @@ public static TezSessionState getSession(TezSessionState session, HiveConf conf, } private static TezSessionState getUnmanagedSession( - TezSessionState session, HiveConf conf, Set desiredCounters, boolean isWorkLlapNode) throws Exception { + TezSessionState session, HiveConf conf, Set desiredCounters, boolean isWorkLlapNode) throws Exception { TezSessionPoolManager pm = TezSessionPoolManager.getInstance(); session = pm.getSession(session, conf, false, isWorkLlapNode); desiredCounters.addAll(pm.getTriggerCounterNames()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java index 5bb6bf1..8414c73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java @@ -19,6 +19,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; -interface PrintSummary { +public interface PrintSummary { void print(SessionState.LogHelper console); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 3dd4b31..ee87063 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -43,7 +43,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.wm.TimeCounterLimit; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hadoop.hive.ql.wm.VertexCounterLimit; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.tez.common.counters.CounterGroup; @@ -156,7 +156,7 @@ public int monitorExecution() { boolean running = false; long checkInterval = MIN_CHECK_INTERVAL; - TriggerContext triggerContext = null; + WMContext wmContext = null; while (true) { try { @@ -167,12 +167,12 @@ public int monitorExecution() { status = dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS), checkInterval); TezCounters dagCounters = status.getDAGCounters(); vertexProgressMap = status.getVertexProgress(); - triggerContext = context.getTriggerContext(); - if (dagCounters != null && triggerContext != null) { - Set desiredCounters = triggerContext.getDesiredCounters(); + wmContext = context.getWmContext(); + if (dagCounters != null && wmContext != null) { + Set desiredCounters = wmContext.getDesiredCounters(); if (desiredCounters != null && !desiredCounters.isEmpty()) { Map currentCounters = getCounterValues(dagCounters, vertexProgressMap, desiredCounters, done); - triggerContext.setCurrentCounters(currentCounters); + wmContext.setCurrentCounters(currentCounters); } } DAGStatus.State state = status.getState(); @@ -234,8 +234,8 @@ public int monitorExecution() { break; } } - if (triggerContext != null && done) { - triggerContext.setQueryCompleted(true); + if (wmContext != null && done) { + wmContext.setQueryCompleted(true); } } catch (Exception e) { console.printInfo("Exception: " + e.getMessage()); @@ -263,13 +263,13 @@ public int monitorExecution() { } else { console.printInfo("Retrying..."); } - if (triggerContext != null && done) { - triggerContext.setQueryCompleted(true); + if (wmContext != null && done) { + wmContext.setQueryCompleted(true); } } finally { if (done) { - if (triggerContext != null && done) { - triggerContext.setQueryCompleted(true); + if (wmContext != null && done) { + wmContext.setQueryCompleted(true); } if (rc != 0 && status != null) { for (String diag : status.getDiagnostics()) { @@ -286,7 +286,7 @@ public int monitorExecution() { } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); - printSummary(success, vertexProgressMap); + printSummary(success, vertexProgressMap, wmContext); return rc; } @@ -324,7 +324,7 @@ public int monitorExecution() { if (!done) { counterName = TimeCounterLimit.TimeCounter.ELAPSED_TIME.name(); if (desiredCounters.contains(counterName)) { - updatedCounters.put(counterName, context.getTriggerContext().getElapsedTime()); + updatedCounters.put(counterName, context.getWmContext().getElapsedTime()); } counterName = TimeCounterLimit.TimeCounter.EXECUTION_TIME.name(); @@ -336,7 +336,8 @@ public int monitorExecution() { return updatedCounters; } - private void printSummary(boolean success, Map progressMap) { + private void printSummary(boolean success, Map progressMap, + final WMContext wmContext) { if (isProfilingEnabled() && success && progressMap != null) { double duration = (System.currentTimeMillis() - this.executionStartTime) / 1000.0; @@ -351,6 +352,11 @@ private void printSummary(boolean success, Map progressMap) { new LLAPioSummary(progressMap, dagClient).print(console); new FSCountersSummary(progressMap, dagClient).print(console); } + + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY, false) && + wmContext != null) { + wmContext.print(console); + } console.printInfo(""); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java index fae8dfe..3af887f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java @@ -15,12 +15,18 @@ */ package org.apache.hadoop.hive.ql.wm; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.annotate.JsonSerialize; + /** * Custom counters with limits (this will only work if the execution engine exposes this counter) */ +@JsonSerialize public class CustomCounterLimit implements CounterLimit { + @JsonProperty("counterName") private String counterName; + @JsonProperty("limit") private long limit; CustomCounterLimit(final String counterName, final long limit) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java index 05b3d3c..14c4e72 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java @@ -18,6 +18,8 @@ import java.util.Objects; import org.apache.hadoop.hive.metastore.api.WMTrigger; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.annotate.JsonSerialize; /** * Trigger with query level scope that contains a name, trigger expression violating which defined action will be diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java deleted file mode 100644 index 16072c3..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.wm; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Some context information that are required for rule evaluation. - */ -public class TriggerContext { - private Set desiredCounters = new HashSet<>(); - private Map currentCounters = new HashMap<>(); - private String queryId; - private long queryStartTime; - private boolean queryCompleted; - - public TriggerContext(final long queryStartTime, final String queryId) { - this.queryStartTime = queryStartTime; - this.queryId = queryId; - this.queryCompleted = false; - } - - public String getQueryId() { - return queryId; - } - - public void setQueryId(final String queryId) { - this.queryId = queryId; - } - - public Set getDesiredCounters() { - return desiredCounters; - } - - public void setDesiredCounters(final Set desiredCounters) { - this.desiredCounters = desiredCounters; - } - - public Map getCurrentCounters() { - return currentCounters; - } - - public void setCurrentCounters(final Map currentCounters) { - this.currentCounters = currentCounters; - } - - public long getElapsedTime() { - return System.currentTimeMillis() - queryStartTime; - } - - public boolean isQueryCompleted() { - return queryCompleted; - } - - public void setQueryCompleted(final boolean queryCompleted) { - this.queryCompleted = queryCompleted; - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/WMContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/WMContext.java new file mode 100644 index 0000000..ffcb7fa --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/WMContext.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR; + +import java.io.IOException; +import java.text.DecimalFormat; +import java.time.Instant; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.management.MXBean; + +import org.apache.hadoop.hive.ql.exec.tez.WMEvent; +import org.apache.hadoop.hive.ql.exec.tez.WmTezSession; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.PrintSummary; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Some context information that are required for rule evaluation. + */ +@MXBean +public class WMContext implements PrintSummary { + private static final Logger LOG = LoggerFactory.getLogger(WMContext.class); + @JsonProperty("queryId") + private String queryId; + @JsonProperty("queryStartTime") + private long queryStartTime; + @JsonProperty("queryEndTime") + private long queryEndTime; + @JsonProperty("queryCompleted") + private boolean queryCompleted; + @JsonProperty("queryWmEvents") + private List queryWmEvents = new LinkedList<>(); + @JsonProperty("appliedTriggers") + private List appliedTriggers = new LinkedList<>(); + @JsonProperty("appliedTriggersNames") + private List appliedTriggersNames = new LinkedList<>(); + @JsonProperty("desiredCounters") + private Set desiredCounters = new HashSet<>(); + @JsonProperty("currentCounters") + private Map currentCounters = new HashMap<>(); + + public WMContext(final long queryStartTime, final String queryId) { + this.queryStartTime = queryStartTime; + this.queryId = queryId; + this.queryCompleted = false; + } + + public List getAppliedTriggers() { + return appliedTriggers; + } + + public void setAppliedTriggers(final List appliedTriggers) { + this.appliedTriggers = appliedTriggers; + } + + public List getAppliedTriggersNames() { + return appliedTriggersNames; + } + + public void setAppliedTriggersNames(final List appliedTriggersNames) { + this.appliedTriggersNames = appliedTriggersNames; + } + + public String getQueryId() { + return queryId; + } + + public void setQueryId(final String queryId) { + this.queryId = queryId; + } + + public Set getDesiredCounters() { + return desiredCounters; + } + + public void setDesiredCounters(final Set desiredCounters) { + this.desiredCounters = desiredCounters; + } + + public Map getCurrentCounters() { + return currentCounters; + } + + public void setCurrentCounters(final Map currentCounters) { + this.currentCounters = currentCounters; + } + + public long getElapsedTime() { + return System.currentTimeMillis() - queryStartTime; + } + + public boolean isQueryCompleted() { + return queryCompleted; + } + + public void setQueryCompleted(final boolean queryCompleted) { + this.queryCompleted = queryCompleted; + this.queryEndTime = System.currentTimeMillis(); + } + + public void addWMEvent(WMEvent wmEvent) { + queryWmEvents.add(wmEvent); + } + + public long getQueryStartTime() { + return queryStartTime; + } + + public long getQueryEndTime() { + return queryEndTime; + } + + public List getQueryWmEvents() { + return queryWmEvents; + } + + private static final String WM_EVENTS_HEADER_FORMAT = "%24s %7s %36s %9s %10s"; + private static final String WM_EVENTS_TITLE = "Workload Manager Events Summary"; + private static final String WM_EVENTS_TABLE_HEADER = String.format(WM_EVENTS_HEADER_FORMAT, + "EVENT_TIMESTAMP", "EVENT", "SESSIONID", "CLUSTER %", "POOL"); + private final DecimalFormat decimalFormat = new DecimalFormat("#0.00"); + + @Override + public void print(final SessionState.LogHelper console) { + boolean first = false; + console.printInfo(""); + console.printInfo(WM_EVENTS_TITLE); + + console.printInfo(""); + console.printInfo("QueryId: " + queryId); + console.printInfo("Triggers: " + appliedTriggersNames); + for (WMEvent wmEvent : queryWmEvents) { + if (!first) { + console.printInfo(SEPARATOR); + console.printInfo(WM_EVENTS_TABLE_HEADER); + console.printInfo(SEPARATOR); + first = true; + } + WMEvent.WmTezSessionInfo wmTezSessionInfo = wmEvent.getWmTezSessionInfo(); + String row = String.format(WM_EVENTS_HEADER_FORMAT, Instant.ofEpochMilli(wmEvent.getEventTimestamp()).toString(), + wmEvent.getEventType(), + wmTezSessionInfo.getSessionId(), + decimalFormat.format(wmTezSessionInfo.getClusterPercent()), + wmTezSessionInfo.getPoolName()); + console.printInfo(row); + } + console.printInfo(SEPARATOR); + console.printInfo(""); + } + + // TODO: expose all WMContext's via /jmx to use in UI + public String toJsonString() { + String wmContextJson; + try { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); + // serialize json based on field annotations only + objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker() + .withGetterVisibility(JsonAutoDetect.Visibility.NONE) + .withSetterVisibility(JsonAutoDetect.Visibility.NONE)); + wmContextJson = objectMapper.writeValueAsString(this); + } catch (IOException e) { + LOG.warn("Unable to serialize WMContext to json.", e); + wmContextJson = null; + } + return wmContextJson; + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 4cb9172..99e26ef 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.tez.dag.api.TezConfiguration; import org.junit.Test; import org.slf4j.Logger; @@ -86,7 +87,7 @@ public void run() { cdl.countDown(); } try { - session.set((WmTezSession) wm.getSession(old, new MappingInput(userName, null), conf)); + session.set((WmTezSession) wm.getSession(old, new MappingInput(userName, null), conf, null)); } catch (Throwable e) { error.compareAndSet(null, e); } @@ -172,9 +173,10 @@ protected WmTezSession createSessionObject(String sessionId, HiveConf conf) { @Override public TezSessionState getSession( - TezSessionState session, MappingInput input, HiveConf conf) throws Exception { + TezSessionState session, MappingInput input, HiveConf conf, + final WMContext wmContext) throws Exception { // We want to wait for the iteration to finish and set the cluster fraction. - TezSessionState state = super.getSession(session, input, conf); + TezSessionState state = super.getSession(session, input, conf, null); ensureWm(); return state; } @@ -213,17 +215,17 @@ public void testReuse() throws Exception { TezSessionState nonPool = mock(TezSessionState.class); when(nonPool.getConf()).thenReturn(conf); doNothing().when(nonPool).close(anyBoolean()); - TezSessionState session = wm.getSession(nonPool, new MappingInput("user", null), conf); + TezSessionState session = wm.getSession(nonPool, new MappingInput("user", null), conf, null); verify(nonPool).close(anyBoolean()); assertNotSame(nonPool, session); session.returnToSessionManager(); TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class); when(diffPool.getConf()).thenReturn(conf); doNothing().when(diffPool).returnToSessionManager(); - session = wm.getSession(diffPool, new MappingInput("user", null), conf); + session = wm.getSession(diffPool, new MappingInput("user", null), conf, null); verify(diffPool).returnToSessionManager(); assertNotSame(diffPool, session); - TezSessionState session2 = wm.getSession(session, new MappingInput("user", null), conf); + TezSessionState session2 = wm.getSession(session, new MappingInput("user", null), conf, null); assertSame(session, session2); } @@ -235,11 +237,11 @@ public void testQueueName() throws Exception { wm.start(); // The queue should be ignored. conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); - TezSessionState session = wm.getSession(null, new MappingInput("user", null), conf); + TezSessionState session = wm.getSession(null, new MappingInput("user", null), conf, null); assertEquals("test", session.getQueueName()); assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); session.setQueueName("test2"); - session = wm.getSession(session, new MappingInput("user", null), conf); + session = wm.getSession(session, new MappingInput("user", null), conf, null); assertEquals("test", session.getQueueName()); } @@ -255,7 +257,7 @@ public void testReopen() throws Exception { WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); WmTezSession session = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf); + null, new MappingInput("user", null), conf, null); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalled(); WmTezSession session2 = (WmTezSession) session.reopen(conf, null); @@ -273,10 +275,10 @@ public void testDestroyAndReturn() throws Exception { MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); wm.start(); - WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); + WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalled(); - WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); + WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); assertEquals(0.5, session.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); qam.assertWasCalled(); @@ -287,7 +289,7 @@ public void testDestroyAndReturn() throws Exception { qam.assertWasCalled(); // We never lose pool session, so we should still be able to get. - session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); + session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); session.returnToSessionManager(); assertEquals(1.0, session2.getClusterFraction(), EPSILON); assertEquals(0.0, session.getClusterFraction(), EPSILON); @@ -308,20 +310,20 @@ public void testClusterFractions() throws Exception { assertEquals(5, wm.getNumSessions()); // Get all the 5 sessions; validate cluster fractions. WmTezSession session05of06 = (WmTezSession) wm.getSession( - null, new MappingInput("p1", null), conf); + null, new MappingInput("p1", null), conf, null); assertEquals(0.3, session05of06.getClusterFraction(), EPSILON); WmTezSession session03of06 = (WmTezSession) wm.getSession( - null, new MappingInput("p2", null), conf); + null, new MappingInput("p2", null), conf, null); assertEquals(0.18, session03of06.getClusterFraction(), EPSILON); WmTezSession session03of06_2 = (WmTezSession) wm.getSession( - null, new MappingInput("p2", null), conf); + null, new MappingInput("p2", null), conf, null); assertEquals(0.09, session03of06.getClusterFraction(), EPSILON); assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON); WmTezSession session02of06 = (WmTezSession) wm.getSession( - null,new MappingInput("r1", null), conf); + null,new MappingInput("r1", null), conf, null); assertEquals(0.12, session02of06.getClusterFraction(), EPSILON); WmTezSession session04 = (WmTezSession) wm.getSession( - null, new MappingInput("r2", null), conf); + null, new MappingInput("r2", null), conf, null); assertEquals(0.4, session04.getClusterFraction(), EPSILON); session05of06.returnToSessionManager(); session03of06.returnToSessionManager(); @@ -353,7 +355,7 @@ public void testMappings() throws Exception { private static void verifyMapping( WorkloadManager wm, HiveConf conf, MappingInput mi, String result) throws Exception { - WmTezSession session = (WmTezSession) wm.getSession(null, mi, conf); + WmTezSession session = (WmTezSession) wm.getSession(null, mi, conf, null); assertEquals(result, session.getPoolName()); session.returnToSessionManager(); } @@ -367,9 +369,9 @@ public void testQueueing() throws Exception { plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null); final AtomicReference sessionA3 = new AtomicReference<>(), sessionA4 = new AtomicReference<>(); final AtomicReference error = new AtomicReference<>(); @@ -383,7 +385,7 @@ public void testQueueing() throws Exception { assertNull(sessionA4.get()); checkError(error); // While threads are blocked on A, we should still be able to get and return a B session. - WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf); + WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null); sessionB1.returnToSessionManager(); sessionB2.returnToSessionManager(); assertNull(sessionA3.get()); @@ -410,19 +412,19 @@ public void testReuseWithQueueing() throws Exception { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); wm.start(); WmTezSession session1 = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf); + null, new MappingInput("user", null), conf, null); // First, try to reuse from the same pool - should "just work". WmTezSession session1a = (WmTezSession) wm.getSession( - session1, new MappingInput("user", null), conf); + session1, new MappingInput("user", null), conf, null); assertSame(session1, session1a); assertEquals(1.0, session1.getClusterFraction(), EPSILON); // Should still be able to get the 2nd session. WmTezSession session2 = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf); + null, new MappingInput("user", null), conf, null); // Now try to reuse with no other sessions remaining. Should still work. WmTezSession session2a = (WmTezSession) wm.getSession( - session2, new MappingInput("user", null), conf); + session2, new MappingInput("user", null), conf, null); assertSame(session2, session2a); assertEquals(0.5, session1.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); @@ -479,19 +481,19 @@ public void testReuseWithDifferentPool() throws Exception { plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); assertEquals("A", sessionA1.getPoolName()); assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON); assertEquals("A", sessionA2.getPoolName()); assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON); - WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B", null), conf); + WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B", null), conf, null); assertSame(sessionA1, sessionB1); assertEquals("B", sessionB1.getPoolName()); assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed from A. // Make sure that we can still get a session from A. - WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); assertEquals("A", sessionA3.getPoolName()); assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); @@ -511,7 +513,7 @@ public void testApplyPlanUserMapping() throws Exception { wm.start(); // One session will be running, the other will be queued in "A" - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null); assertEquals("A", sessionA1.getPoolName()); assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON); final AtomicReference sessionA2 = new AtomicReference<>(); @@ -536,7 +538,7 @@ public void testApplyPlanUserMapping() throws Exception { assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON); // The new session will also go to B now. sessionA2.get().returnToSessionManager(); - WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf); + WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null); assertEquals("B", sessionB1.getPoolName()); assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); sessionA1.returnToSessionManager(); @@ -560,11 +562,11 @@ public void testApplyPlanQpChanges() throws Exception { // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued. // Total: 5/6 running. - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf), - sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf), - sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", null), conf), - sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null), + sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null), + sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", null), conf, null), + sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", null), conf, null); final AtomicReference sessionA2 = new AtomicReference<>(), sessionD2 = new AtomicReference<>(); final AtomicReference error = new AtomicReference<>(); @@ -700,7 +702,7 @@ public void testMoveSessions() throws Exception { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 0] Map allSessionProviders = wm.getAllSessionTriggerProviders(); @@ -724,7 +726,7 @@ public void testMoveSessions() throws Exception { assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); assertEquals("B", sessionA1.getPoolName()); - WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 1] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -751,7 +753,7 @@ public void testMoveSessions() throws Exception { assertEquals("B", sessionA2.getPoolName()); assertEquals("B", sessionA1.getPoolName()); - WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 2] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -791,7 +793,7 @@ public void testMoveSessionsMultiPool() throws Exception { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 0, B.x: 0, B.y: 0, C: 0] Map allSessionProviders = wm.getAllSessionTriggerProviders(); @@ -849,7 +851,7 @@ public void testMoveSessionsMultiPool() throws Exception { assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1)); assertEquals("B.x", sessionA1.getPoolName()); - WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 0, B.x: 1, B.y: 0, C: 0] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -948,7 +950,7 @@ public void testAsyncSessionInitFailures() throws Exception { failedWait.setException(new Exception("foo")); theOnlySession.setWaitForAmRegistryFuture(failedWait); try { - TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf); + TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf, null); fail("Expected an error but got " + r); } catch (Exception ex) { // Expected. @@ -999,7 +1001,7 @@ private SampleTezSessionState validatePoolAfterCleanup( assertEquals(0f, oldSession.getClusterFraction(), EPSILON); pool.returnSession(theOnlySession); // Make sure we can actually get a session still - parallelism/etc. should not be affected. - WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); assertEquals(sessionPoolName, result.getPoolName()); assertEquals(1f, result.getClusterFraction(), EPSILON); result.returnToSessionManager();