diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 3c9a535..e878a9f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.Threads; /** @@ -402,7 +403,7 @@ public class HRegionFileSystem { // We can't compare FileSystem instances as equals() includes UGI instance // as part of the comparison and won't work when doing SecureBulkLoad // TODO deal with viewFS - if (!srcFs.getUri().equals(desFs.getUri())) { + if (!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) { LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination store. Copying file over to destination filesystem."); Path tmpPath = createTempName(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java index 07d4837..0732e04 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java @@ -22,7 +22,13 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -41,6 +47,82 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; @InterfaceStability.Evolving public class FSHDFSUtils extends FSUtils { private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class); + private static Class dfsUtilClazz; + private static Method getNNAddressesMethod; + + /** + * @param fs + * @param conf + * @return A set containing all namenode addresses of fs + */ + private static Set getNNAddresses(DistributedFileSystem fs, + Configuration conf) { + Set addresses = new HashSet(); + String serviceName = fs.getCanonicalServiceName(); + + if (serviceName.startsWith("ha-hdfs")) { + try { + if (dfsUtilClazz == null) { + dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil"); + } + if (getNNAddressesMethod == null) { + getNNAddressesMethod = + dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class); + } + + Map> addressMap = + (Map>) getNNAddressesMethod + .invoke(null, conf); + for (Map.Entry> entry : addressMap.entrySet()) { + Map nnMap = entry.getValue(); + for (Map.Entry e2 : nnMap.entrySet()) { + InetSocketAddress addr = e2.getValue(); + addresses.add(addr); + } + } + } catch (Exception e) { + LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); + } + } else { + URI uri = fs.getUri(); + InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort()); + addresses.add(addr); + } + + return addresses; + } + + /** + * @param conf the Configuration of HBase + * @param srcFs + * @param desFs + * @return Whether srcFs and desFs are on same hdfs or not + */ + public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { + // By getCanonicalServiceName, we could make sure both srcFs and desFs + // show a unified format which contains scheme, host and port. + String srcServiceName = srcFs.getCanonicalServiceName(); + String desServiceName = desFs.getCanonicalServiceName(); + + if (srcServiceName == null || desServiceName == null) { + return false; + } + if (srcServiceName.equals(desServiceName)) { + return true; + } + if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { + //If one serviceName is an HA format while the other is a non-HA format, + // maybe they refer to the same FileSystem. + //For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" + Set srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); + Set desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); + if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { + return true; + } + } + + return false; + } /** * Recover the lease from HDFS, retrying multiple times. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java index 52c199b..5c9208a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java @@ -21,8 +21,12 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -36,6 +40,7 @@ import org.mockito.Mockito; */ @Category(MediumTests.class) public class TestFSHDFSUtils { + private static final Log LOG = LogFactory.getLog(TestFSHDFSUtils.class); private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); static { Configuration conf = HTU.getConfiguration(); @@ -94,6 +99,51 @@ public class TestFSHDFSUtils { Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE); } + @Test + public void testIsSameHdfs() throws IOException { + try { + Class dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil"); + dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class); + } catch (Exception e) { + LOG.info("Skip testIsSameHdfs test case because of the no-HA hadoop version."); + return; + } + + Configuration conf = HBaseConfiguration.create(); + Path srcPath = new Path("hdfs://localhost:8020/"); + Path desPath = new Path("hdfs://127.0.0.1/"); + FileSystem srcFs = srcPath.getFileSystem(conf); + FileSystem desFs = desPath.getFileSystem(conf); + + assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)); + + desPath = new Path("hdfs://127.0.0.1:8070/"); + desFs = desPath.getFileSystem(conf); + assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)); + + desPath = new Path("hdfs://127.0.1.1:8020/"); + desFs = desPath.getFileSystem(conf); + assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)); + + conf.set("fs.defaultFS", "hdfs://haosong-hadoop"); + conf.set("dfs.nameservices", "haosong-hadoop"); + conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2"); + conf.set("dfs.client.failover.proxy.provider.haosong-hadoop", + "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:8020"); + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000"); + desPath = new Path("/"); + desFs = desPath.getFileSystem(conf); + assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)); + + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:8020"); + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000"); + desPath = new Path("/"); + desFs = desPath.getFileSystem(conf); + assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)); + } + /** * Version of DFS that has HDFS-4525 in it. */