From 619eaa0ea0956c3489f94d5fed9945efce9dc9b0 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 14 Mar 2018 21:39:13 +0800 Subject: [PATCH] HBASE-20147 Serial replication will be stuck if we create a table with serial replication but add it to a peer after there are region moves --- .../org/apache/hadoop/hbase/MetaTableAccessor.java | 20 +- .../hbase/client/ConnectionImplementation.java | 2 + .../src/main/protobuf/MasterProcedure.proto | 1 + .../hbase/replication/ReplicationQueueStorage.java | 9 + .../replication/TableReplicationQueueStorage.java | 8 + .../replication/ZKReplicationQueueStorage.java | 50 +++-- .../hbase/master/replication/AddPeerProcedure.java | 9 +- .../master/replication/ReplicationPeerManager.java | 54 ++++- .../replication/UpdatePeerConfigProcedure.java | 21 +- .../replication/SerialReplicationTestBase.java | 232 +++++++++++++++++++++ .../TestAddToSerialReplicationPeer.java | 85 ++++++++ .../hbase/replication/TestSerialReplication.java | 187 +---------------- 12 files changed, 466 insertions(+), 212 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 57a0350..e907aec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -1171,8 +1171,9 @@ public class MetaTableAccessor { final List results = new ArrayList<>(); @Override public boolean visit(Result r) throws IOException { - if (r == null || r.isEmpty()) return true; - add(r); + if (r != null && !r.isEmpty()) { + add(r); + } return true; } @@ -2111,6 +2112,21 @@ public class MetaTableAccessor { } } + public static List getTablePrimaryRegionsAndLocations(Connection conn, + TableName tableName) throws IOException { + List list = new ArrayList<>(); + scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REGION), + getTableStopRowForMeta(tableName, QueryType.REGION), QueryType.REGION, r -> { + RegionLocations locs = getRegionLocations(r); + if (locs == null || locs.size() == 0) { + return true; + } + list.add(locs.getDefaultRegionLocation()); + return true; + }); + return list; + } + private static void debugLogMutations(List mutations) throws IOException { if (!METALOG.isDebugEnabled()) { return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 6408044..74113fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -85,6 +86,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 1134bd6..25405e6 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -413,4 +413,5 @@ message AddPeerStateData { message UpdatePeerConfigStateData { required ReplicationPeer peer_config = 1; + required bool set_last_seq_id = 2; } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index 4c93da6..510f849 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -79,6 +79,15 @@ public interface ReplicationQueueStorage { long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException; /** + * Set the max sequence id of a bunch of regions for a given peer. Will be called when setting up + * a serial replication peer. + * @param encodedRegionName the encoded region name + * @param peerId peer id + * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication. + */ + void setLastSequenceIds(String peerId, Map lastSeqIds) throws ReplicationException; + + /** * Get the current position for a specific WAL in a given queue for a given regionserver. * @param serverName the name of the regionserver * @param queueId a String that identifies the queue diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java index abb279d..8dfa447 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java @@ -198,6 +198,14 @@ public class TableReplicationQueueStorage extends TableReplicationStorageBase } } + + @Override + public void setLastSequenceIds(String peerId, Map lastSeqIds) + throws ReplicationException { + // TODO Implement ReplicationQueueStorage.setLastSequenceId + + } + @Override public long getWALPosition(ServerName serverName, String queueId, String fileName) throws ReplicationException { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index fa0ff0e..5e13f79 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -189,6 +189,24 @@ public class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } + private void addLastSeqIdsToOps(String queueId, Map lastSeqIds, + List listOfOps) throws KeeperException { + for (Entry lastSeqEntry : lastSeqIds.entrySet()) { + String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId); + String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); + /* + * Make sure the existence of path + * /hbase/replication/regions//-. As the javadoc in + * multiOrSequential() method said, if received a NodeExistsException, all operations will + * fail. So create the path here, and in fact, no need to add this operation to listOfOps, + * because only need to make sure that update file position and sequence id atomically. + */ + ZKUtil.createWithParents(zookeeper, path); + // Persist the max sequence id of region to zookeeper. + listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); + } + } + @Override public void setWALPosition(ServerName serverName, String queueId, String fileName, long position, Map lastSeqIds) throws ReplicationException { @@ -197,22 +215,8 @@ public class ZKReplicationQueueStorage extends ZKReplicationStorageBase listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName), ZKUtil.positionToByteArray(position))); // Persist the max sequence id(s) of regions for serial replication atomically. - if (lastSeqIds != null && lastSeqIds.size() > 0) { - for (Entry lastSeqEntry : lastSeqIds.entrySet()) { - String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId); - String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); - /* - * Make sure the existence of path - * /hbase/replication/regions//-. As the javadoc in - * multiOrSequential() method said, if received a NodeExistsException, all operations will - * fail. So create the path here, and in fact, no need to add this operation to listOfOps, - * because only need to make sure that update file position and sequence id atomically. - */ - ZKUtil.createWithParents(zookeeper, path); - // Persist the max sequence id of region to zookeeper. - listOfOps - .add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); - } + if (lastSeqIds != null && !lastSeqIds.isEmpty()) { + addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps); } ZKUtil.multiOrSequential(zookeeper, listOfOps, false); } catch (KeeperException e) { @@ -242,6 +246,20 @@ public class ZKReplicationQueueStorage extends ZKReplicationStorageBase } @Override + public void setLastSequenceIds(String peerId, Map lastSeqIds) + throws ReplicationException { + + try { + List listOfOps = new ArrayList<>(); + addLastSeqIdsToOps(peerId, lastSeqIds, listOfOps); + ZKUtil.multiOrSequential(zookeeper, listOfOps, false); + } catch (KeeperException e) { + throw new ReplicationException("Failed to set last sequence ids (peerId=" + peerId + + ", lastSeqIds.size=" + lastSeqIds.size(), e); + } + } + + @Override public long getWALPosition(ServerName serverName, String queueId, String fileName) throws ReplicationException { byte[] bytes; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index f0f7704..6f304c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -63,7 +63,7 @@ public class AddPeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preAddReplicationPeer(peerId, peerConfig); } - env.getReplicationPeerManager().preAddPeer(peerId, peerConfig); + env.getReplicationPeerManager().preAddPeer(peerId, peerConfig, enabled); } @Override @@ -72,7 +72,12 @@ public class AddPeerProcedure extends ModifyPeerProcedure { } @Override - protected void postPeerModification(MasterProcedureEnv env) throws IOException { + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + if (peerConfig.isSerial()) { + env.getReplicationPeerManager().setLastSequenceIdForSerialPeer(peerId, + env.getMasterServices()); + } LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : "DISABLED", peerId, peerConfig); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 7620638..7cdaca0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -31,8 +32,12 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -85,7 +90,7 @@ public class ReplicationPeerManager { } } - public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) + void preAddPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws DoNotRetryIOException, ReplicationException { if (peerId.contains("-")) { throw new DoNotRetryIOException("Found invalid peer name: " + peerId); @@ -99,6 +104,9 @@ public class ReplicationPeerManager { // have not been cleaned up yet then we should not create the new peer, otherwise the old wal // file may also be replicated. checkQueuesDeleted(peerId); + if (peerConfig.isSerial() && enabled) { + throw new DoNotRetryIOException("Can only add serial replication peer in DISABLED state"); + } } private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { @@ -109,25 +117,25 @@ public class ReplicationPeerManager { return desc; } - public void preRemovePeer(String peerId) throws DoNotRetryIOException { + void preRemovePeer(String peerId) throws DoNotRetryIOException { checkPeerExists(peerId); } - public void preEnablePeer(String peerId) throws DoNotRetryIOException { + void preEnablePeer(String peerId) throws DoNotRetryIOException { ReplicationPeerDescription desc = checkPeerExists(peerId); if (desc.isEnabled()) { throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); } } - public void preDisablePeer(String peerId) throws DoNotRetryIOException { + void preDisablePeer(String peerId) throws DoNotRetryIOException { ReplicationPeerDescription desc = checkPeerExists(peerId); if (!desc.isEnabled()) { throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); } } - public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { checkPeerConfig(peerConfig); ReplicationPeerDescription desc = checkPeerExists(peerId); @@ -146,6 +154,10 @@ public class ReplicationPeerManager { oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); } + if (!oldPeerConfig.isSerial() && peerConfig.isSerial() && desc.isEnabled()) { + throw new DoNotRetryIOException( + "Can only change peer in DISABLED state to serial replication"); + } } public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) @@ -216,7 +228,7 @@ public class ReplicationPeerManager { return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); } - public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { + void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still // on-going when the refresh peer config procedure is done, if a RS which has already been // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in @@ -233,6 +245,36 @@ public class ReplicationPeerManager { queueStorage.removePeerFromHFileRefs(peerId); } + void setLastSequenceIdForSerialPeer(String peerId, MasterServices master) + throws ReplicationException { + ReplicationPeerConfig peerConfig = peers.get(peerId).getPeerConfig(); + try { + List tds = master.getTableDescriptors().getAll().values().stream() + .filter(TableDescriptor::hasGlobalReplicationScope) + .filter(td -> ReplicationUtils.contains(peerConfig, td.getTableName())) + .collect(Collectors.toList()); + Map lastSeqIds = new HashMap(); + for (TableDescriptor td : tds) { + for (HRegionLocation loc : MetaTableAccessor + .getTablePrimaryRegionsAndLocations(master.getConnection(), td.getTableName())) { + // only update if the region has already been moved after born. + if (loc.getSeqNum() > 0) { + lastSeqIds.put(loc.getRegion().getEncodedName(), loc.getSeqNum() - 1); + if (lastSeqIds.size() >= 1000) { + queueStorage.setLastSequenceIds(peerId, lastSeqIds); + lastSeqIds.clear(); + } + } + } + } + if (!lastSeqIds.isEmpty()) { + queueStorage.setLastSequenceIds(peerId, lastSeqIds); + } + } catch (IOException e) { + throw new ReplicationException(e); + } + } + private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { checkClusterKey(peerConfig.getClusterKey()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java index 3497447..d528e7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -40,6 +40,8 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { private ReplicationPeerConfig peerConfig; + private boolean setLastSeqId; + public UpdatePeerConfigProcedure() { } @@ -60,6 +62,10 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig); } env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig); + if (peerConfig.isSerial() && + !env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial()) { + setLastSeqId = true; + } } @Override @@ -68,7 +74,12 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { } @Override - protected void postPeerModification(MasterProcedureEnv env) throws IOException { + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + if (setLastSeqId) { + env.getReplicationPeerManager().setLastSequenceIdForSerialPeer(peerId, + env.getMasterServices()); + } LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { @@ -80,13 +91,15 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { super.serializeStateData(serializer); serializer.serialize(UpdatePeerConfigStateData.newBuilder() - .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).build()); + .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setSetLastSeqId(setLastSeqId) + .build()); } @Override protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { super.deserializeStateData(serializer); - peerConfig = ReplicationPeerConfigUtil - .convert(serializer.deserialize(UpdatePeerConfigStateData.class).getPeerConfig()); + UpdatePeerConfigStateData data = serializer.deserialize(UpdatePeerConfigStateData.class); + peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig()); + setLastSeqId = data.getSetLastSeqId(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java new file mode 100644 index 0000000..8bc8229 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestName; + +/** + * Base class for testing serial replication. + */ +public class SerialReplicationTestBase { + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + protected static String PEER_ID = "1"; + + protected static byte[] CF = Bytes.toBytes("CF"); + + protected static byte[] CQ = Bytes.toBytes("CQ"); + + protected static FileSystem FS; + + protected static Path LOG_DIR; + + protected static WALProvider.Writer WRITER; + + @Rule + public final TestName name = new TestName(); + + protected Path logPath; + + public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { + + private static final UUID PEER_UUID = UUID.randomUUID(); + + @Override + public UUID getPeerUUID() { + return PEER_UUID; + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + synchronized (WRITER) { + try { + for (Entry entry : replicateContext.getEntries()) { + WRITER.append(entry); + } + WRITER.sync(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return true; + } + + @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); + UTIL.startMiniCluster(3); + // disable balancer + UTIL.getAdmin().balancerSwitch(false, true); + LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); + FS = UTIL.getTestFileSystem(); + FS.mkdirs(LOG_DIR); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws Exception { + UTIL.getAdmin().removeReplicationPeer(PEER_ID); + rollAllWALs(); + if (WRITER != null) { + WRITER.close(); + WRITER = null; + } + } + + protected static void moveRegion(RegionInfo region, HRegionServer rs) throws Exception { + UTIL.getAdmin().move(region.getEncodedNameAsBytes(), + Bytes.toBytes(rs.getServerName().getServerName())); + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return rs.getRegion(region.getEncodedName()) != null; + } + + @Override + public String explainFailure() throws Exception { + return region + " is still not on " + rs; + } + }); + } + + protected static void rollAllWALs() throws Exception { + for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { + t.getRegionServer().getWalRoller().requestRollAll(); + } + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() + .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished); + } + + @Override + public String explainFailure() throws Exception { + return "Log roll has not finished yet"; + } + }); + } + + protected final void setupWALWriter() throws IOException { + logPath = new Path(LOG_DIR, name.getMethodName()); + WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); + } + + protected final void waitUntilReplicationDone(int expectedEntries) throws Exception { + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { + int count = 0; + while (reader.next() != null) { + count++; + } + return count >= expectedEntries; + } catch (IOException e) { + return false; + } + } + + @Override + public String explainFailure() throws Exception { + return "Not enough entries replicated"; + } + }); + } + + protected final void addPeer(boolean enabled) throws IOException { + UTIL.getAdmin().addReplicationPeer(PEER_ID, + ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) + .build(), + false); + if (enabled) { + UTIL.getAdmin().enableReplicationPeer(PEER_ID); + } + } + + protected final void checkOrder(int expectedEntries) throws IOException { + try (WAL.Reader reader = + WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { + long seqId = -1L; + int count = 0; + for (Entry entry;;) { + entry = reader.next(); + if (entry == null) { + break; + } + assertTrue( + "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), + entry.getKey().getSequenceId() >= seqId); + count++; + } + assertEquals(expectedEntries, count); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java new file mode 100644 index 0000000..66c4960 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Testcase for HBASE-20147. + */ +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class); + + @Before + public void setUp() throws IOException, StreamLacksCapabilityException { + setupWALWriter(); + } + + // make sure that we will start replication for the sequence id after move, that's what we want to + // test here. + private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception { + moveRegion(region, rs); + rollAllWALs(); + } + + @Test + public void testAddPeer() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + UTIL.getAdmin().createTable( + TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); + UTIL.waitTableAvailable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); + HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); + moveRegionAndArchiveOldWals(region, rs); + addPeer(true); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(100); + checkOrder(100); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java index f8efcf0..6c0bf9f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -23,20 +23,14 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -47,187 +41,31 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; -import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALProvider; -import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; @Category({ ReplicationTests.class, LargeTests.class }) -public class TestSerialReplication { +public class TestSerialReplication extends SerialReplicationTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestSerialReplication.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private static String PEER_ID = "1"; - - private static byte[] CF = Bytes.toBytes("CF"); - - private static byte[] CQ = Bytes.toBytes("CQ"); - - private static FileSystem FS; - - private static Path LOG_DIR; - - private static WALProvider.Writer WRITER; - - public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { - - private static final UUID PEER_UUID = UUID.randomUUID(); - - @Override - public UUID getPeerUUID() { - return PEER_UUID; - } - - @Override - public boolean replicate(ReplicateContext replicateContext) { - synchronized (WRITER) { - try { - for (Entry entry : replicateContext.getEntries()) { - WRITER.append(entry); - } - WRITER.sync(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - return true; - } - - @Override - public void start() { - startAsync(); - } - - @Override - public void stop() { - stopAsync(); - } - - @Override - protected void doStart() { - notifyStarted(); - } - - @Override - protected void doStop() { - notifyStopped(); - } - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); - UTIL.startMiniCluster(3); - // disable balancer - UTIL.getAdmin().balancerSwitch(false, true); - LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); - FS = UTIL.getTestFileSystem(); - FS.mkdirs(LOG_DIR); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - UTIL.shutdownMiniCluster(); - } - - @Rule - public final TestName name = new TestName(); - - private Path logPath; - @Before public void setUp() throws IOException, StreamLacksCapabilityException { - logPath = new Path(LOG_DIR, name.getMethodName()); - WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); + setupWALWriter(); // add in disable state, so later when enabling it all sources will start push together. - UTIL.getAdmin().addReplicationPeer(PEER_ID, - ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") - .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) - .build(), - false); - } - - @After - public void tearDown() throws Exception { - UTIL.getAdmin().removeReplicationPeer(PEER_ID); - for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { - t.getRegionServer().getWalRoller().requestRollAll(); - } - UTIL.waitFor(30000, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() - .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished); - } - - @Override - public String explainFailure() throws Exception { - return "Log roll has not finished yet"; - } - }); - for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { - t.getRegionServer().getWalRoller().requestRollAll(); - } - if (WRITER != null) { - WRITER.close(); - WRITER = null; - } - } - - private void moveRegion(RegionInfo region, HRegionServer rs) throws Exception { - UTIL.getAdmin().move(region.getEncodedNameAsBytes(), - Bytes.toBytes(rs.getServerName().getServerName())); - UTIL.waitFor(30000, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - return rs.getRegion(region.getEncodedName()) != null; - } - - @Override - public String explainFailure() throws Exception { - return region + " is still not on " + rs; - } - }); + addPeer(false); } private void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception { UTIL.getAdmin().enableReplicationPeer(PEER_ID); - UTIL.waitFor(30000, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { - int count = 0; - while (reader.next() != null) { - count++; - } - return count >= expectedEntries; - } catch (IOException e) { - return false; - } - } - - @Override - public String explainFailure() throws Exception { - return "Not enough entries replicated"; - } - }); + waitUntilReplicationDone(expectedEntries); } @Test @@ -251,22 +89,7 @@ public class TestSerialReplication { } } enablePeerAndWaitUntilReplicationDone(200); - try (WAL.Reader reader = - WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { - long seqId = -1L; - int count = 0; - for (Entry entry;;) { - entry = reader.next(); - if (entry == null) { - break; - } - assertTrue( - "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), - entry.getKey().getSequenceId() >= seqId); - count++; - } - assertEquals(200, count); - } + checkOrder(200); } @Test -- 2.7.4