diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2126f6d..ee2177a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -86,6 +86,8 @@ public class ReplicationSource extends Thread private ReplicationQueueInfo replicationQueueInfo; // id of the peer cluster this source replicates to private String peerId; + // id of the wal group this source is handling + private String walGroupId; // The manager of all sources to which we ping back our progress private ReplicationSourceManager manager; // Should we stop everything? @@ -145,6 +147,7 @@ public class ReplicationSource extends Thread * @param clusterId unique UUID for the cluster * @param replicationEndpoint the replication endpoint implementation * @param metrics metrics for replication source + * @param walGroupId wal group this source is handling * @throws IOException */ @Override @@ -152,7 +155,7 @@ public class ReplicationSource extends Thread final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, - final MetricsSource metrics) + final MetricsSource metrics, final String walGroupId) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); @@ -183,6 +186,7 @@ public class ReplicationSource extends Thread this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); + this.walGroupId = walGroupId; this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; } @@ -196,6 +200,11 @@ public class ReplicationSource extends Thread @Override public void enqueueLog(Path log) { + if (LOG.isTraceEnabled()) { + LOG.trace("ReplicationSource for peer and group [" + peerId + + ReplicationSourceManager.PEER_GROUP_DELIMITER + walGroupId + "] now enqueue log: " + + log); + } this.queue.put(log); int queueSize = queue.size(); this.metrics.setSizeOfLogQueue(queueSize); @@ -880,4 +889,9 @@ public class ReplicationSource extends Thread public MetricsSource getSourceMetrics() { return this.metrics; } + + @Override + public String getWALGroupId() { + return this.walGroupId; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 1e9c714..fa25628 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -46,13 +46,14 @@ public interface ReplicationSourceInterface { * @param stopper the stopper object for this region server * @param peerClusterZnode * @param clusterId + * @param groupId * @throws IOException */ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, - final MetricsSource metrics) throws IOException; + final MetricsSource metrics, String groupId) throws IOException; /** * Add a log to the list of logs to replicate @@ -105,4 +106,6 @@ public interface ReplicationSourceInterface { */ String getStats(); + String getWALGroupId(); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 0c8f6f9..65eb2eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -23,9 +23,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; @@ -56,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -63,7 +67,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * This class is responsible to manage all the replication * sources. There are two classes of sources: * @@ -91,13 +95,14 @@ public class ReplicationSourceManager implements ReplicationListener { // All about stopping private final Server server; // All logs we are currently tracking - private final Map> walsById; + // Index structure of the map is: peer_id->logPrefix/logGroup->logs + private final Map>> walsById; // Logs for recovered sources we are currently tracking private final Map> walsByIdRecoveredQueues; private final Configuration conf; private final FileSystem fs; - // The path to the latest log we saw, for new coming sources - private Path latestPath; + // The paths to the latest log of each wal group, for new coming peers + private Set latestPaths; // Path to the wals directories private final Path logDir; // Path to the wal archive @@ -109,6 +114,8 @@ public class ReplicationSourceManager implements ReplicationListener { private final Random rand; + public static final String PEER_GROUP_DELIMITER = "@"; + /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -133,7 +140,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.server = server; - this.walsById = new HashMap>(); + this.walsById = new HashMap>>(); this.walsByIdRecoveredQueues = new ConcurrentHashMap>(); this.oldsources = new CopyOnWriteArrayList(); this.conf = conf; @@ -158,6 +165,7 @@ public class ReplicationSourceManager implements ReplicationListener { tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); + this.latestPaths = Collections.synchronizedSet(new HashSet()); } /** @@ -196,8 +204,9 @@ public class ReplicationSourceManager implements ReplicationListener { } } else { synchronized (this.walsById) { - SortedSet wals = walsById.get(id); - if (!wals.first().equals(key)) { + String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key); + SortedSet wals = walsById.get(id).get(logPrefix); + if (wals != null && !wals.first().equals(key)) { cleanOldLogs(wals, key, id); } } @@ -218,8 +227,12 @@ public class ReplicationSourceManager implements ReplicationListener { * old region server wal queues */ protected void init() throws IOException, ReplicationException { - for (String id : this.replicationPeers.getPeerIds()) { - addSource(id); + synchronized (this.walsById) { + for (String id : this.replicationPeers.getPeerIds()) { + if (walsById.get(id) == null) { + this.walsById.put(id, new HashMap>()); + } + } } List currentReplicators = this.replicationQueues.getListOfReplicators(); if (currentReplicators == null || currentReplicators.size() == 0) { @@ -238,40 +251,61 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Add a new normal source to this region server + * Add sources for the given peer cluster on this region server. For the newly added peer, we only + * need to enqueue the latest log of each wal group and do replication * @param id the id of the peer cluster - * @return the source that was created * @throws IOException */ - protected ReplicationSourceInterface addSource(String id) throws IOException, + protected void addSource(String id) throws IOException, ReplicationException { - ReplicationPeerConfig peerConfig - = replicationPeers.getReplicationPeerConfig(id); - ReplicationPeer peer = replicationPeers.getPeer(id); - ReplicationSourceInterface src = - getReplicationSource(this.conf, this.fs, this, this.replicationQueues, - this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.walsById) { - this.sources.add(src); - this.walsById.put(id, new TreeSet()); + if (walsById.get(id) == null) { + this.walsById.put(id, new HashMap>()); + } // Add the latest wal to that source's queue - if (this.latestPath != null) { - String name = this.latestPath.getName(); - this.walsById.get(id).add(name); - try { - this.replicationQueues.addLog(src.getPeerClusterZnode(), name); - } catch (ReplicationException e) { - String message = - "Cannot add log to queue when creating a new source, queueId=" - + src.getPeerClusterZnode() + ", filename=" + name; - server.stop(message); - throw e; + synchronized (latestPaths) { + if (this.latestPaths.size() > 0) { + for (Path logPath : latestPaths) { + String name = logPath.getName(); + String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name); + SortedSet logs = new TreeSet(); + logs.add(name); + this.walsById.get(id).put(walPrefix, logs); + try { + this.replicationQueues.addLog(id, name); + } catch (ReplicationException e) { + String message = + "Cannot add log to queue when creating a new source, queueId=" + + id + ", filename=" + name; + server.stop(message); + throw e; + } + ReplicationSourceInterface src = getReplicationSource(id, walPrefix); + src.enqueueLog(logPath); + } } - src.enqueueLog(this.latestPath); } } - src.startup(); - return src; + } + + private ReplicationSourceInterface getReplicationSource(String id, String logPrefix) + throws IOException, ReplicationException { + for (ReplicationSourceInterface src : sources) { + if (src.getPeerClusterZnode().equals(id) && src.getWALGroupId().equals(logPrefix)) { + return src; + } + } + synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for + // the to-be-removed peer + ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); + ReplicationPeer peer = replicationPeers.getPeer(id); + ReplicationSourceInterface src = + getReplicationSource(this.conf, this.fs, this, this.replicationQueues, + this.replicationPeers, server, id, this.clusterId, peerConfig, peer, logPrefix); + sources.add(src); + src.startup(); + return src; + } } /** @@ -302,7 +336,7 @@ public class ReplicationSourceManager implements ReplicationListener { * Get a copy of the wals of the first source on this rs * @return a sorted set of wal names */ - protected Map> getWALs() { + protected Map>> getWALs() { return Collections.unmodifiableMap(walsById); } @@ -331,33 +365,110 @@ public class ReplicationSourceManager implements ReplicationListener { } void preLogRoll(Path newLog) throws IOException { - synchronized (this.walsById) { - String name = newLog.getName(); - for (ReplicationSourceInterface source : this.sources) { + checkAndAddSource(newLog); + String logName = newLog.getName(); + String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName); + synchronized (latestPaths) { + Iterator iterator = latestPaths.iterator(); + while (iterator.hasNext()) { + Path path = iterator.next(); + if (path.getName().contains(logPrefix)) { + iterator.remove(); + break; + } + } + this.latestPaths.add(newLog); + } + } + + /** + * Check and enqueue the given log to the correct source. If there's still no source for the + * group to which the given log belongs, create one + * @param logPath the log path to check and enqueue + * @throws IOException + */ + private void checkAndAddSource(Path logPath) throws IOException { + String logName = logPath.getName(); + String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName); + synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for + // the to-be-removed peer + for (String id : replicationPeers.getPeerIds()) { + // update replication queues try { - this.replicationQueues.addLog(source.getPeerClusterZnode(), name); + this.replicationQueues.addLog(id, logName); } catch (ReplicationException e) { - throw new IOException("Cannot add log to replication queue with id=" - + source.getPeerClusterZnode() + ", filename=" + name, e); + String message = + "Cannot add log to queue when creating a new source, queueId=" + id + ", filename=" + + logName; + server.stop(message); + throw new IOException(message, e); + } + // update sources + boolean sourceExists = false; + for (ReplicationSourceInterface src : sources) { + if (src.getPeerClusterZnode().equals(id) && src.getWALGroupId().equals(logPrefix)) { + sourceExists = true; + break; + } + } + if (!sourceExists) { + ReplicationPeerConfig peerConfig; + try { + peerConfig = replicationPeers.getReplicationPeerConfig(id); + } catch (ReplicationException e) { + LOG.error("Failed to get replication peer config for peer " + id, e); + throw new IOException(e); + } + ReplicationPeer peer = replicationPeers.getPeer(id); + ReplicationSourceInterface src = + getReplicationSource(this.conf, this.fs, this, this.replicationQueues, + this.replicationPeers, server, id, this.clusterId, peerConfig, peer, logPrefix); + sources.add(src); + src.startup(); } } - for (SortedSet wals : this.walsById.values()) { - if (this.sources.isEmpty()) { - // If there's no slaves, don't need to keep the old wals since - // we only consider the last one when a new slave comes in - wals.clear(); + } + // update walsById map + synchronized (walsById) { + for (Map.Entry>> entry : this.walsById.entrySet()) { + String peerId = entry.getKey(); + Map> walsByPrefix = entry.getValue(); + boolean existingPrefix = false; + for (Map.Entry> walsEntry : walsByPrefix.entrySet()) { + SortedSet wals = walsEntry.getValue(); + if (this.sources.isEmpty()) { + // If there's no slaves, don't need to keep the old wals since + // we only consider the last one when a new slave comes in + wals.clear(); + } + if (logPrefix.equals(walsEntry.getKey())) { + wals.add(logName); + existingPrefix = true; + } + } + if (!existingPrefix) { + // The new log belongs to a new group, add it into this peer + LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId); + SortedSet wals = new TreeSet(); + wals.add(logName); + walsByPrefix.put(logPrefix, wals); } - wals.add(name); } } - - this.latestPath = newLog; } void postLogRoll(Path newLog) throws IOException { + String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(newLog.getName()); + boolean logEnqueued = false; // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources) { - source.enqueueLog(newLog); + if (source.getWALGroupId().equals(walPrefix)) { + source.enqueueLog(newLog); + if (!logEnqueued) logEnqueued = true; + } + } + if (!logEnqueued) { + LOG.warn("Log " + newLog + " not enqueued into any existing source"); } } @@ -368,6 +479,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param manager the manager to use * @param server the server object for this region server * @param peerId the id of the peer cluster + * @param groupId the group this source is handling * @return the created source * @throws IOException */ @@ -375,8 +487,8 @@ public class ReplicationSourceManager implements ReplicationListener { final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Server server, final String peerId, final UUID clusterId, - final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer) - throws IOException { + final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer, + final String groupId) throws IOException { RegionServerCoprocessorHost rsServerHost = null; TableDescriptors tableDescriptors = null; if (server instanceof HRegionServer) { @@ -414,14 +526,16 @@ public class ReplicationSourceManager implements ReplicationListener { } } } catch (Exception e) { - LOG.warn("Passed replication endpoint implementation throws errors", e); + LOG.warn("Passed replication endpoint implementation throws errors" + + " while initializing ReplicationSource for peer: " + peerId, e); throw new IOException(e); } + // TODO revisit and decide whether to include walGroupId in the metrics id MetricsSource metrics = new MetricsSource(peerId); // init replication source src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, - clusterId, replicationEndpoint, metrics); + clusterId, replicationEndpoint, metrics, groupId); // init replication endpoint replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), @@ -470,7 +584,7 @@ public class ReplicationSourceManager implements ReplicationListener { + sources.size() + " and another " + oldsources.size() + " that were recovered"); String terminateMessage = "Replication stream was removed by a user"; - ReplicationSourceInterface srcToRemove = null; + List srcToRemove = new ArrayList(); List oldSourcesToDelete = new ArrayList(); // First close all the recovered sources for this peer @@ -486,19 +600,23 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Number of deleted recovered sources for " + id + ": " + oldSourcesToDelete.size()); // Now look for the one on this cluster - for (ReplicationSourceInterface src : this.sources) { - if (id.equals(src.getPeerClusterId())) { - srcToRemove = src; - break; + synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source + // for the to-be-removed peer + for (ReplicationSourceInterface src : this.sources) { + if (id.equals(src.getPeerClusterId())) { + srcToRemove.add(src); + } } + if (srcToRemove.size() == 0) { + LOG.error("The queue we wanted to close is missing " + id); + return; + } + for (ReplicationSourceInterface toRemove : srcToRemove) { + toRemove.terminate(terminateMessage); + this.sources.remove(toRemove); + } + deleteSource(id, true); } - if (srcToRemove == null) { - LOG.error("The queue we wanted to close is missing " + id); - return; - } - srcToRemove.terminate(terminateMessage); - this.sources.remove(srcToRemove); - deleteSource(id, true); } @Override @@ -599,7 +717,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface src = getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, - server, peerId, this.clusterId, peerConfig, peer); + server, peerId, this.clusterId, peerConfig, peer, null); if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) { src.terminate("Recovered queue doesn't belong to any current peer"); break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java index e1417b2..c1ad5d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java @@ -49,19 +49,24 @@ public class BoundedRegionGroupingProvider extends RegionGroupingProvider { public void init(final WALFactory factory, final Configuration conf, final List listeners, final String providerId) throws IOException { super.init(factory, conf, listeners, providerId); - // no need to check for and close down old providers; our parent class will throw on re-invoke delegates = new WALProvider[Math.max(1, conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS))]; - for (int i = 0; i < delegates.length; i++) { - delegates[i] = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners, - providerId + i); - } LOG.info("Configured to run with " + delegates.length + " delegate WAL providers."); } @Override - WALProvider populateCache(final byte[] group) { - final WALProvider temp = delegates[counter.getAndIncrement() % delegates.length]; + WALProvider populateCache(final byte[] group) throws IOException { + int idx = counter.getAndIncrement() % delegates.length; + // there's IO cost to initialize the provider, so we only initialize it when necessary + // no need to check for and close down old providers; our parent class will throw on re-invoke + synchronized (delegates) {// synchronize to avoid duplicated initialization + if (delegates[idx] == null) { + delegates[idx] = + factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners, providerId + + idx); + } + } + final WALProvider temp = delegates[idx]; final WALProvider extant = cached.putIfAbsent(group, temp); // if someone else beat us to initializing, just take what they set. // note that in such a case we skew load away from the provider we picked at first @@ -74,7 +79,7 @@ public class BoundedRegionGroupingProvider extends RegionGroupingProvider { IOException failure = null; for (WALProvider provider : delegates) { try { - provider.shutdown(); + if (provider != null) provider.shutdown(); } catch (IOException exception) { LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage()); LOG.debug("Details of problem shutting down provider '" + provider + "'", exception); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java index b41bbfb..f9cbd23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java @@ -363,4 +363,15 @@ public class DefaultWALProvider implements WALProvider { } } + /** + * Get prefix of the log from its name, assuming WAL name in format of + * log_prefix.filenumber.log_suffix @see {@link FSHLog#getCurrentFileName()} + * @param name Name of the WAL to parse + * @return prefix of the log + */ + public static String getWALPrefixFromWALName(String name) { + int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf("."); + return name.substring(0, endIndex); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index eb2c426..1975a96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -123,9 +123,9 @@ class RegionGroupingProvider implements WALProvider { protected RegionGroupingStrategy strategy = null; - private WALFactory factory = null; - private List listeners = null; - private String providerId = null; + protected WALFactory factory = null; + protected List listeners = null; + protected String providerId = null; @Override public void init(final WALFactory factory, final Configuration conf, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index f463f76..e90d7ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -37,15 +37,17 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { ReplicationSourceManager manager; String peerClusterId; Path currentPath; + String groupId; @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, - UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) - throws IOException { + UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics, + String walGroupId) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; + this.groupId = walGroupId; } @Override @@ -89,4 +91,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { public String getStats() { return ""; } + + @Override + public String getWALGroupId() { + return groupId; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 451c39f..c9cad2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -83,6 +83,8 @@ public class TestReplicationEndpoint extends TestReplicationBase { for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) { utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName()); } + // multiple hlog roll requires more time, wait a while for its completion + Threads.sleep(100); } @Test (timeout=120000) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 571be26..c2923ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -25,6 +25,7 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; @@ -229,7 +230,11 @@ public class TestReplicationSourceManager { } wal.sync(); - assertEquals(6, manager.getWALs().get(slaveId).size()); + int logNumber = 0; + for (Map.Entry> entry : manager.getWALs().get(slaveId).entrySet()) { + logNumber += entry.getValue().size(); + } + assertEquals(6, logNumber); wal.rollWriter(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java index 577f0ba..8586d33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java @@ -39,8 +39,10 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -75,6 +77,7 @@ public class TestReplicationWALReaderManager { private int nbRows; private int walEditKVs; private final AtomicLong sequenceId = new AtomicLong(1); + @Rule public TestName tn = new TestName(); @Parameters public static Collection parameters() { @@ -127,7 +130,7 @@ public class TestReplicationWALReaderManager { List listeners = new ArrayList(); pathWatcher = new PathWatcher(); listeners.add(pathWatcher); - final WALFactory wals = new WALFactory(conf, listeners, "some server"); + final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName()); log = wals.getWAL(info.getEncodedNameAsBytes()); }