diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ada2318..0a280a3 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3052,6 +3052,14 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.tez.exec.print.summary", false, "Display breakdown of execution steps, for every query executed by the shell."), + TEZ_SESSION_EVENTS_SUMMARY( + "hive.tez.session.events.print.summary", + false, + "Display summary of all tez sessions related events"), + TEZ_SESSION_EVENTS_SUMMARY_JSON( + "hive.tez.session.events.print.summary.json", + false, + "Display summary of all tez sessions related events as JSON"), TEZ_EXEC_INPLACE_PROGRESS( "hive.tez.exec.inplace.progress", true, diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index ea5b7b9..3a435a8 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -361,7 +361,23 @@ ${tez.version} test - + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.tez + tez-runtime-internals + ${tez.version} + test + + org.slf4j slf4j-log4j12 diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java index 235e6c3..fca971b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java @@ -21,7 +21,9 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.PrintStream; import java.net.URL; import java.sql.Connection; import java.sql.SQLException; @@ -112,7 +114,12 @@ void createSleepUDF() throws SQLException { } void runQueryWithTrigger(final String query, final List setCmds, - final String expect) + final String expect) throws Exception { + runQueryWithTrigger(query, setCmds, expect, null); + } + + void runQueryWithTrigger(final String query, final List setCmds, + final String expect, final List errCaptureExpect) throws Exception { Connection con = hs2Conn; @@ -121,31 +128,48 @@ void runQueryWithTrigger(final String query, final List setCmds, final Statement selStmt = con.createStatement(); final Throwable[] throwable = new Throwable[1]; - Thread queryThread = new Thread(() -> { - try { - if (setCmds != null) { - for (String setCmd : setCmds) { - selStmt.execute(setCmd); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setErr(new PrintStream(baos)); // capture stderr + try { + Thread queryThread = new Thread(() -> { + try { + if (setCmds != null) { + for (String setCmd : setCmds) { + selStmt.execute(setCmd); + } } + selStmt.execute(query); + } catch (SQLException e) { + throwable[0] = e; } - selStmt.execute(query); - } catch (SQLException e) { - throwable[0] = e; + }); + queryThread.start(); + + queryThread.join(); + selStmt.close(); + + if (expect == null) { + assertNull("Expected query to succeed", throwable[0]); + } else { + assertNotNull("Expected non-null throwable", throwable[0]); + assertEquals(SQLException.class, throwable[0].getClass()); + assertTrue(expect + " is not contained in " + throwable[0].getMessage(), + throwable[0].getMessage().contains(expect)); } - }); - queryThread.start(); - - queryThread.join(); - selStmt.close(); - - if (expect == null) { - assertNull("Expected query to succeed", throwable[0]); - } else { - assertNotNull("Expected non-null throwable", throwable[0]); - assertEquals(SQLException.class, throwable[0].getClass()); - assertTrue(expect + " is not contained in " + throwable[0].getMessage(), - throwable[0].getMessage().contains(expect)); + + if (errCaptureExpect != null) { + // failure hooks are run after HiveStatement is closed. wait sometime for failure hook to execute + Thread.sleep(5000); + baos.flush(); + final String stdErrStr = baos.toString(); + for (String errExpect : errCaptureExpect) { + assertTrue("'" + errExpect + "' expected in STDERR capture, but not found.", stdErrStr.contains(errExpect)); + } + } + } finally { + baos.close(); } + } abstract void setupTriggers(final List triggers) throws Exception; diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java index a983855..15406a7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java @@ -97,7 +97,20 @@ public void testTriggerMoveAndKill() throws Exception { setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList(killTrigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, killTrigger + " violated"); + List setCmds = new ArrayList<>(); + setCmds.add("set hive.tez.session.events.print.summary=true"); + setCmds.add("set hive.tez.session.events.print.summary.json=true"); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + List errCaptureExpect = new ArrayList<>(); + errCaptureExpect.add("Workload Manager Events Summary"); + errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00"); + errCaptureExpect.add("Event: MOVE Pool: ETL Cluster %: 20.00"); + errCaptureExpect.add("Event: KILL Pool: ETL Cluster %: 20.00"); + errCaptureExpect.add("\"eventType\" : \"GET\""); + errCaptureExpect.add("\"eventType\" : \"MOVE\""); + errCaptureExpect.add("\"eventType\" : \"KILL\""); + runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect); } @Test(timeout = 60000) @@ -111,7 +124,20 @@ public void testTriggerMoveEscapeKill() throws Exception { setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList()); String query = "select sleep(t1.under_col, 1), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col==t2.under_col"; - runQueryWithTrigger(query, null, null); + List setCmds = new ArrayList<>(); + setCmds.add("set hive.tez.session.events.print.summary=true"); + setCmds.add("set hive.tez.session.events.print.summary.json=true"); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + List errCaptureExpect = new ArrayList<>(); + errCaptureExpect.add("Workload Manager Events Summary"); + errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00"); + errCaptureExpect.add("Event: MOVE Pool: ETL Cluster %: 20.00"); + errCaptureExpect.add("Event: RETURN Pool: null Cluster %: 0.00"); + errCaptureExpect.add("\"eventType\" : \"GET\""); + errCaptureExpect.add("\"eventType\" : \"MOVE\""); + errCaptureExpect.add("\"eventType\" : \"RETURN\""); + runQueryWithTrigger(query, setCmds, null, errCaptureExpect); } @Test(timeout = 60000) @@ -125,7 +151,18 @@ public void testTriggerMoveConflictKill() throws Exception { setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList()); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, killTrigger + " violated"); + List setCmds = new ArrayList<>(); + setCmds.add("set hive.tez.session.events.print.summary=true"); + setCmds.add("set hive.tez.session.events.print.summary.json=true"); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + List errCaptureExpect = new ArrayList<>(); + errCaptureExpect.add("Workload Manager Events Summary"); + errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00"); + errCaptureExpect.add("Event: KILL Pool: BI Cluster %: 80.00"); + errCaptureExpect.add("\"eventType\" : \"GET\""); + errCaptureExpect.add("\"eventType\" : \"KILL\""); + runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 97b52b0..b8af7f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hive.ql.parse.QB; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -150,18 +150,18 @@ */ private Map insertBranchToNamePrefix = new HashMap<>(); private Operation operation = Operation.OTHER; - private TriggerContext triggerContext; + private WMContext wmContext; public void setOperation(Operation operation) { this.operation = operation; } - public TriggerContext getTriggerContext() { - return triggerContext; + public WMContext getWmContext() { + return wmContext; } - public void setTriggerContext(final TriggerContext triggerContext) { - this.triggerContext = triggerContext; + public void setWmContext(final WMContext wmContext) { + this.wmContext = wmContext; } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 389a1a6..5e8063b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -113,7 +113,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; @@ -732,8 +732,8 @@ private void setTriggerContext(final String queryId) { } else { queryStartTime = queryDisplay.getQueryStartTime(); } - TriggerContext triggerContext = new TriggerContext(queryStartTime, queryId); - ctx.setTriggerContext(triggerContext); + WMContext wmContext = new WMContext(queryStartTime, queryId); + ctx.setWmContext(wmContext); } private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java index 0509cbc..4b35617 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java @@ -18,18 +18,23 @@ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hive.common.util.Ref; - -import java.util.concurrent.TimeoutException; - import org.apache.hadoop.security.token.Token; +import org.apache.hive.common.util.Ref; import org.apache.tez.common.security.JobTokenIdentifier; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.annotate.JsonSerialize; +@JsonSerialize public interface AmPluginNode { - public static class AmPluginInfo { + class AmPluginInfo { + @JsonProperty("amPluginPort") public final int amPluginPort; + @JsonIgnore public final Token amPluginToken; + @JsonProperty("amPluginTokenJobId") public final String amPluginTokenJobId; + @JsonProperty("amHost") public final String amHost; AmPluginInfo(String amHost, int amPluginPort, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java index 94b189b..0f74441 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java @@ -21,7 +21,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.slf4j.Logger; @@ -37,37 +36,33 @@ @Override public void applyAction(final Map queriesViolated) { - TezSessionState sessionState; - Map> moveFutures = new HashMap<>(queriesViolated.size()); + Map> futures = new HashMap<>(queriesViolated.size()); for (Map.Entry entry : queriesViolated.entrySet()) { + WmTezSession wmTezSession; + if (entry.getKey() instanceof WmTezSession) { + wmTezSession = (WmTezSession) entry.getKey(); + } else { + throw new RuntimeException("WmTezSession is expected. Got: " + entry.getKey().getClass().getSimpleName() + + ". SessionId: " + entry.getKey().getSessionId()); + } switch (entry.getValue().getAction().getType()) { case KILL_QUERY: - sessionState = entry.getKey(); - String queryId = sessionState.getTriggerContext().getQueryId(); - try { - sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg()); - } catch (HiveException e) { - LOG.warn("Unable to kill query {} for trigger violation"); - } + WorkloadManager.KillQueryContext killQueryContext = new WorkloadManager.KillQueryContext(wmTezSession, + entry.getValue().getViolationMsg()); + Future killFuture = wm.applyKillSessionAsync(killQueryContext); + futures.put(wmTezSession, killFuture); break; case MOVE_TO_POOL: - sessionState = entry.getKey(); - if (sessionState instanceof WmTezSession) { - WmTezSession wmTezSession = (WmTezSession) sessionState; - String destPoolName = entry.getValue().getAction().getPoolName(); - Future moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName); - moveFutures.put(wmTezSession, moveFuture); - } else { - throw new RuntimeException("WmTezSession is expected. Got: " + sessionState.getClass().getSimpleName() + - ". SessionId: " + sessionState.getSessionId()); - } + String destPoolName = entry.getValue().getAction().getPoolName(); + Future moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName); + futures.put(wmTezSession, moveFuture); break; default: throw new RuntimeException("Unsupported action: " + entry.getValue()); } } - for (Map.Entry> entry : moveFutures.entrySet()) { + for (Map.Entry> entry : futures.entrySet()) { WmTezSession wmTezSession = entry.getKey(); Future moveFuture = entry.getValue(); try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index 8c60b6f..15e054b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -37,7 +37,7 @@ public void applyAction(final Map queriesViolated) { switch (entry.getValue().getAction().getType()) { case KILL_QUERY: TezSessionState sessionState = entry.getKey(); - String queryId = sessionState.getTriggerContext().getQueryId(); + String queryId = sessionState.getWmContext().getQueryId(); try { KillQuery killQuery = sessionState.getKillQuery(); // if kill query is null then session might have been released to pool or closed already diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 6fa3724..32a28c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -62,7 +62,7 @@ import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.security.Credentials; @@ -85,6 +85,9 @@ import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.annotate.JsonSerialize; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; @@ -95,6 +98,7 @@ /** * Holds session state related to Tez */ +@JsonSerialize public class TezSessionState { protected static final Logger LOG = LoggerFactory.getLogger(TezSessionState.class.getName()); @@ -104,26 +108,37 @@ private static final String LLAP_LAUNCHER = LlapContainerLauncher.class.getName(); private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName(); + @JsonIgnore private final HiveConf conf; + @JsonIgnore private Path tezScratchDir; + @JsonIgnore private LocalResource appJarLr; + @JsonIgnore private TezClient session; + @JsonIgnore private Future sessionFuture; + @JsonIgnore /** Console used for user feedback during async session opening. */ private LogHelper console; + @JsonProperty("sessionId") private String sessionId; private final DagUtils utils; + @JsonProperty("queueName") private String queueName; + @JsonProperty("defaultQueue") private boolean defaultQueue = false; + @JsonProperty("user") private String user; private AtomicReference ownerThread = new AtomicReference<>(null); private final Set additionalFilesNotFromConf = new HashSet(); private final Set localizedResources = new HashSet(); + @JsonProperty("doAsEnabled") private boolean doAsEnabled; private boolean isLegacyLlapMode; - private TriggerContext triggerContext; + private WMContext wmContext; private KillQuery killQuery; private static final Cache shaCache = CacheBuilder.newBuilder().maximumSize(100).build(); @@ -852,12 +867,12 @@ public void destroy() throws Exception { TezSessionPoolManager.getInstance().destroy(this); } - public TriggerContext getTriggerContext() { - return triggerContext; + public WMContext getWmContext() { + return wmContext; } - public void setTriggerContext(final TriggerContext triggerContext) { - this.triggerContext = triggerContext; + public void setWmContext(final WMContext wmContext) { + this.wmContext = wmContext; } public void setKillQuery(final KillQuery killQuery) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index af77f30..f4b5872 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.QueryInfo; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; @@ -63,7 +62,7 @@ import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -146,8 +145,8 @@ public int execute(DriverContext driverContext) { // some DDL task that directly executes a TezTask does not setup Context and hence TriggerContext. // Setting queryId is messed up. Some DDL tasks have executionId instead of proper queryId. String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); - TriggerContext triggerContext = new TriggerContext(System.currentTimeMillis(), queryId); - ctx.setTriggerContext(triggerContext); + WMContext wmContext = new WMContext(System.currentTimeMillis(), queryId); + ctx.setWmContext(wmContext); } // Need to remove this static hack. But this is the way currently to get a session. @@ -158,7 +157,6 @@ public int execute(DriverContext driverContext) { if (session != null && !session.isOpen()) { LOG.warn("The session: " + session + " has not been opened"); } - Set desiredCounters = new HashSet<>(); // We only need a username for UGI to use for groups; getGroups will fetch the groups // based on Hadoop configuration, as documented at // https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html @@ -166,15 +164,11 @@ public int execute(DriverContext driverContext) { MappingInput mi = (userName == null) ? new MappingInput("anonymous", null) : new MappingInput(ss.getUserName(), UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups()); - session = WorkloadManagerFederation.getSession( - session, conf, mi, getWork().getLlapMode(), desiredCounters); + WMContext wmContext = ctx.getWmContext(); + session = WorkloadManagerFederation.getSession(session, conf, mi, getWork().getLlapMode(), wmContext); - TriggerContext triggerContext = ctx.getTriggerContext(); - triggerContext.setDesiredCounters(desiredCounters); - LOG.info("Subscribed to counters: {} for queryId: {}", - desiredCounters, triggerContext.getQueryId()); + LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getDesiredCounters(), wmContext.getQueryId()); ss.setTezSession(session); - session.setTriggerContext(triggerContext); try { // jobConf will hold all the configuration for hadoop, tez, and hive JobConf jobConf = utils.createConfiguration(conf); @@ -262,6 +256,15 @@ public int execute(DriverContext driverContext) { LOG.error("Failed to return session: {} to pool", session, e); throw e; } + + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY, false) && + ctx.getWmContext() != null) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY_JSON, false)) { + ctx.getWmContext().printJson(console); + } else { + ctx.getWmContext().print(console); + } + } } if (LOG.isInfoEnabled() && counters != null @@ -585,9 +588,9 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: " + Arrays.toString(e.getStackTrace()) + " retrying..."); // TODO: this is temporary, need to refactor how reopen is invoked. - TriggerContext oldCtx = sessionState.getTriggerContext(); + WMContext oldCtx = sessionState.getWmContext(); sessionState = sessionState.reopen(conf, inputOutputJars); - sessionState.setTriggerContext(oldCtx); + sessionState.setWmContext(oldCtx); dagClient = sessionState.getSession().submitDAG(dag); } catch (Exception retryException) { // we failed to submit after retrying. Destroy session and bail. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index 5821659..4cbf39d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,17 +46,17 @@ public void run() { final List sessions = sessionTriggerProvider.getSessions(); final List triggers = sessionTriggerProvider.getTriggers(); for (TezSessionState sessionState : sessions) { - TriggerContext triggerContext = sessionState.getTriggerContext(); - if (triggerContext != null && !triggerContext.isQueryCompleted() - && !triggerContext.getCurrentCounters().isEmpty()) { - Map currentCounters = triggerContext.getCurrentCounters(); + WMContext wmContext = sessionState.getWmContext(); + if (wmContext != null && !wmContext.isQueryCompleted() + && !wmContext.getCurrentCounters().isEmpty()) { + Map currentCounters = wmContext.getCurrentCounters(); for (Trigger currentTrigger : triggers) { String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName(); // there could be interval where desired counter value is not populated by the time we make this check if (currentCounters.containsKey(desiredCounter)) { long currentCounterValue = currentCounters.get(desiredCounter); if (currentTrigger.apply(currentCounterValue)) { - String queryId = sessionState.getTriggerContext().getQueryId(); + String queryId = sessionState.getWmContext().getQueryId(); if (violatedSessions.containsKey(sessionState)) { // session already has a violation Trigger existingTrigger = violatedSessions.get(sessionState); @@ -84,7 +84,7 @@ public void run() { Trigger chosenTrigger = violatedSessions.get(sessionState); if (chosenTrigger != null) { - LOG.info("Query: {}. {}. Applying action.", sessionState.getTriggerContext().getQueryId(), + LOG.info("Query: {}. {}. Applying action.", sessionState.getWmContext().getQueryId(), chosenTrigger.getViolationMsg()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WMEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WMEvent.java new file mode 100644 index 0000000..1567e31 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WMEvent.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +/** + * Workload Manager events at query level. + */ +@JsonSerialize +public class WMEvent { + enum EventType { + GET, // get session + KILL, // kill query + DESTROY, // destroy session + RESTART, // restart session + RETURN, // return session back to pool + MOVE // move session to different pool + } + + // snapshot of subset of wm tez session info for printing in events summary + @JsonSerialize + public static class WmTezSessionInfo { + @JsonProperty("sessionId") + private final String sessionId; + @JsonProperty("poolName") + private final String poolName; + @JsonProperty("clusterPercent") + private final double clusterPercent; + + WmTezSessionInfo(WmTezSession wmTezSession) { + this.poolName = wmTezSession.getPoolName(); + this.sessionId = wmTezSession.getSessionId(); + this.clusterPercent = wmTezSession.getClusterFraction() * 100.0; + } + + public String getPoolName() { + return poolName; + } + + public String getSessionId() { + return sessionId; + } + + public double getClusterPercent() { + return clusterPercent; + } + + @Override + public String toString() { + return "SessionId: " + sessionId + " Pool: " + poolName + " Cluster %: " + clusterPercent; + } + } + + @JsonProperty("wmTezSessionInfo") + private final WmTezSessionInfo wmTezSessionInfo; + @JsonProperty("eventTimestamp") + private long eventTimestamp; + @JsonProperty("eventType") + private final EventType eventType; + + WMEvent(final WmTezSession sessionState, final EventType eventType) { + this.wmTezSessionInfo = new WmTezSessionInfo(sessionState); + this.eventTimestamp = System.currentTimeMillis(); + this.eventType = eventType; + } + + public long getEventTimestamp() { + return eventTimestamp; + } + + public EventType getEventType() { + return eventType; + } + + public WmTezSessionInfo getWmTezSessionInfo() { + return wmTezSessionInfo; + } + + @Override + public String toString() { + return "EventType: " + eventType + " EventTimestamp: " + eventTimestamp + " " + wmTezSessionInfo; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index d61c531..d31b0a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -27,21 +27,34 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.hive.common.util.Ref; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.annotate.JsonSerialize; +@JsonSerialize public class WmTezSession extends TezSessionPoolSession implements AmPluginNode { + @JsonProperty("poolName") private String poolName; + @JsonProperty("clusterFraction") private double clusterFraction; /** * The reason to kill an AM. Note that this is for the entire session, not just for a query. * Once set, this can never be unset because you can only kill the session once. */ + @JsonProperty("killReason") private String killReason = null; + @JsonIgnore private final Object amPluginInfoLock = new Object(); + @JsonProperty("amPluginInfo") private AmPluginInfo amPluginInfo = null; - private Integer amPluginendpointVersion = null; + @JsonProperty("amPluginEndpointVersion") + private Integer amPluginEndpointVersion = null; + @JsonIgnore private SettableFuture amRegistryFuture = null; + @JsonIgnore private ScheduledFuture timeoutTimer = null; + @JsonProperty("queryId") private String queryId; private final WorkloadManager wmParent; @@ -99,12 +112,12 @@ void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { synchronized (amPluginInfoLock) { // Ignore the outdated updates; for the same version, ignore non-null updates because // we assume that removal is the last thing that happens for any given version. - if ((amPluginendpointVersion != null) && ((amPluginendpointVersion > ephSeqVersion) - || (amPluginendpointVersion == ephSeqVersion && info != null))) { + if ((amPluginEndpointVersion != null) && ((amPluginEndpointVersion > ephSeqVersion) + || (amPluginEndpointVersion == ephSeqVersion && info != null))) { LOG.info("Ignoring an outdated info update {}: {}", ephSeqVersion, si); return; } - this.amPluginendpointVersion = ephSeqVersion; + this.amPluginEndpointVersion = ephSeqVersion; this.amPluginInfo = info; if (info != null) { // Only update someone waiting for info if we have the info. @@ -123,7 +136,7 @@ void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { @Override public AmPluginInfo getAmPluginInfo(Ref version) { synchronized (amPluginInfoLock) { - version.value = amPluginendpointVersion; + version.value = amPluginEndpointVersion; return amPluginInfo; } } @@ -132,7 +145,7 @@ void setPoolName(String poolName) { this.poolName = poolName; } - String getPoolName() { + public String getPoolName() { return poolName; } @@ -145,7 +158,7 @@ void clearWm() { this.clusterFraction = 0f; } - double getClusterFraction() { + public double getClusterFraction() { return this.clusterFraction; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index ecdcf12..8205a31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -17,20 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import com.google.common.collect.Lists; - -import java.util.concurrent.ExecutionException; - -import java.util.Collection; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -42,6 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -50,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -68,11 +58,24 @@ import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hive.common.util.Ref; import org.apache.tez.dag.api.TezConfiguration; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** Workload management entry point for HS2. * Note on how this class operates. @@ -92,6 +95,7 @@ private static final char POOL_SEPARATOR = '.'; private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR; + private final ObjectMapper objectMapper; // Various final services, configs, etc. private final HiveConf conf; private final TezSessionPool tezAmPool; @@ -112,6 +116,7 @@ private Map pools; private String rpName, defaultPool; // For information only. private int totalQueryParallelism; + /** * The queries being killed. This is used to sync between the background kill finishing and the * query finishing and user returning the sessions, which can happen in separate iterations @@ -211,6 +216,13 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso wmThread.start(); updateResourcePlanAsync(plan).get(); // Wait for the initial resource plan to be applied. + + objectMapper = new ObjectMapper(); + objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); + // serialize json based on field annotations only + objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker() + .withGetterVisibility(JsonAutoDetect.Visibility.NONE) + .withSetterVisibility(JsonAutoDetect.Visibility.NONE)); } private static int determineQueryParallelism(WMFullResourcePlan plan) { @@ -397,7 +409,9 @@ private void scheduleWork(WmThreadSyncWork context) { try { kq.killQuery(queryId, reason); addKillQueryResult(toKill, true); + killCtx.killSessionFuture.set(true); LOG.debug("Killed " + queryId); + recordWMEvent(toKill, WMEvent.EventType.KILL); return; } catch (HiveException ex) { LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex); @@ -425,6 +439,7 @@ private void scheduleWork(WmThreadSyncWork context) { try { // Note: sessions in toRestart are always in use, so they cannot expire in parallel. tezAmPool.replaceSession(toRestart, false, null); + recordWMEvent(toRestart, WMEvent.EventType.RESTART); } catch (Exception ex) { LOG.error("Failed to restart an old session; ignoring", ex); } @@ -438,6 +453,7 @@ private void scheduleWork(WmThreadSyncWork context) { workPool.submit(() -> { try { toDestroy.close(false); + recordWMEvent(toDestroy, WMEvent.EventType.DESTROY); } catch (Exception ex) { LOG.error("Failed to close an old session; ignoring " + ex.getMessage()); } @@ -516,6 +532,8 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw boolean wasReturned = tezAmPool.returnSessionAsync(sessionToReturn); if (!wasReturned) { syncWork.toDestroyNoRestart.add(sessionToReturn); + } else { + recordWMEvent(sessionToReturn, WMEvent.EventType.RETURN); } break; case NOT_FOUND: @@ -563,8 +581,9 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw // We could consider delaying the move (when destination capacity is full) until there is claim in src pool. // May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long // as possible + List recordMoveEvents = new ArrayList<>(); for (MoveSession moveSession : e.moveSessions) { - handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse); + handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse, recordMoveEvents); } e.moveSessions.clear(); @@ -631,7 +650,12 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw } } - // 13. Notify tests and global async ops. + // 13. To record move events, we need to cluster fraction updates that happens at step 11. + for (WmTezSession wmTezSession : recordMoveEvents) { + recordWMEvent(wmTezSession, WMEvent.EventType.MOVE); + } + + // 14. Notify tests and global async ops. if (e.dumpStateFuture != null) { List result = new ArrayList<>(); result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool); @@ -676,8 +700,11 @@ private void dumpPoolState(PoolState ps, List set) { } } - private void handleMoveSessionOnMasterThread(MoveSession moveSession, WmThreadSyncWork syncWork, - Set poolsToRedistribute, Map toReuse) { + private void handleMoveSessionOnMasterThread(final MoveSession moveSession, + final WmThreadSyncWork syncWork, + final HashSet poolsToRedistribute, + final Map toReuse, + final List recordMoveEvents) { String destPoolName = moveSession.destPool; LOG.info("Handling move session event: {}", moveSession); if (validMove(moveSession.srcSession, destPoolName)) { @@ -692,6 +719,7 @@ private void handleMoveSessionOnMasterThread(MoveSession moveSession, WmThreadSy moveSession.srcSession, destPoolName, poolsToRedistribute); if (added != null && added) { moveSession.future.set(true); + recordMoveEvents.add(moveSession.srcSession); return; } else { LOG.error("Failed to move session: {}. Session is not added to destination.", moveSession); @@ -808,7 +836,7 @@ private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session case OK: // If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK. PoolState pool = pools.get(poolName); - SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId()); + SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId(), session.getWmContext()); // We have just removed the session from the same pool, so don't check concurrency here. pool.initializingSessions.add(sw); ListenableFuture getFuture = tezAmPool.getSessionAsync(); @@ -918,6 +946,14 @@ private void applyNewResourcePlanOnMasterThread( totalQueryParallelism += qp; } } + // TODO: in the current impl, triggers are added to RP. For tez, no pool triggers (mapping between trigger name and + // pool name) will exist which means all triggers applies to tez. For LLAP, pool triggers has to exist for attaching + // triggers to specific pools. + // For usability, + // Provide a way for triggers sharing/inheritance possibly with following modes + // ONLY - only to pool + // INHERIT - child pools inherit from parent + // GLOBAL - all pools inherit if (e.resourcePlanToApply.isSetTriggers() && e.resourcePlanToApply.isSetPoolTriggers()) { Map triggers = new HashMap<>(); for (WMTrigger trigger : e.resourcePlanToApply.getTriggers()) { @@ -1022,6 +1058,7 @@ private void queueGetRequestOnMasterThread( req.sessionToReuse.setPoolName(poolName); req.sessionToReuse.setQueueName(yarnQueue); req.sessionToReuse.setQueryId(req.queryId); + req.sessionToReuse.setWmContext(req.wmContext); pool.sessions.add(req.sessionToReuse); if (pool != oldPool) { poolsToRedistribute.add(poolName); @@ -1058,7 +1095,8 @@ private void processPoolChangesOnMasterThread( // Note that in theory, we are guaranteed to have a session waiting for us here, but // the expiration, failures, etc. may cause one to be missing pending restart. // See SessionInitContext javadoc. - SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId); + SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId, + queueReq.wmContext); ListenableFuture getFuture = tezAmPool.getSessionAsync(); Futures.addCallback(getFuture, sw); // It is possible that all the async methods returned on the same thread because the @@ -1162,7 +1200,7 @@ private Boolean checkAndAddSessionToAnotherPool( poolsToRedistribute.add(destPoolName); return true; } - LOG.error("Session {} was not not added to pool {}", session, destPoolName); + LOG.error("Session {} was not added to pool {}", session, destPoolName); return null; } @@ -1188,7 +1226,7 @@ private Boolean checkAndAddSessionToAnotherPool( return applyRpFuture; } - public Future applyMoveSessionAsync(WmTezSession srcSession, String destPoolName) { + Future applyMoveSessionAsync(WmTezSession srcSession, String destPoolName) { currentLock.lock(); MoveSession moveSession; try { @@ -1202,6 +1240,18 @@ private Boolean checkAndAddSessionToAnotherPool( return moveSession.future; } + Future applyKillSessionAsync(KillQueryContext killQueryContext) { + currentLock.lock(); + try { + syncWork.toKillQuery.put(killQueryContext.session, killQueryContext); + LOG.info("Queued session for kill: {}", killQueryContext.session); + notifyWmThreadUnderLock(); + } finally { + currentLock.unlock(); + } + return killQueryContext.killSessionFuture; + } + private final static class GetRequest { public static final Comparator ORDER_COMPARATOR = new Comparator() { @Override @@ -1215,15 +1265,18 @@ public int compare(GetRequest o1, GetRequest o2) { private final SettableFuture future; private WmTezSession sessionToReuse; private final String queryId; + private final WMContext wmContext; private GetRequest(MappingInput mappingInput, String queryId, - SettableFuture future, WmTezSession sessionToReuse, long order) { + SettableFuture future, WmTezSession sessionToReuse, long order, + final WMContext wmContext) { assert mappingInput != null; this.mappingInput = mappingInput; this.queryId = queryId; this.future = future; this.sessionToReuse = sessionToReuse; this.order = order; + this.wmContext = wmContext; } @Override @@ -1233,14 +1286,14 @@ public String toString() { } public TezSessionState getSession( - TezSessionState session, MappingInput input, HiveConf conf) throws Exception { + TezSessionState session, MappingInput input, HiveConf conf, final WMContext wmContext) throws Exception { // Note: not actually used for pool sessions; verify some things like doAs are not set. validateConfig(conf); String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); SettableFuture future = SettableFuture.create(); WmTezSession wmSession = checkSessionForReuse(session); GetRequest req = new GetRequest( - input, queryId, future, wmSession, getRequestVersion.incrementAndGet()); + input, queryId, future, wmSession, getRequestVersion.incrementAndGet(), wmContext); currentLock.lock(); try { current.getRequests.add(req); @@ -1252,7 +1305,18 @@ public TezSessionState getSession( } finally { currentLock.unlock(); } - return future.get(); + WmTezSession sessionState = future.get(); + recordWMEvent(sessionState, WMEvent.EventType.GET); + return sessionState; + } + + void recordWMEvent(final WmTezSession sessionState, final WMEvent.EventType wmEventType) { + WMContext wmContext = sessionState.getWmContext(); + if (wmContext != null) { + WMEvent wmEvent = new WMEvent(sessionState, wmEventType); + wmContext.addWMEvent(wmEvent); + LOG.info("Added WMEvent: {}", wmEvent); + } } @Override @@ -1519,7 +1583,8 @@ protected final HiveConf getConf() { return conf; } - public List getTriggerCounterNames(final TezSessionState session) { + public List getTriggerCounterNames(final TezSessionState session, + final WMContext wmContext) { if (session instanceof WmTezSession) { WmTezSession wmTezSession = (WmTezSession) session; String poolName = wmTezSession.getPoolName(); @@ -1527,10 +1592,16 @@ protected final HiveConf getConf() { if (poolState != null) { List counterNames = new ArrayList<>(); List triggers = poolState.getTriggers(); + List triggerStr = new ArrayList<>(); + List triggerNamesStr = new ArrayList<>(); if (triggers != null) { for (Trigger trigger : triggers) { counterNames.add(trigger.getExpression().getCounterLimit().getName()); + triggerStr.add(trigger.toString()); + triggerNamesStr.add(trigger.getName()); } + wmContext.setAppliedTriggers(triggerStr); + wmContext.setAppliedTriggersNames(triggerNamesStr); } return counterNames; } @@ -1671,12 +1742,15 @@ public void setTriggers(final LinkedList triggers) { private SettableFuture future; private SessionInitState state; private String cancelReason; + private WMContext wmContext; - public SessionInitContext(SettableFuture future, String poolName, String queryId) { + public SessionInitContext(SettableFuture future, String poolName, String queryId, + final WMContext wmContext) { this.state = SessionInitState.GETTING; this.future = future; this.poolName = poolName; this.queryId = queryId; + this.wmContext = wmContext; } @Override @@ -1693,6 +1767,7 @@ public void onSuccess(WmTezSession session) { session.setPoolName(poolName); session.setQueueName(yarnQueue); session.setQueryId(queryId); + session.setWmContext(wmContext); this.session = session; this.state = SessionInitState.WAITING_FOR_REGISTRY; break; @@ -1740,6 +1815,7 @@ public void onSuccess(WmTezSession session) { session.setPoolName(null); session.setClusterFraction(0f); session.setQueryId(null); + session.setWmContext(null); tezAmPool.returnSession(session); break; } @@ -1858,7 +1934,8 @@ boolean isManaged(MappingInput input) { * like the session even before we kill it, or the kill fails and the user is happily computing * away. This class is to collect and make sense of the state around all this. */ - private static final class KillQueryContext { + static final class KillQueryContext { + private SettableFuture killSessionFuture; private final String reason; private final WmTezSession session; // Note: all the fields are only modified by master thread. @@ -1868,6 +1945,7 @@ boolean isManaged(MappingInput input) { public KillQueryContext(WmTezSession session, String reason) { this.session = session; this.reason = reason; + this.killSessionFuture = SettableFuture.create(); } private void handleKillQueryCallback(boolean hasFailed) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java index 0a9fa72..a25d9e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,19 +16,21 @@ * limitations under the License. */package org.apache.hadoop.hive.ql.exec.tez; -import org.slf4j.LoggerFactory; - -import org.slf4j.Logger; - +import java.util.HashSet; import java.util.Set; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; +import org.apache.hadoop.hive.ql.wm.WMContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WorkloadManagerFederation { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManagerFederation.class); public static TezSessionState getSession(TezSessionState session, HiveConf conf, - MappingInput input, boolean isUnmanagedLlapMode, Set desiredCounters) throws Exception { + MappingInput input, boolean isUnmanagedLlapMode, final WMContext wmContext) throws Exception { + Set desiredCounters = new HashSet<>(); // 1. If WM is not present just go to unmanaged. WorkloadManager wm = WorkloadManager.getInstance(); if (wm == null) { @@ -47,8 +49,10 @@ public static TezSessionState getSession(TezSessionState session, HiveConf conf, try { // Note: this may just block to wait for a session based on parallelism. LOG.info("Getting a WM session for " + input); - TezSessionState result = wm.getSession(session, input, conf); - desiredCounters.addAll(wm.getTriggerCounterNames(result)); + TezSessionState result = wm.getSession(session, input, conf, wmContext); + desiredCounters.addAll(wm.getTriggerCounterNames(result, wmContext)); + wmContext.setDesiredCounters(desiredCounters); + result.setWmContext(wmContext); return result; } catch (WorkloadManager.NoPoolMappingException ex) { LOG.info("NoPoolMappingException thrown. Getting an un-managed session.."); @@ -57,7 +61,7 @@ public static TezSessionState getSession(TezSessionState session, HiveConf conf, } private static TezSessionState getUnmanagedSession( - TezSessionState session, HiveConf conf, Set desiredCounters, boolean isWorkLlapNode) throws Exception { + TezSessionState session, HiveConf conf, Set desiredCounters, boolean isWorkLlapNode) throws Exception { TezSessionPoolManager pm = TezSessionPoolManager.getInstance(); session = pm.getSession(session, conf, false, isWorkLlapNode); desiredCounters.addAll(pm.getTriggerCounterNames()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java index 5bb6bf1..8414c73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java @@ -19,6 +19,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; -interface PrintSummary { +public interface PrintSummary { void print(SessionState.LogHelper console); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 3dd4b31..c133cb7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -43,7 +43,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.wm.TimeCounterLimit; -import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.hadoop.hive.ql.wm.VertexCounterLimit; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.tez.common.counters.CounterGroup; @@ -156,7 +156,7 @@ public int monitorExecution() { boolean running = false; long checkInterval = MIN_CHECK_INTERVAL; - TriggerContext triggerContext = null; + WMContext wmContext = null; while (true) { try { @@ -167,12 +167,12 @@ public int monitorExecution() { status = dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS), checkInterval); TezCounters dagCounters = status.getDAGCounters(); vertexProgressMap = status.getVertexProgress(); - triggerContext = context.getTriggerContext(); - if (dagCounters != null && triggerContext != null) { - Set desiredCounters = triggerContext.getDesiredCounters(); + wmContext = context.getWmContext(); + if (dagCounters != null && wmContext != null) { + Set desiredCounters = wmContext.getDesiredCounters(); if (desiredCounters != null && !desiredCounters.isEmpty()) { Map currentCounters = getCounterValues(dagCounters, vertexProgressMap, desiredCounters, done); - triggerContext.setCurrentCounters(currentCounters); + wmContext.setCurrentCounters(currentCounters); } } DAGStatus.State state = status.getState(); @@ -234,8 +234,8 @@ public int monitorExecution() { break; } } - if (triggerContext != null && done) { - triggerContext.setQueryCompleted(true); + if (wmContext != null && done) { + wmContext.setQueryCompleted(true); } } catch (Exception e) { console.printInfo("Exception: " + e.getMessage()); @@ -263,13 +263,13 @@ public int monitorExecution() { } else { console.printInfo("Retrying..."); } - if (triggerContext != null && done) { - triggerContext.setQueryCompleted(true); + if (wmContext != null && done) { + wmContext.setQueryCompleted(true); } } finally { if (done) { - if (triggerContext != null && done) { - triggerContext.setQueryCompleted(true); + if (wmContext != null && done) { + wmContext.setQueryCompleted(true); } if (rc != 0 && status != null) { for (String diag : status.getDiagnostics()) { @@ -324,7 +324,7 @@ public int monitorExecution() { if (!done) { counterName = TimeCounterLimit.TimeCounter.ELAPSED_TIME.name(); if (desiredCounters.contains(counterName)) { - updatedCounters.put(counterName, context.getTriggerContext().getElapsedTime()); + updatedCounters.put(counterName, context.getWmContext().getElapsedTime()); } counterName = TimeCounterLimit.TimeCounter.EXECUTION_TIME.name(); @@ -351,6 +351,7 @@ private void printSummary(boolean success, Map progressMap) { new LLAPioSummary(progressMap, dagClient).print(console); new FSCountersSummary(progressMap, dagClient).print(console); } + console.printInfo(""); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecWMEventsSummaryPrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecWMEventsSummaryPrinter.java new file mode 100644 index 0000000..44099e6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecWMEventsSummaryPrinter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.hooks; + +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.exec.tez.WMEvent; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.wm.WMContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Post execution (success or failure) hook to print hive workload manager events summary. + */ +public class PostExecWMEventsSummaryPrinter implements ExecuteWithHookContext { + private static final Logger LOG = LoggerFactory.getLogger(PostExecWMEventsSummaryPrinter.class.getName()); + + @Override + public void run(HookContext hookContext) throws Exception { + assert (hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK || + hookContext.getHookType() == HookContext.HookType.ON_FAILURE_HOOK); + HiveConf conf = hookContext.getConf(); + if (!"tez".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { + return; + } + + LOG.info("Executing post execution hook to print workload manager events summary.."); + SessionState.LogHelper console = SessionState.getConsole(); + QueryPlan plan = hookContext.getQueryPlan(); + if (plan == null) { + return; + } + + List rootTasks = Utilities.getTezTasks(plan.getRootTasks()); + for (TezTask tezTask : rootTasks) { + WMContext wmContext = tezTask.getDriverContext().getCtx().getWmContext(); + if (wmContext != null) { + console.printInfo(WMContext.WM_EVENTS_TITLE, false); + for (WMEvent wmEvent : wmContext.getQueryWmEvents()) { + console.printInfo("Event: " + wmEvent.getEventType() + + " Pool: " + wmEvent.getWmTezSessionInfo().getPoolName() + + " Cluster %: " + WMContext.DECIMAL_FORMAT.format(wmEvent.getWmTezSessionInfo().getClusterPercent())); + } + } + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java deleted file mode 100644 index 16072c3..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.wm; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Some context information that are required for rule evaluation. - */ -public class TriggerContext { - private Set desiredCounters = new HashSet<>(); - private Map currentCounters = new HashMap<>(); - private String queryId; - private long queryStartTime; - private boolean queryCompleted; - - public TriggerContext(final long queryStartTime, final String queryId) { - this.queryStartTime = queryStartTime; - this.queryId = queryId; - this.queryCompleted = false; - } - - public String getQueryId() { - return queryId; - } - - public void setQueryId(final String queryId) { - this.queryId = queryId; - } - - public Set getDesiredCounters() { - return desiredCounters; - } - - public void setDesiredCounters(final Set desiredCounters) { - this.desiredCounters = desiredCounters; - } - - public Map getCurrentCounters() { - return currentCounters; - } - - public void setCurrentCounters(final Map currentCounters) { - this.currentCounters = currentCounters; - } - - public long getElapsedTime() { - return System.currentTimeMillis() - queryStartTime; - } - - public boolean isQueryCompleted() { - return queryCompleted; - } - - public void setQueryCompleted(final boolean queryCompleted) { - this.queryCompleted = queryCompleted; - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/WMContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/WMContext.java new file mode 100644 index 0000000..6d6ae99 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/WMContext.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR; + +import java.io.IOException; +import java.text.DecimalFormat; +import java.time.Instant; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.management.MXBean; + +import org.apache.hadoop.hive.ql.exec.tez.WMEvent; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.PrintSummary; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Some context information that are required for rule evaluation. + */ +@MXBean +public class WMContext implements PrintSummary { + private static final Logger LOG = LoggerFactory.getLogger(WMContext.class); + @JsonProperty("queryId") + private String queryId; + @JsonProperty("queryStartTime") + private long queryStartTime; + @JsonProperty("queryEndTime") + private long queryEndTime; + @JsonProperty("queryCompleted") + private boolean queryCompleted; + @JsonProperty("queryWmEvents") + private List queryWmEvents = new LinkedList<>(); + @JsonProperty("appliedTriggers") // TODO: jsonize trigger objects for json printing + private List appliedTriggers = new LinkedList<>(); + @JsonProperty("appliedTriggersNames") + private List appliedTriggersNames = new LinkedList<>(); + @JsonProperty("desiredCounters") + private Set desiredCounters = new HashSet<>(); + @JsonProperty("currentCounters") + private Map currentCounters = new HashMap<>(); + + public WMContext(final long queryStartTime, final String queryId) { + this.queryStartTime = queryStartTime; + this.queryId = queryId; + this.queryCompleted = false; + } + + public List getAppliedTriggers() { + return appliedTriggers; + } + + public void setAppliedTriggers(final List appliedTriggers) { + this.appliedTriggers = appliedTriggers; + } + + public List getAppliedTriggersNames() { + return appliedTriggersNames; + } + + public void setAppliedTriggersNames(final List appliedTriggersNames) { + this.appliedTriggersNames = appliedTriggersNames; + } + + public String getQueryId() { + return queryId; + } + + public void setQueryId(final String queryId) { + this.queryId = queryId; + } + + public Set getDesiredCounters() { + return desiredCounters; + } + + public void setDesiredCounters(final Set desiredCounters) { + this.desiredCounters = desiredCounters; + } + + public Map getCurrentCounters() { + return currentCounters; + } + + public void setCurrentCounters(final Map currentCounters) { + this.currentCounters = currentCounters; + } + + public long getElapsedTime() { + return System.currentTimeMillis() - queryStartTime; + } + + public boolean isQueryCompleted() { + return queryCompleted; + } + + public void setQueryCompleted(final boolean queryCompleted) { + this.queryCompleted = queryCompleted; + this.queryEndTime = System.currentTimeMillis(); + } + + public void addWMEvent(WMEvent wmEvent) { + queryWmEvents.add(wmEvent); + } + + public long getQueryStartTime() { + return queryStartTime; + } + + public long getQueryEndTime() { + return queryEndTime; + } + + public List getQueryWmEvents() { + return queryWmEvents; + } + + private static final String WM_EVENTS_HEADER_FORMAT = "%24s %7s %36s %9s %10s"; + public static final String WM_EVENTS_TITLE = "Workload Manager Events Summary"; + private static final String WM_EVENTS_TABLE_HEADER = String.format(WM_EVENTS_HEADER_FORMAT, + "EVENT_TIMESTAMP", "EVENT", "SESSIONID", "CLUSTER %", "POOL"); + public static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#0.00"); + + @Override + public void print(final SessionState.LogHelper console) { + boolean first = false; + console.printInfo(""); + console.printInfo(WM_EVENTS_TITLE); + + console.printInfo(""); + console.printInfo("QueryId: " + queryId); + console.printInfo("Triggers: " + appliedTriggersNames); + for (WMEvent wmEvent : queryWmEvents) { + if (!first) { + console.printInfo(SEPARATOR); + console.printInfo(WM_EVENTS_TABLE_HEADER); + console.printInfo(SEPARATOR); + first = true; + } + WMEvent.WmTezSessionInfo wmTezSessionInfo = wmEvent.getWmTezSessionInfo(); + String row = String.format(WM_EVENTS_HEADER_FORMAT, Instant.ofEpochMilli(wmEvent.getEventTimestamp()).toString(), + wmEvent.getEventType(), + wmTezSessionInfo.getSessionId(), + DECIMAL_FORMAT.format(wmTezSessionInfo.getClusterPercent()), + wmTezSessionInfo.getPoolName()); + console.printInfo(row); + } + console.printInfo(SEPARATOR); + console.printInfo(""); + } + + // TODO: expose all WMContext's via /jmx to use in UI + public String printJson(final SessionState.LogHelper console) { + try { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); + // serialize json based on field annotations only + objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker() + .withGetterVisibility(JsonAutoDetect.Visibility.NONE) + .withSetterVisibility(JsonAutoDetect.Visibility.NONE)); + String wmContextJson = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(this); + console.printInfo(""); + console.printInfo(WM_EVENTS_TITLE); + console.printInfo(SEPARATOR); + console.printInfo(wmContextJson); + console.printInfo(SEPARATOR); + console.printInfo(""); + } catch (IOException e) { + LOG.warn("Unable to serialize WMContext to json.", e); + } + return null; + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 78df962..3b773cf 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.WMContext; import org.apache.tez.dag.api.TezConfiguration; import org.junit.Test; import org.slf4j.Logger; @@ -86,7 +87,7 @@ public void run() { cdl.countDown(); } try { - session.set((WmTezSession) wm.getSession(old, new MappingInput(userName, null), conf)); + session.set((WmTezSession) wm.getSession(old, new MappingInput(userName, null), conf, null)); } catch (Throwable e) { error.compareAndSet(null, e); } @@ -186,9 +187,10 @@ protected WmTezSession createSessionObject(String sessionId, HiveConf conf) { @Override public TezSessionState getSession( - TezSessionState session, MappingInput input, HiveConf conf) throws Exception { + TezSessionState session, MappingInput input, HiveConf conf, + final WMContext wmContext) throws Exception { // We want to wait for the iteration to finish and set the cluster fraction. - TezSessionState state = super.getSession(session, input, conf); + TezSessionState state = super.getSession(session, input, conf, null); ensureWm(); return state; } @@ -227,17 +229,17 @@ public void testReuse() throws Exception { TezSessionState nonPool = mock(TezSessionState.class); when(nonPool.getConf()).thenReturn(conf); doNothing().when(nonPool).close(anyBoolean()); - TezSessionState session = wm.getSession(nonPool, new MappingInput("user", null), conf); + TezSessionState session = wm.getSession(nonPool, new MappingInput("user", null), conf, null); verify(nonPool).close(anyBoolean()); assertNotSame(nonPool, session); session.returnToSessionManager(); TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class); when(diffPool.getConf()).thenReturn(conf); doNothing().when(diffPool).returnToSessionManager(); - session = wm.getSession(diffPool, new MappingInput("user", null), conf); + session = wm.getSession(diffPool, new MappingInput("user", null), conf, null); verify(diffPool).returnToSessionManager(); assertNotSame(diffPool, session); - TezSessionState session2 = wm.getSession(session, new MappingInput("user", null), conf); + TezSessionState session2 = wm.getSession(session, new MappingInput("user", null), conf, null); assertSame(session, session2); } @@ -249,11 +251,11 @@ public void testQueueName() throws Exception { wm.start(); // The queue should be ignored. conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); - TezSessionState session = wm.getSession(null, new MappingInput("user", null), conf); + TezSessionState session = wm.getSession(null, new MappingInput("user", null), conf, null); assertEquals("test", session.getQueueName()); assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); session.setQueueName("test2"); - session = wm.getSession(session, new MappingInput("user", null), conf); + session = wm.getSession(session, new MappingInput("user", null), conf, null); assertEquals("test", session.getQueueName()); } @@ -269,7 +271,7 @@ public void testReopen() throws Exception { WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); WmTezSession session = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf); + null, new MappingInput("user", null), conf, null); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); WmTezSession session2 = (WmTezSession) session.reopen(conf, null); @@ -287,10 +289,10 @@ public void testDestroyAndReturn() throws Exception { MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); wm.start(); - WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); + WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); - WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); + WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); assertEquals(0.5, session.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); @@ -301,7 +303,7 @@ public void testDestroyAndReturn() throws Exception { qam.assertWasCalledAndReset(); // We never lose pool session, so we should still be able to get. - session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); + session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); session.returnToSessionManager(); assertEquals(1.0, session2.getClusterFraction(), EPSILON); assertEquals(0.0, session.getClusterFraction(), EPSILON); @@ -322,20 +324,20 @@ public void testClusterFractions() throws Exception { assertEquals(5, wm.getNumSessions()); // Get all the 5 sessions; validate cluster fractions. WmTezSession session05of06 = (WmTezSession) wm.getSession( - null, new MappingInput("p1", null), conf); + null, new MappingInput("p1", null), conf, null); assertEquals(0.3, session05of06.getClusterFraction(), EPSILON); WmTezSession session03of06 = (WmTezSession) wm.getSession( - null, new MappingInput("p2", null), conf); + null, new MappingInput("p2", null), conf, null); assertEquals(0.18, session03of06.getClusterFraction(), EPSILON); WmTezSession session03of06_2 = (WmTezSession) wm.getSession( - null, new MappingInput("p2", null), conf); + null, new MappingInput("p2", null), conf, null); assertEquals(0.09, session03of06.getClusterFraction(), EPSILON); assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON); WmTezSession session02of06 = (WmTezSession) wm.getSession( - null,new MappingInput("r1", null), conf); + null,new MappingInput("r1", null), conf, null); assertEquals(0.12, session02of06.getClusterFraction(), EPSILON); WmTezSession session04 = (WmTezSession) wm.getSession( - null, new MappingInput("r2", null), conf); + null, new MappingInput("r2", null), conf, null); assertEquals(0.4, session04.getClusterFraction(), EPSILON); session05of06.returnToSessionManager(); session03of06.returnToSessionManager(); @@ -367,7 +369,7 @@ public void testMappings() throws Exception { private static void verifyMapping( WorkloadManager wm, HiveConf conf, MappingInput mi, String result) throws Exception { - WmTezSession session = (WmTezSession) wm.getSession(null, mi, conf); + WmTezSession session = (WmTezSession) wm.getSession(null, mi, conf, null); assertEquals(result, session.getPoolName()); session.returnToSessionManager(); } @@ -381,9 +383,9 @@ public void testQueueing() throws Exception { plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null); final AtomicReference sessionA3 = new AtomicReference<>(), sessionA4 = new AtomicReference<>(); final AtomicReference error = new AtomicReference<>(); @@ -397,7 +399,7 @@ public void testQueueing() throws Exception { assertNull(sessionA4.get()); checkError(error); // While threads are blocked on A, we should still be able to get and return a B session. - WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf); + WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null); sessionB1.returnToSessionManager(); sessionB2.returnToSessionManager(); assertNull(sessionA3.get()); @@ -425,8 +427,8 @@ public void testClusterChange() throws Exception { plan.getPlan().setDefaultPoolPath("A"); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); assertEquals(0.5, session1.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); @@ -448,19 +450,19 @@ public void testReuseWithQueueing() throws Exception { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); wm.start(); WmTezSession session1 = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf); + null, new MappingInput("user", null), conf, null); // First, try to reuse from the same pool - should "just work". WmTezSession session1a = (WmTezSession) wm.getSession( - session1, new MappingInput("user", null), conf); + session1, new MappingInput("user", null), conf, null); assertSame(session1, session1a); assertEquals(1.0, session1.getClusterFraction(), EPSILON); // Should still be able to get the 2nd session. WmTezSession session2 = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf); + null, new MappingInput("user", null), conf, null); // Now try to reuse with no other sessions remaining. Should still work. WmTezSession session2a = (WmTezSession) wm.getSession( - session2, new MappingInput("user", null), conf); + session2, new MappingInput("user", null), conf, null); assertSame(session2, session2a); assertEquals(0.5, session1.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); @@ -517,19 +519,19 @@ public void testReuseWithDifferentPool() throws Exception { plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); assertEquals("A", sessionA1.getPoolName()); assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON); assertEquals("A", sessionA2.getPoolName()); assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON); - WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B", null), conf); + WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B", null), conf, null); assertSame(sessionA1, sessionB1); assertEquals("B", sessionB1.getPoolName()); assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed from A. // Make sure that we can still get a session from A. - WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); assertEquals("A", sessionA3.getPoolName()); assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); @@ -549,7 +551,7 @@ public void testApplyPlanUserMapping() throws Exception { wm.start(); // One session will be running, the other will be queued in "A" - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null); assertEquals("A", sessionA1.getPoolName()); assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON); final AtomicReference sessionA2 = new AtomicReference<>(); @@ -574,7 +576,7 @@ public void testApplyPlanUserMapping() throws Exception { assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON); // The new session will also go to B now. sessionA2.get().returnToSessionManager(); - WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf); + WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null); assertEquals("B", sessionB1.getPoolName()); assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); sessionA1.returnToSessionManager(); @@ -598,11 +600,11 @@ public void testApplyPlanQpChanges() throws Exception { // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued. // Total: 5/6 running. - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf), - sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf), - sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", null), conf), - sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null), + sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null), + sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", null), conf, null), + sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", null), conf, null); final AtomicReference sessionA2 = new AtomicReference<>(), sessionD2 = new AtomicReference<>(); final AtomicReference error = new AtomicReference<>(); @@ -738,7 +740,7 @@ public void testMoveSessions() throws Exception { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 0] Map allSessionProviders = wm.getAllSessionTriggerProviders(); @@ -762,7 +764,7 @@ public void testMoveSessions() throws Exception { assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); assertEquals("B", sessionA1.getPoolName()); - WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 1] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -789,7 +791,7 @@ public void testMoveSessions() throws Exception { assertEquals("B", sessionA2.getPoolName()); assertEquals("B", sessionA1.getPoolName()); - WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 2] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -829,7 +831,7 @@ public void testMoveSessionsMultiPool() throws Exception { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 0, B.x: 0, B.y: 0, C: 0] Map allSessionProviders = wm.getAllSessionTriggerProviders(); @@ -887,7 +889,7 @@ public void testMoveSessionsMultiPool() throws Exception { assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1)); assertEquals("B.x", sessionA1.getPoolName()); - WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 0, B.x: 1, B.y: 0, C: 0] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -986,7 +988,7 @@ public void testAsyncSessionInitFailures() throws Exception { failedWait.setException(new Exception("foo")); theOnlySession.setWaitForAmRegistryFuture(failedWait); try { - TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf); + TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf, null); fail("Expected an error but got " + r); } catch (Exception ex) { // Expected. @@ -1037,7 +1039,7 @@ private SampleTezSessionState validatePoolAfterCleanup( assertEquals(0f, oldSession.getClusterFraction(), EPSILON); pool.returnSession(theOnlySession); // Make sure we can actually get a session still - parallelism/etc. should not be affected. - WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); assertEquals(sessionPoolName, result.getPoolName()); assertEquals(1f, result.getClusterFraction(), EPSILON); result.returnToSessionManager();