diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java index 96010d92ec..c76284e8c4 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -59,6 +60,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication public static void setUpBeforeClass() throws Exception { conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); + conf1.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); conf1.set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName()); String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); @@ -82,8 +84,8 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication * Prepare 16 random hfile ranges required for creating hfiles */ Iterator randomHFileRangeListIterator = null; - Set randomHFileRanges = new HashSet<>(16); - for (int i = 0; i < 16; i++) { + Set randomHFileRanges = new HashSet<>(24); + for (int i = 0; i < 24; i++) { randomHFileRanges.add(utility1.getRandomUUID().toString()); } List randomHFileRangeList = new ArrayList<>(randomHFileRanges); @@ -173,12 +175,18 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication Iterator randomHFileRangeListIterator) throws Exception { LOG.debug("loadAndReplicateHFiles"); - // Load 100 + 3 hfiles to t1_syncup. + // Load 50 + 50 + 3 hfiles to t1_syncup. byte[][][] hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), Bytes.toBytes(randomHFileRangeListIterator.next()) } }; loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges, - 100); + 50); + + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, + hfileRanges, 50); hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), @@ -186,12 +194,18 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source, hfileRanges, 3); - // Load 200 + 3 hfiles to t2_syncup. + // Load 100 + 100 + 3 hfiles to t2_syncup. hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), Bytes.toBytes(randomHFileRangeListIterator.next()) } }; loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges, - 200); + 100); + + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, + hfileRanges, 100); hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), @@ -230,6 +244,29 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication loader.run(args); } + private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, + byte[] fam, Table source, + byte[][][] hfileRanges, + int numOfRows) throws Exception { + Path dir = utility2.getDataTestDirOnTestFS(testName); + FileSystem fs = utility2.getTestFileSystem(); + dir = dir.makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(fam)); + + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + HFileTestUtil.createHFile(utility2.getConfiguration(), fs, new Path(familyDir, "hfile_" + + hfileIdx++), fam, row, from, to, numOfRows); + } + + final TableName tableName = source.getName(); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration()); + String[] args = { dir.toString(), tableName.toString() }; + loader.run(args); + } + private void wait(Table target, int expectedCount, String msg) throws IOException, InterruptedException { for (int i = 0; i < NB_RETRIES; i++) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 2b3bec73de..0f7334fb8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ByteBufferExtendedCell; import org.apache.hadoop.hbase.CacheEvictionStats; @@ -2323,7 +2324,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, filePaths.add(familyPath.getPath()); } // Check if the batch of files exceeds the current quota - enforcement.checkBulkLoad(regionServer.getFileSystem(), filePaths); + enforcement.checkBulkLoad(getFileSystem(filePaths), filePaths); } } @@ -3569,6 +3570,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); } + private FileSystem getFileSystem(List filePaths) throws IOException { + if (filePaths.isEmpty()) { + // local hdfs + return regionServer.getFileSystem(); + } + // source hdfs + return new Path(filePaths.get(0)).getFileSystem(regionServer.getConfiguration()); + } + @Override @QosPriority(priority = HConstants.ADMIN_QOS) public ExecuteProceduresResponse executeProcedures(RpcController controller,