diff --git cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java index 3a80f99..01df9fb 100644 --- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java +++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java @@ -99,6 +99,7 @@ private final LogHelper console; protected ConsoleReader reader; private Configuration conf; + private final String originalThreadName; public CliDriver() { SessionState ss = SessionState.get(); @@ -108,11 +109,15 @@ public CliDriver() { LOG.debug("CliDriver inited with classpath " + System.getProperty("java.class.path")); } console = new LogHelper(LOG); + originalThreadName = Thread.currentThread().getName(); } public int processCmd(String cmd) { CliSessionState ss = (CliSessionState) SessionState.get(); ss.setLastCommand(cmd); + + String callerInfo = ss.getConf().getLogIdVar(ss.getSessionId()); + Thread.currentThread().setName(callerInfo + " " + originalThreadName); // Flush the print stream, so it doesn't include output from the last command ss.err.flush(); String cmd_trimmed = cmd.trim(); @@ -182,6 +187,7 @@ public int processCmd(String cmd) { } } + Thread.currentThread().setName(originalThreadName); return ret; } @@ -699,6 +705,7 @@ public int run(String[] args) throws Exception { SessionState.start(ss); } + Thread.currentThread().setName(conf.getLogIdVar(ss.getSessionId()) + " " + originalThreadName); // execute cli driver work try { return executeDriver(ss, conf, oproc); diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dc79415..afb31c4 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -78,6 +78,7 @@ private Pattern modWhiteListPattern = null; private volatile boolean isSparkConfigUpdated = false; + private final int LOG_PREFIX_LENGTH = 64; public boolean getSparkConfigUpdated() { return isSparkConfigUpdated; @@ -2380,7 +2381,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_TEZ_ENABLE_MEMORY_MANAGER("hive.tez.enable.memory.manager", true, "Enable memory manager for tez"), HIVE_HASH_TABLE_INFLATION_FACTOR("hive.hash.table.inflation.factor", (float) 2.0, - "Expected inflation factor between disk/in memory representation of hash tables"); + "Expected inflation factor between disk/in memory representation of hash tables"), + HIVE_LOG_TRACE_ID_PREFIX("hive.log.trace.id", "", + "Log tracing id that can be used by upstream clients for tracking respective logs. Truncated to " + LOG_PREFIX_LENGTH + " characters."); public final String varname; @@ -2822,6 +2825,19 @@ public static String getVar(Configuration conf, ConfVars var, String defaultVal) return conf.get(var.varname, defaultVal); } + public String getLogIdVar(String defaultValue) { + String retval = getVar(ConfVars.HIVE_LOG_TRACE_ID_PREFIX); + if (retval.equals("") == true) { + retval = defaultValue; + } + if (retval.length() > LOG_PREFIX_LENGTH) { + l4j.info("The original log id prefix is " + retval + " has been truncated to " + + retval.substring(0, LOG_PREFIX_LENGTH - 1)); + retval = retval.substring(0, LOG_PREFIX_LENGTH - 1); + } + return retval; + } + public static void setVar(Configuration conf, ConfVars var, String val) { assert (var.valClass == String.class) : var.varname; conf.set(var.varname, val); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 032a9e6..ab66d5b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -54,6 +54,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClient; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounter; @@ -163,6 +164,11 @@ public int execute(DriverContext driverContext) { // next we translate the TezWork to a Tez DAG DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx); + CallerContext callerContext = CallerContext.create("HIVE", + conf.getLogIdVar(Thread.currentThread().getName()) + " " + + conf.getVar(HiveConf.ConfVars.HIVEQUERYID), + "HIVE_QUERY_ID", driverContext.getCtx().getCmd()); + dag.setCallerContext(callerContext); // Add the extra resources to the dag addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources); diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index 2caa7ae..d95dea6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -26,11 +26,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.ExplainWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; @@ -52,7 +54,10 @@ private static TimelineClient timelineClient; private enum EntityTypes { HIVE_QUERY_ID }; private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED }; - private enum OtherInfoTypes { QUERY, STATUS, TEZ, MAPRED }; + + private enum OtherInfoTypes { + QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME + }; private enum PrimaryFilterTypes { user, requestuser, operationid }; private static final int WAIT_TIME = 3; @@ -104,7 +109,7 @@ public void run() { String user = hookContext.getUgi().getUserName(); String requestuser = hookContext.getUserName(); if (hookContext.getUserName() == null ){ - requestuser = hookContext.getUgi().getUserName() ; + requestuser = hookContext.getUgi().getUserName() ; } int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size(); @@ -133,8 +138,9 @@ public void run() { explain.initialize(conf, plan, null); String query = plan.getQueryStr(); JSONObject explainPlan = explain.getJSONPlan(null, work); - fireAndForget(conf, createPreHookEvent(queryId, query, - explainPlan, queryStartTime, user, requestuser, numMrJobs, numTezJobs, opId)); + String logID = conf.getLogIdVar(SessionState.get().getSessionId()); + fireAndForget(conf, createPreHookEvent(queryId, query, explainPlan, queryStartTime, + user, requestuser, numMrJobs, numTezJobs, opId, logID)); break; case POST_EXEC_HOOK: fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId)); @@ -154,7 +160,8 @@ public void run() { } TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan, - long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId) throws Exception { + long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId, + String logID) throws Exception { JSONObject queryObj = new JSONObject(new LinkedHashMap<>()); queryObj.put("queryText", query); @@ -171,7 +178,7 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name()); atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), requestuser); - + if (opId != null) { atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); } @@ -184,6 +191,8 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla atsEntity.addOtherInfo(OtherInfoTypes.QUERY.name(), queryObj.toString()); atsEntity.addOtherInfo(OtherInfoTypes.TEZ.name(), numTezJobs > 0); atsEntity.addOtherInfo(OtherInfoTypes.MAPRED.name(), numMrJobs > 0); + atsEntity.addOtherInfo(OtherInfoTypes.INVOKER_INFO.name(), logID); + atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), Thread.currentThread().getName()); return atsEntity; } diff --git ql/src/test/queries/clientpositive/mrr.q ql/src/test/queries/clientpositive/mrr.q index 2ced4db..a8eddaf 100644 --- ql/src/test/queries/clientpositive/mrr.q +++ ql/src/test/queries/clientpositive/mrr.q @@ -1,5 +1,6 @@ set hive.explain.user=false; set hive.auto.convert.join.noconditionaltask.size=60000000; +set hive.log.trace.id=mrrTest; -- simple query with multiple reduce stages -- SORT_QUERY_RESULTS @@ -50,6 +51,7 @@ WHERE s1.cnt > 1 ORDER BY s1.key; +set hive.log.trace.id=Test2; set hive.auto.convert.join=true; -- query with broadcast join in the reduce stage EXPLAIN diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 3eaab9a..dc16b30 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -308,6 +308,11 @@ protected synchronized void acquire(boolean userAccess) { if (userAccess) { lastAccessTime = System.currentTimeMillis(); } + // set the thread name with the logging prefix. + String logPrefix = getHiveConf().getLogIdVar(sessionState.getSessionId()); + LOG.info( + "Prefixing the thread name (" + Thread.currentThread().getName() + ") with " + logPrefix); + Thread.currentThread().setName(logPrefix + Thread.currentThread().getName()); Hive.set(sessionHive); } @@ -319,6 +324,16 @@ protected synchronized void acquire(boolean userAccess) { * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() */ protected synchronized void release(boolean userAccess) { + // reset thread name at release time. + String[] names = Thread.currentThread().getName() + .split(getHiveConf().getLogIdVar(sessionState.getSessionId())); + String threadName = null; + if (names.length > 1) { + threadName = names[1]; + } else { + threadName = names[0]; + } + Thread.currentThread().setName(threadName); SessionState.detachSession(); if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { ThreadWithGarbageCleanup currentThread =