diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 9e5fd37..c0d729b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.PrintStream; import java.io.Serializable; +import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -1729,7 +1730,8 @@ public int execute(boolean deferClose) throws CommandNeedRetryException { SessionState ss = SessionState.get(); hookContext = new HookContext(plan, queryState, ctx.getPathToCS(), ss.getUserName(), - ss.getUserIpAddress(), operationId); + ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(), + Thread.currentThread().getName(), ss.isHiveServerQuery()); hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index 7438570..f4cec16 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -17,7 +17,14 @@ */ package org.apache.hadoop.hive.ql.hooks; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -29,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration; import org.apache.hadoop.hive.ql.plan.ExplainWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -38,6 +46,7 @@ import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hive.common.util.ShutdownHookManager; +import org.json.JSONArray; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,9 +68,16 @@ private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED }; private enum OtherInfoTypes { - QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME, VERSION + QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME, VERSION, + CLIENT_IP_ADDRESS, HIVE_ADDRESS, HIVE_INSTANCE_TYPE, HIVE_QUERY_NAME, CONF, }; - private enum PrimaryFilterTypes { user, requestuser, operationid }; + private enum ExecutionMode { + MR, TEZ, LLAP, SPARK, NONE + }; + private enum PrimaryFilterTypes { + user, requestuser, operationid, queryname, executionmode, tablesread, tableswritten + }; + private static final int WAIT_TIME = 3; public ATSHook() { @@ -138,9 +154,22 @@ public void run() { explain.initialize(queryState, plan, null, null); String query = plan.getQueryStr(); JSONObject explainPlan = explain.getJSONPlan(null, work); - String logID = conf.getLogIdVar(SessionState.get().getSessionId()); - fireAndForget(conf, createPreHookEvent(queryId, query, explainPlan, queryStartTime, - user, requestuser, numMrJobs, numTezJobs, opId, logID)); + String logID = conf.getLogIdVar(hookContext.getSessionId()); + List tablesRead = getTablesFromEntitySet(hookContext.getInputs()); + List tablesWritten = getTablesFromEntitySet(hookContext.getOutputs()); + String queryName = conf.getVar(HiveConf.ConfVars.HIVEQUERYID); + String executionMode = getExecutionMode(plan).name(); + String hiveInstanceAddress = hookContext.getHiveInstanceAddress(); + if (hiveInstanceAddress == null) { + hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress(); + } + String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI"; + fireAndForget(conf, + createPreHookEvent(queryId, query, explainPlan, queryStartTime, + user, requestuser, numMrJobs, numTezJobs, opId, + hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType, + logID, hookContext.getThreadId(), queryName, executionMode, + tablesRead, tablesWritten, conf)); break; case POST_EXEC_HOOK: fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId)); @@ -159,13 +188,50 @@ public void run() { }); } + protected List getTablesFromEntitySet(Set entities) { + List tableNames = new ArrayList(); + for (Entity entity : entities) { + if (entity.getType() == Entity.Type.TABLE) { + tableNames.add(entity.getTable().getDbName() + "." + entity.getTable().getTableName()); + } + } + return tableNames; + } + + protected ExecutionMode getExecutionMode(QueryPlan plan) { + int numMRJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); + int numSparkJobs = Utilities.getSparkTasks(plan.getRootTasks()).size(); + int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size(); + + ExecutionMode mode = ExecutionMode.MR; + if (0 == (numMRJobs + numSparkJobs + numTezJobs)) { + mode = ExecutionMode.NONE; + } else if (numSparkJobs > 0) { + return ExecutionMode.SPARK; + } else if (numTezJobs > 0) { + mode = ExecutionMode.TEZ; + // Need to go in and check if any of the tasks is running in LLAP mode. + for (TezTask tezTask : Utilities.getTezTasks(plan.getRootTasks())) { + if (tezTask.getWork().getLlapMode()) { + mode = ExecutionMode.LLAP; + break; + } + } + } + + return mode; + } + TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan, long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId, - String logID) throws Exception { + String clientIpAddress, String hiveInstanceAddress, String hiveInstanceType, + String logID, String threadId, String queryName, String executionMode, + List tablesRead, List tablesWritten, HiveConf conf) throws Exception { JSONObject queryObj = new JSONObject(new LinkedHashMap<>()); queryObj.put("queryText", query); queryObj.put("queryPlan", explainPlan); + queryObj.put("queryName", queryName); LOG.info("Received pre-hook notification for :" + queryId); if (LOG.isDebugEnabled()) { @@ -173,16 +239,32 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla LOG.debug("Operation id: <" + opId + ">"); } + conf.stripHiddenConfigurations(conf); + Map confMap = new HashMap(); + for (Map.Entry setting : conf) { + confMap.put(setting.getKey(), setting.getValue()); + } + JSONObject confObj = new JSONObject((Map) confMap); + TimelineEntity atsEntity = new TimelineEntity(); atsEntity.setEntityId(queryId); atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name()); atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), requestuser); + atsEntity.addPrimaryFilter(PrimaryFilterTypes.queryname.name(), queryName); + atsEntity.addPrimaryFilter(PrimaryFilterTypes.executionmode.name(), executionMode); if (opId != null) { atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); } + for (String tabName : tablesRead) { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.tablesread.name(), tabName); + } + for (String tabName : tablesWritten) { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.tableswritten.name(), tabName); + } + TimelineEvent startEvt = new TimelineEvent(); startEvt.setEventType(EventTypes.QUERY_SUBMITTED.name()); startEvt.setTimestamp(startTime); @@ -192,8 +274,15 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla atsEntity.addOtherInfo(OtherInfoTypes.TEZ.name(), numTezJobs > 0); atsEntity.addOtherInfo(OtherInfoTypes.MAPRED.name(), numMrJobs > 0); atsEntity.addOtherInfo(OtherInfoTypes.INVOKER_INFO.name(), logID); - atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), Thread.currentThread().getName()); + atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), threadId); atsEntity.addOtherInfo(OtherInfoTypes.VERSION.name(), VERSION); + if (clientIpAddress != null) { + atsEntity.addOtherInfo(OtherInfoTypes.CLIENT_IP_ADDRESS.name(), clientIpAddress); + } + atsEntity.addOtherInfo(OtherInfoTypes.HIVE_ADDRESS.name(), hiveInstanceAddress); + atsEntity.addOtherInfo(OtherInfoTypes.HIVE_INSTANCE_TYPE.name(), hiveInstanceType); + atsEntity.addOtherInfo(OtherInfoTypes.CONF.name(), confObj.toString()); + return atsEntity; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java index 8db0124..6c4b5ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java @@ -57,14 +57,18 @@ private Throwable exception; final private Map inputPathToContentSummary; private final String ipAddress; + private final String hiveInstanceAddress; private final String userName; // unique id set for operation when run from HS2, base64 encoded value of // TExecuteStatementResp.TOperationHandle.THandleIdentifier.guid private final String operationId; + private final String sessionId; + private final String threadId; + private boolean isHiveServerQuery; public HookContext(QueryPlan queryPlan, QueryState queryState, - Map inputPathToContentSummary, String userName, String ipAddress, - String operationId) throws Exception { + Map inputPathToContentSummary, String userName, String ipAddress, String hiveInstanceAddress, + String operationId, String sessionId, String threadId, boolean isHiveServerQuery) throws Exception { this.queryPlan = queryPlan; this.queryState = queryState; this.conf = queryState.getConf(); @@ -81,7 +85,11 @@ public HookContext(QueryPlan queryPlan, QueryState queryState, } this.userName = userName; this.ipAddress = ipAddress; + this.hiveInstanceAddress = hiveInstanceAddress; this.operationId = operationId; + this.sessionId = sessionId; + this.threadId = threadId; + this.isHiveServerQuery = isHiveServerQuery; } public QueryPlan getQueryPlan() { @@ -168,6 +176,10 @@ public String getIpAddress() { return this.ipAddress; } + public String getHiveInstanceAddress() { + return hiveInstanceAddress; + } + public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; } @@ -199,4 +211,21 @@ public String getOperationId() { public QueryState getQueryState() { return queryState; } + + public String getSessionId() { + return sessionId; + } + + public String getThreadId() { + return threadId; + } + + public boolean isHiveServerQuery() { + return isHiveServerQuery; + } + + public void setHiveServerQuery(boolean isHiveServerQuery) { + this.isHiveServerQuery = isHiveServerQuery; + } + }