From 23bb87b567b8b095f6810da108fdac9f3cb67946 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 8 Feb 2018 11:31:10 +0800 Subject: [PATCH] HBASE-19957 General framework to transit sync replication state --- .../hbase/replication/ReplicationPeerConfig.java | 5 + .../java/org/apache/hadoop/hbase/HConstants.java | 3 + .../src/main/protobuf/MasterProcedure.proto | 14 ++ .../hbase/replication/ReplicationPeerImpl.java | 15 ++ .../hadoop/hbase/replication/ReplicationPeers.java | 10 +- .../org/apache/hadoop/hbase/master/HMaster.java | 4 +- .../master/replication/AbstractPeerProcedure.java | 14 +- .../master/replication/ModifyPeerProcedure.java | 15 +- .../master/replication/RefreshPeerProcedure.java | 18 ++- .../master/replication/ReplicationPeerManager.java | 22 +-- .../TransitPeerSyncReplicationStateProcedure.java | 155 ++++++++++++--------- .../hadoop/hbase/regionserver/HRegionServer.java | 35 ++--- .../regionserver/ReplicationSourceService.java | 11 +- .../regionserver/PeerActionListener.java | 4 +- .../regionserver/PeerProcedureHandler.java | 16 ++- .../regionserver/PeerProcedureHandlerImpl.java | 56 +++++++- .../regionserver/RefreshPeerCallable.java | 6 + .../replication/regionserver/Replication.java | 21 ++- .../regionserver/ReplicationSourceManager.java | 41 ++++-- .../SyncReplicationPeerInfoProvider.java | 43 ++++++ .../SyncReplicationPeerInfoProviderImpl.java | 71 ++++++++++ .../SyncReplicationPeerMappingManager.java | 48 +++++++ .../regionserver/SyncReplicationPeerProvider.java | 35 ----- .../hbase/wal/SyncReplicationWALProvider.java | 28 ++-- .../org/apache/hadoop/hbase/wal/WALFactory.java | 47 ++----- .../regionserver/wal/TestCombinedAsyncWriter.java | 6 + .../hbase/wal/TestSyncReplicationWALProvider.java | 40 ++++-- 27 files changed, 541 insertions(+), 242 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 4c10c46..2c8f993 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -217,6 +218,10 @@ public class ReplicationPeerConfig { return this.remoteWALDir; } + public boolean isSyncReplication() { + return !StringUtils.isBlank(remoteWALDir); + } + public static ReplicationPeerConfigBuilder newBuilder() { return new ReplicationPeerConfigBuilderImpl(); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 1cd6f89..4664778 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1351,6 +1351,9 @@ public final class HConstants { public static final String NOT_IMPLEMENTED = "Not implemented"; + // TODO: need to find a better place to hold it. + public static final String SYNC_REPLICATION_ENABLED = "hbase.replication.sync.enabled"; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 1dffd33..39f97eb 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -374,6 +374,15 @@ enum PeerModificationState { POST_PEER_MODIFICATION = 4; } +enum PeerSyncReplicationStateTransitionState { + PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION = 1; + UPDATE_PEER_SYNC_REPLICATION_STATE_STORAGE = 2; + REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3; + REOPEN_ALL_REGIONS_IN_PEER = 4; + REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 5; + POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 6; +} + message PeerModificationStateData { required string peer_id = 1; } @@ -384,18 +393,23 @@ enum PeerModificationType { ENABLE_PEER = 3; DISABLE_PEER = 4; UPDATE_PEER_CONFIG = 5; + TRANSIT_SYNC_REPLICATION_STATE = 6; } message RefreshPeerStateData { required string peer_id = 1; required PeerModificationType type = 2; required ServerName target_server = 3; + /** We need multiple stages for sync replication state transition **/ + optional uint32 stage = 4 [default = 0]; } message RefreshPeerParameter { required string peer_id = 1; required PeerModificationType type = 2; required ServerName target_server = 3; + /** We need multiple stages for sync replication state transition **/ + optional uint32 stage = 4 [default = 0];; } message PeerProcedureStateData { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java index 5ec14cd..eced90b 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -38,6 +38,8 @@ public class ReplicationPeerImpl implements ReplicationPeer { private volatile SyncReplicationState syncReplicationState; + private SyncReplicationState oldSyncReplicationState; + private final List peerConfigListeners; /** @@ -66,6 +68,19 @@ public class ReplicationPeerImpl implements ReplicationPeer { peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig)); } + public void transitSyncReplicationStateBegin(SyncReplicationState newState) { + oldSyncReplicationState = syncReplicationState; + syncReplicationState = newState; + } + + public SyncReplicationState getOldSyncReplicationState() { + return oldSyncReplicationState; + } + + public void transitSyncReplicationStateEnd() { + oldSyncReplicationState = null; + } + /** * Get the identifier of this peer * @return string representation of the id (short) diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index f120dbc..d0f9472 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -79,8 +79,8 @@ public class ReplicationPeers { return true; } - public void removePeer(String peerId) { - peerCache.remove(peerId); + public ReplicationPeerImpl removePeer(String peerId) { + return peerCache.remove(peerId); } /** @@ -105,18 +105,12 @@ public class ReplicationPeers { public PeerState refreshPeerState(String peerId) throws ReplicationException { ReplicationPeerImpl peer = peerCache.get(peerId); - if (peer == null) { - throw new ReplicationException("Peer with id=" + peerId + " is not cached."); - } peer.setPeerState(peerStorage.isPeerEnabled(peerId)); return peer.getPeerState(); } public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException { ReplicationPeerImpl peer = peerCache.get(peerId); - if (peer == null) { - throw new ReplicationException("Peer with id=" + peerId + " is not cached."); - } peer.setPeerConfig(peerStorage.getPeerConfig(peerId)); return peer.getPeerConfig(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index ddffbec..297b1f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -129,10 +129,10 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; +import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure; import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; -import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure; @@ -3347,7 +3347,7 @@ public class HMaster extends HRegionServer implements MasterServices { return favoredNodesManager; } - private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException { + private long executePeerProcedure(AbstractPeerProcedure procedure) throws IOException { long procId = procedureExecutor.submitProcedure(procedure); procedure.getLatch().await(); return procId; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java index 0ad8a63..6679d78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java @@ -46,7 +46,7 @@ public abstract class AbstractPeerProcedure protected AbstractPeerProcedure(String peerId) { this.peerId = peerId; - this.latch = ProcedurePrepareLatch.createLatch(2, 0); + this.latch = ProcedurePrepareLatch.createLatch(2, 1); } public ProcedurePrepareLatch getLatch() { @@ -94,4 +94,16 @@ public abstract class AbstractPeerProcedure super.deserializeStateData(serializer); peerId = serializer.deserialize(PeerProcedureStateData.class).getPeerId(); } + + @Override + protected void rollbackState(MasterProcedureEnv env, TState state) + throws IOException, InterruptedException { + if (state == getInitialState()) { + // actually the peer related operations has no rollback, but if we haven't done any + // modifications on the peer storage yet, we can just return. + return; + } + throw new UnsupportedOperationException(); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 83c5134..ac5d2af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -105,8 +105,8 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure new RefreshPeerProcedure(peerId, getPeerOperationType(), sn)) - .toArray(RefreshPeerProcedure[]::new)); + .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn)) + .toArray(RefreshPeerProcedure[]::new)); setNextState(PeerModificationState.POST_PEER_MODIFICATION); return Flow.HAS_MORE_STATE; case POST_PEER_MODIFICATION: @@ -128,17 +128,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure private ServerName targetServer; + private int stage; + private boolean dispatched; private ProcedureEvent event; @@ -62,9 +64,15 @@ public class RefreshPeerProcedure extends Procedure } public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer) { + this(peerId, type, targetServer, 0); + } + + public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer, + int stage) { this.peerId = peerId; this.type = type; this.targetServer = targetServer; + this.stage = stage; } @Override @@ -89,6 +97,8 @@ public class RefreshPeerProcedure extends Procedure return PeerModificationType.DISABLE_PEER; case UPDATE_CONFIG: return PeerModificationType.UPDATE_PEER_CONFIG; + case TRANSIT_SYNC_REPLICATION_STATE: + return PeerModificationType.TRANSIT_SYNC_REPLICATION_STATE; default: throw new IllegalArgumentException("Unknown type: " + type); } @@ -106,6 +116,8 @@ public class RefreshPeerProcedure extends Procedure return PeerOperationType.DISABLE; case UPDATE_PEER_CONFIG: return PeerOperationType.UPDATE_CONFIG; + case TRANSIT_SYNC_REPLICATION_STATE: + return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE; default: throw new IllegalArgumentException("Unknown type: " + type); } @@ -116,7 +128,8 @@ public class RefreshPeerProcedure extends Procedure assert targetServer.equals(remote); return new ServerOperation(this, getProcId(), RefreshPeerCallable.class, RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) - .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()); + .setTargetServer(ProtobufUtil.toServerName(remote)).setStage(stage).build() + .toByteArray()); } private void complete(MasterProcedureEnv env, Throwable error) { @@ -191,7 +204,7 @@ public class RefreshPeerProcedure extends Procedure protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { serializer.serialize( RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) - .setTargetServer(ProtobufUtil.toServerName(targetServer)).build()); + .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).build()); } @Override @@ -200,5 +213,6 @@ public class RefreshPeerProcedure extends Procedure peerId = data.getPeerId(); type = toPeerOperationType(data.getType()); targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + stage = data.getStage(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 9336fbd..e054027 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.EnumMap; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -50,6 +49,9 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; + /** * Manages and performs all replication admin operations. *

@@ -64,15 +66,11 @@ public class ReplicationPeerManager { private final ConcurrentMap peers; - private final EnumMap> allowedTransition = - new EnumMap>(SyncReplicationState.class) { - { - put(SyncReplicationState.ACTIVE, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE)); - put(SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE)); - put(SyncReplicationState.DOWNGRADE_ACTIVE, - EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)); - } - }; + private final ImmutableMap> allowedTransition = + Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE, + EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.STANDBY, + EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.DOWNGRADE_ACTIVE, + EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE))); ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, ConcurrentMap peers) { @@ -264,6 +262,10 @@ public class ReplicationPeerManager { public void transitPeerSyncReplicationState(String peerId, SyncReplicationState state) throws ReplicationException { ReplicationPeerDescription desc = peers.get(peerId); + if (desc.getSyncReplicationState() == state) { + // this should be a retry, just return + return; + } peerStorage.setPeerSyncReplicationState(peerId, state); peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), state)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index aad3b06..85cb374 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -18,11 +18,12 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; - +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; @@ -32,26 +33,27 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData; /** - * The procedure for transit current cluster state for a synchronous replication peer. + * The procedure for transit current sync replication state for a synchronous replication peer. */ @InterfaceAudience.Private -public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedure { +public class TransitPeerSyncReplicationStateProcedure + extends AbstractPeerProcedure { private static final Logger LOG = LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); - private SyncReplicationState state; + private SyncReplicationState syncReplicationState; public TransitPeerSyncReplicationStateProcedure() { } public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) { super(peerId); - this.state = state; + this.syncReplicationState = state; } @Override @@ -60,99 +62,124 @@ public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedur } @Override - protected void prePeerModification(MasterProcedureEnv env) - throws IOException, ReplicationException { - MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); - if (cpHost != null) { - cpHost.preTransitReplicationPeerSyncReplicationState(peerId, state); - } - env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, state); + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer + .serialize(TransitPeerSyncReplicationStateStateData.newBuilder().setSyncReplicationState( + ReplicationPeerConfigUtil.toSyncReplicationState(syncReplicationState)).build()); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { - env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, state); + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + TransitPeerSyncReplicationStateStateData data = + serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); + syncReplicationState = + ReplicationPeerConfigUtil.toSyncReplicationState(data.getSyncReplicationState()); } @Override - protected void postPeerModification(MasterProcedureEnv env) - throws IOException, ReplicationException { - LOG.info("Successfully transit current cluster state to {} in synchronous replication peer {}", - state, peerId); + protected PeerSyncReplicationStateTransitionState getState(int stateId) { + return PeerSyncReplicationStateTransitionState.forNumber(stateId); + } + + @Override + protected int getStateId(PeerSyncReplicationStateTransitionState state) { + return state.getNumber(); + } + + @Override + protected PeerSyncReplicationStateTransitionState getInitialState() { + return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION; + } + + private void preTransit(MasterProcedureEnv env) throws IOException { MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { - env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, state); + cpHost.preTransitReplicationPeerSyncReplicationState(peerId, syncReplicationState); } + env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, + syncReplicationState); } - @Override - protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { - super.serializeStateData(serializer); - serializer.serialize(TransitPeerSyncReplicationStateStateData.newBuilder() - .setSyncReplicationState(ReplicationPeerConfigUtil.toSyncReplicationState(state)).build()); + private void postTransit(MasterProcedureEnv env) throws IOException { + LOG.info("Successfully transit current cluster state to {} for synchronous replication peer {}", + syncReplicationState, peerId); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, + syncReplicationState); + } } - @Override - protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { - super.deserializeStateData(serializer); - TransitPeerSyncReplicationStateStateData data = - serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); - state = ReplicationPeerConfigUtil.toSyncReplicationState(data.getSyncReplicationState()); + private List getRegionsToReopen(MasterProcedureEnv env) { + return env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet() + .stream() + .flatMap(tn -> env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn).stream()) + .collect(Collectors.toList()); } @Override - protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) + protected Flow executeFromState(MasterProcedureEnv env, + PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { switch (state) { - case PRE_PEER_MODIFICATION: + case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION: try { - prePeerModification(env); + preTransit(env); } catch (IOException e) { - LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " + - "mark the procedure as failure and give up", getClass().getName(), peerId, e); - setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); - releaseLatch(); + LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} " + + "when transiting sync replication peer state to {}, " + + "mark the procedure as failure and give up", peerId, state, e); + setFailure("master-transit-peer-sync-replication-state", e); return Flow.NO_MORE_STATE; - } catch (ReplicationException e) { - LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(), - peerId, e); - throw new ProcedureYieldException(); } - setNextState(PeerModificationState.UPDATE_PEER_STORAGE); + setNextState( + PeerSyncReplicationStateTransitionState.UPDATE_PEER_SYNC_REPLICATION_STATE_STORAGE); return Flow.HAS_MORE_STATE; - case UPDATE_PEER_STORAGE: + case UPDATE_PEER_SYNC_REPLICATION_STATE_STORAGE: try { - updatePeerStorage(env); + env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, + syncReplicationState); } catch (ReplicationException e) { - LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId, - e); + LOG.warn("Failed to update peer storage for peer {} when transiting sync replication " + + "peer state to {}, retry", peerId, syncReplicationState, e); throw new ProcedureYieldException(); } - setNextState(PeerModificationState.REFRESH_PEER_ON_RS); + setNextState( + PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN); return Flow.HAS_MORE_STATE; - case REFRESH_PEER_ON_RS: - // TODO: Need add child procedure for every RegionServer - setNextState(PeerModificationState.POST_PEER_MODIFICATION); + case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN: + addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() + .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0)) + .toArray(RefreshPeerProcedure[]::new)); + setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); return Flow.HAS_MORE_STATE; - case POST_PEER_MODIFICATION: + case REOPEN_ALL_REGIONS_IN_PEER: + addChildProcedure( + env.getAssignmentManager().createReopenProcedures(getRegionsToReopen(env))); + setNextState( + PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END); + return Flow.HAS_MORE_STATE; + case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END: + addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() + .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1)) + .toArray(RefreshPeerProcedure[]::new)); + setNextState( + PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); + case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION: try { - postPeerModification(env); - } catch (ReplicationException e) { - LOG.warn("{} failed to call postPeerModification for peer {}, retry", - getClass().getName(), peerId, e); - throw new ProcedureYieldException(); + postTransit(env); } catch (IOException e) { - LOG.warn("{} failed to call post CP hook for peer {}, " + - "ignore since the procedure has already done", getClass().getName(), peerId, e); + LOG.warn( + "Failed to call post CP hook for peer {} when transiting sync replication " + + "peer state to {}, ignore since the procedure has already done", + peerId, syncReplicationState, e); } - releaseLatch(); return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state); } } - private void releaseLatch() { - ProcedurePrepareLatch.releaseLatch(latch, this); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3a93c76..3ed5e87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1785,21 +1785,27 @@ public class HRegionServer extends HasThread implements * be hooked up to WAL. */ private void setupWALAndReplication() throws IOException { + boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster && + (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf)); + if (isMasterNoTableOrSystemTableOnly) { + conf.setBoolean(HConstants.SYNC_REPLICATION_ENABLED, false); + } WALFactory factory = new WALFactory(conf, serverName.toString()); + if (!isMasterNoTableOrSystemTableOnly) { + // TODO Replication make assumptions here based on the default filesystem impl + Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); - // TODO Replication make assumptions here based on the default filesystem impl - Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); - String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); - - Path logDir = new Path(walRootDir, logName); - LOG.debug("logDir={}", logDir); - if (this.walFs.exists(logDir)) { - throw new RegionServerRunningException( - "Region server has already created directory at " + this.serverName.toString()); + Path logDir = new Path(walRootDir, logName); + LOG.debug("logDir={}", logDir); + if (this.walFs.exists(logDir)) { + throw new RegionServerRunningException( + "Region server has already created directory at " + this.serverName.toString()); + } + // Instantiate replication if replication enabled. Pass it the log directories. + createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, + factory.getWALProvider()); } - // Instantiate replication if replication enabled. Pass it the log directories. - createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, - factory.getWALProvider()); this.walFactory = factory; } @@ -2897,11 +2903,6 @@ public class HRegionServer extends HasThread implements */ private static void createNewReplicationInstance(Configuration conf, HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { - if ((server instanceof HMaster) && - (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { - return; - } - // read in the name of the source replication class from the config file. String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 23ba773..4529943 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -18,17 +18,22 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler; +import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.yetus.audience.InterfaceAudience; /** - * A source for a replication stream has to expose this service. - * This service allows an application to hook into the - * regionserver and watch for new transactions. + * A source for a replication stream has to expose this service. This service allows an application + * to hook into the regionserver and watch for new transactions. */ @InterfaceAudience.Private public interface ReplicationSourceService extends ReplicationService { /** + * Returns an info provider for sync replication peer. + */ + SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider(); + + /** * Returns a Handler to handle peer procedures. */ PeerProcedureHandler getPeerProcedureHandler(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java index 6df2af9..efafd09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java @@ -28,8 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface PeerActionListener { - default void peerRemoved(String peerId) {} + static final PeerActionListener DUMMY = new PeerActionListener() {}; default void peerSyncReplicationStateChange(String peerId, SyncReplicationState from, - SyncReplicationState to) {} + SyncReplicationState to, int stage) {} } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java index 65da9af..52b604b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java @@ -15,11 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; - +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; @@ -29,13 +28,16 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface PeerProcedureHandler { - public void addPeer(String peerId) throws ReplicationException, IOException; + void addPeer(String peerId) throws ReplicationException, IOException; + + void removePeer(String peerId) throws ReplicationException, IOException; - public void removePeer(String peerId) throws ReplicationException, IOException; + void disablePeer(String peerId) throws ReplicationException, IOException; - public void disablePeer(String peerId) throws ReplicationException, IOException; + void enablePeer(String peerId) throws ReplicationException, IOException; - public void enablePeer(String peerId) throws ReplicationException, IOException; + void updatePeerConfig(String peerId) throws ReplicationException, IOException; - public void updatePeerConfig(String peerId) throws ReplicationException, IOException; + void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs) + throws ReplicationException, IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index ce8fdae..a9082fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -19,22 +19,32 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.concurrent.locks.Lock; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; +import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @InterfaceAudience.Private public class PeerProcedureHandlerImpl implements PeerProcedureHandler { + private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class); + private final ReplicationSourceManager replicationSourceManager; + private final PeerActionListener peerActionListener; private final KeyLocker peersLock = new KeyLocker<>(); - public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) { + public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager, + PeerActionListener peerActionListener) { this.replicationSourceManager = replicationSourceManager; + this.peerActionListener = peerActionListener; } @Override @@ -60,7 +70,6 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { } private void refreshPeerState(String peerId) throws ReplicationException, IOException { - PeerState newState; Lock peerLock = peersLock.acquireLock(peerId); try { ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); @@ -68,7 +77,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { throw new ReplicationException("Peer with id=" + peerId + " is not cached."); } PeerState oldState = peer.getPeerState(); - newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); + PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); // RS need to start work with the new replication state change if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { replicationSourceManager.refreshSources(peerId); @@ -98,7 +107,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { } ReplicationPeerConfig oldConfig = peer.getPeerConfig(); ReplicationPeerConfig newConfig = - replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); + replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); // RS need to start work with the new replication config change if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) { replicationSourceManager.refreshSources(peerId); @@ -107,4 +116,43 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { peerLock.unlock(); } } + + @Override + public void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs) + throws ReplicationException, IOException { + ReplicationPeers replicationPeers = replicationSourceManager.getReplicationPeers(); + Lock peerLock = peersLock.acquireLock(peerId); + try { + ReplicationPeerImpl peer = replicationPeers.getPeer(peerId); + if (peer == null) { + throw new ReplicationException("Peer with id=" + peerId + " is not cached."); + } + if (!peer.getPeerConfig().isSyncReplication()) { + throw new ReplicationException("Peer with id=" + peerId + " is not synchronous."); + } + if (stage == 0) { + SyncReplicationState oldState = peer.getSyncReplicationState(); + SyncReplicationState newState = + replicationPeers.getPeerStorage().getPeerSyncReplicationState(peerId); + if (oldState == newState) { + return; + } + peer.transitSyncReplicationStateBegin(newState); + peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, 0); + } else { + SyncReplicationState oldState = peer.getOldSyncReplicationState(); + SyncReplicationState newState = peer.getSyncReplicationState(); + if (oldState == null) { + LOG.warn("Old state is null when trying to finish the transition of sync replication " + + "state for peer {}, the new state is {}, the region server may have " + + "already been restarted, give up", peerId, newState); + return; + } + peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, 1); + peer.transitSyncReplicationStateEnd(); + } + } finally { + peerLock.unlock(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java index 7ada24b..4f55685 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java @@ -35,12 +35,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R public class RefreshPeerCallable implements RSProcedureCallable { private static final Logger LOG = Logger.getLogger(RefreshPeerCallable.class); + private HRegionServer rs; private String peerId; private PeerModificationType type; + private int stage; + private Exception initError; @Override @@ -67,6 +70,8 @@ public class RefreshPeerCallable implements RSProcedureCallable { case UPDATE_PEER_CONFIG: handler.updatePeerConfig(this.peerId); break; + case TRANSIT_SYNC_REPLICATION_STATE: + handler.transitSyncReplicationPeerState(peerId, stage, rs); default: throw new IllegalArgumentException("Unknown peer modification type: " + type); } @@ -80,6 +85,7 @@ public class RefreshPeerCallable implements RSProcedureCallable { RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter); this.peerId = param.getPeerId(); this.type = param.getType(); + this.stage = param.getStage(); } catch (InvalidProtocolBufferException e) { initError = e; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 7803ac4..d3976a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.yetus.audience.InterfaceAudience; @@ -66,6 +67,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer private ReplicationTracker replicationTracker; private Configuration conf; private ReplicationSink replicationSink; + private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider; // Hosting server private Server server; /** Statistics thread schedule pool */ @@ -120,19 +122,29 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } + SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, - walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); + walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), + mapping); + this.syncReplicationPeerInfoProvider = + new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping); + PeerActionListener peerActionListener = PeerActionListener.DUMMY; if (walProvider != null) { walProvider .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); + if (walProvider instanceof SyncReplicationWALProvider) { + SyncReplicationWALProvider syncWALProvider = (SyncReplicationWALProvider) walProvider; + peerActionListener = syncWALProvider; + syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider); + } } this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); this.replicationLoad = new ReplicationLoad(); - this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager); + this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager, peerActionListener); } @Override @@ -270,4 +282,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); } + + @Override + public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() { + return syncReplicationPeerInfoProvider; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 85b2e85..7b8c43b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; @@ -136,6 +137,8 @@ public class ReplicationSourceManager implements ReplicationListener { // For recovered source, the queue id's format is peer_id-servername-* private final ConcurrentMap>> walsByIdRecoveredQueues; + private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager; + private final Configuration conf; private final FileSystem fs; // The paths to the latest log of each wal group, for new coming peers @@ -172,9 +175,8 @@ public class ReplicationSourceManager implements ReplicationListener { public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, - WALFileLengthProvider walFileLengthProvider) throws IOException { - // CopyOnWriteArrayList is thread-safe. - // Generally, reading is more than modifying. + WALFileLengthProvider walFileLengthProvider, + SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException { this.sources = new ConcurrentHashMap<>(); this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; @@ -187,10 +189,11 @@ public class ReplicationSourceManager implements ReplicationListener { this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; - this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 - // seconds + // 30 seconds + this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); this.clusterId = clusterId; this.walFileLengthProvider = walFileLengthProvider; + this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager; this.replicationTracker.registerListener(this); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. @@ -254,8 +257,11 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add - * HFile Refs + *

    + *
  1. Add peer to replicationPeers
  2. + *
  3. Add the normal source and related replication queue
  4. + *
  5. Add HFile Refs
  6. + *
* @param peerId the id of replication peer */ public void addPeer(String peerId) throws IOException { @@ -274,13 +280,16 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id - * and related replication queues 3. Remove the normal source and related replication queue 4. - * Remove HFile Refs + *
    + *
  1. Remove peer for replicationPeers
  2. + *
  3. Remove all the recovered sources for the specified id and related replication queues
  4. + *
  5. Remove the normal source and related replication queue
  6. + *
  7. Remove HFile Refs
  8. + *
* @param peerId the id of the replication peer */ public void removePeer(String peerId) { - replicationPeers.removePeer(peerId); + ReplicationPeer peer = replicationPeers.removePeer(peerId); String terminateMessage = "Replication stream was removed by a user"; List oldSourcesToDelete = new ArrayList<>(); // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer @@ -311,7 +320,10 @@ public class ReplicationSourceManager implements ReplicationListener { deleteQueue(peerId); this.walsById.remove(peerId); } - + ReplicationPeerConfig peerConfig = peer.getPeerConfig(); + if (peerConfig.isSyncReplication()) { + syncReplicationPeerMappingManager.remove(peerId, peerConfig); + } // Remove HFile Refs abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId)); } @@ -363,6 +375,10 @@ public class ReplicationSourceManager implements ReplicationListener { } } } + ReplicationPeerConfig peerConfig = peer.getPeerConfig(); + if (peerConfig.isSyncReplication()) { + syncReplicationPeerMappingManager.add(peer.getId(), peerConfig); + } src.startup(); return src; } @@ -442,6 +458,7 @@ public class ReplicationSourceManager implements ReplicationListener { // Delete queue from storage and memory deleteQueue(src.getQueueId()); this.walsById.remove(src.getQueueId()); + } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java new file mode 100644 index 0000000..92f2c52 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Optional; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Get the information for a sync replication peer. + */ +@InterfaceAudience.Private +public interface SyncReplicationPeerInfoProvider { + + /** + * Return the peer id and remote WAL directory if the region is synchronously replicated and the + * state is {@link SyncReplicationState#ACTIVE}. + */ + Optional> getPeerIdAndRemoteWALDir(RegionInfo info); + + /** + * Check whether the give region is contained in a sync replication peer which is in the given + * state. + */ + boolean isInState(RegionInfo info, SyncReplicationState state); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java new file mode 100644 index 0000000..32159e6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Optional; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProvider { + + private final ReplicationPeers replicationPeers; + + private final SyncReplicationPeerMappingManager mapping; + + SyncReplicationPeerInfoProviderImpl(ReplicationPeers replicationPeers, + SyncReplicationPeerMappingManager mapping) { + this.replicationPeers = replicationPeers; + this.mapping = mapping; + } + + @Override + public Optional> getPeerIdAndRemoteWALDir(RegionInfo info) { + String peerId = mapping.getPeerId(info); + if (peerId == null) { + return Optional.empty(); + } + ReplicationPeer peer = replicationPeers.getPeer(peerId); + if (peer == null) { + return Optional.empty(); + } + if (peer.getSyncReplicationState() == SyncReplicationState.ACTIVE) { + return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir())); + } else { + return Optional.empty(); + } + } + + @Override + public boolean isInState(RegionInfo info, SyncReplicationState state) { + String peerId = mapping.getPeerId(info); + if (peerId == null) { + return false; + } + ReplicationPeer peer = replicationPeers.getPeer(peerId); + if (peer == null) { + return false; + } + return peer.getSyncReplicationState() == state; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java new file mode 100644 index 0000000..64216cb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Used to map region to sync replication peer id. + *

+ * TODO: now only support include table options. + */ +@InterfaceAudience.Private +class SyncReplicationPeerMappingManager { + + private final ConcurrentMap table2PeerId = new ConcurrentHashMap<>(); + + void add(String peerId, ReplicationPeerConfig peerConfig) { + peerConfig.getTableCFsMap().keySet().forEach(tn -> table2PeerId.put(tn, peerId)); + } + + void remove(String peerId, ReplicationPeerConfig peerConfig) { + peerConfig.getTableCFsMap().keySet().forEach(table2PeerId::remove); + } + + String getPeerId(RegionInfo info) { + return table2PeerId.get(info.getTable()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java deleted file mode 100644 index b97bf7e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import java.util.Optional; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Get the peer id and remote root dir if the region is synchronously replicated. - */ -@InterfaceAudience.Private -public interface SyncReplicationPeerProvider { - - /** - * Return the peer id and remote WAL directory if the region is synchronously replicated. - */ - Optional> getPeerIdAndRemoteWALDir(RegionInfo info); -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index bccc842..fe7052a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; -import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider; +import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; @@ -67,7 +67,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private final WALProvider provider; - private final SyncReplicationPeerProvider peerProvider; + private SyncReplicationPeerInfoProvider peerInfoProvider; private WALFactory factory; @@ -85,9 +85,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private final KeyLocker createLock = new KeyLocker<>(); - SyncReplicationWALProvider(WALProvider provider, SyncReplicationPeerProvider peerProvider) { + SyncReplicationWALProvider(WALProvider provider) { this.provider = provider; - this.peerProvider = peerProvider; + } + + public void setPeerInfoProvider(SyncReplicationPeerInfoProvider peerInfoProvider) { + this.peerInfoProvider = peerInfoProvider; } @Override @@ -99,7 +102,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen this.conf = conf; this.factory = factory; Pair> eventLoopGroupAndChannelClass = - NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); + NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); channelClass = eventLoopGroupAndChannelClass.getSecond(); } @@ -139,7 +142,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen @Override public WAL getWAL(RegionInfo region) throws IOException { Optional> peerIdAndRemoteWALDir = - peerProvider.getPeerIdAndRemoteWALDir(region); + peerInfoProvider.getPeerIdAndRemoteWALDir(region); if (peerIdAndRemoteWALDir.isPresent()) { Pair pair = peerIdAndRemoteWALDir.get(); return getWAL(pair.getFirst(), pair.getSecond()); @@ -221,14 +224,11 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen } @Override - public void peerRemoved(String peerId) { - safeClose(peerId2WAL.remove(peerId)); - } - - @Override public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from, - SyncReplicationState to) { - assert to == SyncReplicationState.DOWNGRADE_ACTIVE; - safeClose(peerId2WAL.remove(peerId)); + SyncReplicationState to, int stage) { + // TODO: consider stage + if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE) { + safeClose(peerId2WAL.remove(peerId)); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 06999ea..202b584 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -24,10 +24,10 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; @@ -143,18 +143,6 @@ public class WALFactory { } /** - * instantiate a provider from a config property. requires conf to have already been set (as well - * as anything the provider might need to read). - */ - private WALProvider getProvider(String key, String defaultValue, String providerId) - throws IOException { - WALProvider provider = createProvider(getProviderClass(key, defaultValue)); - provider.init(this, conf, providerId); - provider.addWALActionsListener(new MetricsWAL()); - return provider; - } - - /** * @param conf must not be null, will keep a reference to read params in later reader/writer * instances. * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations @@ -171,7 +159,13 @@ public class WALFactory { this.factoryId = factoryId; // end required early initialization if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { - provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); + WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); + if (conf.getBoolean(HConstants.SYNC_REPLICATION_ENABLED, false)) { + provider = new SyncReplicationWALProvider(provider); + } + provider.init(this, conf, null); + provider.addWALActionsListener(new MetricsWAL()); + this.provider = provider; } else { // special handling of existing configuration behavior. LOG.warn("Running with WAL disabled."); @@ -181,26 +175,6 @@ public class WALFactory { } /** - * A temporary constructor for testing synchronous replication. - *

- * Remove it once we can integrate the synchronous replication logic in RS. - */ - @VisibleForTesting - WALFactory(Configuration conf, String factoryId, SyncReplicationPeerProvider peerProvider) - throws IOException { - timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); - /* TODO Both of these are probably specific to the fs wal provider */ - logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, - AbstractFSWALProvider.Reader.class); - this.conf = conf; - this.factoryId = factoryId; - WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); - this.provider = new SyncReplicationWALProvider(provider, peerProvider); - this.provider.init(this, conf, null); - this.provider.addWALActionsListener(new MetricsWAL()); - } - - /** * Shutdown all WALs and clean up any underlying storage. * Use only when you will not need to replay and edits that have gone to any wals from this * factory. @@ -248,8 +222,9 @@ public class WALFactory { if (provider != null) { return provider; } - provider = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER, - AbstractFSWALProvider.META_WAL_PROVIDER_ID); + provider = createProvider(getProviderClass(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER)); + provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID); + provider.addWALActionsListener(new MetricsWAL()); if (metaProvider.compareAndSet(null, provider)) { return provider; } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java index 36dbe0f..07aa6a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -51,6 +53,10 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; @Category({ RegionServerTests.class, MediumTests.class }) public class TestCombinedAsyncWriter { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static EventLoopGroup EVENT_LOOP_GROUP; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index 60a9e13..986228c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -25,7 +25,9 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.util.Optional; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.RegionInfo; @@ -34,6 +36,8 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -41,12 +45,17 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ RegionServerTests.class, MediumTests.class }) public class TestSyncReplicationWALProvider { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static String PEER_ID = "1"; @@ -63,19 +72,30 @@ public class TestSyncReplicationWALProvider { private static WALFactory FACTORY; - private static Optional> getPeerIdAndRemoteWALDir(RegionInfo info) { - if (info.getTable().equals(TABLE)) { - return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR)); - } else { - return Optional.empty(); + public static final class InfoProvider implements SyncReplicationPeerInfoProvider { + + @Override + public Optional> getPeerIdAndRemoteWALDir(RegionInfo info) { + if (info.getTable().equals(TABLE)) { + return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR)); + } else { + return Optional.empty(); + } + } + + @Override + public boolean isInState(RegionInfo info, SyncReplicationState state) { + // TODO Implement SyncReplicationPeerInfoProvider.isInState + return false; } } @BeforeClass public static void setUpBeforeClass() throws Exception { + UTIL.getConfiguration().setBoolean(HConstants.SYNC_REPLICATION_ENABLED, true); UTIL.startMiniDFSCluster(3); - FACTORY = new WALFactory(UTIL.getConfiguration(), "test", - TestSyncReplicationWALProvider::getPeerIdAndRemoteWALDir); + FACTORY = new WALFactory(UTIL.getConfiguration(), "test"); + ((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider()); UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID)); } @@ -145,9 +165,9 @@ public class TestSyncReplicationWALProvider { DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION); assertEquals(2, FACTORY.getWALs().size()); testReadWrite(wal); - SyncReplicationWALProvider walProvider = - (SyncReplicationWALProvider) FACTORY.getWALProvider(); - walProvider.peerRemoved(PEER_ID); + SyncReplicationWALProvider walProvider = (SyncReplicationWALProvider) FACTORY.getWALProvider(); + walProvider.peerSyncReplicationStateChange(PEER_ID, SyncReplicationState.ACTIVE, + SyncReplicationState.DOWNGRADE_ACTIVE, 1); assertEquals(1, FACTORY.getWALs().size()); } } -- 2.7.4