From 15c63a0879c657753042bd60f9a61a45b64ea0d2 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Wed, 30 Oct 2019 08:52:16 +0000 Subject: [PATCH] HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380) Signed-off-by: Josh Elser (cherry picked from commit 4d414020bb3bfd7f214d2a599426be700df772b2, then resolved conflicts) --- .../hbase/client/SecureBulkLoadClient.java | 8 +- .../hbase/shaded/protobuf/ProtobufUtil.java | 11 +- .../shaded/protobuf/RequestConverter.java | 5 +- .../src/main/protobuf/Client.proto | 1 + .../src/main/protobuf/WAL.proto | 1 + .../compactions/PartitionedMobCompactor.java | 1 + .../hadoop/hbase/regionserver/HRegion.java | 7 +- .../hbase/regionserver/RSRpcServices.java | 2 +- .../regionserver/SecureBulkLoadManager.java | 2 +- .../regionserver/ReplicationSink.java | 24 ++-- .../hbase/tool/LoadIncrementalHFiles.java | 11 +- .../regionserver/TestBulkLoadReplication.java | 107 ++++++++++++++++-- 12 files changed, 141 insertions(+), 39 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index 7e3166c3f4..cb258e4b56 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -116,7 +116,7 @@ public class SecureBulkLoadClient { final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) throws IOException { return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, - bulkToken, false, null); + bulkToken, false, null, true); } /** @@ -138,17 +138,17 @@ public class SecureBulkLoadClient { final Token userToken, final String bulkToken, boolean copyFiles) throws IOException { return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, - bulkToken, false, null); + bulkToken, false, null, true); } public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken, - boolean copyFiles, List clusterIds) throws IOException { + boolean copyFiles, List clusterIds, boolean replicate) throws IOException { BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, - userToken, bulkToken, copyFiles, clusterIds); + userToken, bulkToken, copyFiles, clusterIds, replicate); try { BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 3821fc1036..688d06116d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2553,16 +2553,19 @@ public final class ProtobufUtil { ByteString encodedRegionName, Map> storeFiles, Map storeFilesSize, long bulkloadSeqId) { return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles, - storeFilesSize, bulkloadSeqId, null); + storeFilesSize, bulkloadSeqId, null, true); } public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName, Map> storeFiles, - Map storeFilesSize, long bulkloadSeqId, List clusterIds) { + Map storeFilesSize, long bulkloadSeqId, + List clusterIds, boolean replicate) { BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(tableName)) - .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId); + .setTableName(ProtobufUtil.toProtoTableName(tableName)) + .setEncodedRegionName(encodedRegionName) + .setBulkloadSeqNum(bulkloadSeqId) + .setReplicate(replicate); if(clusterIds != null) { desc.addAllClusterIds(clusterIds); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index a7c4e8ea67..acf5a572ac 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -562,7 +562,7 @@ public final class RequestConverter { final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) { return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken, - false, null); + false, null, true); } /** @@ -579,7 +579,7 @@ public final class RequestConverter { public static BulkLoadHFileRequest buildBulkLoadHFileRequest( final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken, boolean copyFiles, - List clusterIds) { + List clusterIds, boolean replicate) { RegionSpecifier region = RequestConverter.buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -620,6 +620,7 @@ public final class RequestConverter { if (clusterIds != null) { request.addAllClusterIds(clusterIds); } + request.setReplicate(replicate); return request.build(); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 07d8d711a0..a22c6237bc 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -379,6 +379,7 @@ message BulkLoadHFileRequest { optional string bulk_token = 5; optional bool copy_file = 6 [default = false]; repeated string cluster_ids = 7; + optional bool replicate = 8 [default = true]; message FamilyPath { required bytes family = 1; diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto index 9020daf615..fe44813e32 100644 --- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto @@ -145,6 +145,7 @@ message BulkLoadDescriptor { repeated StoreDescriptor stores = 3; required int64 bulkload_seq_num = 4; repeated string cluster_ids = 5; + optional bool replicate = 6 [default = true]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 09d9d98da1..ed299028e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -848,6 +848,7 @@ public class PartitionedMobCompactor extends MobCompactor { // bulkload the ref file try { LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.disableReplication(); bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table, connection.getRegionLocator(table.getName())); } catch (Exception e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f36fb20e20..0534b9cf50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6051,7 +6051,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { - return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null); + return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, + null, true); } /** @@ -6102,7 +6103,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener, - boolean copyFile, List clusterIds) throws IOException { + boolean copyFile, List clusterIds, boolean replicate) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap<>(); @@ -6277,7 +6278,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(), UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()), - storeFiles, storeFilesSizes, seqId, clusterIds); + storeFiles, storeFilesSizes, seqId, clusterIds, replicate); WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index aa54876661..f69367af58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2343,7 +2343,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } try { map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, - request.getCopyFile(), clusterIds); + request.getCopyFile(), clusterIds, request.getReplicate()); } finally { if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index f51608d78a..efc85cfc38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -294,7 +294,7 @@ public class SecureBulkLoadManager { //To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), - clusterIds); + clusterIds, request.getReplicate()); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 8079adc3ca..e8ae6fe2c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -203,18 +203,20 @@ public class ReplicationSink { // Handle bulk load hfiles replication if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); - if(bulkLoadsPerClusters == null) { - bulkLoadsPerClusters = new HashMap<>(); - } - // Map of table name Vs list of pair of family and list of - // hfile paths from its namespace - Map>>> bulkLoadHFileMap = - bulkLoadsPerClusters.get(bld.getClusterIdsList()); - if (bulkLoadHFileMap == null) { - bulkLoadHFileMap = new HashMap<>(); - bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); + if(bld.getReplicate()) { + if (bulkLoadsPerClusters == null) { + bulkLoadsPerClusters = new HashMap<>(); + } + // Map of table name Vs list of pair of family and list of + // hfile paths from its namespace + Map>>> bulkLoadHFileMap = + bulkLoadsPerClusters.get(bld.getClusterIdsList()); + if (bulkLoadHFileMap == null) { + bulkLoadHFileMap = new HashMap<>(); + bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); + } + buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } - buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } else { // Handle wal replication if (isNewRowOrType(previousCell, cell)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 950f2a556d..7123318eaa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -139,6 +139,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private List clusterIds = new ArrayList<>(); + private boolean replicate = true; + /** * Represents an HFile waiting to be loaded. An queue is used in this class in order to support * the case where a region has split during the process of the load. When this happens, the HFile @@ -541,7 +543,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = conn.getTable(getTableName())) { secureClient = new SecureBulkLoadClient(getConf(), table); success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, - assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds); + assignSeqIds, fsDelegationToken.getUserToken(), + bulkToken, copyFile, clusterIds, replicate); } return success ? regionName : null; } finally { @@ -1257,6 +1260,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { this.clusterIds = clusterIds; } + /** + * Disables replication for these bulkloaded files. + */ + public void disableReplication(){ + this.replicate = false; + } /** * Infers region boundaries for a new table. *

diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java index 4d69bdf77d..14e3976f91 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -25,11 +25,17 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; +import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -45,6 +51,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -57,14 +64,21 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; @@ -109,9 +123,11 @@ public class TestBulkLoadReplication extends TestReplicationBase { private static final String PEER_ID3 = "3"; private static final String PEER_ID4 = "4"; - private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0); + private static AtomicInteger BULK_LOADS_COUNT; private static CountDownLatch BULK_LOAD_LATCH; + private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); + private static HBaseTestingUtility utility3; private static HBaseTestingUtility utility4; private static Configuration conf3; @@ -151,7 +167,9 @@ public class TestBulkLoadReplication extends TestReplicationBase { util.startMiniCluster(2); TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName) - .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) + .setMobEnabled(true) + .setMobThreshold(4000) .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); @@ -180,6 +198,7 @@ public class TestBulkLoadReplication extends TestReplicationBase { setupCoprocessor(utility1); setupCoprocessor(utility4); setupCoprocessor(utility3); + BULK_LOADS_COUNT = new AtomicInteger(0); } private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) { @@ -190,9 +209,16 @@ public class TestBulkLoadReplication extends TestReplicationBase { private void setupCoprocessor(HBaseTestingUtility cluster){ cluster.getHBaseCluster().getRegions(tableName).forEach(r -> { try { - r.getCoprocessorHost() - .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, - cluster.getConfiguration()); + TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost(). + findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); + if(cp == null) { + r.getCoprocessorHost(). + load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, + cluster.getConfiguration()); + cp = r.getCoprocessorHost(). + findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); + cp.clusterName = cluster.getClusterKey(); + } } catch (Exception e){ LOG.error(e.getMessage(), e); } @@ -206,6 +232,7 @@ public class TestBulkLoadReplication extends TestReplicationBase { utility4.getAdmin().removeReplicationPeer(PEER_ID1); utility4.getAdmin().removeReplicationPeer(PEER_ID3); utility3.getAdmin().removeReplicationPeer(PEER_ID4); + utility1.getAdmin().removeReplicationPeer(PEER_ID4); } private static void setupBulkLoadConfigsForCluster(Configuration config, @@ -241,6 +268,31 @@ public class TestBulkLoadReplication extends TestReplicationBase { assertEquals(9, BULK_LOADS_COUNT.get()); } + @Test + public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception { + Path path = createMobFiles(utility3); + ColumnFamilyDescriptor descriptor = + new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName); + ExecutorService pool = null; + try { + pool = Executors.newFixedThreadPool(1); + PartitionedMobCompactor compactor = + new PartitionedMobCompactor(utility3.getConfiguration(), utility3.getTestFileSystem(), + tableName, descriptor, pool); + BULK_LOAD_LATCH = new CountDownLatch(1); + BULK_LOADS_COUNT.set(0); + compactor.compact(Arrays.asList(utility3.getTestFileSystem().listStatus(path)), true); + assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS)); + Thread.sleep(400); + assertEquals(1, BULK_LOADS_COUNT.get()); + } finally { + if(pool != null && !pool.isTerminated()) { + pool.shutdownNow(); + } + } + } + + private void assertBulkLoadConditions(byte[] row, byte[] value, HBaseTestingUtility utility, Table...tables) throws Exception { BULK_LOAD_LATCH = new CountDownLatch(3); @@ -260,13 +312,13 @@ public class TestBulkLoadReplication extends TestReplicationBase { new LoadIncrementalHFiles(cluster.getConfiguration()); Map> family2Files = new HashMap<>(); List files = new ArrayList<>(); - files.add(new Path("/bulk_dir/f/" + bulkLoadFilePath.getName())); + files.add(new Path(BULK_LOAD_BASE_DIR + "/f/" + bulkLoadFilePath.getName())); family2Files.put(Bytes.toBytes("f"), files); bulkLoadHFilesTool.run(family2Files, tableName); } private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { - Path bulkLoadDir = new Path("/bulk_dir/f"); + Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/f/"); cluster.getFileSystem().mkdirs(bulkLoadDir); cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); } @@ -307,22 +359,53 @@ public class TestBulkLoadReplication extends TestReplicationBase { return hFileLocation.getAbsoluteFile().getAbsolutePath(); } + private Path createMobFiles(HBaseTestingUtility util) throws IOException { + Path testDir = FSUtils.getRootDir(util.getConfiguration()); + Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); + Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f"); + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + MobFileName mobFileName = null; + byte[] mobFileStartRow = new byte[32]; + for (byte rowKey : Bytes.toBytes("01234")) { + mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()), + UUID.randomUUID().toString().replaceAll("-", "")); + StoreFileWriter mobFileWriter = + new StoreFileWriter.Builder(util.getConfiguration(), + new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta) + .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); + long now = System.currentTimeMillis(); + try { + for (int i = 0; i < 10; i++) { + byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i)); + byte[] dummyData = new byte[5000]; + new Random().nextBytes(dummyData); + mobFileWriter.append( + new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData)); + } + } finally { + mobFileWriter.close(); + } + } + return basePath; + } + public static class BulkReplicationTestObserver implements RegionCoprocessor { + String clusterName; + AtomicInteger bulkLoadCounts = new AtomicInteger(); + @Override public Optional getRegionObserver() { return Optional.of(new RegionObserver() { - @Override - public void preBulkLoadHFile(ObserverContext ctx, - List> familyPaths) throws IOException { - BULK_LOADS_COUNT.incrementAndGet(); - } @Override public void postBulkLoadHFile(ObserverContext ctx, List> stagingFamilyPaths, Map> finalPaths) throws IOException { BULK_LOAD_LATCH.countDown(); + BULK_LOADS_COUNT.incrementAndGet(); + LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName, + bulkLoadCounts.addAndGet(1)); } }); } -- 2.17.2 (Apple Git-113)