diff --git shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index 117e0a5..2237fde 100644 --- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.ProxyFileSystem; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; @@ -743,4 +744,8 @@ public HCatHadoopShims getHCatShim() { public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi) throws IOException { throw new UnsupportedOperationException("WebHCat does not support Hadoop 0.20.x"); } + @Override + public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { + return new ProxyFileSystem(fs, uri); + } } diff --git shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index cf5c175..7589b58 100644 --- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -21,11 +21,13 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; +import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ProxyFileSystem; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.JobTracker; @@ -330,4 +332,8 @@ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi) throws IOException { return new WebHCatJTShim20S(conf, ugi);//this has state, so can't be cached } + @Override + public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { + return new ProxyFileSystem(fs, uri); + } } diff --git shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 9351411..74ad95f 100644 --- shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -23,12 +23,17 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.Map; +import java.net.URI; +import java.io.FileNotFoundException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.fs.ProxyFileSystem; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.ClusterStatus; @@ -339,4 +344,41 @@ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi) throws IOException { return new WebHCatJTShim23(conf, ugi);//this has state, so can't be cached } + + class ProxyFileSystem23 extends ProxyFileSystem { + public ProxyFileSystem23(FileSystem fs) { + super(fs); + } + public ProxyFileSystem23(FileSystem fs, URI uri) { + super(fs, uri); + } + + @Override + public RemoteIterator listLocatedStatus(final Path f) + throws FileNotFoundException, IOException { + return new RemoteIterator() { + private RemoteIterator stats = + ProxyFileSystem23.super.listLocatedStatus( + ProxyFileSystem23.super.swizzleParamPath(f)); + + @Override + public boolean hasNext() throws IOException { + return stats.hasNext(); + } + + @Override + public LocatedFileStatus next() throws IOException { + LocatedFileStatus result = stats.next(); + return new LocatedFileStatus( + ProxyFileSystem23.super.swizzleFileStatus(result, false), + result.getBlockLocations()); + } + }; + } + } + + @Override + public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { + return new ProxyFileSystem23(fs, uri); + } } diff --git shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java index 28843e0..fd274a6 100644 --- shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java +++ shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java @@ -622,4 +622,7 @@ public void reLoginUserFromKeytab() throws IOException{ @Override abstract public boolean moveToAppropriateTrash(FileSystem fs, Path path, Configuration conf) throws IOException; + + @Override + abstract public FileSystem createProxyFileSystem(FileSystem fs, URI uri); } diff --git shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java index 28a18f6..cb1e2b7 100644 --- shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java +++ shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java @@ -45,7 +45,7 @@ - private Path swizzleParamPath(Path p) { + protected Path swizzleParamPath(Path p) { String pathUriString = p.toUri().toString(); URI newPathUri = URI.create(pathUriString); return new Path (realScheme, realAuthority, newPathUri.getPath()); @@ -57,7 +57,7 @@ private Path swizzleReturnPath(Path p) { return new Path (myScheme, myAuthority, newPathUri.getPath()); } - private FileStatus swizzleFileStatus(FileStatus orig, boolean isParam) { + protected FileStatus swizzleFileStatus(FileStatus orig, boolean isParam) { FileStatus ret = new FileStatus(orig.getLen(), orig.isDir(), orig.getReplication(), orig.getBlockSize(), orig.getModificationTime(), diff --git shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java index 9f35769..228a972 100644 --- shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java +++ shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java @@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims; /**************************************************************** * A Proxy for LocalFileSystem @@ -61,7 +63,9 @@ public void initialize(URI name, Configuration conf) throws IOException { String authority = name.getAuthority() != null ? name.getAuthority() : ""; String proxyUriString = nameUriString + "://" + authority + "/"; - fs = new ProxyFileSystem(localFs, URI.create(proxyUriString)); + + fs = ShimLoader.getHadoopShims().createProxyFileSystem( + localFs, URI.create(proxyUriString)); fs.initialize(name, conf); } diff --git shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java index 5b91267..a599fdd 100644 --- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -536,4 +536,10 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte */ public void close(); } + + /** + * Create a proxy file system that can serve a given scheme/authority using some + * other file system. + */ + public FileSystem createProxyFileSystem(FileSystem fs, URI uri); }