From 855a827ba1093f56871855f0112f9a9ba70414c6 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 28 Oct 2019 20:17:05 +0000 Subject: [PATCH] HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380) --- .../hbase/client/SecureBulkLoadClient.java | 8 +- .../hbase/shaded/protobuf/ProtobufUtil.java | 11 +- .../shaded/protobuf/RequestConverter.java | 5 +- .../src/main/protobuf/Client.proto | 1 + .../src/main/protobuf/WAL.proto | 1 + .../compactions/PartitionedMobCompactor.java | 1 + .../hadoop/hbase/regionserver/HRegion.java | 7 +- .../hbase/regionserver/RSRpcServices.java | 2 +- .../regionserver/SecureBulkLoadManager.java | 2 +- .../regionserver/ReplicationSink.java | 24 ++-- .../hadoop/hbase/tool/BulkLoadHFiles.java | 1 + .../hbase/tool/LoadIncrementalHFiles.java | 11 +- .../regionserver/TestBulkLoadReplication.java | 136 ++++++++++++++---- 13 files changed, 159 insertions(+), 51 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index 7e3166c3f4..cb258e4b56 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -116,7 +116,7 @@ public class SecureBulkLoadClient { final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) throws IOException { return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, - bulkToken, false, null); + bulkToken, false, null, true); } /** @@ -138,17 +138,17 @@ public class SecureBulkLoadClient { final Token userToken, final String bulkToken, boolean copyFiles) throws IOException { return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, - bulkToken, false, null); + bulkToken, false, null, true); } public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken, - boolean copyFiles, List clusterIds) throws IOException { + boolean copyFiles, List clusterIds, boolean replicate) throws IOException { BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, - userToken, bulkToken, copyFiles, clusterIds); + userToken, bulkToken, copyFiles, clusterIds, replicate); try { BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 0a2063de72..5223551a89 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2582,16 +2582,19 @@ public final class ProtobufUtil { ByteString encodedRegionName, Map> storeFiles, Map storeFilesSize, long bulkloadSeqId) { return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles, - storeFilesSize, bulkloadSeqId, null); + storeFilesSize, bulkloadSeqId, null, true); } public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName, Map> storeFiles, - Map storeFilesSize, long bulkloadSeqId, List clusterIds) { + Map storeFilesSize, long bulkloadSeqId, + List clusterIds, boolean replicate) { BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(tableName)) - .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId); + .setTableName(ProtobufUtil.toProtoTableName(tableName)) + .setEncodedRegionName(encodedRegionName) + .setBulkloadSeqNum(bulkloadSeqId) + .setReplicate(replicate); if(clusterIds != null) { desc.addAllClusterIds(clusterIds); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 7dc8645d2a..151a454ece 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -566,7 +566,7 @@ public final class RequestConverter { final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) { return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken, - false, null); + false, null, true); } /** @@ -583,7 +583,7 @@ public final class RequestConverter { public static BulkLoadHFileRequest buildBulkLoadHFileRequest( final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken, boolean copyFiles, - List clusterIds) { + List clusterIds, boolean replicate) { RegionSpecifier region = RequestConverter.buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -624,6 +624,7 @@ public final class RequestConverter { if (clusterIds != null) { request.addAllClusterIds(clusterIds); } + request.setReplicate(replicate); return request.build(); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 07d8d711a0..a22c6237bc 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -379,6 +379,7 @@ message BulkLoadHFileRequest { optional string bulk_token = 5; optional bool copy_file = 6 [default = false]; repeated string cluster_ids = 7; + optional bool replicate = 8 [default = true]; message FamilyPath { required bytes family = 1; diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto index c103075c44..fd622cfc5b 100644 --- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto @@ -151,6 +151,7 @@ message BulkLoadDescriptor { repeated StoreDescriptor stores = 3; required int64 bulkload_seq_num = 4; repeated string cluster_ids = 5; + optional bool replicate = 6 [default = true]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 09d9d98da1..ed299028e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -848,6 +848,7 @@ public class PartitionedMobCompactor extends MobCompactor { // bulkload the ref file try { LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.disableReplication(); bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table, connection.getRegionLocator(table.getName())); } catch (Exception e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5b894a59cc..2423c085ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6099,7 +6099,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { - return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null); + return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, + null, true); } /** @@ -6150,7 +6151,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener, - boolean copyFile, List clusterIds) throws IOException { + boolean copyFile, List clusterIds, boolean replicate) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap<>(); @@ -6325,7 +6326,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(), UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()), - storeFiles, storeFilesSizes, seqId, clusterIds); + storeFiles, storeFilesSizes, seqId, clusterIds, replicate); WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 00e616915b..32d75f41f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2402,7 +2402,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } try { map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, - request.getCopyFile(), clusterIds); + request.getCopyFile(), clusterIds, request.getReplicate()); } finally { if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 0badf2a9c8..d0f3943090 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -295,7 +295,7 @@ public class SecureBulkLoadManager { //To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), - clusterIds); + clusterIds, request.getReplicate()); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 6e6876039a..ae0a732499 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -203,18 +203,20 @@ public class ReplicationSink { // Handle bulk load hfiles replication if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); - if(bulkLoadsPerClusters == null) { - bulkLoadsPerClusters = new HashMap<>(); - } - // Map of table name Vs list of pair of family and list of - // hfile paths from its namespace - Map>>> bulkLoadHFileMap = - bulkLoadsPerClusters.get(bld.getClusterIdsList()); - if (bulkLoadHFileMap == null) { - bulkLoadHFileMap = new HashMap<>(); - bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); + if(bld.getReplicate()) { + if (bulkLoadsPerClusters == null) { + bulkLoadsPerClusters = new HashMap<>(); + } + // Map of table name Vs list of pair of family and list of + // hfile paths from its namespace + Map>>> bulkLoadHFileMap = + bulkLoadsPerClusters.get(bld.getClusterIdsList()); + if (bulkLoadHFileMap == null) { + bulkLoadHFileMap = new HashMap<>(); + bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); + } + buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } - buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } else { // Handle wal replication if (isNewRowOrType(previousCell, cell)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java index f3d627ab2b..702ed75046 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java @@ -97,4 +97,5 @@ public interface BulkLoadHFiles { static BulkLoadHFiles create(Configuration conf) { return new BulkLoadHFilesTool(conf); } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index c5ba5a7f28..16ed114f6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -157,6 +157,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private List clusterIds = new ArrayList<>(); + private boolean replicate = true; + /** * Represents an HFile waiting to be loaded. An queue is used in this class in order to support * the case where a region has split during the process of the load. When this happens, the HFile @@ -547,7 +549,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = conn.getTable(getTableName())) { secureClient = new SecureBulkLoadClient(getConf(), table); success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, - assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds); + assignSeqIds, fsDelegationToken.getUserToken(), + bulkToken, copyFile, clusterIds, replicate); } return success ? regionName : null; } finally { @@ -1266,6 +1269,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { this.clusterIds = clusterIds; } + /** + * Disables replication for these bulkloaded files. + */ + public void disableReplication(){ + this.replicate = false; + } /** * Infers region boundaries for a new table. *

diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java index 028277f56a..6fd7288042 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -24,10 +24,16 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Arrays; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; +import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -42,6 +48,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -54,14 +61,21 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; @@ -77,14 +91,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Integration test for bulk load replication. Defines two clusters, with two way replication. - * Performs a bulk load on cluster defined by UTIL1 first, asserts the Cell on the bulk loaded file - * gets into the related table in UTIL1, then also validates the same got replicated to cluster - * UTIL2. Then, bulk loads another file into UTIL2, and checks if related values are present on - * UTIL2, and also gets replicated to UTIL1. - * It also defines a preBulkLoad coprocessor that is added to all test table regions on each of the - * clusters, in order to count amount of times bulk load actually gets invoked. This is to certify - * we are not entered in the infinite loop condition addressed by HBASE-22380. + * Integration test for bulk load replication. Defines three clusters, with the following + * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between + * 2 and 3). + * + * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file + * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication + * topology all these bulk loads should get replicated only once on each peer. To assert this, + * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the + * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying + * we are not entering the infinite loop condition addressed by HBASE-22380. */ @Category({ ReplicationTests.class, MediumTests.class}) public class TestBulkLoadReplication extends TestReplicationBase { @@ -103,11 +119,14 @@ public class TestBulkLoadReplication extends TestReplicationBase { private static final String PEER_ID1 = "1"; private static final String PEER_ID3 = "3"; - private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0); + private static AtomicInteger BULK_LOADS_COUNT; private static CountDownLatch BULK_LOAD_LATCH; private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility(); private static final Configuration CONF3 = UTIL3.getConfiguration(); + + private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); + private static Table htable3; @Rule @@ -132,7 +151,9 @@ public class TestBulkLoadReplication extends TestReplicationBase { UTIL3.startMiniCluster(NUM_SLAVES1); TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) - .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) + .setMobEnabled(true) + .setMobThreshold(4000) .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); @@ -147,6 +168,9 @@ public class TestBulkLoadReplication extends TestReplicationBase { @Before @Override public void setUpBase() throws Exception { + //"super.setUpBase()" already sets replication from 1->2, + //then on the subsequent lines, sets 2->1, 2->3 and 3->2. + //So we have following topology: "1 <-> 2 <->3" super.setUpBase(); ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); @@ -157,9 +181,10 @@ public class TestBulkLoadReplication extends TestReplicationBase { UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); //adds cluster2 as a remote peer on cluster3 UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); - setupCoprocessor(UTIL1); - setupCoprocessor(UTIL2); - setupCoprocessor(UTIL3); + setupCoprocessor(UTIL1, "cluster1"); + setupCoprocessor(UTIL2, "cluster2"); + setupCoprocessor(UTIL3, "cluster3"); + BULK_LOADS_COUNT = new AtomicInteger(0); } private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) { @@ -167,12 +192,19 @@ public class TestBulkLoadReplication extends TestReplicationBase { .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build(); } - private void setupCoprocessor(HBaseTestingUtility cluster){ + private void setupCoprocessor(HBaseTestingUtility cluster, String name){ cluster.getHBaseCluster().getRegions(tableName).forEach(r -> { try { - r.getCoprocessorHost() - .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, - cluster.getConfiguration()); + TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost(). + findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); + if(cp == null) { + r.getCoprocessorHost(). + load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, + cluster.getConfiguration()); + cp = r.getCoprocessorHost(). + findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); + cp.clusterName = cluster.getClusterKey(); + } } catch (Exception e){ LOG.error(e.getMessage(), e); } @@ -221,6 +253,31 @@ public class TestBulkLoadReplication extends TestReplicationBase { assertEquals(9, BULK_LOADS_COUNT.get()); } + @Test + public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception { + Path path = createMobFiles(UTIL3); + ColumnFamilyDescriptor descriptor = + new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName); + ExecutorService pool = null; + try { + pool = Executors.newFixedThreadPool(1); + PartitionedMobCompactor compactor = + new PartitionedMobCompactor(UTIL3.getConfiguration(), UTIL3.getTestFileSystem(), tableName, + descriptor, pool); + BULK_LOAD_LATCH = new CountDownLatch(1); + BULK_LOADS_COUNT.set(0); + compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true); + assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS)); + Thread.sleep(400); + assertEquals(1, BULK_LOADS_COUNT.get()); + } finally { + if(pool != null && !pool.isTerminated()) { + pool.shutdownNow(); + } + } + } + + private void assertBulkLoadConditions(byte[] row, byte[] value, HBaseTestingUtility utility, Table...tables) throws Exception { BULK_LOAD_LATCH = new CountDownLatch(3); @@ -236,11 +293,11 @@ public class TestBulkLoadReplication extends TestReplicationBase { String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration()); copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster()); BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); - bulkLoadHFilesTool.bulkLoad(tableName, new Path("/bulk_dir")); + bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR); } private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { - Path bulkLoadDir = new Path("/bulk_dir/f"); + Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f"); cluster.getFileSystem().mkdirs(bulkLoadDir); cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); } @@ -281,22 +338,53 @@ public class TestBulkLoadReplication extends TestReplicationBase { return hFileLocation.getAbsoluteFile().getAbsolutePath(); } + private Path createMobFiles(HBaseTestingUtility util) throws IOException { + Path testDir = FSUtils.getRootDir(util.getConfiguration()); + Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); + Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f"); + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + MobFileName mobFileName = null; + byte[] mobFileStartRow = new byte[32]; + for (byte rowKey : Bytes.toBytes("01234")) { + mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()), + UUID.randomUUID().toString().replaceAll("-", "")); + StoreFileWriter mobFileWriter = + new StoreFileWriter.Builder(util.getConfiguration(), + new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta) + .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); + long now = System.currentTimeMillis(); + try { + for (int i = 0; i < 10; i++) { + byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i)); + byte[] dummyData = new byte[5000]; + new Random().nextBytes(dummyData); + mobFileWriter.append( + new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData)); + } + } finally { + mobFileWriter.close(); + } + } + return basePath; + } + public static class BulkReplicationTestObserver implements RegionCoprocessor { + String clusterName; + AtomicInteger bulkLoadCounts = new AtomicInteger(); + @Override public Optional getRegionObserver() { return Optional.of(new RegionObserver() { - @Override - public void preBulkLoadHFile(ObserverContext ctx, - List> familyPaths) throws IOException { - BULK_LOADS_COUNT.incrementAndGet(); - } @Override public void postBulkLoadHFile(ObserverContext ctx, List> stagingFamilyPaths, Map> finalPaths) throws IOException { BULK_LOAD_LATCH.countDown(); + BULK_LOADS_COUNT.incrementAndGet(); + LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName, + bulkLoadCounts.addAndGet(1)); } }); } -- 2.17.2 (Apple Git-113)