diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 1a53c24..884bce1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -163,6 +163,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // update metrics this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); + replicationSinkMgr.reportSinkSuccess(sinkPeer); return true; } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java index b186e08..8ce08cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java @@ -140,6 +140,24 @@ public class ReplicationSinkManager { } } + /** + * Report that a {@code SinkPeer} successfully replicated a chunk of data. + * + * @param sinkPeer + * The SinkPeer that had a failed replication attempt on it + */ + public void reportSinkSuccess(SinkPeer sinkPeer) { + ServerName serverName = sinkPeer.getServerName(); + if (badReportCounts.containsKey(serverName)) { + int badReportCount = badReportCounts.get(serverName)-1; + if (badReportCount <= 0) { + badReportCounts.remove(serverName); + } else { + badReportCounts.put(serverName, badReportCount); + } + } + } + void chooseSinks() { List slaveAddresses = endpoint.getRegionServers(); Collections.shuffle(slaveAddresses, random); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java index a2ea258..0058419 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java @@ -111,7 +111,7 @@ public class TestReplicationSinkManager { @Test public void testReportBadSink_PastThreshold() { List serverNames = Lists.newArrayList(); - for (int i = 0; i < 20; i++) { + for (int i = 0; i < 30; i++) { serverNames.add(mock(ServerName.class)); } when(replicationEndpoint.getRegionServers()) @@ -120,18 +120,38 @@ public class TestReplicationSinkManager { sinkManager.chooseSinks(); // Sanity check - assertEquals(2, sinkManager.getSinks().size()); + assertEquals(3, sinkManager.getSinks().size()); ServerName serverName = sinkManager.getSinks().get(0); SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); + sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { sinkManager.reportBadSink(sinkPeer); } // Reporting a bad sink more than the threshold count should remove it // from the list of potential sinks + assertEquals(2, sinkManager.getSinks().size()); + + // + // now try a sink that has some successes + // + serverName = sinkManager.getSinks().get(0); + + sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); + for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) { + sinkManager.reportBadSink(sinkPeer); + } + sinkManager.reportSinkSuccess(sinkPeer); // one success + sinkManager.reportBadSink(sinkPeer); + + // did not remove the sink, since we had one successful try + assertEquals(2, sinkManager.getSinks().size()); + + sinkManager.reportBadSink(sinkPeer); + // but we exhausted the tries assertEquals(1, sinkManager.getSinks().size()); }