diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index e947cc0..dc99702 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -30,11 +30,11 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; @@ -79,7 +79,7 @@ // Read all credentials into the credentials instance stored in JobConf. JobConf jobConf = new JobConf(conf); - jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); + ShimLoader.getHadoopShims().getMergedCredentials(jobConf); InputSplitInfoMem inputSplitInfo = null; String realInputFormatName = userPayloadProto.getInputFormatName(); diff --git shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index 5c19ee5..c45c962 100644 --- shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -86,6 +86,7 @@ /** * Returns a shim to wrap MiniMrCluster */ + @Override public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException { return new MiniMrShim(conf, numberOfTaskTrackers, nameNode, numDir); @@ -125,6 +126,7 @@ public void setupConfiguration(Configuration conf) { } } + @Override public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf, int numDataNodes, boolean format, @@ -143,15 +145,18 @@ public MiniDFSShim(MiniDFSCluster cluster) { this.cluster = cluster; } + @Override public FileSystem getFileSystem() throws IOException { return cluster.getFileSystem(); } + @Override public void shutdown() { cluster.shutdown(); } } + @Override public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() { return new CombineFileInputFormatShim() { @Override @@ -162,6 +167,7 @@ public RecordReader getRecordReader(InputSplit split, }; } + @Override public void setTotalOrderPartitionFile(JobConf jobConf, Path partitionFile){ TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile); } @@ -255,6 +261,7 @@ public void write(DataOutput out) throws IOException { protected boolean isShrinked; protected long shrinkedLength; + @Override public boolean next(K key, V value) throws IOException { while ((curReader == null) @@ -267,11 +274,13 @@ public boolean next(K key, V value) throws IOException { return true; } + @Override public K createKey() { K newKey = curReader.createKey(); return (K)(new CombineHiveKey(newKey)); } + @Override public V createValue() { return curReader.createValue(); } @@ -279,10 +288,12 @@ public V createValue() { /** * Return the amount of data processed. */ + @Override public long getPos() throws IOException { return progress; } + @Override public void close() throws IOException { if (curReader != null) { curReader.close(); @@ -293,6 +304,7 @@ public void close() throws IOException { /** * Return progress based on the amount of data processed so far. */ + @Override public float getProgress() throws IOException { long subprogress = 0; // bytes processed in current split if (null != curReader) { @@ -396,6 +408,7 @@ protected boolean initNextRecordReader(K key) throws IOException { CombineFileInputFormat implements HadoopShims.CombineFileInputFormatShim { + @Override public Path[] getInputPathsShim(JobConf conf) { try { return FileInputFormat.getInputPaths(conf); @@ -436,10 +449,12 @@ public void createPool(JobConf conf, PathFilter... filters) { return isplits; } + @Override public InputSplitShim getInputSplitShim() throws IOException { return new InputSplitShim(); } + @Override public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split, Reporter reporter, Class> rrClass) @@ -450,6 +465,7 @@ public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim spli } + @Override public String getInputFormatClassName() { return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat"; } @@ -479,6 +495,7 @@ public int createHadoopArchive(Configuration conf, Path sourceDir, Path destDir, * compared against the one used by Hadoop 1.0 (within HadoopShimsSecure) * where a relative path is stored within the archive. */ + @Override public URI getHarUri (URI original, URI base, URI originalBase) throws URISyntaxException { URI relative = null; @@ -511,6 +528,7 @@ public void commitTask(TaskAttemptContext taskContext) { } public void abortTask(TaskAttemptContext taskContext) { } } + @Override public void prepareJobOutput(JobConf conf) { conf.setOutputCommitter(Hadoop20Shims.NullOutputCommitter.class); @@ -641,6 +659,7 @@ public void authorizeProxyAccess(String proxyUser, UserGroupInformation realUser // This hadoop version doesn't have proxy verification } + @Override public boolean isSecurityEnabled() { return false; } @@ -808,4 +827,9 @@ public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOE conf.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", origDisableHDFSCache); return fs; } + + @Override + public void getMergedCredentials(JobConf jobConf) throws IOException { + throw new IOException("Merging of credentials not supported in this version of hadoop"); + } } \ No newline at end of file diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index 4a0e72d..109aa29 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -172,6 +172,7 @@ public int compare(LongWritable o1, LongWritable o2) { /** * Returns a shim to wrap MiniMrCluster */ + @Override public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException { return new MiniMrShim(conf, numberOfTaskTrackers, nameNode, numDir); @@ -226,6 +227,7 @@ public void setupConfiguration(Configuration conf) { // incompatibility between hadoop 1 and 2 wrt MiniDFSCluster and we // need to have two different shim classes even though they are // exactly the same. + @Override public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf, int numDataNodes, boolean format, @@ -244,10 +246,12 @@ public MiniDFSShim(MiniDFSCluster cluster) { this.cluster = cluster; } + @Override public FileSystem getFileSystem() throws IOException { return cluster.getFileSystem(); } + @Override public void shutdown() { cluster.shutdown(); } @@ -435,7 +439,7 @@ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) /* not supported */ return null; } - + @Override public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) { return context.getConfiguration(); @@ -451,4 +455,9 @@ public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOE conf.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", origDisableHDFSCache); return fs; } + + @Override + public void getMergedCredentials(JobConf jobConf) throws IOException { + throw new IOException("Merging of credentials not supported in this version of hadoop"); + } } diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index d6336e2..47b4627 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -201,6 +201,7 @@ public int compare(LongWritable o1, LongWritable o2) { /** * Returns a shim to wrap MiniMrCluster */ + @Override public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException { return new MiniMrShim(conf, numberOfTaskTrackers, nameNode, numDir); @@ -259,6 +260,7 @@ public void setupConfiguration(Configuration conf) { /** * Returns a shim to wrap MiniMrTez */ + @Override public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException { return new MiniTezShim(conf, numberOfTaskTrackers, nameNode, numDir); @@ -327,6 +329,7 @@ public void setupConfiguration(Configuration conf) { // incompatibility between hadoop 1 and 2 wrt MiniDFSCluster and we // need to have two different shim classes even though they are // exactly the same. + @Override public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf, int numDataNodes, boolean format, @@ -346,10 +349,12 @@ public MiniDFSShim(MiniDFSCluster cluster) { this.cluster = cluster; } + @Override public FileSystem getFileSystem() throws IOException { return cluster.getFileSystem(); } + @Override public void shutdown() { cluster.shutdown(); } @@ -563,7 +568,7 @@ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) /* not supported */ return null; } - + @Override public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) { return context.getConfiguration(); @@ -573,4 +578,9 @@ public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext con public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException { return FileSystem.newInstance(uri, conf); } + + @Override + public void getMergedCredentials(JobConf jobConf) throws IOException { + jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); + } } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index b8fdd85..fdba9f8 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -633,4 +633,6 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException; + public void getMergedCredentials(JobConf jobConf) throws IOException; + }