From 46e8be7cf15a9789a125812188291bfa33094737 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 22 Mar 2016 16:14:20 +0800 Subject: [PATCH] HBASE-15360 addendum fix testCoDelScheduling --- .../hbase/ipc/AdaptiveLifoCoDelCallQueue.java | 5 +- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 2 +- .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 135 ++++++++++++++------- 3 files changed, 96 insertions(+), 46 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java index 37e86be..266c6a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading. @@ -77,7 +78,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { private volatile long minDelay; // the moment when current interval ends - private volatile long intervalTime = System.currentTimeMillis(); + private volatile long intervalTime = EnvironmentEdgeManager.currentTime(); // switch to ensure only one threads does interval cutoffs private AtomicBoolean resetDelay = new AtomicBoolean(true); @@ -147,7 +148,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { * and internal queue state (deemed overloaded). */ private boolean needToDrop(CallRunner callRunner) { - long now = System.currentTimeMillis(); + long now = EnvironmentEdgeManager.currentTime(); long callDelay = now - callRunner.getCall().timestamp; long localMinDelay = this.minDelay; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 0cd34bb..431aeeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -203,7 +203,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches}; - callExecutor = new RWQueueRpcExecutor("B.default", handlerCount, + callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, callqReadShare, callqScanShare, AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs, AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 6454537..97ef973 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -17,28 +17,49 @@ */ package org.apache.hadoop.hbase.ipc; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.google.protobuf.Message; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.RPCTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.ipc.RpcServer.Call; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.junit.Before; import org.junit.Rule; @@ -48,25 +69,11 @@ import org.junit.rules.TestRule; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.protobuf.Message; @Category({RPCTests.class, SmallTests.class}) public class TestSimpleRpcScheduler { @@ -218,7 +225,7 @@ public class TestSimpleRpcScheduler { scheduler.dispatch(smallCallTask); while (work.size() < 8) { - Threads.sleepWithoutInterrupt(100); + Thread.sleep(100); } int seqSum = 0; @@ -298,7 +305,7 @@ public class TestSimpleRpcScheduler { scheduler.dispatch(scanCallTask); while (work.size() < 6) { - Threads.sleepWithoutInterrupt(100); + Thread.sleep(100); } for (int i = 0; i < work.size() - 2; i += 3) { @@ -326,6 +333,13 @@ public class TestSimpleRpcScheduler { }).when(callTask).run(); } + private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler) + throws InterruptedException { + while (scheduler.getGeneralQueueLength() > 0) { + Thread.sleep(100); + } + } + @Test public void testSoftAndHardQueueLimits() throws Exception { Configuration schedConf = HBaseConfiguration.create(); @@ -354,9 +368,7 @@ public class TestSimpleRpcScheduler { schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0); scheduler.onConfigurationChange(schedConf); assertFalse(scheduler.dispatch(putCallTask)); - while (scheduler.getGeneralQueueLength() > 0) { - Threads.sleepWithoutInterrupt(100); - } + waitUntilQueueEmpty(scheduler); schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1); scheduler.onConfigurationChange(schedConf); assertTrue(scheduler.dispatch(putCallTask)); @@ -365,8 +377,30 @@ public class TestSimpleRpcScheduler { } } + private static final class CoDelEnvironmentEdge implements EnvironmentEdge { + + private final BlockingQueue timeQ = new LinkedBlockingQueue<>(); + + private long offset; + + private final Set threadNamePrefixs = new HashSet<>(); + + @Override + public long currentTime() { + for (String threadNamePrefix : threadNamePrefixs) { + if (Thread.currentThread().getName().startsWith(threadNamePrefix)) { + return timeQ.poll().longValue() + offset; + } + } + return System.currentTimeMillis(); + } + } + @Test public void testCoDelScheduling() throws Exception { + CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); + envEdge.threadNamePrefixs.add("RW.default"); + envEdge.threadNamePrefixs.add("B.default"); Configuration schedConf = HBaseConfiguration.create(); schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, @@ -379,36 +413,51 @@ public class TestSimpleRpcScheduler { HConstants.QOS_THRESHOLD); try { scheduler.start(); - + EnvironmentEdgeManager.injectEdge(envEdge); + envEdge.offset = 5; // calls faster than min delay for (int i = 0; i < 100; i++) { - CallRunner cr = getMockedCallRunner(); + long time = System.currentTimeMillis(); + envEdge.timeQ.put(time); + CallRunner cr = getMockedCallRunner(time); Thread.sleep(5); scheduler.dispatch(cr); } - Thread.sleep(100); // make sure fast calls are handled + // make sure fast calls are handled + waitUntilQueueEmpty(scheduler); + Thread.sleep(100); assertEquals("None of these calls should have been discarded", 0, scheduler.getNumGeneralCallsDropped()); + envEdge.offset = 6; // calls slower than min delay, but not individually slow enough to be dropped for (int i = 0; i < 20; i++) { - CallRunner cr = getMockedCallRunner(); + long time = System.currentTimeMillis(); + envEdge.timeQ.put(time); + CallRunner cr = getMockedCallRunner(time); Thread.sleep(6); scheduler.dispatch(cr); } - Thread.sleep(100); // make sure somewhat slow calls are handled + // make sure somewhat slow calls are handled + waitUntilQueueEmpty(scheduler); + Thread.sleep(100); assertEquals("None of these calls should have been discarded", 0, scheduler.getNumGeneralCallsDropped()); + envEdge.offset = 12; // now slow calls and the ones to be dropped for (int i = 0; i < 20; i++) { - CallRunner cr = getMockedCallRunner(); + long time = System.currentTimeMillis(); + envEdge.timeQ.put(time); + CallRunner cr = getMockedCallRunner(time); Thread.sleep(12); scheduler.dispatch(cr); } - Thread.sleep(100); // make sure somewhat slow calls are handled + // make sure somewhat slow calls are handled + waitUntilQueueEmpty(scheduler); + Thread.sleep(100); assertTrue("There should have been at least 12 calls dropped", scheduler.getNumGeneralCallsDropped() > 12); } finally { @@ -416,7 +465,7 @@ public class TestSimpleRpcScheduler { } } - private CallRunner getMockedCallRunner() throws IOException { + private CallRunner getMockedCallRunner(long timestamp) throws IOException { CallRunner putCallTask = mock(CallRunner.class); RpcServer.Call putCall = mock(RpcServer.Call.class); putCall.param = RequestConverter.buildMutateRequest( @@ -424,7 +473,7 @@ public class TestSimpleRpcScheduler { RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(); when(putCallTask.getCall()).thenReturn(putCall); when(putCall.getHeader()).thenReturn(putHead); - putCall.timestamp = System.currentTimeMillis(); + putCall.timestamp = timestamp; return putCallTask; } } -- 1.9.1