diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 757c60c..79e95cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.PrintStream; import java.io.Serializable; +import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -1731,7 +1732,8 @@ public int execute(boolean deferClose) throws CommandNeedRetryException { SessionState ss = SessionState.get(); hookContext = new HookContext(plan, queryState, ctx.getPathToCS(), ss.getUserName(), - ss.getUserIpAddress(), operationId, ss.getSessionId()); + ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(), + Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger); hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index 8ee5c04..bccd3e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -17,7 +17,14 @@ */ package org.apache.hadoop.hive.ql.hooks; +import java.io.Serializable; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -29,9 +36,10 @@ import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration; import org.apache.hadoop.hive.ql.plan.ExplainWork; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; @@ -59,11 +67,23 @@ private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED }; private enum OtherInfoTypes { - QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME, VERSION + QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME, VERSION, + CLIENT_IP_ADDRESS, HIVE_ADDRESS, HIVE_INSTANCE_TYPE, HIVE_QUERY_NAME, CONF, PERF, }; - private enum PrimaryFilterTypes { user, requestuser, operationid }; + private enum ExecutionMode { + MR, TEZ, LLAP, SPARK, NONE + }; + private enum PrimaryFilterTypes { + user, requestuser, operationid, queryname, executionmode, tablesread, tableswritten, queue + }; + private static final int WAIT_TIME = 3; + private static final String[] PERF_KEYS = new String[] { + PerfLogger.PARSE, PerfLogger.COMPILE, PerfLogger.ANALYZE, PerfLogger.OPTIMIZER, + PerfLogger.GET_SPLITS, PerfLogger.RUN_TASKS, + }; + public ATSHook() { synchronized(LOCK) { if (executor == null) { @@ -139,14 +159,27 @@ public void run() { String query = plan.getQueryStr(); JSONObject explainPlan = explain.getJSONPlan(null, work); String logID = conf.getLogIdVar(hookContext.getSessionId()); - fireAndForget(conf, createPreHookEvent(queryId, query, explainPlan, queryStartTime, - user, requestuser, numMrJobs, numTezJobs, opId, logID)); + List tablesRead = getTablesFromEntitySet(hookContext.getInputs()); + List tablesWritten = getTablesFromEntitySet(hookContext.getOutputs()); + String queryName = conf.getVar(HiveConf.ConfVars.HIVEQUERYID); + String executionMode = getExecutionMode(plan).name(); + String hiveInstanceAddress = hookContext.getHiveInstanceAddress(); + if (hiveInstanceAddress == null) { + hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress(); + } + String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI"; + fireAndForget(conf, + createPreHookEvent(queryId, query, explainPlan, queryStartTime, + user, requestuser, numMrJobs, numTezJobs, opId, + hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType, + logID, hookContext.getThreadId(), queryName, executionMode, + tablesRead, tablesWritten, conf)); break; case POST_EXEC_HOOK: - fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId)); + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, hookContext.getPerfLogger())); break; case ON_FAILURE_HOOK: - fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId)); + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, hookContext.getPerfLogger())); break; default: //ignore @@ -159,13 +192,50 @@ public void run() { }); } + protected List getTablesFromEntitySet(Set entities) { + List tableNames = new ArrayList(); + for (Entity entity : entities) { + if (entity.getType() == Entity.Type.TABLE) { + tableNames.add(entity.getTable().getDbName() + "." + entity.getTable().getTableName()); + } + } + return tableNames; + } + + protected ExecutionMode getExecutionMode(QueryPlan plan) { + int numMRJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); + int numSparkJobs = Utilities.getSparkTasks(plan.getRootTasks()).size(); + int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size(); + + ExecutionMode mode = ExecutionMode.MR; + if (0 == (numMRJobs + numSparkJobs + numTezJobs)) { + mode = ExecutionMode.NONE; + } else if (numSparkJobs > 0) { + return ExecutionMode.SPARK; + } else if (numTezJobs > 0) { + mode = ExecutionMode.TEZ; + // Need to go in and check if any of the tasks is running in LLAP mode. + for (TezTask tezTask : Utilities.getTezTasks(plan.getRootTasks())) { + if (tezTask.getWork().getLlapMode()) { + mode = ExecutionMode.LLAP; + break; + } + } + } + + return mode; + } + TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan, long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId, - String logID) throws Exception { + String clientIpAddress, String hiveInstanceAddress, String hiveInstanceType, + String logID, String threadId, String queryName, String executionMode, + List tablesRead, List tablesWritten, HiveConf conf) throws Exception { JSONObject queryObj = new JSONObject(new LinkedHashMap<>()); queryObj.put("queryText", query); queryObj.put("queryPlan", explainPlan); + queryObj.put("queryName", queryName); LOG.info("Received pre-hook notification for :" + queryId); if (LOG.isDebugEnabled()) { @@ -173,16 +243,33 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla LOG.debug("Operation id: <" + opId + ">"); } + conf.stripHiddenConfigurations(conf); + Map confMap = new HashMap(); + for (Map.Entry setting : conf) { + confMap.put(setting.getKey(), setting.getValue()); + } + JSONObject confObj = new JSONObject((Map) confMap); + TimelineEntity atsEntity = new TimelineEntity(); atsEntity.setEntityId(queryId); atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name()); atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), requestuser); + atsEntity.addPrimaryFilter(PrimaryFilterTypes.queryname.name(), queryName); + atsEntity.addPrimaryFilter(PrimaryFilterTypes.executionmode.name(), executionMode); + atsEntity.addPrimaryFilter(PrimaryFilterTypes.queue.name(), conf.get("mapreduce.job.queuename")); if (opId != null) { atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); } + for (String tabName : tablesRead) { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.tablesread.name(), tabName); + } + for (String tabName : tablesWritten) { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.tableswritten.name(), tabName); + } + TimelineEvent startEvt = new TimelineEvent(); startEvt.setEventType(EventTypes.QUERY_SUBMITTED.name()); startEvt.setTimestamp(startTime); @@ -192,13 +279,20 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla atsEntity.addOtherInfo(OtherInfoTypes.TEZ.name(), numTezJobs > 0); atsEntity.addOtherInfo(OtherInfoTypes.MAPRED.name(), numMrJobs > 0); atsEntity.addOtherInfo(OtherInfoTypes.INVOKER_INFO.name(), logID); - atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), Thread.currentThread().getName()); + atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), threadId); atsEntity.addOtherInfo(OtherInfoTypes.VERSION.name(), VERSION); + if (clientIpAddress != null) { + atsEntity.addOtherInfo(OtherInfoTypes.CLIENT_IP_ADDRESS.name(), clientIpAddress); + } + atsEntity.addOtherInfo(OtherInfoTypes.HIVE_ADDRESS.name(), hiveInstanceAddress); + atsEntity.addOtherInfo(OtherInfoTypes.HIVE_INSTANCE_TYPE.name(), hiveInstanceType); + atsEntity.addOtherInfo(OtherInfoTypes.CONF.name(), confObj.toString()); + return atsEntity; } TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, String requestuser, boolean success, - String opId) { + String opId, PerfLogger perfLogger) throws Exception { LOG.info("Received post-hook notification for :" + queryId); TimelineEntity atsEntity = new TimelineEntity(); @@ -217,6 +311,13 @@ TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, S atsEntity.addOtherInfo(OtherInfoTypes.STATUS.name(), success); + // Perf times + JSONObject perfObj = new JSONObject(new LinkedHashMap<>()); + for (String key : perfLogger.getEndTimes().keySet()) { + perfObj.put(key, perfLogger.getDuration(key)); + } + atsEntity.addOtherInfo(OtherInfoTypes.PERF.name(), perfObj.toString()); + return atsEntity; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java index 3b4cc2c..c94100c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.Utils; @@ -57,15 +58,19 @@ private Throwable exception; final private Map inputPathToContentSummary; private final String ipAddress; + private final String hiveInstanceAddress; private final String userName; // unique id set for operation when run from HS2, base64 encoded value of // TExecuteStatementResp.TOperationHandle.THandleIdentifier.guid private final String operationId; private final String sessionId; + private final String threadId; + private boolean isHiveServerQuery; + private PerfLogger perfLogger; public HookContext(QueryPlan queryPlan, QueryState queryState, - Map inputPathToContentSummary, String userName, String ipAddress, - String operationId, String sessionId) throws Exception { + Map inputPathToContentSummary, String userName, String ipAddress, String hiveInstanceAddress, + String operationId, String sessionId, String threadId, boolean isHiveServerQuery, PerfLogger perfLogger) throws Exception { this.queryPlan = queryPlan; this.queryState = queryState; this.conf = queryState.getConf(); @@ -82,8 +87,12 @@ public HookContext(QueryPlan queryPlan, QueryState queryState, } this.userName = userName; this.ipAddress = ipAddress; + this.hiveInstanceAddress = hiveInstanceAddress; this.operationId = operationId; this.sessionId = sessionId; + this.threadId = threadId; + this.isHiveServerQuery = isHiveServerQuery; + this.perfLogger = perfLogger; } public QueryPlan getQueryPlan() { @@ -170,6 +179,10 @@ public String getIpAddress() { return this.ipAddress; } + public String getHiveInstanceAddress() { + return hiveInstanceAddress; + } + public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; } @@ -205,4 +218,25 @@ public QueryState getQueryState() { public String getSessionId() { return sessionId; } + + public String getThreadId() { + return threadId; + } + + public boolean isHiveServerQuery() { + return isHiveServerQuery; + } + + public void setHiveServerQuery(boolean isHiveServerQuery) { + this.isHiveServerQuery = isHiveServerQuery; + } + + public PerfLogger getPerfLogger() { + return perfLogger; + } + + public void setPerfLogger(PerfLogger perfLogger) { + this.perfLogger = perfLogger; + } + }