diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 3f91c05..7c29612 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -64,8 +64,8 @@ import org.apache.hadoop.hive.ql.hooks.PostExecute; import org.apache.hadoop.hive.ql.hooks.PreExecute; import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.hooks.Redactor; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; @@ -153,6 +153,9 @@ private String userName; + // HS2 operation handle guid string + private String operationId; + private boolean checkConcurrency() { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { @@ -1328,7 +1331,8 @@ public int execute() throws CommandNeedRetryException { resStream = null; SessionState ss = SessionState.get(); - HookContext hookContext = new HookContext(plan, conf, ctx.getPathToCS(), ss.getUserName(), ss.getUserIpAddress()); + HookContext hookContext = new HookContext(plan, conf, ctx.getPathToCS(), ss.getUserName(), + ss.getUserIpAddress(), operationId); hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) { @@ -1784,4 +1788,12 @@ public String getErrorMsg() { return errorMessage; } + /** + * Set the HS2 operation handle's guid string + * @param opId base64 encoded guid string + */ + public void setOperationId(String opId) { + this.operationId = opId; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index 4db825b..513a2fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -17,11 +17,9 @@ */ package org.apache.hadoop.hive.ql.hooks; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -37,10 +35,9 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; - import org.json.JSONObject; -import static org.apache.hadoop.hive.ql.hooks.HookContext.HookType.*; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * ATSHook sends query + plan info to Yarn App Timeline Server. To enable (hadoop 2.4 and up) set @@ -55,7 +52,7 @@ private enum EntityTypes { HIVE_QUERY_ID }; private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED }; private enum OtherInfoTypes { QUERY, STATUS, TEZ, MAPRED }; - private enum PrimaryFilterTypes { user }; + private enum PrimaryFilterTypes { user, operationid }; private static final int WAIT_TIME = 3; public ATSHook() { @@ -101,6 +98,7 @@ public void run() { return; } String queryId = plan.getQueryId(); + String opId = hookContext.getOperationId(); long queryStartTime = plan.getQueryStartTime(); String user = hookContext.getUgi().getUserName(); int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); @@ -119,13 +117,13 @@ public void run() { JSONObject explainPlan = explain.getJSONPlan(null, null, rootTasks, plan.getFetchTask(), true, false, false); fireAndForget(conf, createPreHookEvent(queryId, query, - explainPlan, queryStartTime, user, numMrJobs, numTezJobs)); + explainPlan, queryStartTime, user, numMrJobs, numTezJobs, opId)); break; case POST_EXEC_HOOK: - fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, true)); + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, true, opId)); break; case ON_FAILURE_HOOK: - fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, false)); + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, false, opId)); break; default: //ignore @@ -139,7 +137,7 @@ public void run() { } TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan, - long startTime, String user, int numMrJobs, int numTezJobs) throws Exception { + long startTime, String user, int numMrJobs, int numTezJobs, String opId) throws Exception { JSONObject queryObj = new JSONObject(); queryObj.put("queryText", query); @@ -148,12 +146,16 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla LOG.info("Received pre-hook notification for :" + queryId); if (LOG.isDebugEnabled()) { LOG.debug("Otherinfo: " + queryObj.toString()); + LOG.debug("Operation id: <" + opId + ">"); } TimelineEntity atsEntity = new TimelineEntity(); atsEntity.setEntityId(queryId); atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name()); atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); + if (opId != null) { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); + } TimelineEvent startEvt = new TimelineEvent(); startEvt.setEventType(EventTypes.QUERY_SUBMITTED.name()); @@ -166,13 +168,17 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla return atsEntity; } - TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, boolean success) { + TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, boolean success, + String opId) { LOG.info("Received post-hook notification for :" + queryId); TimelineEntity atsEntity = new TimelineEntity(); atsEntity.setEntityId(queryId); atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name()); atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); + if (opId != null) { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); + } TimelineEvent stopEvt = new TimelineEvent(); stopEvt.setEventType(EventTypes.QUERY_COMPLETED.name()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java index 5a33655..0c6a938 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; /** @@ -53,9 +52,13 @@ final private Map inputPathToContentSummary; private final String ipAddress; private final String userName; + // unique id set for operation when run from HS2, base64 encoded value of + // TExecuteStatementResp.TOperationHandle.THandleIdentifier.guid + private final String operationId; public HookContext(QueryPlan queryPlan, HiveConf conf, - Map inputPathToContentSummary, String userName, String ipAddress) throws Exception { + Map inputPathToContentSummary, String userName, String ipAddress, + String operationId) throws Exception { this.queryPlan = queryPlan; this.conf = conf; this.inputPathToContentSummary = inputPathToContentSummary; @@ -67,8 +70,9 @@ public HookContext(QueryPlan queryPlan, HiveConf conf, if(SessionState.get() != null){ linfo = SessionState.get().getLineageState().getLineageInfo(); } - this.ipAddress = ipAddress; this.userName = userName; + this.ipAddress = ipAddress; + this.operationId = operationId; } public QueryPlan getQueryPlan() { @@ -154,4 +158,8 @@ public String getOperationName() { public String getUserName() { return this.userName; } + + public String getOperationId() { + return operationId; + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 684ed7c..33ee16b 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; @@ -92,6 +93,14 @@ public void prepare(HiveConf sqlOperationConf) throws HiveSQLException { try { driver = new Driver(sqlOperationConf, getParentSession().getUserName()); + + // set the operation handle information in Driver, so that thrift API users + // can use the operation handle they receive, to lookup query information in + // Yarn ATS + String guid64 = Base64.encodeBase64URLSafeString(getHandle().getHandleIdentifier() + .toTHandleIdentifier().getGuid()).trim(); + driver.setOperationId(guid64); + // In Hive server mode, we are not able to retry in the FetchTask // case, when calling fetch queries since execute() has returned. // For now, we disable the test attempts.