diff --git a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java index 806663df43..2e7adaee6b 100644 --- a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java +++ b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.IOUtils; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.ShutdownHookManager; @@ -236,9 +237,13 @@ int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) { out.println(cmd); } + // Set HDFS CallerContext to queryId and reset back to sessionId after the query is done + ShimLoader.getHadoopShims().setHadoopQueryContext(qp.getQueryState().getQueryId()); ret = qp.run(cmd).getResponseCode(); + if (ret != 0) { qp.close(); + ShimLoader.getHadoopShims().setHadoopSessionContext(ss.getSessionId()); return ret; } @@ -276,6 +281,7 @@ int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) { } qp.close(); + ShimLoader.getHadoopShims().setHadoopSessionContext(ss.getSessionId()); if (out instanceof FetchConverter) { ((FetchConverter) out).fetchFinished(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index dad2035362..5b31dc5955 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -3022,6 +3022,7 @@ public void setOperationId(String opId) { this.operationId = opId; } + @Override public QueryState getQueryState() { return queryState; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java index bab5014778..e44e6a3913 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java @@ -42,6 +42,8 @@ QueryPlan getPlan(); + QueryState getQueryState(); + QueryDisplay getQueryDisplay(); void setOperationId(String guid64); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java index 3bc3b291c9..ab5c66b151 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java @@ -131,6 +131,11 @@ public QueryPlan getPlan() { return coreDriver.getPlan(); } + @Override + public QueryState getQueryState() { + return queryState; + } + @Override public QueryDisplay getQueryDisplay() { return coreDriver.getQueryDisplay(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 71e130b608..c16c66528f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -423,6 +423,8 @@ public SessionState(HiveConf conf, String userName) { resourceDownloader = new ResourceDownloader(conf, HiveConf.getVar(conf, ConfVars.DOWNLOADED_RESOURCES_DIR)); killQuery = new NullKillQuery(); + + ShimLoader.getHadoopShims().setHadoopSessionContext(getSessionId()); } public Map getHiveVariables() { diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 4b9cbd35e2..cb995de94e 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.log.LogDivertAppenderForTest; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; @@ -223,6 +224,7 @@ protected void createOperationLog() { * Set up some preconditions, or configurations. */ protected void beforeRun() { + ShimLoader.getHadoopShims().setHadoopQueryContext(queryState.getQueryId()); createOperationLog(); LogUtils.registerLoggingContext(queryState.getConf()); } @@ -233,6 +235,8 @@ protected void beforeRun() { */ protected void afterRun() { LogUtils.unregisterLoggingContext(); + // Reset back to session context after the query is done + ShimLoader.getHadoopShims().setHadoopSessionContext(parentSession.getSessionState().getSessionId()); } /** diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 0e6bd4deb5..fcc80b29bf 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.security.UserGroupInformation; @@ -311,6 +312,8 @@ public Object run() throws HiveSQLException { SessionState.setCurrentSessionState(parentSessionState); PerfLogger.setPerfLogger(SessionState.getPerfLogger()); LogUtils.registerLoggingContext(queryState.getConf()); + ShimLoader.getHadoopShims().setHadoopQueryContext(queryState.getQueryId()); + try { if (asyncPrepare) { prepare(queryState); diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 98c3eef6fa..b6f70ebe63 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -481,6 +482,21 @@ public MiniMrShim getMiniSparkCluster(Configuration conf, int numberOfTaskTracke return new MiniSparkShim(conf, numberOfTaskTrackers, nameNode, numDir); } + @Override + public void setHadoopCallerContext(String callerContext) { + CallerContext.setCurrent(new CallerContext.Builder(callerContext).build()); + } + + @Override + public void setHadoopQueryContext(String queryId) { + setHadoopCallerContext("hive_queryId_" + queryId); + } + + @Override + public void setHadoopSessionContext(String sessionId) { + setHadoopCallerContext("hive_sessionId_" + sessionId); + } + /** * Shim for MiniSparkOnYARNCluster */ @@ -1048,7 +1064,6 @@ public String getShortName() throws IOException { } } - public static class StoragePolicyShim implements HadoopShims.StoragePolicyShim { private final DistributedFileSystem dfs; diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 84e6430fcf..c569b242ae 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -96,6 +96,22 @@ public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers public MiniMrShim getMiniSparkCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException; + + /** + * Set up the caller context for HDFS and Yarn. + */ + void setHadoopCallerContext(String callerContext); + + /** + * Set up context specific caller context with query prefix. + */ + void setHadoopQueryContext(String queryId); + + /** + * Set up context specific caller context with session prefix. + */ + void setHadoopSessionContext(String sessionId); + /** * Shim for MiniMrCluster */