diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2126f6d..ee2177a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -86,6 +86,8 @@ public class ReplicationSource extends Thread
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
+ // id of the wal group this source is handling
+ private String walGroupId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
// Should we stop everything?
@@ -145,6 +147,7 @@ public class ReplicationSource extends Thread
* @param clusterId unique UUID for the cluster
* @param replicationEndpoint the replication endpoint implementation
* @param metrics metrics for replication source
+ * @param walGroupId wal group this source is handling
* @throws IOException
*/
@Override
@@ -152,7 +155,7 @@ public class ReplicationSource extends Thread
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
- final MetricsSource metrics)
+ final MetricsSource metrics, final String walGroupId)
throws IOException {
this.stopper = stopper;
this.conf = HBaseConfiguration.create(conf);
@@ -183,6 +186,7 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
+ this.walGroupId = walGroupId;
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
}
@@ -196,6 +200,11 @@ public class ReplicationSource extends Thread
@Override
public void enqueueLog(Path log) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("ReplicationSource for peer and group [" + peerId
+ + ReplicationSourceManager.PEER_GROUP_DELIMITER + walGroupId + "] now enqueue log: "
+ + log);
+ }
this.queue.put(log);
int queueSize = queue.size();
this.metrics.setSizeOfLogQueue(queueSize);
@@ -880,4 +889,9 @@ public class ReplicationSource extends Thread
public MetricsSource getSourceMetrics() {
return this.metrics;
}
+
+ @Override
+ public String getWALGroupId() {
+ return this.walGroupId;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 1e9c714..fa25628 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -46,13 +46,14 @@ public interface ReplicationSourceInterface {
* @param stopper the stopper object for this region server
* @param peerClusterZnode
* @param clusterId
+ * @param groupId
* @throws IOException
*/
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
- final MetricsSource metrics) throws IOException;
+ final MetricsSource metrics, String groupId) throws IOException;
/**
* Add a log to the list of logs to replicate
@@ -105,4 +106,6 @@ public interface ReplicationSourceInterface {
*/
String getStats();
+ String getWALGroupId();
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0c8f6f9..65eb2eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -23,9 +23,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -56,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -63,7 +67,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* This class is responsible to manage all the replication
* sources. There are two classes of sources:
*
- *
Normal sources are persistent and one per peer cluster
+ *
Normal sources are persistent and one per wal group per peer cluster
*
Old sources are recovered from a failed region server and our
* only goal is to finish replicating the WAL queue it had up in ZK
*
@@ -91,13 +95,14 @@ public class ReplicationSourceManager implements ReplicationListener {
// All about stopping
private final Server server;
// All logs we are currently tracking
- private final Map> walsById;
+ // Index structure of the map is: peer_id->logPrefix/logGroup->logs
+ private final Map>> walsById;
// Logs for recovered sources we are currently tracking
private final Map> walsByIdRecoveredQueues;
private final Configuration conf;
private final FileSystem fs;
- // The path to the latest log we saw, for new coming sources
- private Path latestPath;
+ // The paths to the latest log of each wal group, for new coming peers
+ private Set latestPaths;
// Path to the wals directories
private final Path logDir;
// Path to the wal archive
@@ -109,6 +114,8 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Random rand;
+ public static final String PEER_GROUP_DELIMITER = "@";
+
/**
* Creates a replication manager and sets the watch on all the other registered region servers
@@ -133,7 +140,7 @@ public class ReplicationSourceManager implements ReplicationListener {
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.server = server;
- this.walsById = new HashMap>();
+ this.walsById = new HashMap>>();
this.walsByIdRecoveredQueues = new ConcurrentHashMap>();
this.oldsources = new CopyOnWriteArrayList();
this.conf = conf;
@@ -158,6 +165,7 @@ public class ReplicationSourceManager implements ReplicationListener {
tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build());
this.rand = new Random();
+ this.latestPaths = Collections.synchronizedSet(new HashSet());
}
/**
@@ -196,8 +204,9 @@ public class ReplicationSourceManager implements ReplicationListener {
}
} else {
synchronized (this.walsById) {
- SortedSet wals = walsById.get(id);
- if (!wals.first().equals(key)) {
+ String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
+ SortedSet wals = walsById.get(id).get(logPrefix);
+ if (wals != null && !wals.first().equals(key)) {
cleanOldLogs(wals, key, id);
}
}
@@ -218,8 +227,12 @@ public class ReplicationSourceManager implements ReplicationListener {
* old region server wal queues
*/
protected void init() throws IOException, ReplicationException {
- for (String id : this.replicationPeers.getPeerIds()) {
- addSource(id);
+ synchronized (this.walsById) {
+ for (String id : this.replicationPeers.getPeerIds()) {
+ if (walsById.get(id) == null) {
+ this.walsById.put(id, new HashMap>());
+ }
+ }
}
List currentReplicators = this.replicationQueues.getListOfReplicators();
if (currentReplicators == null || currentReplicators.size() == 0) {
@@ -238,40 +251,61 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
- * Add a new normal source to this region server
+ * Add sources for the given peer cluster on this region server. For the newly added peer, we only
+ * need to enqueue the latest log of each wal group and do replication
* @param id the id of the peer cluster
- * @return the source that was created
* @throws IOException
*/
- protected ReplicationSourceInterface addSource(String id) throws IOException,
+ protected void addSource(String id) throws IOException,
ReplicationException {
- ReplicationPeerConfig peerConfig
- = replicationPeers.getReplicationPeerConfig(id);
- ReplicationPeer peer = replicationPeers.getPeer(id);
- ReplicationSourceInterface src =
- getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
- this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
synchronized (this.walsById) {
- this.sources.add(src);
- this.walsById.put(id, new TreeSet());
+ if (walsById.get(id) == null) {
+ this.walsById.put(id, new HashMap>());
+ }
// Add the latest wal to that source's queue
- if (this.latestPath != null) {
- String name = this.latestPath.getName();
- this.walsById.get(id).add(name);
- try {
- this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
- } catch (ReplicationException e) {
- String message =
- "Cannot add log to queue when creating a new source, queueId="
- + src.getPeerClusterZnode() + ", filename=" + name;
- server.stop(message);
- throw e;
+ synchronized (latestPaths) {
+ if (this.latestPaths.size() > 0) {
+ for (Path logPath : latestPaths) {
+ String name = logPath.getName();
+ String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
+ SortedSet logs = new TreeSet();
+ logs.add(name);
+ this.walsById.get(id).put(walPrefix, logs);
+ try {
+ this.replicationQueues.addLog(id, name);
+ } catch (ReplicationException e) {
+ String message =
+ "Cannot add log to queue when creating a new source, queueId="
+ + id + ", filename=" + name;
+ server.stop(message);
+ throw e;
+ }
+ ReplicationSourceInterface src = getReplicationSource(id, walPrefix);
+ src.enqueueLog(logPath);
+ }
}
- src.enqueueLog(this.latestPath);
}
}
- src.startup();
- return src;
+ }
+
+ private ReplicationSourceInterface getReplicationSource(String id, String logPrefix)
+ throws IOException, ReplicationException {
+ for (ReplicationSourceInterface src : sources) {
+ if (src.getPeerClusterZnode().equals(id) && src.getWALGroupId().equals(logPrefix)) {
+ return src;
+ }
+ }
+ synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
+ // the to-be-removed peer
+ ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
+ ReplicationPeer peer = replicationPeers.getPeer(id);
+ ReplicationSourceInterface src =
+ getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
+ this.replicationPeers, server, id, this.clusterId, peerConfig, peer, logPrefix);
+ sources.add(src);
+ src.startup();
+ return src;
+ }
}
/**
@@ -302,7 +336,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get a copy of the wals of the first source on this rs
* @return a sorted set of wal names
*/
- protected Map> getWALs() {
+ protected Map>> getWALs() {
return Collections.unmodifiableMap(walsById);
}
@@ -331,33 +365,110 @@ public class ReplicationSourceManager implements ReplicationListener {
}
void preLogRoll(Path newLog) throws IOException {
- synchronized (this.walsById) {
- String name = newLog.getName();
- for (ReplicationSourceInterface source : this.sources) {
+ checkAndAddSource(newLog);
+ String logName = newLog.getName();
+ String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
+ synchronized (latestPaths) {
+ Iterator iterator = latestPaths.iterator();
+ while (iterator.hasNext()) {
+ Path path = iterator.next();
+ if (path.getName().contains(logPrefix)) {
+ iterator.remove();
+ break;
+ }
+ }
+ this.latestPaths.add(newLog);
+ }
+ }
+
+ /**
+ * Check and enqueue the given log to the correct source. If there's still no source for the
+ * group to which the given log belongs, create one
+ * @param logPath the log path to check and enqueue
+ * @throws IOException
+ */
+ private void checkAndAddSource(Path logPath) throws IOException {
+ String logName = logPath.getName();
+ String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
+ synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
+ // the to-be-removed peer
+ for (String id : replicationPeers.getPeerIds()) {
+ // update replication queues
try {
- this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
+ this.replicationQueues.addLog(id, logName);
} catch (ReplicationException e) {
- throw new IOException("Cannot add log to replication queue with id="
- + source.getPeerClusterZnode() + ", filename=" + name, e);
+ String message =
+ "Cannot add log to queue when creating a new source, queueId=" + id + ", filename="
+ + logName;
+ server.stop(message);
+ throw new IOException(message, e);
+ }
+ // update sources
+ boolean sourceExists = false;
+ for (ReplicationSourceInterface src : sources) {
+ if (src.getPeerClusterZnode().equals(id) && src.getWALGroupId().equals(logPrefix)) {
+ sourceExists = true;
+ break;
+ }
+ }
+ if (!sourceExists) {
+ ReplicationPeerConfig peerConfig;
+ try {
+ peerConfig = replicationPeers.getReplicationPeerConfig(id);
+ } catch (ReplicationException e) {
+ LOG.error("Failed to get replication peer config for peer " + id, e);
+ throw new IOException(e);
+ }
+ ReplicationPeer peer = replicationPeers.getPeer(id);
+ ReplicationSourceInterface src =
+ getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
+ this.replicationPeers, server, id, this.clusterId, peerConfig, peer, logPrefix);
+ sources.add(src);
+ src.startup();
}
}
- for (SortedSet wals : this.walsById.values()) {
- if (this.sources.isEmpty()) {
- // If there's no slaves, don't need to keep the old wals since
- // we only consider the last one when a new slave comes in
- wals.clear();
+ }
+ // update walsById map
+ synchronized (walsById) {
+ for (Map.Entry>> entry : this.walsById.entrySet()) {
+ String peerId = entry.getKey();
+ Map> walsByPrefix = entry.getValue();
+ boolean existingPrefix = false;
+ for (Map.Entry> walsEntry : walsByPrefix.entrySet()) {
+ SortedSet wals = walsEntry.getValue();
+ if (this.sources.isEmpty()) {
+ // If there's no slaves, don't need to keep the old wals since
+ // we only consider the last one when a new slave comes in
+ wals.clear();
+ }
+ if (logPrefix.equals(walsEntry.getKey())) {
+ wals.add(logName);
+ existingPrefix = true;
+ }
+ }
+ if (!existingPrefix) {
+ // The new log belongs to a new group, add it into this peer
+ LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
+ SortedSet wals = new TreeSet();
+ wals.add(logName);
+ walsByPrefix.put(logPrefix, wals);
}
- wals.add(name);
}
}
-
- this.latestPath = newLog;
}
void postLogRoll(Path newLog) throws IOException {
+ String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(newLog.getName());
+ boolean logEnqueued = false;
// This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources) {
- source.enqueueLog(newLog);
+ if (source.getWALGroupId().equals(walPrefix)) {
+ source.enqueueLog(newLog);
+ if (!logEnqueued) logEnqueued = true;
+ }
+ }
+ if (!logEnqueued) {
+ LOG.warn("Log " + newLog + " not enqueued into any existing source");
}
}
@@ -368,6 +479,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param manager the manager to use
* @param server the server object for this region server
* @param peerId the id of the peer cluster
+ * @param groupId the group this source is handling
* @return the created source
* @throws IOException
*/
@@ -375,8 +487,8 @@ public class ReplicationSourceManager implements ReplicationListener {
final FileSystem fs, final ReplicationSourceManager manager,
final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
final Server server, final String peerId, final UUID clusterId,
- final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
- throws IOException {
+ final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer,
+ final String groupId) throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
@@ -414,14 +526,16 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
} catch (Exception e) {
- LOG.warn("Passed replication endpoint implementation throws errors", e);
+ LOG.warn("Passed replication endpoint implementation throws errors"
+ + " while initializing ReplicationSource for peer: " + peerId, e);
throw new IOException(e);
}
+ // TODO revisit and decide whether to include walGroupId in the metrics id
MetricsSource metrics = new MetricsSource(peerId);
// init replication source
src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
- clusterId, replicationEndpoint, metrics);
+ clusterId, replicationEndpoint, metrics, groupId);
// init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
@@ -470,7 +584,7 @@ public class ReplicationSourceManager implements ReplicationListener {
+ sources.size() + " and another "
+ oldsources.size() + " that were recovered");
String terminateMessage = "Replication stream was removed by a user";
- ReplicationSourceInterface srcToRemove = null;
+ List srcToRemove = new ArrayList();
List oldSourcesToDelete =
new ArrayList();
// First close all the recovered sources for this peer
@@ -486,19 +600,23 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.info("Number of deleted recovered sources for " + id + ": "
+ oldSourcesToDelete.size());
// Now look for the one on this cluster
- for (ReplicationSourceInterface src : this.sources) {
- if (id.equals(src.getPeerClusterId())) {
- srcToRemove = src;
- break;
+ synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source
+ // for the to-be-removed peer
+ for (ReplicationSourceInterface src : this.sources) {
+ if (id.equals(src.getPeerClusterId())) {
+ srcToRemove.add(src);
+ }
}
+ if (srcToRemove.size() == 0) {
+ LOG.error("The queue we wanted to close is missing " + id);
+ return;
+ }
+ for (ReplicationSourceInterface toRemove : srcToRemove) {
+ toRemove.terminate(terminateMessage);
+ this.sources.remove(toRemove);
+ }
+ deleteSource(id, true);
}
- if (srcToRemove == null) {
- LOG.error("The queue we wanted to close is missing " + id);
- return;
- }
- srcToRemove.terminate(terminateMessage);
- this.sources.remove(srcToRemove);
- deleteSource(id, true);
}
@Override
@@ -599,7 +717,7 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
- server, peerId, this.clusterId, peerConfig, peer);
+ server, peerId, this.clusterId, peerConfig, peer, null);
if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java
index e1417b2..c1ad5d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java
@@ -49,19 +49,24 @@ public class BoundedRegionGroupingProvider extends RegionGroupingProvider {
public void init(final WALFactory factory, final Configuration conf,
final List listeners, final String providerId) throws IOException {
super.init(factory, conf, listeners, providerId);
- // no need to check for and close down old providers; our parent class will throw on re-invoke
delegates = new WALProvider[Math.max(1, conf.getInt(NUM_REGION_GROUPS,
DEFAULT_NUM_REGION_GROUPS))];
- for (int i = 0; i < delegates.length; i++) {
- delegates[i] = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners,
- providerId + i);
- }
LOG.info("Configured to run with " + delegates.length + " delegate WAL providers.");
}
@Override
- WALProvider populateCache(final byte[] group) {
- final WALProvider temp = delegates[counter.getAndIncrement() % delegates.length];
+ WALProvider populateCache(final byte[] group) throws IOException {
+ int idx = counter.getAndIncrement() % delegates.length;
+ // there's IO cost to initialize the provider, so we only initialize it when necessary
+ // no need to check for and close down old providers; our parent class will throw on re-invoke
+ synchronized (delegates) {// synchronize to avoid duplicated initialization
+ if (delegates[idx] == null) {
+ delegates[idx] =
+ factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners, providerId
+ + idx);
+ }
+ }
+ final WALProvider temp = delegates[idx];
final WALProvider extant = cached.putIfAbsent(group, temp);
// if someone else beat us to initializing, just take what they set.
// note that in such a case we skew load away from the provider we picked at first
@@ -74,7 +79,7 @@ public class BoundedRegionGroupingProvider extends RegionGroupingProvider {
IOException failure = null;
for (WALProvider provider : delegates) {
try {
- provider.shutdown();
+ if (provider != null) provider.shutdown();
} catch (IOException exception) {
LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
index b41bbfb..f9cbd23 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -363,4 +363,15 @@ public class DefaultWALProvider implements WALProvider {
}
}
+ /**
+ * Get prefix of the log from its name, assuming WAL name in format of
+ * log_prefix.filenumber.log_suffix @see {@link FSHLog#getCurrentFileName()}
+ * @param name Name of the WAL to parse
+ * @return prefix of the log
+ */
+ public static String getWALPrefixFromWALName(String name) {
+ int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
+ return name.substring(0, endIndex);
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index eb2c426..1975a96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -123,9 +123,9 @@ class RegionGroupingProvider implements WALProvider {
protected RegionGroupingStrategy strategy = null;
- private WALFactory factory = null;
- private List listeners = null;
- private String providerId = null;
+ protected WALFactory factory = null;
+ protected List listeners = null;
+ protected String providerId = null;
@Override
public void init(final WALFactory factory, final Configuration conf,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index f463f76..e90d7ad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -37,15 +37,17 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
ReplicationSourceManager manager;
String peerClusterId;
Path currentPath;
+ String groupId;
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
- UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
- throws IOException {
+ UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics,
+ String walGroupId) throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
+ this.groupId = walGroupId;
}
@Override
@@ -89,4 +91,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public String getStats() {
return "";
}
+
+ @Override
+ public String getWALGroupId() {
+ return groupId;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 451c39f..c9cad2e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -83,6 +83,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName());
}
+ // multiple hlog roll requires more time, wait a while for its completion
+ Threads.sleep(100);
}
@Test (timeout=120000)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 571be26..c2923ee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -25,6 +25,7 @@ import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -229,7 +230,11 @@ public class TestReplicationSourceManager {
}
wal.sync();
- assertEquals(6, manager.getWALs().get(slaveId).size());
+ int logNumber = 0;
+ for (Map.Entry> entry : manager.getWALs().get(slaveId).entrySet()) {
+ logNumber += entry.getValue().size();
+ }
+ assertEquals(6, logNumber);
wal.rollWriter();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
index 577f0ba..8586d33 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
@@ -39,8 +39,10 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@@ -75,6 +77,7 @@ public class TestReplicationWALReaderManager {
private int nbRows;
private int walEditKVs;
private final AtomicLong sequenceId = new AtomicLong(1);
+ @Rule public TestName tn = new TestName();
@Parameters
public static Collection