diff --git cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java index d1bc79c..2af40ab 100644 --- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java +++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java @@ -118,6 +118,7 @@ public int processCmd(String cmd) { ss.setLastCommand(cmd); String callerInfo = ss.getConf().getLogIdVar(); Thread.currentThread().setName(callerInfo + originalThreadName); + ShimLoader.getHadoopShims().setHadoopCallerContext(callerInfo); // Flush the print stream, so it doesn't include output from the last command ss.err.flush(); String cmd_trimmed = cmd.trim(); diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 0005e21..a093fe3 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -301,6 +301,7 @@ protected synchronized void acquire(boolean userAccess) { LOG.info( "Prefixing the thread name (" + Thread.currentThread().getName() + ") with " + logPrefix); Thread.currentThread().setName(logPrefix + Thread.currentThread().getName()); + ShimLoader.getHadoopShims().setHadoopCallerContext(logPrefix); Hive.set(sessionHive); } @@ -335,6 +336,8 @@ protected synchronized void release(boolean userAccess) { threadName = names[0]; } Thread.currentThread().setName(threadName); + // reset the HDFS caller context. + ShimLoader.getHadoopShims().setHadoopCallerContext(""); } @Override diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index f60e8f0..35963c4 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -731,4 +731,11 @@ public void addDelegationTokens(FileSystem fs, Credentials cred, String uname) t public long getFileId(FileSystem fs, String path) throws IOException { throw new UnsupportedOperationException("Not supported on old version"); } + + @Override + public void setHadoopCallerContext(String callerContext) { + /* + * do nothing. This is not supported in hadoop-1 + */ + } } diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 36282a5..984382f 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -21,14 +21,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.Constructor; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URI; -import java.net.URL; import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.NoSuchAlgorithmException; @@ -76,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -1531,4 +1528,9 @@ public TextReaderShim getTextReaderShim(InputStream in) throws IOException { return new FastTextReaderShim(in); } + @Override + public void setHadoopCallerContext(String callerContext) { + CallerContext.setCurrent(new CallerContext.Builder(callerContext).build()); + } + } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index dae9a1d..e234b29 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -418,11 +418,11 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus, public FileSystem createProxyFileSystem(FileSystem fs, URI uri); public Map getHadoopConfNames(); - + /** * Create a shim for DFS storage policy. */ - + public enum StoragePolicyValue { MEMORY, /* 1-replica memory */ SSD, /* 3-replica ssd */ @@ -435,11 +435,11 @@ public static StoragePolicyValue lookup(String name) { return StoragePolicyValue.valueOf(name.toUpperCase().trim()); } }; - + public interface StoragePolicyShim { void setStoragePolicy(Path path, StoragePolicyValue policy) throws IOException; } - + /** * obtain a storage policy shim associated with the filesystem. * Returns null when the filesystem has no storage policies. @@ -768,4 +768,9 @@ public void deleteKey(String keyName) throws IOException { * which are required for TextReaderShim.read() input. */ public TextReaderShim getTextReaderShim(InputStream input) throws IOException; + + /** + * Set up the caller context for HDFS and Yarn. + */ + public void setHadoopCallerContext(String callerContext); }