Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java (revision 1563098) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java (working copy) @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; @@ -375,7 +377,76 @@ } } + /** + * unit test for ReplicationSource.applyThrottling + */ + @Test(timeout=100000) + public void testApplyThrottling() { + LOG.info("testApplyThrottling"); + // 1. cyclePushSize is large, need sleep out some following cycles + // to amortize + long now = EnvironmentEdgeManager.currentTimeMillis(); + AtomicLong cycleStartTick = new AtomicLong(now); + AtomicLong cyclePushSize = new AtomicLong(1000); + long sleepTicks = ReplicationSource.applyThrottling(100.0, 20, + cycleStartTick, cyclePushSize); + assertEquals(0, cyclePushSize.get()); + // note: cycleStartTick will reset after the sleep really occurs in shipEdits()! + assertEquals(now, cycleStartTick.get()); + // sleepTicks1 is about 1000/100 = 10 cycles = 1s + assertTrue(sleepTicks >= (1000-1)); + + now = EnvironmentEdgeManager.currentTimeMillis(); + cycleStartTick.set(now); + cyclePushSize.set(1000); + sleepTicks = ReplicationSource.applyThrottling(10.0, 20, + cycleStartTick, cyclePushSize); + assertEquals(0, cyclePushSize.get()); + // note: cycleStartTick will reset after the sleep really occurs in shipEdits()! + assertEquals(now, cycleStartTick.get()); + // sleepTicks2 is about 1000/10 = 100 cycles = 10s + assertTrue(sleepTicks >= (10*1000-1)); + + // 2. current cycle has passed, switch to next cycle, no sleep + now = EnvironmentEdgeManager.currentTimeMillis(); + cycleStartTick = new AtomicLong(now - 150); // a cycle is 100ms + cyclePushSize = new AtomicLong(10); + sleepTicks = ReplicationSource.applyThrottling(100.0, 20, + cycleStartTick, cyclePushSize); + // cyclePushSize reset to 0 + assertEquals(0, cyclePushSize.get()); + // cycleStartTick reset to NOW, so it's almost the same as 'now' + assertTrue(cycleStartTick.get() - now <= 1); + // no sleep + assertEquals(0, sleepTicks); + + // 3.1 cyclePushSize + currentSize < bandwidthPerCycle: no sleep and + // cyclePushSize and cycleStartTick no change in applyThrotting() + now = EnvironmentEdgeManager.currentTimeMillis(); + cycleStartTick = new AtomicLong(now - 50); + cyclePushSize = new AtomicLong(5); + sleepTicks = ReplicationSource.applyThrottling(100.0, 20, + cycleStartTick, cyclePushSize); + assertEquals(5, cyclePushSize.get()); + assertEquals(now - 50, cycleStartTick.get()); + assertEquals(0, sleepTicks); + + // 3.2 cyclePushSize + currentSize > bandwidthPerCycle: delay the + // push(currentSize) until next cycle by sleeping + now = EnvironmentEdgeManager.currentTimeMillis(); + cycleStartTick.set(now - 50); + cyclePushSize.set(5); + sleepTicks = ReplicationSource.applyThrottling(10.0, 20, + cycleStartTick, cyclePushSize); + // cyclePushSize is reset to 0 + assertEquals(0, cyclePushSize.get()); + // note: cycleStartTick will reset after the sleep really occurs in shipEdits()! + assertEquals(now - 50, cycleStartTick.get()); + // sleepTicks should be (100-50), the original current cycle has 50ms left + assertTrue(sleepTicks >= (100-50-1)); + } + /** * Do a more intense version testSmallBatch, one that will trigger * hlog rolling and other non-trivial code paths Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1563098) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -28,6 +28,7 @@ import java.util.List; import java.util.NavigableMap; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; @@ -96,6 +98,12 @@ private long replicationQueueSizeCapacity; // Max number of entries in entriesArray private int replicationQueueNbCapacity; + // Throttling per-peer-per-node:enabled if perPeerNodeBandwidth > 0 + // a cycle = 100ms, by throttling we guarantee data pushed to peer + // within each cycle(second) won't exceed perPeerNodeBandwidth/10(bytes) + private long perPeerNodeBandwidth; + private AtomicLong cyclePushSize; + private AtomicLong cycleStartTick; // Our reader for the current log private HLog.Reader reader; // Last position in the log that we sent to ZooKeeper @@ -151,6 +159,8 @@ decorateConf(); this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); + this.perPeerNodeBandwidth = + this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); @@ -164,6 +174,8 @@ // replication and make replication specific settings such as compression or codec to use // passing Cells. this.conn = HConnectionManager.getConnection(this.conf); + this.cyclePushSize = new AtomicLong(0l); + this.cycleStartTick = new AtomicLong(EnvironmentEdgeManager.currentTimeMillis()); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; @@ -642,6 +654,50 @@ } /** + * Apply the per-node per-peer throttling + * @param bandwidthPerCycle cycle is 100ms + * @param currentSize data size that is going to be pushed to peer + * @param cycleStartTick is the start tick of current cycle, can only be reset here + * when current cycle has passed, will delay the reset if need sleep + * @param cyclePushSize is the total push size of current cycle, can be reset here + * @return interval that should be slept out according to throttling + */ + public static long applyThrottling(final double bandwidthPerCycle, final int currentSize, + AtomicLong cycleStartTick, AtomicLong cyclePushSize) { + long sleepTicks = 0; + long now = EnvironmentEdgeManager.currentTimeMillis(); + // 1. if cyclePushSize exceeds bandwidthPerCycle, we need to sleep some + // following cycles to amortize, this case can occur when a single push + // exceeds the bandwidthPerCycle + if ((double)cyclePushSize.get() > bandwidthPerCycle) { + double cycles = Math.ceil((double)cyclePushSize.get() / bandwidthPerCycle); + long shouldTillTo = cycleStartTick.get() + (long)(cycles * 100); + if (shouldTillTo > now) { + sleepTicks = shouldTillTo - now; + } else { + // no reset in shipEdits since no sleep, so need reset here! + cycleStartTick.set(now); + } + cyclePushSize.set(0); + } else { + long nextCycleTick = cycleStartTick.get() + 100; + if (now >= nextCycleTick) { + // 2. switch to next cycle if the current cycle has passed + cycleStartTick.set(now); + cyclePushSize.set(0); + } else if (cyclePushSize.get() > 0 && + (double)(cyclePushSize.get() + currentSize) > bandwidthPerCycle) { + // 3. delay the push to next cycle if exceeds throttling bandwidth. + // enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case + // where a cycle's first push size(currentSize) > bandwidthPerCycle + sleepTicks = nextCycleTick - now; + cyclePushSize.set(0); + } + } + return sleepTicks; + } + + /** * Do the shipping logic * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) * written to when this method was called @@ -661,6 +717,21 @@ } SinkPeer sinkPeer = null; try { + // throttling is enabled only when perPeerNodeBandwidth > 0 + if (this.perPeerNodeBandwidth > 0) { + double bandwidthPerCycle = (double)this.perPeerNodeBandwidth / 10.0; + long sleepTicks = applyThrottling(bandwidthPerCycle, currentSize, + this.cycleStartTick, this.cyclePushSize); + if (sleepTicks > 0) { + try { + Thread.sleep(sleepTicks); + } catch (InterruptedException e) { + } + // cycleStartTick needs to reset to (real) NOW whenever a sleep for + // throttling occurs + cycleStartTick.set(EnvironmentEdgeManager.currentTimeMillis()); + } + } sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); if (LOG.isTraceEnabled()) { @@ -675,6 +746,9 @@ this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); this.lastLoggedPosition = this.repLogReader.getPosition(); } + if (this.perPeerNodeBandwidth > 0) { + this.cyclePushSize.set(this.cyclePushSize.get() + currentSize); + } this.totalReplicatedEdits += entries.size(); this.totalReplicatedOperations += currentNbOperations; this.metrics.shipBatch(this.currentNbOperations);