From e2d69b216ed2c526e67db6b03aa1bc3b6574a2b1 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 24 Apr 2018 11:20:56 +0800 Subject: [PATCH] HBASE-20426 Give up replicating anything in S state --- .../src/main/protobuf/MasterProcedure.proto | 11 +- .../master/replication/ReplicationPeerManager.java | 6 +- .../TransitPeerSyncReplicationStateProcedure.java | 29 ++++- .../hadoop/hbase/regionserver/LogRoller.java | 11 +- .../regionserver/PeerProcedureHandlerImpl.java | 24 +++++ .../regionserver/ReplicationSourceManager.java | 120 +++++++++++++++++---- .../TestDrainReplicationQueuesForStandBy.java | 118 ++++++++++++++++++++ 7 files changed, 288 insertions(+), 31 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 53c54b2..feae3d5 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -390,11 +390,12 @@ enum PeerSyncReplicationStateTransitionState { SET_PEER_NEW_SYNC_REPLICATION_STATE = 2; REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3; REPLAY_REMOTE_WAL_IN_PEER = 4; - REOPEN_ALL_REGIONS_IN_PEER = 5; - TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6; - REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7; - CREATE_DIR_FOR_REMOTE_WAL = 8; - POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 9; + REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 5; + REOPEN_ALL_REGIONS_IN_PEER = 6; + TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 7; + REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 8; + CREATE_DIR_FOR_REMOTE_WAL = 9; + POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 10; } message PeerModificationStateData { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 229549e..052b659 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -303,7 +303,7 @@ public class ReplicationPeerManager { } } - public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { + public void removeAllQueues(String peerId) throws ReplicationException { // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still // on-going when the refresh peer config procedure is done, if a RS which has already been // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in @@ -317,6 +317,10 @@ public class ReplicationPeerManager { // unless it has already been removed by others. ReplicationUtils.removeAllQueues(queueStorage, peerId); ReplicationUtils.removeAllQueues(queueStorage, peerId); + } + + public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { + removeAllQueues(peerId); queueStorage.removePeerFromHFileRefs(peerId); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index 5da2b0c..771596e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -163,16 +163,35 @@ public class TransitPeerSyncReplicationStateProcedure addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0)) .toArray(RefreshPeerProcedure[]::new)); - if (fromState == SyncReplicationState.STANDBY && - toState == SyncReplicationState.DOWNGRADE_ACTIVE) { - setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); + if (fromState.equals(SyncReplicationState.ACTIVE)) { + setNextState(toState.equals(SyncReplicationState.STANDBY) + ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER + : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); + } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) { + setNextState(toState.equals(SyncReplicationState.STANDBY) + ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER + : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); } else { - setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); + assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE); + setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); } return Flow.HAS_MORE_STATE; case REPLAY_REMOTE_WAL_IN_PEER: addChildProcedure(new RecoverStandbyProcedure(peerId)); - setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); + setNextState( + PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); + return Flow.HAS_MORE_STATE; + case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER: + try { + env.getReplicationPeerManager().removeAllQueues(peerId); + } catch (ReplicationException e) { + LOG.warn("Failed to remove all replication queues peer {} when starting transiting" + + " sync replication peer state from {} to {}, retry", peerId, fromState, toState, e); + throw new ProcedureYieldException(); + } + setNextState(fromState.equals(SyncReplicationState.ACTIVE) + ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER + : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); return Flow.HAS_MORE_STATE; case REOPEN_ALL_REGIONS_IN_PEER: try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 55c5219..85050a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -234,10 +234,8 @@ public class LogRoller extends HasThread implements Closeable { } /** - * For testing only * @return true if all WAL roll finished */ - @VisibleForTesting public boolean walRollFinished() { for (boolean needRoll : walNeedsRoll.values()) { if (needRoll) { @@ -247,6 +245,15 @@ public class LogRoller extends HasThread implements Closeable { return true; } + /** + * Wait until all wals have been rolled after calling {@link #requestRollAll()}. + */ + public void waitUntilWalRollFinished() throws InterruptedException { + while (!walRollFinished()) { + Thread.sleep(100); + } + } + @Override public void close() { running = false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index 7fc9f53..6113b0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.concurrent.locks.Lock; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.LogRoller; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -170,6 +172,28 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { "current state is {}, this should be a retry, give up", peerId, newState); return; } + if (newState == SyncReplicationState.STANDBY) { + replicationSourceManager.drainSources(peerId); + // Need to roll the wals and make the ReplicationSource for this peer track the new file. + // If we do not do this, there will be two problems that can not be addressed at the same + // time. First, if we just throw away the current wal file, and later when we transit the + // peer to DA, and the wal has not been rolled yet, then the new data written to the wal + // file will not be replicated and cause data inconsistency. But if we just track the + // current wal file without rolling, it may contains some data before we transit the peer + // to S, later if we transit the peer to DA, the data will also be replicated and cause + // data inconsistency. So here we need to roll the wal, and let the ReplicationSource + // track the new wal file, and throw the old wal files away. + LogRoller roller = rs.getWalRoller(); + roller.requestRollAll(); + try { + roller.waitUntilWalRollFinished(); + } catch (InterruptedException e) { + // reset the interrupted flag + Thread.currentThread().interrupt(); + throw (IOException) new InterruptedIOException( + "Interrupted while waiting for wal roll finish").initCause(e); + } + } SyncReplicationState oldState = peer.getSyncReplicationState(); peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage); peer.transitSyncReplicationState(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index e3c323f..d2ea4f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -53,11 +53,13 @@ import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -388,11 +390,81 @@ public class ReplicationSourceManager implements ReplicationListener { } /** + *

+ * This is used when we transit a sync replication peer to {@link SyncReplicationState#STANDBY}. + *

+ *

+ * When transiting to {@link SyncReplicationState#STANDBY}, we can remove all the pending wal + * files for a replication peer as we do not need to replicate them any more. And this is + * necessary, otherwise when we transit back to {@link SyncReplicationState#DOWNGRADE_ACTIVE} + * later, the stale data will be replicated again and cause inconsistency. + *

+ *

+ * See HBASE-20426 for more details. + *

+ * @param peerId the id of the sync replication peer + */ + public void drainSources(String peerId) throws IOException, ReplicationException { + String terminateMessage = "Sync replication peer " + peerId + + " is transiting to STANDBY. Will close the previous replication source and open a new one"; + ReplicationPeer peer = replicationPeers.getPeer(peerId); + assert peer.getPeerConfig().isSyncReplication(); + ReplicationSourceInterface src = createSource(peerId, peer); + // synchronized here to avoid race with preLogRoll where we add new log to source and also + // walsById. + ReplicationSourceInterface toRemove; + Map> wals = new HashMap<>(); + synchronized (latestPaths) { + toRemove = sources.put(peerId, src); + if (toRemove != null) { + LOG.info("Terminate replication source for " + toRemove.getPeerId()); + toRemove.terminate(terminateMessage); + } + // Here we make a copy of all the remaining wal files and then delete them from the + // replication queue storage after releasing the lock. It is not safe to just remove the old + // map from walsById since later we may fail to delete them from the replication queue + // storage, and when we retry next time, we can not know the wal files that need to be deleted + // from the replication queue storage. + walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v))); + } + LOG.info("Startup replication source for " + src.getPeerId()); + src.startup(); + for (NavigableSet walsByGroup : wals.values()) { + for (String wal : walsByGroup) { + queueStorage.removeWAL(server.getServerName(), peerId, wal); + } + } + synchronized (walsById) { + Map> oldWals = walsById.get(peerId); + wals.forEach((k, v) -> { + NavigableSet walsByGroup = oldWals.get(k); + if (walsByGroup != null) { + walsByGroup.removeAll(v); + } + }); + } + // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is + // a background task, we will delete the file from replication queue storage under the lock to + // simplify the logic. + synchronized (this.oldsources) { + for (Iterator iter = oldsources.iterator(); iter.hasNext();) { + ReplicationSourceInterface oldSource = iter.next(); + if (oldSource.getPeerId().equals(peerId)) { + String queueId = oldSource.getQueueId(); + queueStorage.removeQueue(server.getServerName(), queueId); + walsByIdRecoveredQueues.remove(queueId); + oldSource.terminate(terminateMessage); + iter.remove(); + } + } + } + } + + /** * Close the previous replication sources of this peer id and open new sources to trigger the new * replication state changes or new replication config changes. Here we don't need to change * replication queue storage and only to enqueue all logs to the new replication source * @param peerId the id of the replication peer - * @throws IOException */ public void refreshSources(String peerId) throws IOException { String terminateMessage = "Peer " + peerId + @@ -406,7 +478,7 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Terminate replication source for " + toRemove.getPeerId()); toRemove.terminate(terminateMessage); } - for (SortedSet walsByGroup : walsById.get(peerId).values()) { + for (NavigableSet walsByGroup : walsById.get(peerId).values()) { walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); } } @@ -779,11 +851,11 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); String actualPeerId = replicationQueueInfo.getPeerId(); - ReplicationPeer peer = replicationPeers.getPeer(actualPeerId); + ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId); if (peer == null) { LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS + ", peer is null"); - abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); + deleteQueue(queueId); continue; } if (server instanceof ReplicationSyncUp.DummyServer @@ -793,27 +865,39 @@ public class ReplicationSourceManager implements ReplicationListener { actualPeerId); continue; } - // track sources in walsByIdRecoveredQueues - Map> walsByGroup = new HashMap<>(); - walsByIdRecoveredQueues.put(queueId, walsByGroup); - for (String wal : walsSet) { - String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); - NavigableSet wals = walsByGroup.get(walPrefix); - if (wals == null) { - wals = new TreeSet<>(); - walsByGroup.put(walPrefix, wals); - } - wals.add(wal); - } ReplicationSourceInterface src = createSource(queueId, peer); // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer synchronized (oldsources) { - if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) { + // The peer has been removed, or it has been recreated with the same peer id + if (replicationPeers.getPeer(actualPeerId) != peer) { src.terminate("Recovered queue doesn't belong to any current peer"); - removeRecoveredSource(src); + deleteQueue(queueId); continue; } + // Do not setup recovered queue if a sync replication peer is in standby state + if (peer.getPeerConfig().isSyncReplication()) { + Pair stateAndNewState = + peer.getSyncReplicationStateAndNewState(); + if (stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) || + stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) { + src.terminate("Sync replication peer is in STANDBY state"); + deleteQueue(queueId); + continue; + } + } + // track sources in walsByIdRecoveredQueues + Map> walsByGroup = new HashMap<>(); + walsByIdRecoveredQueues.put(queueId, walsByGroup); + for (String wal : walsSet) { + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); + NavigableSet wals = walsByGroup.get(walPrefix); + if (wals == null) { + wals = new TreeSet<>(); + walsByGroup.put(walPrefix, wals); + } + wals.add(wal); + } oldsources.add(src); for (String wal : walsSet) { src.enqueueLog(new Path(oldLogDir, wal)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java new file mode 100644 index 0000000..5da7870 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.SyncReplicationTestBase; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDrainReplicationQueuesForStandBy.class); + + @Test + public void test() throws Exception { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, 100); + + HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); + String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName( + ((AbstractFSWAL) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build())) + .getCurrentFileName().getName()); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + // transit cluster2 to DA and cluster 1 to S + verify(UTIL2, 0, 100); + + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + // delete the original value, and then major compact + try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { + for (int i = 0; i < 100; i++) { + table.delete(new Delete(Bytes.toBytes(i))); + } + } + UTIL2.flush(TABLE_NAME); + UTIL2.compact(TABLE_NAME, true); + // wait until the new values are replicated back to cluster1 + HRegion region = rs.getRegions(TABLE_NAME).get(0); + UTIL1.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return region.get(new Get(Bytes.toBytes(99))).isEmpty(); + } + + @Override + public String explainFailure() throws Exception { + return "Replication has not been catched up yet"; + } + }); + // transit cluster1 to DA and cluster2 to S, then we will start replicating from cluster1 to + // cluster2 + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().enableReplicationPeer(PEER_ID); + + // confirm that we will not replicate the old data which causes inconsistency + ReplicationSource source = (ReplicationSource) ((Replication) rs.getReplicationSourceService()) + .getReplicationManager().getSource(PEER_ID); + UTIL1.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return !source.workerThreads.containsKey(walGroupId); + } + + @Override + public String explainFailure() throws Exception { + return "Replication has not been catched up yet"; + } + }); + HRegion region2 = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); + for (int i = 0; i < 100; i++) { + assertTrue(region2.get(new Get(Bytes.toBytes(i))).isEmpty()); + } + } +} \ No newline at end of file -- 2.7.4