diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 73e6c21..fc1f0c3 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3219,6 +3219,17 @@ public static String getVar(Configuration conf, ConfVars var, String defaultVal) return conf.get(var.varname, defaultVal); } + public String getLogIdVar() { + String retval = getVar(ConfVars.HIVE_LOG_TRACE_ID); + + if (retval.length() > LOG_PREFIX_LENGTH) { + l4j.warn("The original log id prefix is " + retval + " has been truncated to " + + retval.substring(0, LOG_PREFIX_LENGTH - 1)); + retval = retval.substring(0, LOG_PREFIX_LENGTH - 1); + } + return retval; + } + public String getLogIdVar(String defaultValue) { String retval = getVar(ConfVars.HIVE_LOG_TRACE_ID); if (retval.equals("")) { diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 7147a9a..315472a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -113,6 +113,8 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; @@ -416,7 +418,16 @@ public int compile(String command, boolean resetTaskIds) { SessionState.get().setupQueryCurrentTimestamp(); + String originalCallerContext = ""; + HadoopShims shim = ShimLoader.getHadoopShims(); try { + // we set the hadoop caller context to the query id as soon as we have one. + // initially, the caller context is the session id (when creating temp directories) + originalCallerContext = shim.getHadoopCallerContext(); + LOG.info("We are setting the hadoop caller context from " + originalCallerContext + " to " + + queryId); + shim.setHadoopQueryContext(queryId); + // Initialize the transaction manager. This must be done before analyze is called. final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf); // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks @@ -492,7 +503,8 @@ public void run() { schema = getSchema(sem, conf); plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, - SessionState.get().getHiveOperation(), schema); + SessionState.get().getHiveOperation(), schema, SessionState.get().getSessionId(), + Thread.currentThread().getName(), HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_LOG_TRACE_ID)); conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr); @@ -565,6 +577,10 @@ public void run() { ImmutableMap compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation"); queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings); restoreSession(queryState); + // reset the caller id. + LOG.info("We are resetting the hadoop caller context to " + originalCallerContext); + shim.setHadoopCallerContext(originalCallerContext); + LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); } } @@ -1478,8 +1494,12 @@ public int execute() throws CommandNeedRetryException { String queryStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYSTRING); maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER); - + String originalCallerContext = ""; try { + LOG.info("Setting caller context to query id " + queryId); + originalCallerContext = ShimLoader.getHadoopShims().getHadoopCallerContext(); + ShimLoader.getHadoopShims().setHadoopQueryContext(queryId); + LOG.info("Executing command(queryId=" + queryId + "): " + queryStr); // compile and execute can get called from different threads in case of HS2 // so clear timing in this thread's Hive object before proceeding. @@ -1711,6 +1731,8 @@ public int execute() throws CommandNeedRetryException { + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (12); } finally { + LOG.info("Resetting the caller context to " + originalCallerContext); + ShimLoader.getHadoopShims().setHadoopCallerContext(originalCallerContext); if (SessionState.get() != null) { SessionState.get().getHiveHistory().endQuery(queryId); } diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index 9132a21..2adda35 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -106,6 +106,9 @@ private transient Long queryStartTime; private final HiveOperation operation; private Boolean autoCommitValue; + private String sessionId; + private String threadName; + private String userProvidedContext; public QueryPlan() { this.reducerTimeStatsPerJobList = new ArrayList(); @@ -113,7 +116,8 @@ public QueryPlan() { } public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId, - HiveOperation operation, Schema resultSchema) { + HiveOperation operation, Schema resultSchema, String sessionId, + String threadName, String userProvidedContext) { this.queryString = queryString; rootTasks = new ArrayList>(); @@ -137,6 +141,9 @@ public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, S this.operation = operation; this.autoCommitValue = sem.getAutoCommitValue(); this.resultSchema = resultSchema; + this.setSessionId(sessionId); + this.setThreadName(threadName); + this.setUserProvidedContext(userProvidedContext); } public String getQueryStr() { @@ -610,7 +617,6 @@ public String toThriftJSONString() throws IOException { try { q.write(oprot); } catch (TException e) { - // TODO Auto-generated catch block e.printStackTrace(); return q.toString(); } @@ -624,7 +630,6 @@ public String toBinaryString() throws IOException { try { q.write(oprot); } catch (TException e) { - // TODO Auto-generated catch block e.printStackTrace(); return q.toString(); } @@ -803,4 +808,28 @@ public HiveOperation getOperation() { public Boolean getAutoCommitValue() { return autoCommitValue; } + + public String getSessionId() { + return sessionId; + } + + public void setSessionId(String sessionId) { + this.sessionId = sessionId; + } + + public String getThreadName() { + return threadName; + } + + public void setThreadName(String threadName) { + this.threadName = threadName; + } + + public String getUserProvidedContext() { + return userProvidedContext; + } + + public void setUserProvidedContext(String userProvidedContext) { + this.userProvidedContext = userProvidedContext; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 6a7d035..e802309 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -217,6 +218,17 @@ public void open(HiveConf conf, Collection additionalFiles, Path scratch protected void openInternal(final HiveConf conf, Collection additionalFiles, boolean isAsync, LogHelper console, Path scratchDir) throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { + + LOG.info("Opening the session with id " + sessionId + " for thread " + + Thread.currentThread().getName() + " log trace id - " + conf.getLogIdVar() + + " query id - " + conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID); + if ((queryId == null) || (queryId.isEmpty())) { + ShimLoader.getHadoopShims().setHadoopSessionContext(sessionId); + } else { + ShimLoader.getHadoopShims().setHadoopQueryContext(queryId); + } + this.conf = conf; this.queueName = conf.get("tez.queue.name"); this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); @@ -334,6 +346,8 @@ public TezClient call() throws Exception { this.console = console; this.sessionFuture = sessionFuture; } + // reset caller context + ShimLoader.getHadoopShims().setHadoopCallerContext(""); } private TezClient startSessionAndContainers(TezClient session, HiveConf conf, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 83defea..c5129b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -176,9 +176,9 @@ public int execute(DriverContext driverContext) { if (driverContext.getCtx() == null) { boolean a = false; } - CallerContext callerContext = CallerContext.create( - "HIVE", queryPlan.getQueryId(), - "HIVE_QUERY_ID", queryPlan.getQueryStr()); + + CallerContext callerContext = CallerContext.create("HIVE", + queryPlan.getQueryId(), "HIVE_QUERY_ID", queryPlan.getQueryStr()); dag.setCallerContext(callerContext); // Add the extra resources to the dag diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index f490161..5a5b78f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -57,7 +57,7 @@ private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED }; private enum OtherInfoTypes { - QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME, VERSION + QUERY, STATUS, TEZ, MAPRED, SESSION_ID, THREAD_NAME, VERSION, LOG_TRACE_ID }; private enum PrimaryFilterTypes { user, requestuser, operationid }; private static final int WAIT_TIME = 3; @@ -139,9 +139,9 @@ public void run() { explain.initialize(conf, plan, null, null); String query = plan.getQueryStr(); JSONObject explainPlan = explain.getJSONPlan(null, work); - String logID = conf.getLogIdVar(SessionState.get().getSessionId()); fireAndForget(conf, createPreHookEvent(queryId, query, explainPlan, queryStartTime, - user, requestuser, numMrJobs, numTezJobs, opId, logID)); + user, requestuser, numMrJobs, numTezJobs, opId, plan.getSessionId(), + plan.getThreadName(), plan.getUserProvidedContext())); break; case POST_EXEC_HOOK: fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId)); @@ -162,7 +162,7 @@ public void run() { TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan, long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId, - String logID) throws Exception { + String sessionId, String threadName, String logTraceId) throws Exception { JSONObject queryObj = new JSONObject(new LinkedHashMap<>()); queryObj.put("queryText", query); @@ -192,8 +192,11 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla atsEntity.addOtherInfo(OtherInfoTypes.QUERY.name(), queryObj.toString()); atsEntity.addOtherInfo(OtherInfoTypes.TEZ.name(), numTezJobs > 0); atsEntity.addOtherInfo(OtherInfoTypes.MAPRED.name(), numMrJobs > 0); - atsEntity.addOtherInfo(OtherInfoTypes.INVOKER_INFO.name(), logID); - atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), Thread.currentThread().getName()); + atsEntity.addOtherInfo(OtherInfoTypes.SESSION_ID.name(), sessionId); + atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), threadName); + if ((logTraceId != null) && (logTraceId.equals("") == false)) { + atsEntity.addOtherInfo(OtherInfoTypes.LOG_TRACE_ID.name(), logTraceId); + } atsEntity.addOtherInfo(OtherInfoTypes.VERSION.name(), VERSION); return atsEntity; } diff --git ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java index d795324..a1c10c0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java +++ ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java @@ -281,7 +281,7 @@ private ReturnInfo parseAndAnalyze(String query, String testName) // validate the plan sem.validate(); - QueryPlan plan = new QueryPlan(query, sem, 0L, testName, null, null); + QueryPlan plan = new QueryPlan(query, sem, 0L, testName, null, null, "", "", ""); return new ReturnInfo(tree, sem, plan); } diff --git ql/src/test/queries/clientpositive/mrr.q ql/src/test/queries/clientpositive/mrr.q index 324f2b1..60fcbfa 100644 --- ql/src/test/queries/clientpositive/mrr.q +++ ql/src/test/queries/clientpositive/mrr.q @@ -19,6 +19,7 @@ set hive.auto.convert.join=true; EXPLAIN SELECT s2.key, count(distinct s2.value) as cnt FROM src s1 join src s2 on (s1.key = s2.key) GROUP BY s2.key ORDER BY cnt,s2.key; SELECT s2.key, count(distinct s2.value) as cnt FROM src s1 join src s2 on (s1.key = s2.key) GROUP BY s2.key ORDER BY cnt,s2.key; +set hive.log.trace.id=mrrTest; set hive.auto.convert.join=false; -- query with multiple branches in the task dag EXPLAIN diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 6aee80c..e3848c1 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -286,6 +286,9 @@ public void setOperationLogSessionDir(File operationLogRootDir) { if (isOperationLogEnabled) { LOG.info("Operation log session directory is created: " + sessionLogDir.getAbsolutePath()); } + + String logPrefix = getHiveConf().getLogIdVar(sessionState.getSessionId()); + ShimLoader.getHadoopShims().setHadoopCallerContext(logPrefix); } @Override @@ -323,17 +326,17 @@ public void setOperationManager(OperationManager operationManager) { } protected synchronized void acquire(boolean userAccess) { - // Need to make sure that the this HiveServer2's session's SessionState is + // Need to make sure that this HiveServer2's session's SessionState is // stored in the thread local for the handler thread. SessionState.setCurrentSessionState(sessionState); if (userAccess) { lastAccessTime = System.currentTimeMillis(); } - // set the thread name with the logging prefix. - String logPrefix = getHiveConf().getLogIdVar(sessionState.getSessionId()); - LOG.info( - "Prefixing the thread name (" + Thread.currentThread().getName() + ") with " + logPrefix); - Thread.currentThread().setName(logPrefix + Thread.currentThread().getName()); + // set the log context for debugging + LOG.info("We are setting the hadoop caller context to " + sessionState.getSessionId() + + " for thread " + Thread.currentThread().getName()); + ShimLoader.getHadoopShims().setHadoopSessionContext(sessionState.getSessionId()); + Hive.set(sessionHive); } @@ -345,6 +348,11 @@ protected synchronized void acquire(boolean userAccess) { * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() */ protected synchronized void release(boolean userAccess) { + // reset the HDFS caller context. + LOG.info("We are resetting the hadoop caller context for thread " + + Thread.currentThread().getName()); + ShimLoader.getHadoopShims().setHadoopCallerContext(""); + if (sessionState != null) { // can be null in-case of junit tests. skip reset. // reset thread name at release time. diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 31060a2..7ed2d3b 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -1217,7 +1218,6 @@ public String getShortName() throws IOException { } } - public static class StoragePolicyShim implements HadoopShims.StoragePolicyShim { private final DistributedFileSystem dfs; @@ -1503,4 +1503,26 @@ public TextReaderShim getTextReaderShim(InputStream in) throws IOException { return new FastTextReaderShim(in); } + @Override + public void setHadoopCallerContext(String callerContext) { + CallerContext.setCurrent(new CallerContext.Builder(callerContext).build()); + } + + @Override + public void setHadoopQueryContext(String callerContext) { + setHadoopCallerContext("HIVE_QUERY_ID:" + callerContext); + } + + @Override + public void setHadoopSessionContext(String sessionId) { + setHadoopCallerContext("HIVE_SSN_ID:" + sessionId); + } + + @Override + public String getHadoopCallerContext() { + if (CallerContext.getCurrent() == null) { + return ""; + } + return CallerContext.getCurrent().getContext(); + } } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 7a5a9b5..a842269 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -746,4 +746,24 @@ public void deleteKey(String keyName) throws IOException { * which are required for TextReaderShim.read() input. */ public TextReaderShim getTextReaderShim(InputStream input) throws IOException; + + /* + * Set up the caller context for HDFS and Yarn. + */ + public void setHadoopCallerContext(String callerContext); + + /* + * Set up context specific caller context with query prefix. + */ + void setHadoopQueryContext(String queryId); + + /* + * Set up context specific caller context with session prefix. + */ + void setHadoopSessionContext(String sessionId); + + /* + * get current caller context of HDFS and Yarn. + */ + public String getHadoopCallerContext(); }