diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 039f991f9d..78b75c4582 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2315,6 +2315,9 @@ private void execute() throws CommandProcessorResponse { ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(), Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger, queryInfo, ctx); hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); + if (ss != null) { + hookContext.setSessionState(ss); + } hookRunner.runPreHooks(hookContext); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 2633390861..6a5e08d399 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -20,10 +20,7 @@ import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -303,12 +300,25 @@ private TezSessionState getNewSessionState(HiveConf conf, TezSessionPoolSession retTezSessionState = createAndInitSession(queueName, false, conf); if (queueName != null) { conf.set(TezConfiguration.TEZ_QUEUE_NAME, queueName); + } else { + String[] qNameList = HiveConf.getTrimmedStringsVar( + conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES); + if (qNameList.length != 0) { + int idx = 0; + if (qNameList.length > 1) { + // Choose a quename randomly from the default qname list for Tez. + idx = new Random().nextInt(qNameList.length); + } + conf.set("tez.queue.name", qNameList[idx]); + queueName = qNameList[idx]; + } } if (doOpen) { retTezSessionState.open(); LOG.info("Started a new session for queue: " + queueName + " session id: " + retTezSessionState.getSessionId()); } + retTezSessionState.setQueueName(queueName); return retTezSessionState; } diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index fa69f13965..0a9d399b42 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration; @@ -292,10 +293,12 @@ public void run() { tablesRead, tablesWritten, conf, llapId, domainId)); break; case POST_EXEC_HOOK: - fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, durations, domainId)); + fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, durations, domainId, + getTezQueueName(hookContext, conf))); break; case ON_FAILURE_HOOK: - fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, durations, domainId)); + fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, durations, domainId, + getTezQueueName(hookContext, conf))); break; default: //ignore @@ -310,6 +313,15 @@ public void run() { LOG.warn("Failed to submit to ATS for " + queryId, e); } } + private String getTezQueueName(HookContext hookContext, HiveConf conf) { + String tezQueueName = null; + if (conf.get("hive.execution.engine").equalsIgnoreCase("tez")) { + SessionState ss = hookContext.getSessionState(); + TezSessionState ts = ss != null ? ss.getTezSession() : null; + tezQueueName = ts != null ? ts.getQueueName() : null; + } + return tezQueueName; + } protected List getTablesFromEntitySet(Set entities) { List tableNames = new ArrayList(); @@ -376,7 +388,13 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), requestuser); atsEntity.addPrimaryFilter(PrimaryFilterTypes.executionmode.name(), executionMode); - atsEntity.addPrimaryFilter(PrimaryFilterTypes.queue.name(), conf.get("mapreduce.job.queuename")); + if (conf.get("hive.execution.engine").equalsIgnoreCase("tez")) { + if (!(conf.get("tez.queue.name") == null || conf.get("tez.queue.name").isEmpty())) { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.queue.name(), conf.get("tez.queue.name")); + } + } else { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.queue.name(), conf.get("mapreduce.job.queuename")); + } if (opId != null) { atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); @@ -416,7 +434,7 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla } TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, String requestuser, boolean success, - String opId, Map durations, String domainId) throws Exception { + String opId, Map durations, String domainId, String tezQueueName) throws Exception { LOG.info("Received post-hook notification for :" + queryId); TimelineEntity atsEntity = new TimelineEntity(); @@ -424,6 +442,9 @@ TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, S atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name()); atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), requestuser); + if (tezQueueName != null) { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.queue.name(), tezQueueName); + } if (opId != null) { atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); } diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java index eeb1ae8942..5edc283f1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -63,6 +64,7 @@ private String errorMessage; private Throwable exception; final private Map inputPathToContentSummary; + private SessionState sessionState; private final String ipAddress; private final String hiveInstanceAddress; private final String userName; @@ -239,4 +241,12 @@ public PerfLogger getPerfLogger() { public QueryInfo getQueryInfo() { return queryInfo; } + + public SessionState getSessionState() { + return sessionState; + } + + public void setSessionState(SessionState sessionState) { + this.sessionState = sessionState; + } }