diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java index 6343af6..02002f9 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java @@ -49,30 +49,39 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** - * Holds back the request if the submitted size or number has reached the - * threshold. + * Holds back the requests if they reach any thresholds. */ @InterfaceAudience.Private @InterfaceStability.Evolving class SimpleRequestController implements RequestController { private static final Log LOG = LogFactory.getLog(SimpleRequestController.class); /** - * The maximum size of single RegionServer. + * The maximum heap size for each request. */ public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize"; /** - * Default value of #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE + * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}. */ @VisibleForTesting static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304; /** + * The maximum number of rows for each request. + */ + public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS = "hbase.client.max.perrequest.rows"; + /** + * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}. + */ + @VisibleForTesting + static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS = 2048; + + /** * The maximum size of submit. */ public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize"; /** - * Default value of #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE + * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}. */ @VisibleForTesting static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE; @@ -89,9 +98,13 @@ class SimpleRequestController implements RequestController { private final int maxTotalConcurrentTasks; /** - * The max heap size of all tasks simultaneously executed on a server. + * The maximum heap size for each request. */ private final long maxHeapSizePerRequest; + /** + * The maximum number of rows for each request. + */ + private final long maxRowsPerRequest; private final long maxHeapSizeSubmit; /** * The number of tasks we run in parallel on a single region. With 1 (the @@ -124,6 +137,8 @@ class SimpleRequestController implements RequestController { HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); + this.maxRowsPerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_ROWS, + DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS); this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE); this.thresholdToLogUndoneTaskDetails = conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS, @@ -143,10 +158,13 @@ class SimpleRequestController implements RequestController { + maxConcurrentTasksPerRegion); } if (this.maxHeapSizePerRequest <= 0) { - throw new IllegalArgumentException("maxHeapSizePerServer=" + throw new IllegalArgumentException("maxHeapSizePerRequest=" + maxHeapSizePerRequest); } - + if (this.maxRowsPerRequest <= 0) { + throw new IllegalArgumentException("maxRowsPerRequest=" + + maxRowsPerRequest); + } if (this.maxHeapSizeSubmit <= 0) { throw new IllegalArgumentException("maxHeapSizeSubmit=" + maxHeapSizeSubmit); @@ -208,15 +226,16 @@ class SimpleRequestController implements RequestController { @Override public Checker newChecker() { - List checkers = new ArrayList<>(3); + List checkers = new ArrayList<>(4); checkers.add(new TaskCountChecker(maxTotalConcurrentTasks, maxConcurrentTasksPerServer, maxConcurrentTasksPerRegion, tasksInProgress, taskCounterPerServer, taskCounterPerRegion)); - checkers.add(new RequestSizeChecker(maxHeapSizePerRequest)); + checkers.add(new RequestHeapSizeChecker(maxHeapSizePerRequest)); checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit)); + checkers.add(new RequestRowsChecker(maxRowsPerRequest)); return newChecker(checkers); } @@ -454,15 +473,54 @@ class SimpleRequestController implements RequestController { } /** - * limit the request size for each regionserver. + * limit the number of rows for each request. + */ + @VisibleForTesting + static class RequestRowsChecker implements RowChecker { + + private final long maxRowsPerRequest; + private final Map serverRows = new HashMap<>(); + + RequestRowsChecker(final long maxRowsPerRequest) { + this.maxRowsPerRequest = maxRowsPerRequest; + } + + @Override + public void reset() { + serverRows.clear(); + } + + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + long currentRows = serverRows.containsKey(loc.getServerName()) + ? serverRows.get(loc.getServerName()) : 0L; + // accept at least one row + if (currentRows == 0 || currentRows < maxRowsPerRequest) { + return ReturnCode.INCLUDE; + } + return ReturnCode.SKIP; + } + + @Override + public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) { + if (code == ReturnCode.INCLUDE) { + long currentRows = serverRows.containsKey(loc.getServerName()) + ? serverRows.get(loc.getServerName()) : 0L; + serverRows.put(loc.getServerName(), currentRows + 1); + } + } + } + + /** + * limit the heap size for each request. */ @VisibleForTesting - static class RequestSizeChecker implements RowChecker { + static class RequestHeapSizeChecker implements RowChecker { private final long maxHeapSizePerRequest; private final Map serverRequestSizes = new HashMap<>(); - RequestSizeChecker(final long maxHeapSizePerRequest) { + RequestHeapSizeChecker(final long maxHeapSizePerRequest) { this.maxHeapSizePerRequest = maxHeapSizePerRequest; } diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java index b46e572..8de3eb4 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java @@ -56,7 +56,6 @@ public class TestSimpleRequestController { private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes(); private static final ServerName SN = ServerName.valueOf("s1:1,1"); private static final ServerName SN2 = ServerName.valueOf("s2:2,2"); - private static final ServerName SN3 = ServerName.valueOf("s3:3,3"); private static final HRegionInfo HRI1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); private static final HRegionInfo HRI2 @@ -68,7 +67,7 @@ public class TestSimpleRequestController { private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2); @Test - public void testIllegalRequestSize() { + public void testIllegalRequestHeapSize() { testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1); } @@ -87,9 +86,14 @@ public class TestSimpleRequestController { testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1); } + @Test + public void testIllegalRequestRows() { + testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1); + } + private void testIllegalArgument(String key, long value) { Configuration conf = HBaseConfiguration.create(); - conf.setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1); + conf.setLong(key, value); try { SimpleRequestController controller = new SimpleRequestController(conf); fail("The " + key + " must be bigger than zero"); @@ -121,7 +125,7 @@ public class TestSimpleRequestController { tasksInProgress, taskCounterPerServer, taskCounterPerRegion); final long maxHeapSizePerRequest = 2 * 1024 * 1024; // unlimiited - SimpleRequestController.RequestSizeChecker sizeChecker = new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest); + SimpleRequestController.RequestHeapSizeChecker sizeChecker = new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest); RequestController.Checker checker = SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker)); ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest)); assertEquals(ReturnCode.INCLUDE, loc1Code); @@ -151,10 +155,10 @@ public class TestSimpleRequestController { } @Test - public void testRequestSizeCheckerr() throws IOException { + public void testRequestHeapSizeChecker() throws IOException { final long maxHeapSizePerRequest = 2 * 1024 * 1024; - SimpleRequestController.RequestSizeChecker checker - = new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest); + SimpleRequestController.RequestHeapSizeChecker checker + = new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest); // inner state is unchanged. for (int i = 0; i != 10; ++i) { @@ -193,6 +197,51 @@ public class TestSimpleRequestController { } @Test + public void testRequestRowsChecker() throws IOException { + final long maxRowCount = 100; + SimpleRequestController.RequestRowsChecker checker + = new SimpleRequestController.RequestRowsChecker(maxRowCount); + + final long rowSize = 100; //unused + // inner state is unchanged. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(LOC1, rowSize); + assertEquals(ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(LOC2, rowSize); + assertEquals(ReturnCode.INCLUDE, code); + } + + // accept the data located on LOC1 region. + for (int i = 0; i != maxRowCount; ++i) { + ReturnCode acceptCode = checker.canTakeOperation(LOC1, rowSize); + assertEquals(ReturnCode.INCLUDE, acceptCode); + checker.notifyFinal(acceptCode, LOC1, rowSize); + } + + // the sn server reachs the limit. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(LOC1, rowSize); + assertNotEquals(ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(LOC2, rowSize); + assertNotEquals(ReturnCode.INCLUDE, code); + } + + // the request to sn2 server should be accepted. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(LOC3, rowSize); + assertEquals(ReturnCode.INCLUDE, code); + } + + checker.reset(); + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(LOC1, rowSize); + assertEquals(ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(LOC2, rowSize); + assertEquals(ReturnCode.INCLUDE, code); + } + } + + @Test public void testSubmittedSizeChecker() { final long maxHeapSizeSubmit = 2 * 1024 * 1024; SimpleRequestController.SubmittedSizeChecker checker