From 6b5d085a3945403195dc6a21c9c3780aa11afd4c Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Sat, 21 Sep 2019 14:39:06 +0100 Subject: [PATCH] HBASE-22380 break circle replication when doing bulkload (#494) (cherry picked from commit 38c8bd37319325f97b1a6fe8a64c0c71683782b9) Signed-off-by: stack Signed-off-by: Andrew Purtell Signed-off-by: Norbert Kalmar --- .../hbase/client/SecureBulkLoadClient.java | 18 +- .../hbase/shaded/protobuf/ProtobufUtil.java | 12 +- .../shaded/protobuf/RequestConverter.java | 11 +- .../src/main/protobuf/Client.proto | 1 + .../src/main/protobuf/WAL.proto | 1 + .../hadoop/hbase/regionserver/HRegion.java | 9 +- .../hbase/regionserver/RSRpcServices.java | 10 +- .../regionserver/SecureBulkLoadManager.java | 10 +- .../regionserver/HFileReplicator.java | 5 +- .../regionserver/ReplicationSink.java | 44 ++- .../hbase/tool/LoadIncrementalHFiles.java | 8 +- .../TestReplicationAdminWithClusters.java | 14 +- .../regionserver/TestBulkLoadReplication.java | 311 ++++++++++++++++++ .../replication/TestNamespaceReplication.java | 4 +- .../replication/TestReplicationBase.java | 132 ++++---- ...tReplicationChangingPeerRegionservers.java | 12 +- .../TestReplicationDisableInactivePeer.java | 4 +- .../TestReplicationDroppedTables.java | 52 +-- .../TestReplicationEmptyWALRecovery.java | 22 +- .../replication/TestReplicationEndpoint.java | 42 +-- .../TestReplicationKillMasterRS.java | 2 +- ...TestReplicationKillMasterRSCompressed.java | 2 +- ...cationKillMasterRSWithSeparateOldWALs.java | 4 +- .../replication/TestReplicationKillRS.java | 6 +- .../TestReplicationKillSlaveRS.java | 2 +- ...icationKillSlaveRSWithSeparateOldWALs.java | 4 +- .../TestReplicationMetricsforUI.java | 6 +- .../TestReplicationSmallTests.java | 12 +- .../replication/TestReplicationStatus.java | 10 +- .../TestReplicationSyncUpTool.java | 78 ++--- ...plicationEndpointWithMultipleAsyncWAL.java | 4 +- ...estReplicationEndpointWithMultipleWAL.java | 4 +- ...asterRSCompressedWithMultipleAsyncWAL.java | 4 +- ...KillMasterRSCompressedWithMultipleWAL.java | 4 +- ...icationSyncUpToolWithMultipleAsyncWAL.java | 4 +- ...tReplicationSyncUpToolWithMultipleWAL.java | 4 +- .../regionserver/TestReplicator.java | 22 +- 37 files changed, 637 insertions(+), 257 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index 2186271757..7e3166c3f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -115,8 +115,8 @@ public class SecureBulkLoadClient { final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) throws IOException { - return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken, - false); + return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, + bulkToken, false, null); } /** @@ -132,13 +132,23 @@ public class SecureBulkLoadClient { * @return true if all are loaded * @throws IOException */ + public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, + final List> familyPaths, + final byte[] regionName, boolean assignSeqNum, + final Token userToken, final String bulkToken, + boolean copyFiles) throws IOException { + return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, + bulkToken, false, null); + } + public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, final List> familyPaths, final byte[] regionName, boolean assignSeqNum, - final Token userToken, final String bulkToken, boolean copyFiles) throws IOException { + final Token userToken, final String bulkToken, + boolean copyFiles, List clusterIds) throws IOException { BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, - userToken, bulkToken, copyFiles); + userToken, bulkToken, copyFiles, clusterIds); try { BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index adc855231f..7179272abd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2548,13 +2548,23 @@ public final class ProtobufUtil { * name * @return The WAL log marker for bulk loads. */ + public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, + ByteString encodedRegionName, Map> storeFiles, + Map storeFilesSize, long bulkloadSeqId) { + return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles, + storeFilesSize, bulkloadSeqId, null); + } + public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName, Map> storeFiles, - Map storeFilesSize, long bulkloadSeqId) { + Map storeFilesSize, long bulkloadSeqId, List clusterIds) { BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder() .setTableName(ProtobufUtil.toProtoTableName(tableName)) .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId); + if(clusterIds != null) { + desc.addAllClusterIds(clusterIds); + } for (Map.Entry> entry : storeFiles.entrySet()) { WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder() diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index a4bf8990c0..a7c4e8ea67 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -562,7 +562,7 @@ public final class RequestConverter { final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) { return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken, - false); + false, null); } /** @@ -577,9 +577,9 @@ public final class RequestConverter { * @return a bulk load request */ public static BulkLoadHFileRequest buildBulkLoadHFileRequest( - final List> familyPaths, - final byte[] regionName, boolean assignSeqNum, - final Token userToken, final String bulkToken, boolean copyFiles) { + final List> familyPaths, final byte[] regionName, boolean assignSeqNum, + final Token userToken, final String bulkToken, boolean copyFiles, + List clusterIds) { RegionSpecifier region = RequestConverter.buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -617,6 +617,9 @@ public final class RequestConverter { request.setBulkToken(bulkToken); } request.setCopyFile(copyFiles); + if (clusterIds != null) { + request.addAllClusterIds(clusterIds); + } return request.build(); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 14abb085d6..07d8d711a0 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -378,6 +378,7 @@ message BulkLoadHFileRequest { optional DelegationToken fs_token = 4; optional string bulk_token = 5; optional bool copy_file = 6 [default = false]; + repeated string cluster_ids = 7; message FamilyPath { required bytes family = 1; diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto index 08d4741aa4..9020daf615 100644 --- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto @@ -144,6 +144,7 @@ message BulkLoadDescriptor { required bytes encoded_region_name = 2; repeated StoreDescriptor stores = 3; required int64 bulkload_seq_num = 4; + repeated string cluster_ids = 5; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8e17decb52..09e364dcf7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6042,7 +6042,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { - return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false); + return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null); } /** @@ -6087,11 +6087,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param bulkLoadListener Internal hooks enabling massaging/preparation of a * file about to be bulk loaded * @param copyFile always copy hfiles if true + * @param clusterIds ids from clusters that had already handled the given bulkload event. * @return Map from family to List of store file paths if successful, null if failed recoverably * @throws IOException if failed unrecoverably. */ public Map> bulkLoadHFiles(Collection> familyPaths, - boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException { + boolean assignSeqId, BulkLoadListener bulkLoadListener, + boolean copyFile, List clusterIds) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap<>(); @@ -6266,8 +6268,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(), UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()), - storeFiles, - storeFilesSizes, seqId); + storeFiles, storeFilesSizes, seqId, clusterIds); WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 2b3bec73de..aa54876661 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2305,6 +2305,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, final BulkLoadHFileRequest request) throws ServiceException { long start = EnvironmentEdgeManager.currentTime(); + List clusterIds = new ArrayList<>(request.getClusterIdsList()); + if(clusterIds.contains(this.regionServer.clusterId)){ + return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); + } else { + clusterIds.add(this.regionServer.clusterId); + } try { checkOpen(); requestCount.increment(); @@ -2337,7 +2343,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } try { map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, - request.getCopyFile()); + request.getCopyFile(), clusterIds); } finally { if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); @@ -2345,7 +2351,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } else { // secure bulk load - map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); + map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds); } BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); builder.setLoaded(map != null); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 6b55744d94..f51608d78a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -212,7 +212,12 @@ public class SecureBulkLoadManager { } public Map> secureBulkLoadHFiles(final HRegion region, - final BulkLoadHFileRequest request) throws IOException { + final BulkLoadHFileRequest request) throws IOException { + return secureBulkLoadHFiles(region, request, null); + } + + public Map> secureBulkLoadHFiles(final HRegion region, + final BulkLoadHFileRequest request, List clusterIds) throws IOException { final List> familyPaths = new ArrayList<>(request.getFamilyPathCount()); for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) { familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath())); @@ -288,7 +293,8 @@ public class SecureBulkLoadManager { //We call bulkLoadHFiles as requesting user //To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile()); + new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), + clusterIds); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java index ab9a2366a7..c7fed77c1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -87,17 +87,19 @@ public class HFileReplicator { private ThreadPoolExecutor exec; private int maxCopyThreads; private int copiesPerThread; + private List sourceClusterIds; public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, Map>>> tableQueueMap, Configuration conf, - Connection connection) throws IOException { + Connection connection, List sourceClusterIds) throws IOException { this.sourceClusterConf = sourceClusterConf; this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; this.bulkLoadHFileMap = tableQueueMap; this.conf = conf; this.connection = connection; + this.sourceClusterIds = sourceClusterIds; userProvider = UserProvider.instantiate(conf); fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); @@ -128,6 +130,7 @@ public class HFileReplicator { LoadIncrementalHFiles loadHFiles = null; try { loadHFiles = new LoadIncrementalHFiles(conf); + loadHFiles.setClusterIds(sourceClusterIds); } catch (Exception e) { LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded" + " data.", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index fb4e0f95b1..8079adc3ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -174,9 +174,7 @@ public class ReplicationSink { // invocation of this method per table and cluster id. Map, List>> rowMap = new TreeMap<>(); - // Map of table name Vs list of pair of family and list of hfile paths from its namespace - Map>>> bulkLoadHFileMap = null; - + Map, Map>>>> bulkLoadsPerClusters = null; for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); @@ -204,10 +202,19 @@ public class ReplicationSink { Cell cell = cells.current(); // Handle bulk load hfiles replication if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + if(bulkLoadsPerClusters == null) { + bulkLoadsPerClusters = new HashMap<>(); + } + // Map of table name Vs list of pair of family and list of + // hfile paths from its namespace + Map>>> bulkLoadHFileMap = + bulkLoadsPerClusters.get(bld.getClusterIdsList()); if (bulkLoadHFileMap == null) { bulkLoadHFileMap = new HashMap<>(); + bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); } - buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell); + buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } else { // Handle wal replication if (isNewRowOrType(previousCell, cell)) { @@ -243,14 +250,26 @@ public class ReplicationSink { LOG.debug("Finished replicating mutations."); } - if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { - LOG.debug("Started replicating bulk loaded data."); - HFileReplicator hFileReplicator = - new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), + if(bulkLoadsPerClusters != null) { + for (Entry, Map>>>> entry : bulkLoadsPerClusters.entrySet()) { + Map>>> bulkLoadHFileMap = entry.getValue(); + if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { + if(LOG.isDebugEnabled()) { + LOG.debug("Started replicating bulk loaded data from cluster ids: {}.", + entry.getKey().toString()); + } + HFileReplicator hFileReplicator = + new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, - getConnection()); - hFileReplicator.replicate(); - LOG.debug("Finished replicating bulk loaded data."); + getConnection(), entry.getKey()); + hFileReplicator.replicate(); + if(LOG.isDebugEnabled()) { + LOG.debug("Finished replicating bulk loaded data from cluster id: {}", + entry.getKey().toString()); + } + } + } } int size = entries.size(); @@ -265,8 +284,7 @@ public class ReplicationSink { private void buildBulkLoadHFileMap( final Map>>> bulkLoadHFileMap, TableName table, - Cell cell) throws IOException { - BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + BulkLoadDescriptor bld) throws IOException { List storesList = bld.getStoresList(); int storesSize = storesList.size(); for (int j = 0; j < storesSize; j++) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index e4d5dcb21c..950f2a556d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -137,6 +137,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private String bulkToken; + private List clusterIds = new ArrayList<>(); + /** * Represents an HFile waiting to be loaded. An queue is used in this class in order to support * the case where a region has split during the process of the load. When this happens, the HFile @@ -539,7 +541,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = conn.getTable(getTableName())) { secureClient = new SecureBulkLoadClient(getConf(), table); success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, - assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); + assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds); } return success ? regionName : null; } finally { @@ -1251,6 +1253,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { this.bulkToken = stagingDir; } + public void setClusterIds(List clusterIds) { + this.clusterIds = clusterIds; + } + /** * Infers region boundaries for a new table. *

diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index 0ff757eaa4..dc2b3a73ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -74,11 +74,11 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { @BeforeClass public static void setUpBeforeClass() throws Exception { TestReplicationBase.setUpBeforeClass(); - connection1 = ConnectionFactory.createConnection(conf1); - connection2 = ConnectionFactory.createConnection(conf2); + connection1 = ConnectionFactory.createConnection(CONF1); + connection2 = ConnectionFactory.createConnection(CONF2); admin1 = connection1.getAdmin(); admin2 = connection2.getAdmin(); - adminExt = new ReplicationAdmin(conf1); + adminExt = new ReplicationAdmin(CONF1); } @AfterClass @@ -199,8 +199,8 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); } } finally { - utility1.deleteTable(tn); - utility2.deleteTable(tn); + UTIL1.deleteTable(tn); + UTIL2.deleteTable(tn); } } @@ -273,7 +273,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { public void testReplicationPeerConfigUpdateCallback() throws Exception { String peerId = "1"; ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(utility2.getClusterKey()); + rpc.setClusterKey(UTIL2.getClusterKey()); rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName()); rpc.getConfiguration().put("key1", "value1"); @@ -325,7 +325,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { @Override public UUID getPeerUUID() { - return utility1.getRandomUUID(); + return UTIL1.getRandomUUID(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java new file mode 100644 index 0000000000..5cf7d77317 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for bulk load replication. Defines two clusters, with two way replication. + * Performs a bulk load on cluster defined by UTIL1 first, asserts the Cell on the bulk loaded file + * gets into the related table in UTIL1, then also validates the same got replicated to cluster + * UTIL2. Then, bulk loads another file into UTIL2, and checks if related values are present on + * UTIL2, and also gets replicated to UTIL1. + * It also defines a preBulkLoad coprocessor that is added to all test table regions on each of the + * clusters, in order to count amount of times bulk load actually gets invoked. This is to certify + * we are not entered in the infinite loop condition addressed by HBASE-22380. + */ +@Category({ ReplicationTests.class, MediumTests.class}) +public class TestBulkLoadReplication extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBulkLoadReplication.class); + + protected static final Logger LOG = + LoggerFactory.getLogger(TestBulkLoadReplication.class); + + private static final String PEER1_CLUSTER_ID = "peer1"; + private static final String PEER2_CLUSTER_ID = "peer2"; + private static final String PEER3_CLUSTER_ID = "peer3"; + + private static final String PEER_ID1 = "1"; + private static final String PEER_ID3 = "3"; + + private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0); + private static CountDownLatch BULK_LOAD_LATCH; + + private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility(); + private static final Configuration CONF3 = UTIL3.getConfiguration(); + private static Table htable3; + + @Rule + public TestName name = new TestName(); + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); + setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID); + setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID); + setupConfig(UTIL3, "/3"); + TestReplicationBase.setUpBeforeClass(); + startThirdCluster(); + } + + private static void startThirdCluster() throws Exception { + LOG.info("Setup Zk to same one from UTIL1 and UTIL2"); + UTIL3.setZkCluster(UTIL1.getZkCluster()); + UTIL3.startMiniCluster(NUM_SLAVES1); + + TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); + + Connection connection3 = ConnectionFactory.createConnection(CONF3); + try (Admin admin3 = connection3.getAdmin()) { + admin3.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + UTIL3.waitUntilAllRegionsAssigned(tableName); + htable3 = connection3.getTable(tableName); + } + + @Before + @Override + public void setUpBase() throws Exception { + super.setUpBase(); + ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); + ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); + ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3); + //adds cluster1 as a remote peer on cluster2 + UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); + //adds cluster3 as a remote peer on cluster2 + UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); + //adds cluster2 as a remote peer on cluster3 + UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); + setupCoprocessor(UTIL1); + setupCoprocessor(UTIL2); + setupCoprocessor(UTIL3); + } + + private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) { + return ReplicationPeerConfig.newBuilder() + .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build(); + } + + private void setupCoprocessor(HBaseTestingUtility cluster){ + cluster.getHBaseCluster().getRegions(tableName).forEach(r -> { + try { + r.getCoprocessorHost() + .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, + cluster.getConfiguration()); + } catch (Exception e){ + LOG.error(e.getMessage(), e); + } + }); + } + + @After + @Override + public void tearDownBase() throws Exception { + super.tearDownBase(); + UTIL2.getAdmin().removeReplicationPeer(PEER_ID1); + UTIL2.getAdmin().removeReplicationPeer(PEER_ID3); + UTIL3.getAdmin().removeReplicationPeer(PEER_ID2); + } + + private static void setupBulkLoadConfigsForCluster(Configuration config, + String clusterReplicationId) throws Exception { + config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); + File sourceConfigFolder = testFolder.newFolder(clusterReplicationId); + File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + + "/hbase-site.xml"); + config.writeXml(new FileOutputStream(sourceConfigFile)); + config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath()); + } + + @Test + public void testBulkLoadReplicationActiveActive() throws Exception { + Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); + Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); + Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); + byte[] row = Bytes.toBytes("001"); + byte[] value = Bytes.toBytes("v1"); + assertBulkLoadConditions(row, value, UTIL1, peer1TestTable, peer2TestTable, peer3TestTable); + row = Bytes.toBytes("002"); + value = Bytes.toBytes("v2"); + assertBulkLoadConditions(row, value, UTIL2, peer1TestTable, peer2TestTable, peer3TestTable); + row = Bytes.toBytes("003"); + value = Bytes.toBytes("v3"); + assertBulkLoadConditions(row, value, UTIL3, peer1TestTable, peer2TestTable, peer3TestTable); + //Additional wait to make sure no extra bulk load happens + Thread.sleep(400); + //We have 3 bulk load events (1 initiated on each cluster). + //Each event gets 3 counts (the originator cluster, plus the two peers), + //so BULK_LOADS_COUNT expected value is 3 * 3 = 9. + assertEquals(9, BULK_LOADS_COUNT.get()); + } + + private void assertBulkLoadConditions(byte[] row, byte[] value, + HBaseTestingUtility utility, Table...tables) throws Exception { + BULK_LOAD_LATCH = new CountDownLatch(3); + bulkLoadOnCluster(row, value, utility); + assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); + assertTableHasValue(tables[0], row, value); + assertTableHasValue(tables[1], row, value); + assertTableHasValue(tables[2], row, value); + } + + private void bulkLoadOnCluster(byte[] row, byte[] value, + HBaseTestingUtility cluster) throws Exception { + String bulkLoadFile = createHFileForFamilies(row, value, cluster.getConfiguration()); + Path bulkLoadFilePath = new Path(bulkLoadFile); + copyToHdfs(bulkLoadFile, cluster.getDFSCluster()); + LoadIncrementalHFiles bulkLoadHFilesTool = new LoadIncrementalHFiles(cluster.getConfiguration()); + Map> family2Files = new HashMap<>(); + List files = new ArrayList<>(); + files.add(new Path("/bulk_dir/f/" + bulkLoadFilePath.getName())); + family2Files.put(Bytes.toBytes("f"), files); + bulkLoadHFilesTool.run(family2Files, tableName); + } + + private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { + Path bulkLoadDir = new Path("/bulk_dir/f"); + cluster.getFileSystem().mkdirs(bulkLoadDir); + cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); + } + + private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { + Get get = new Get(row); + Result result = table.get(get); + assertTrue(result.advance()); + assertEquals(Bytes.toString(value), Bytes.toString(result.value())); + } + + private String createHFileForFamilies(byte[] row, byte[] value, + Configuration clusterConfig) throws IOException { + CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); + cellBuilder.setRow(row) + .setFamily(TestReplicationBase.famName) + .setQualifier(Bytes.toBytes("1")) + .setValue(value) + .setType(Cell.Type.Put); + + HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); + // TODO We need a way to do this without creating files + File hFileLocation = testFolder.newFile(); + FSDataOutputStream out = + new FSDataOutputStream(new FileOutputStream(hFileLocation), null); + try { + hFileFactory.withOutputStream(out); + hFileFactory.withFileContext(new HFileContext()); + HFile.Writer writer = hFileFactory.create(); + try { + writer.append(new KeyValue(cellBuilder.build())); + } finally { + writer.close(); + } + } finally { + out.close(); + } + return hFileLocation.getAbsoluteFile().getAbsolutePath(); + } + + public static class BulkReplicationTestObserver implements RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(new RegionObserver() { + @Override + public void preBulkLoadHFile(ObserverContext ctx, + List> familyPaths) throws IOException { + BULK_LOADS_COUNT.incrementAndGet(); + } + + @Override + public void postBulkLoadHFile(ObserverContext ctx, + List> stagingFamilyPaths, Map> finalPaths) + throws IOException { + BULK_LOAD_LATCH.countDown(); + } + }); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java index d8a02c7b44..7dcdf8ccee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java @@ -101,8 +101,8 @@ public class TestNamespaceReplication extends TestReplicationBase { public static void setUpBeforeClass() throws Exception { TestReplicationBase.setUpBeforeClass(); - connection1 = ConnectionFactory.createConnection(conf1); - connection2 = ConnectionFactory.createConnection(conf2); + connection1 = ConnectionFactory.createConnection(CONF1); + connection2 = ConnectionFactory.createConnection(CONF2); admin1 = connection1.getAdmin(); admin2 = connection2.getAdmin(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 72ab246650..df44486735 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -25,15 +25,12 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.NavigableMap; -import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -50,7 +47,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -67,22 +63,21 @@ import org.slf4j.LoggerFactory; public class TestReplicationBase { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); - protected static Configuration conf1 = HBaseConfiguration.create(); - protected static Configuration conf2; protected static Configuration CONF_WITH_LOCALFS; - protected static ZKWatcher zkw1; - protected static ZKWatcher zkw2; - protected static ReplicationAdmin admin; protected static Admin hbaseAdmin; protected static Table htable1; protected static Table htable2; - protected static NavigableMap scopes; - protected static HBaseTestingUtility utility1; - protected static HBaseTestingUtility utility2; + protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); + protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); + protected static final Configuration CONF1 = UTIL1.getConfiguration(); + protected static final Configuration CONF2 = UTIL2.getConfiguration(); + + protected static final int NUM_SLAVES1 = 2; + protected static final int NUM_SLAVES2 = 4; protected static final int NB_ROWS_IN_BATCH = 100; protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10; @@ -102,12 +97,12 @@ public class TestReplicationBase { protected final void cleanUp() throws IOException, InterruptedException { // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue - for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() + for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster() .getRegionServerThreads()) { - utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); + UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); } - int rowCount = utility1.countRows(tableName); - utility1.deleteTableData(tableName); + int rowCount = UTIL1.countRows(tableName); + UTIL1.deleteTableData(tableName); // truncating the table will send one Delete per row to the slave cluster // in an async fashion, which is why we cannot just call deleteTableData on // utility2 since late writes could make it to the slave in some way. @@ -169,79 +164,88 @@ public class TestReplicationBase { htable1.put(puts); } - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + protected static void setupConfig(HBaseTestingUtility util, String znodeParent) { + Configuration conf = util.getConfiguration(); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger // sufficient number of events. But we don't want to go too low because // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want // more than one batch sent to the peer cluster for better testing. - conf1.setInt("replication.source.size.capacity", 102400); - conf1.setLong("replication.source.sleepforretries", 100); - conf1.setInt("hbase.regionserver.maxlogs", 10); - conf1.setLong("hbase.master.logcleaner.ttl", 10); - conf1.setInt("zookeeper.recovery.retry", 1); - conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); - conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); - conf1.setInt("replication.stats.thread.period.seconds", 5); - conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); - conf1.setLong("replication.sleep.before.failover", 2000); - conf1.setInt("replication.source.maxretriesmultiplier", 10); - conf1.setFloat("replication.source.ratio", 1.0f); - conf1.setBoolean("replication.source.eof.autorecovery", true); - conf1.setLong("hbase.serial.replication.waiting.ms", 100); - - utility1 = new HBaseTestingUtility(conf1); - utility1.startMiniZKCluster(); - MiniZooKeeperCluster miniZK = utility1.getZkCluster(); - // Have to reget conf1 in case zk cluster location different - // than default - conf1 = utility1.getConfiguration(); - zkw1 = new ZKWatcher(conf1, "cluster1", null, true); - admin = new ReplicationAdmin(conf1); - LOG.info("Setup first Zk"); + conf.setInt("replication.source.size.capacity", 102400); + conf.setLong("replication.source.sleepforretries", 100); + conf.setInt("hbase.regionserver.maxlogs", 10); + conf.setLong("hbase.master.logcleaner.ttl", 10); + conf.setInt("zookeeper.recovery.retry", 1); + conf.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf.setInt("replication.stats.thread.period.seconds", 5); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.setLong("replication.sleep.before.failover", 2000); + conf.setInt("replication.source.maxretriesmultiplier", 10); + conf.setFloat("replication.source.ratio", 1.0f); + conf.setBoolean("replication.source.eof.autorecovery", true); + conf.setLong("hbase.serial.replication.waiting.ms", 100); + } + + static void configureClusters(HBaseTestingUtility util1, + HBaseTestingUtility util2) { + setupConfig(util1, "/1"); + setupConfig(util2, "/2"); - // Base conf2 on conf1 so it gets the right zk cluster. - conf2 = HBaseConfiguration.create(conf1); + Configuration conf2 = util2.getConfiguration(); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); + } + + protected static void restartHBaseCluster(HBaseTestingUtility util, int numSlaves) + throws Exception { + util.shutdownMiniHBaseCluster(); + util.restartHBaseCluster(numSlaves); + } + + protected static void startClusters() throws Exception { + UTIL1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); + admin = new ReplicationAdmin(CONF1); + LOG.info("Setup first Zk"); - utility2 = new HBaseTestingUtility(conf2); - utility2.setZkCluster(miniZK); - zkw2 = new ZKWatcher(conf2, "cluster2", null, true); + UTIL2.setZkCluster(miniZK); LOG.info("Setup second Zk"); - CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); - utility1.startMiniCluster(2); + CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1); + UTIL1.startMiniCluster(NUM_SLAVES1); // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks // as a component in deciding maximum number of parallel batches to send to the peer cluster. - utility2.startMiniCluster(4); + UTIL2.startMiniCluster(NUM_SLAVES2); - hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin(); + hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin(); TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); - scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (ColumnFamilyDescriptor f : table.getColumnFamilies()) { - scopes.put(f.getName(), f.getScope()); - } - Connection connection1 = ConnectionFactory.createConnection(conf1); - Connection connection2 = ConnectionFactory.createConnection(conf2); + + Connection connection1 = ConnectionFactory.createConnection(CONF1); + Connection connection2 = ConnectionFactory.createConnection(CONF2); try (Admin admin1 = connection1.getAdmin()) { admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } - utility1.waitUntilAllRegionsAssigned(tableName); - utility2.waitUntilAllRegionsAssigned(tableName); + UTIL1.waitUntilAllRegionsAssigned(tableName); + UTIL2.waitUntilAllRegionsAssigned(tableName); htable1 = connection1.getTable(tableName); htable2 = connection2.getTable(tableName); } + @BeforeClass + public static void setUpBeforeClass() throws Exception { + configureClusters(UTIL1, UTIL2); + startClusters(); + } + private boolean peerExist(String peerId) throws IOException { return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); } @@ -250,7 +254,7 @@ public class TestReplicationBase { public void setUpBase() throws Exception { if (!peerExist(PEER_ID2)) { ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() - .setClusterKey(utility2.getClusterKey()).setSerial(isSerialPeer()).build(); + .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).build(); hbaseAdmin.addReplicationPeer(PEER_ID2, rpc); } } @@ -266,7 +270,7 @@ public class TestReplicationBase { Put put = new Put(row); put.addColumn(famName, row, row); - htable1 = utility1.getConnection().getTable(tableName); + htable1 = UTIL1.getConnection().getTable(tableName); htable1.put(put); Get get = new Get(row); @@ -321,7 +325,7 @@ public class TestReplicationBase { htable2.close(); htable1.close(); admin.close(); - utility2.shutdownMiniCluster(); - utility1.shutdownMiniCluster(); + UTIL2.shutdownMiniCluster(); + UTIL1.shutdownMiniCluster(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java index b94b443dda..4e1d33f9d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java @@ -82,14 +82,14 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas public void setUp() throws Exception { // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue - for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() + for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster() .getRegionServerThreads()) { - utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); + UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); } - utility1.deleteTableData(tableName); + UTIL1.deleteTableData(tableName); // truncating the table will send one Delete per row to the slave cluster // in an async fashion, which is why we cannot just call deleteTableData on - // utility2 since late writes could make it to the slave in some way. + // UTIL2 since late writes could make it to the slave in some way. // Instead, we truncate the first table and wait for all the Deletes to // make it to the slave. Scan scan = new Scan(); @@ -117,7 +117,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas @Test public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException { LOG.info("testSimplePutDelete"); - MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster(); + MiniHBaseCluster peerCluster = UTIL2.getMiniHBaseCluster(); int numRS = peerCluster.getRegionServerThreads().size(); doPutTest(Bytes.toBytes(1)); @@ -144,7 +144,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas put.addColumn(famName, row, row); if (htable1 == null) { - htable1 = utility1.getConnection().getTable(tableName); + htable1 = UTIL1.getConnection().getTable(tableName); } htable1.put(put); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java index 8b795aa423..0a490cd341 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java @@ -53,7 +53,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase { */ @Test public void testDisableInactivePeer() throws Exception { - utility2.shutdownMiniHBaseCluster(); + UTIL2.shutdownMiniHBaseCluster(); byte[] rowkey = Bytes.toBytes("disable inactive peer"); Put put = new Put(rowkey); @@ -65,7 +65,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase { // disable and start the peer admin.disablePeer("2"); - utility2.startMiniHBaseCluster(1, 2); + UTIL2.startMiniHBaseCluster(1, 2); Get get = new Get(rowkey); for (int i = 0; i < NB_RETRIES; i++) { Result res = htable2.get(get); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java index f280c7c193..79302ddd2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java @@ -63,17 +63,17 @@ public class TestReplicationDroppedTables extends TestReplicationBase { public void setUpBase() throws Exception { // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue - for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() + for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster() .getRegionServerThreads()) { - utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); + UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); } // Initialize the peer after wal rolling, so that we will abandon the stuck WALs. super.setUpBase(); - int rowCount = utility1.countRows(tableName); - utility1.deleteTableData(tableName); + int rowCount = UTIL1.countRows(tableName); + UTIL1.deleteTableData(tableName); // truncating the table will send one Delete per row to the slave cluster // in an async fashion, which is why we cannot just call deleteTableData on - // utility2 since late writes could make it to the slave in some way. + // UTIL2 since late writes could make it to the slave in some way. // Instead, we truncate the first table and wait for all the Deletes to // make it to the slave. Scan scan = new Scan(); @@ -101,7 +101,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase { // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table // may apply first, and then test_dropped table, and we will believe that the replication is not // got stuck (HBASE-20475). - conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024); + CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024); } @Test @@ -121,11 +121,11 @@ public class TestReplicationDroppedTables extends TestReplicationBase { @Test public void testEditsDroppedWithDroppedTableNS() throws Exception { // also try with a namespace - Connection connection1 = ConnectionFactory.createConnection(conf1); + Connection connection1 = ConnectionFactory.createConnection(CONF1); try (Admin admin1 = connection1.getAdmin()) { admin1.createNamespace(NamespaceDescriptor.create("NS").build()); } - Connection connection2 = ConnectionFactory.createConnection(conf2); + Connection connection2 = ConnectionFactory.createConnection(CONF2); try (Admin admin2 = connection2.getAdmin()) { admin2.createNamespace(NamespaceDescriptor.create("NS").build()); } @@ -143,13 +143,13 @@ public class TestReplicationDroppedTables extends TestReplicationBase { } private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception { - conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); - conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); + CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); + CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); // make sure we have a single region server only, so that all // edits for all tables go there - utility1.shutdownMiniHBaseCluster(); - utility1.startMiniHBaseCluster(1, 1); + UTIL1.shutdownMiniHBaseCluster(); + UTIL1.startMiniHBaseCluster(1, 1); TableName tablename = TableName.valueOf(tName); byte[] familyName = Bytes.toBytes("fam"); @@ -161,16 +161,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase { .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .build(); - Connection connection1 = ConnectionFactory.createConnection(conf1); - Connection connection2 = ConnectionFactory.createConnection(conf2); + Connection connection1 = ConnectionFactory.createConnection(CONF1); + Connection connection2 = ConnectionFactory.createConnection(CONF2); try (Admin admin1 = connection1.getAdmin()) { admin1.createTable(table); } try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(table); } - utility1.waitUntilAllRegionsAssigned(tablename); - utility2.waitUntilAllRegionsAssigned(tablename); + UTIL1.waitUntilAllRegionsAssigned(tablename); + UTIL2.waitUntilAllRegionsAssigned(tablename); // now suspend replication try (Admin admin1 = connection1.getAdmin()) { @@ -213,18 +213,18 @@ public class TestReplicationDroppedTables extends TestReplicationBase { verifyReplicationStuck(); } // just to be safe - conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); + CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); } @Test public void testEditsBehindDroppedTableTiming() throws Exception { - conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true); - conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); + CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true); + CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); // make sure we have a single region server only, so that all // edits for all tables go there - utility1.shutdownMiniHBaseCluster(); - utility1.startMiniHBaseCluster(1, 1); + UTIL1.shutdownMiniHBaseCluster(); + UTIL1.startMiniHBaseCluster(1, 1); TableName tablename = TableName.valueOf("testdroppedtimed"); byte[] familyName = Bytes.toBytes("fam"); @@ -236,16 +236,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase { .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .build(); - Connection connection1 = ConnectionFactory.createConnection(conf1); - Connection connection2 = ConnectionFactory.createConnection(conf2); + Connection connection1 = ConnectionFactory.createConnection(CONF1); + Connection connection2 = ConnectionFactory.createConnection(CONF2); try (Admin admin1 = connection1.getAdmin()) { admin1.createTable(table); } try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(table); } - utility1.waitUntilAllRegionsAssigned(tablename); - utility2.waitUntilAllRegionsAssigned(tablename); + UTIL1.waitUntilAllRegionsAssigned(tablename); + UTIL2.waitUntilAllRegionsAssigned(tablename); // now suspend replication try (Admin admin1 = connection1.getAdmin()) { @@ -290,7 +290,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase { verifyReplicationProceeded(); } // just to be safe - conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); + CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); } private boolean peerHasAllNormalRows() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index 4effe4149c..c0f22a9ac1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -55,16 +55,16 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { * @param numRs number of regionservers */ private void waitForLogAdvance(int numRs) throws Exception { - Waiter.waitFor(conf1, 10000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 10000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { for (int i = 0; i < numRs; i++) { - HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); + HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); RegionInfo regionInfo = - utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); Path currentFile = ((AbstractFSWAL) wal).getCurrentFileName(); - Replication replicationService = (Replication) utility1.getHBaseCluster() + Replication replicationService = (Replication) UTIL1.getHBaseCluster() .getRegionServer(i).getReplicationSourceService(); for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() .getSources()) { @@ -81,19 +81,19 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { @Test public void testEmptyWALRecovery() throws Exception { - final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size(); + final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); // for each RS, create an empty wal with same walGroupId final List emptyWalPaths = new ArrayList<>(); long ts = System.currentTimeMillis(); for (int i = 0; i < numRs; i++) { RegionInfo regionInfo = - utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); - WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); - Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts); - utility1.getTestFileSystem().create(emptyWalPath).close(); + Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); + UTIL1.getTestFileSystem().create(emptyWalPath).close(); emptyWalPaths.add(emptyWalPath); } @@ -102,12 +102,12 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { // determine if the file being replicated currently is still opened for write, so just inject a // new wal to the replication queue does not mean the previous file is closed. for (int i = 0; i < numRs; i++) { - HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); + HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); Replication replicationService = (Replication) hrs.getReplicationSourceService(); replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i)); replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i)); RegionInfo regionInfo = - utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); wal.rollWriter(true); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 03fbb59f26..84af61dd49 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -83,7 +83,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { @BeforeClass public static void setUpBeforeClass() throws Exception { TestReplicationBase.setUpBeforeClass(); - numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size(); + numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size(); } @AfterClass @@ -101,12 +101,12 @@ public class TestReplicationEndpoint extends TestReplicationBase { ReplicationEndpointReturningFalse.replicated.set(false); ReplicationEndpointForTest.lastEntries = null; final List rsThreads = - utility1.getMiniHBaseCluster().getRegionServerThreads(); + UTIL1.getMiniHBaseCluster().getRegionServerThreads(); for (RegionServerThread rs : rsThreads) { - utility1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); + UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); } // Wait for all log roll to finish - utility1.waitFor(3000, new Waiter.ExplainingPredicate() { + UTIL1.waitFor(3000, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { for (RegionServerThread rs : rsThreads) { @@ -134,18 +134,18 @@ public class TestReplicationEndpoint extends TestReplicationBase { public void testCustomReplicationEndpoint() throws Exception { // test installing a custom replication endpoint other than the default one. admin.addPeer("testCustomReplicationEndpoint", - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); // check whether the class has been constructed and started - Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; } }); - Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; @@ -157,7 +157,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { // now replicate some data. doPut(Bytes.toBytes("row42")); - Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.replicateCount.get() >= 1; @@ -176,7 +176,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { int peerCount = admin.getPeersCount(); final String id = "testReplicationEndpointReturnsFalseOnReplicate"; admin.addPeer(id, - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); // This test is flakey and then there is so much stuff flying around in here its, hard to // debug. Peer needs to be up for the edit to make it across. This wait on @@ -188,7 +188,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { // now replicate some data doPut(row); - Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { // Looks like replication endpoint returns false unless we put more than 10 edits. We @@ -209,7 +209,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { public void testInterClusterReplication() throws Exception { final String id = "testInterClusterReplication"; - List regions = utility1.getHBaseCluster().getRegions(tableName); + List regions = UTIL1.getHBaseCluster().getRegions(tableName); int totEdits = 0; // Make sure edits are spread across regions because we do region based batching @@ -228,12 +228,12 @@ public class TestReplicationEndpoint extends TestReplicationBase { } admin.addPeer(id, - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2)) + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2)) .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), null); final int numEdits = totEdits; - Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate() { + Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; @@ -248,12 +248,12 @@ public class TestReplicationEndpoint extends TestReplicationBase { }); admin.removePeer("testInterClusterReplication"); - utility1.deleteTableData(tableName); + UTIL1.deleteTableData(tableName); } @Test public void testWALEntryFilterFromReplicationEndpoint() throws Exception { - ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); //test that we can create mutliple WALFilters reflectively rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, @@ -261,13 +261,13 @@ public class TestReplicationEndpoint extends TestReplicationBase { "," + EverythingPassesWALEntryFilterSubclass.class.getName()); admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); // now replicate some data. - try (Connection connection = ConnectionFactory.createConnection(conf1)) { + try (Connection connection = ConnectionFactory.createConnection(CONF1)) { doPut(connection, Bytes.toBytes("row1")); doPut(connection, row); doPut(connection, Bytes.toBytes("row2")); } - Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.replicateCount.get() >= 1; @@ -282,7 +282,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { @Test (expected=IOException.class) public void testWALEntryFilterAddValidation() throws Exception { - ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); //test that we can create mutliple WALFilters reflectively rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, @@ -292,7 +292,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { @Test (expected=IOException.class) public void testWALEntryFilterUpdateValidation() throws Exception { - ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); //test that we can create mutliple WALFilters reflectively rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, @@ -388,7 +388,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { } private void doPut(byte[] row) throws IOException { - try (Connection connection = ConnectionFactory.createConnection(conf1)) { + try (Connection connection = ConnectionFactory.createConnection(CONF1)) { doPut(connection, row); } } @@ -413,7 +413,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { } public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { - static UUID uuid = utility1.getRandomUUID(); + static UUID uuid = UTIL1.getRandomUUID(); static AtomicInteger contructedCount = new AtomicInteger(); static AtomicInteger startedCount = new AtomicInteger(); static AtomicInteger stoppedCount = new AtomicInteger(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java index 41cc9bc8d0..ae99eb897a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java @@ -37,6 +37,6 @@ public class TestReplicationKillMasterRS extends TestReplicationKillRS { @Test public void killOneMasterRS() throws Exception { - loadTableAndKillRS(utility1); + loadTableAndKillRS(UTIL1); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java index 6cbae83b9b..e649149ca5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java @@ -41,7 +41,7 @@ public class TestReplicationKillMasterRSCompressed extends TestReplicationKillMa */ @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); TestReplicationBase.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java index 108f2744e7..aa3aadde51 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java @@ -36,12 +36,12 @@ public class TestReplicationKillMasterRSWithSeparateOldWALs extends TestReplicat @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true); + CONF1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true); TestReplicationBase.setUpBeforeClass(); } @Test public void killOneMasterRS() throws Exception { - loadTableAndKillRS(utility1); + loadTableAndKillRS(UTIL1); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java index 5b4fa2af1c..c2457267b2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java @@ -57,10 +57,10 @@ public class TestReplicationKillRS extends TestReplicationBase { Thread killer = killARegionServer(util, 5000, rsToKill1); Result[] res; int initialCount; - try (Connection conn = ConnectionFactory.createConnection(conf1)) { + try (Connection conn = ConnectionFactory.createConnection(CONF1)) { try (Table table = conn.getTable(tableName)) { LOG.info("Start loading table"); - initialCount = utility1.loadTable(table, famName); + initialCount = UTIL1.loadTable(table, famName); LOG.info("Done loading table"); killer.join(5000); LOG.info("Done waiting for threads"); @@ -86,7 +86,7 @@ public class TestReplicationKillRS extends TestReplicationBase { int lastCount = 0; final long start = System.currentTimeMillis(); int i = 0; - try (Connection conn = ConnectionFactory.createConnection(conf2)) { + try (Connection conn = ConnectionFactory.createConnection(CONF2)) { try (Table table = conn.getTable(tableName)) { while (true) { if (i == NB_RETRIES - 1) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java index 96630b234f..733fa3aa8d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java @@ -37,6 +37,6 @@ public class TestReplicationKillSlaveRS extends TestReplicationKillRS { @Test public void killOneSlaveRS() throws Exception { - loadTableAndKillRS(utility2); + loadTableAndKillRS(UTIL2); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java index a852b81c24..abff3e2caf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java @@ -36,12 +36,12 @@ public class TestReplicationKillSlaveRSWithSeparateOldWALs extends TestReplicati @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true); + CONF1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true); TestReplicationBase.setUpBeforeClass(); } @Test public void killOneSlaveRS() throws Exception { - loadTableAndKillRS(utility2); + loadTableAndKillRS(UTIL2); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java index 8ff4d84dcd..c646a9011c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java @@ -42,7 +42,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase { @Test public void testReplicationMetrics() throws Exception { - try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { + try (Admin hbaseAdmin = UTIL1.getConnection().getAdmin()) { Put p = new Put(Bytes.toBytes("starter")); p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay")); htable1.put(p); @@ -52,7 +52,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase { } // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp Thread.sleep(5000); - HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName); + HRegionServer rs = UTIL1.getRSForFirstRegionInTable(tableName); Map metrics = rs.getWalGroupsReplicationStatus(); Assert.assertEquals("metric size ", 1, metrics.size()); long lastPosition = 0; @@ -72,7 +72,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase { .size() == 0) { Thread.sleep(500); } - rs = utility1.getRSForFirstRegionInTable(tableName); + rs = UTIL1.getRSForFirstRegionInTable(tableName); metrics = rs.getWalGroupsReplicationStatus(); Path lastPath = null; for (Map.Entry metric : metrics.entrySet()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 2c8dc4c0b7..b8b96788f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -102,7 +102,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { final byte[] v1 = Bytes.toBytes("v1"); final byte[] v2 = Bytes.toBytes("v2"); final byte[] v3 = Bytes.toBytes("v3"); - htable1 = utility1.getConnection().getTable(tableName); + htable1 = UTIL1.getConnection().getTable(tableName); long t = EnvironmentEdgeManager.currentTime(); // create three versions for "row" @@ -265,7 +265,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build(); + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build(); hbaseAdmin.addReplicationPeer(PEER_ID, rpc); Thread.sleep(SLEEP_TIME); rowKey = Bytes.toBytes("do rep"); @@ -363,7 +363,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { final String colFam = "cf1"; final int numOfTables = 3; - Admin hadmin = utility1.getAdmin(); + Admin hadmin = UTIL1.getAdmin(); // Create Tables for (int i = 0; i < numOfTables; i++) { @@ -408,15 +408,15 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testReplicationInReplay() throws Exception { final TableName tableName = htable1.getName(); - HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0); + HRegion region = UTIL1.getMiniHBaseCluster().getRegions(tableName).get(0); RegionInfo hri = region.getRegionInfo(); NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) { scopes.put(fam, 1); } final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName()); - WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo()); + int index = UTIL1.getMiniHBaseCluster().getServerWith(hri.getRegionName()); + WAL wal = UTIL1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo()); final byte[] rowName = Bytes.toBytes("testReplicationInReplay"); final byte[] qualifier = Bytes.toBytes("q"); final byte[] value = Bytes.toBytes("v"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java index aaa843ef3d..d83a822ecb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java @@ -61,7 +61,7 @@ public class TestReplicationStatus extends TestReplicationBase { public void testReplicationStatus() throws Exception { LOG.info("testReplicationStatus"); - try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { + try (Admin hbaseAdmin = UTIL1.getConnection().getAdmin()) { // disable peer admin.disablePeer(PEER_ID); @@ -77,7 +77,7 @@ public class TestReplicationStatus extends TestReplicationBase { ClusterStatus status = new ClusterStatus(hbaseAdmin.getClusterMetrics( EnumSet.of(Option.LIVE_SERVERS))); - for (JVMClusterUtil.RegionServerThread thread : utility1.getHBaseCluster() + for (JVMClusterUtil.RegionServerThread thread : UTIL1.getHBaseCluster() .getRegionServerThreads()) { ServerName server = thread.getRegionServer().getServerName(); ServerLoad sl = status.getLoad(server); @@ -96,10 +96,10 @@ public class TestReplicationStatus extends TestReplicationBase { } // Stop rs1, then the queue of rs1 will be transfered to rs0 - utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer"); + UTIL1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer"); Thread.sleep(10000); status = new ClusterStatus(hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); - ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName(); + ServerName server = UTIL1.getHBaseCluster().getRegionServer(0).getServerName(); ServerLoad sl = status.getLoad(server); List rLoadSourceList = sl.getReplicationLoadSourceList(); // check SourceList still only has one entry @@ -107,7 +107,7 @@ public class TestReplicationStatus extends TestReplicationBase { assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID()); } finally { admin.enablePeer(PEER_ID); - utility1.getHBaseCluster().getRegionServer(1).start(); + UTIL1.getHBaseCluster().getRegionServer(1).start(); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java index 745c439168..8b76e01f0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java @@ -190,21 +190,21 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { } protected void setupReplication() throws Exception { - ReplicationAdmin admin1 = new ReplicationAdmin(conf1); - ReplicationAdmin admin2 = new ReplicationAdmin(conf2); + ReplicationAdmin admin1 = new ReplicationAdmin(CONF1); + ReplicationAdmin admin2 = new ReplicationAdmin(CONF2); - Admin ha = utility1.getAdmin(); + Admin ha = UTIL1.getAdmin(); ha.createTable(t1_syncupSource); ha.createTable(t2_syncupSource); ha.close(); - ha = utility2.getAdmin(); + ha = UTIL2.getAdmin(); ha.createTable(t1_syncupTarget); ha.createTable(t2_syncupTarget); ha.close(); - Connection connection1 = ConnectionFactory.createConnection(utility1.getConfiguration()); - Connection connection2 = ConnectionFactory.createConnection(utility2.getConfiguration()); + Connection connection1 = ConnectionFactory.createConnection(UTIL1.getConfiguration()); + Connection connection2 = ConnectionFactory.createConnection(UTIL2.getConfiguration()); // Get HTable from Master ht1Source = connection1.getTable(t1_su); @@ -215,10 +215,10 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { ht2TargetAtPeer1 = connection2.getTable(t2_su); /** - * set M-S : Master: utility1 Slave1: utility2 + * set M-S : Master: UTIL1 Slave1: UTIL2 */ ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(utility2.getClusterKey()); + rpc.setClusterKey(UTIL2.getClusterKey()); admin1.addPeer("1", rpc, null); admin1.close(); @@ -252,9 +252,9 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { // ensure replication completed Thread.sleep(SLEEP_TIME); - int rowCount_ht1Source = utility1.countRows(ht1Source); + int rowCount_ht1Source = UTIL1.countRows(ht1Source); for (int i = 0; i < NB_RETRIES; i++) { - int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + int rowCount_ht1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); if (i==NB_RETRIES-1) { assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1, rowCount_ht1TargetAtPeer1); @@ -265,9 +265,9 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { Thread.sleep(SLEEP_TIME); } - int rowCount_ht2Source = utility1.countRows(ht2Source); + int rowCount_ht2Source = UTIL1.countRows(ht2Source); for (int i = 0; i < NB_RETRIES; i++) { - int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + int rowCount_ht2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); if (i==NB_RETRIES-1) { assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1, rowCount_ht2TargetAtPeer1); @@ -281,7 +281,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { private void mimicSyncUpAfterDelete() throws Exception { LOG.debug("mimicSyncUpAfterDelete"); - utility2.shutdownMiniHBaseCluster(); + UTIL2.shutdownMiniHBaseCluster(); List list = new ArrayList<>(); // delete half of the rows @@ -299,37 +299,37 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { } ht2Source.delete(list); - int rowCount_ht1Source = utility1.countRows(ht1Source); + int rowCount_ht1Source = UTIL1.countRows(ht1Source); assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51, rowCount_ht1Source); - int rowCount_ht2Source = utility1.countRows(ht2Source); + int rowCount_ht2Source = UTIL1.countRows(ht2Source); assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101, rowCount_ht2Source); - utility1.shutdownMiniHBaseCluster(); - utility2.restartHBaseCluster(1); + UTIL1.shutdownMiniHBaseCluster(); + UTIL2.restartHBaseCluster(1); Thread.sleep(SLEEP_TIME); // before sync up - int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + int rowCount_ht1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); + int rowCount_ht2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); // After sync up for (int i = 0; i < NB_RETRIES; i++) { - syncUp(utility1); - rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + syncUp(UTIL1); + rowCount_ht1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); + rowCount_ht2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); if (i == NB_RETRIES - 1) { if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) { // syncUP still failed. Let's look at the source in case anything wrong there - utility1.restartHBaseCluster(1); - rowCount_ht1Source = utility1.countRows(ht1Source); + UTIL1.restartHBaseCluster(1); + rowCount_ht1Source = UTIL1.countRows(ht1Source); LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source); - rowCount_ht2Source = utility1.countRows(ht2Source); + rowCount_ht2Source = UTIL1.countRows(ht2Source); LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source); } assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50, @@ -351,8 +351,8 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { private void mimicSyncUpAfterPut() throws Exception { LOG.debug("mimicSyncUpAfterPut"); - utility1.restartHBaseCluster(1); - utility2.shutdownMiniHBaseCluster(); + UTIL1.restartHBaseCluster(1); + UTIL2.shutdownMiniHBaseCluster(); Put p; // another 100 + 1 row to t1_syncup @@ -377,19 +377,19 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998)); ht2Source.put(p); - int rowCount_ht1Source = utility1.countRows(ht1Source); + int rowCount_ht1Source = UTIL1.countRows(ht1Source); assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source); - int rowCount_ht2Source = utility1.countRows(ht2Source); + int rowCount_ht2Source = UTIL1.countRows(ht2Source); assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source); - utility1.shutdownMiniHBaseCluster(); - utility2.restartHBaseCluster(1); + UTIL1.shutdownMiniHBaseCluster(); + UTIL2.restartHBaseCluster(1); Thread.sleep(SLEEP_TIME); // before sync up - int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + int rowCount_ht1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); + int rowCount_ht2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50, rowCount_ht1TargetAtPeer1); assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100, @@ -397,16 +397,16 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { // after syun up for (int i = 0; i < NB_RETRIES; i++) { - syncUp(utility1); - rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + syncUp(UTIL1); + rowCount_ht1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); + rowCount_ht2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); if (i == NB_RETRIES - 1) { if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) { // syncUP still failed. Let's look at the source in case anything wrong there - utility1.restartHBaseCluster(1); - rowCount_ht1Source = utility1.countRows(ht1Source); + UTIL1.restartHBaseCluster(1); + rowCount_ht1Source = UTIL1.countRows(ht1Source); LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source); - rowCount_ht2Source = utility1.countRows(ht2Source); + rowCount_ht2Source = UTIL1.countRows(ht2Source); LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source); } assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java index 594aac0b5c..2aa3ea4b0e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java @@ -36,8 +36,8 @@ public class TestReplicationEndpointWithMultipleAsyncWAL extends TestReplication @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); + CONF1.set(WALFactory.WAL_PROVIDER, "multiwal"); + CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); TestReplicationEndpoint.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java index 68b41be457..36c07fd014 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java @@ -36,8 +36,8 @@ public class TestReplicationEndpointWithMultipleWAL extends TestReplicationEndpo @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); + CONF1.set(WALFactory.WAL_PROVIDER, "multiwal"); + CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); TestReplicationEndpoint.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java index 4685f24c0d..0f79492839 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java @@ -37,8 +37,8 @@ public class TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL extends @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); + CONF1.set(WALFactory.WAL_PROVIDER, "multiwal"); + CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); TestReplicationKillMasterRSCompressed.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java index 82fef3aa58..21f325c1b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java @@ -37,8 +37,8 @@ public class TestReplicationKillMasterRSCompressedWithMultipleWAL extends @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); + CONF1.set(WALFactory.WAL_PROVIDER, "multiwal"); + CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); TestReplicationKillMasterRSCompressed.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java index 1451499347..103000c457 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java @@ -37,8 +37,8 @@ public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends TestReplicati @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); + CONF1.set(WALFactory.WAL_PROVIDER, "multiwal"); + CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); TestReplicationBase.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java index e487039dcd..8f02c89170 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java @@ -37,8 +37,8 @@ public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyn @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); + CONF1.set(WALFactory.WAL_PROVIDER, "multiwal"); + CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); TestReplicationBase.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index 24329a0fff..bff363f986 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -59,19 +59,19 @@ public class TestReplicator extends TestReplicationBase { @BeforeClass public static void setUpBeforeClass() throws Exception { // Set RPC size limit to 10kb (will be applied to both source and sink clusters) - conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10); + CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10); TestReplicationBase.setUpBeforeClass(); } @Test public void testReplicatorBatching() throws Exception { // Clear the tables - truncateTable(utility1, tableName); - truncateTable(utility2, tableName); + truncateTable(UTIL1, tableName); + truncateTable(UTIL2, tableName); // Replace the peer set up for us by the base class with a wrapper for this test admin.addPeer("testReplicatorBatching", - new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()) + new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey()) .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); @@ -92,7 +92,7 @@ public class TestReplicator extends TestReplicationBase { } // Wait for replication to complete. - Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount()); @@ -107,7 +107,7 @@ public class TestReplicator extends TestReplicationBase { assertEquals("We sent an incorrect number of batches", NUM_ROWS, ReplicationEndpointForTest.getBatchCount()); - assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2)); + assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2)); } finally { admin.removePeer("testReplicatorBatching"); } @@ -116,12 +116,12 @@ public class TestReplicator extends TestReplicationBase { @Test public void testReplicatorWithErrors() throws Exception { // Clear the tables - truncateTable(utility1, tableName); - truncateTable(utility2, tableName); + truncateTable(UTIL1, tableName); + truncateTable(UTIL2, tableName); // Replace the peer set up for us by the base class with a wrapper for this test admin.addPeer("testReplicatorWithErrors", - new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()) + new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey()) .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()), null); @@ -143,7 +143,7 @@ public class TestReplicator extends TestReplicationBase { // Wait for replication to complete. // We can expect 10 batches - Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS; @@ -155,7 +155,7 @@ public class TestReplicator extends TestReplicationBase { } }); - assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2)); + assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2)); } finally { admin.removePeer("testReplicatorWithErrors"); } -- 2.17.2 (Apple Git-113)