Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (revision 1572808) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (working copy) @@ -49,8 +49,8 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Threads; /** * View to an on-disk Region. @@ -403,7 +403,7 @@ // We can't compare FileSystem instances as equals() includes UGI instance // as part of the comparison and won't work when doing SecureBulkLoad // TODO deal with viewFS - if (!srcFs.getUri().equals(desFs.getUri())) { + if (!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) { LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination store. Copying file over to destination filesystem."); Path tmpPath = createTempName(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (revision 1572808) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (working copy) @@ -22,7 +22,13 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -42,6 +48,9 @@ public class FSHDFSUtils extends FSUtils { private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class); + private static Class dfsUtilClazz; + private static Method getNNAddressesMethod; + /** * Recover the lease from HDFS, retrying multiple times. */ @@ -213,6 +222,78 @@ return false; } + /** + * @param fs + * @param conf + * @return A set contains all namenode addresses of fs + */ + public static Set getNNAddresses(DistributedFileSystem fs, Configuration conf) { + Set addresses = new HashSet(); + String serviceName = fs.getCanonicalServiceName(); + + if (serviceName.startsWith("ha-hdfs")) { + try { + if (dfsUtilClazz == null) { + dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil"); + } + if (getNNAddressesMethod == null) { + getNNAddressesMethod = + dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class); + } + + Map> addressMap = + (Map>) getNNAddressesMethod.invoke(null, conf); + for (Map.Entry> entry : addressMap.entrySet()) { + Map nnMap = entry.getValue(); + for (Map.Entry e2 : nnMap.entrySet()) { + InetSocketAddress addr = e2.getValue(); + addresses.add(addr); + } + } + } catch (Exception e) { + return addresses; + } + } else { + URI uri = fs.getUri(); + InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort()); + addresses.add(addr); + return addresses; + } + + return addresses; + } + + /** + * @param conf the Configuration of HBase + * @param srcFs + * @param desFs + * @return Whether srcFs and desFs are on same hdfs or not + */ + public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { + // By getCanonicalServiceName, we could make sure both srcFs and desFs + // show an unified format which contains scheme, host and port. + String srcServiceName = srcFs.getCanonicalServiceName(); + String desServiceName = desFs.getCanonicalServiceName(); + + if (srcServiceName == null || desServiceName == null) { + return false; + } else if (srcServiceName.equals(desServiceName)) { + return true; + } else if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { + + //If one serviceName is a HA format while the other is a no-HA format, + //maybe they are refer to the same FileSystem. + //For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" + Set srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); + Set desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); + if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { + return true; + } + } + + return false; + } + void checkIfCancelled(final CancelableProgressable reporter) throws InterruptedIOException { if (reporter == null) return; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java (revision 1572808) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java (working copy) @@ -21,8 +21,12 @@ import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -36,6 +40,7 @@ */ @Category(MediumTests.class) public class TestFSHDFSUtils { + private static final Log LOG = LogFactory.getLog(TestFSHDFSUtils.class); private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); static { Configuration conf = HTU.getConfiguration(); @@ -94,6 +99,51 @@ Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE); } + @Test + public void testIsSameHdfs() throws IOException { + try { + Class dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil"); + dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class); + } catch (Exception e) { + LOG.info("Skip testIsSameHdfs test case because of the no-HA hadoop version."); + return; + } + + Configuration conf = HBaseConfiguration.create(); + Path srcPath = new Path("hdfs://localhost:8020/"); + Path desPath = new Path("hdfs://127.0.0.1/"); + FileSystem srcFs = srcPath.getFileSystem(conf); + FileSystem desFs = desPath.getFileSystem(conf); + + assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)); + + desPath = new Path("hdfs://127.0.0.1:8070/"); + desFs = desPath.getFileSystem(conf); + assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)); + + desPath = new Path("hdfs://127.0.1.1:8020/"); + desFs = desPath.getFileSystem(conf); + assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)); + + conf.set("fs.defaultFS", "hdfs://haosong-hadoop"); + conf.set("dfs.nameservices", "haosong-hadoop"); + conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2"); + conf.set("dfs.client.failover.proxy.provider.haosong-hadoop", + "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:8020"); + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.1.1:8000"); + desPath = new Path("/"); + desFs = desPath.getFileSystem(conf); + assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)); + + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.1.1:8020"); + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000"); + desPath = new Path("/"); + desFs = desPath.getFileSystem(conf); + assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)); + } + /** * Version of DFS that has HDFS-4525 in it. */