diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 812e4bf..ec2fed0 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.Trace; import com.google.common.annotations.VisibleForTesting; +import java.util.LinkedList; +import java.util.TreeMap; /** * This class allows a continuous flow of requests. It's written to be compatible with a @@ -357,15 +359,25 @@ class AsyncProcess { } throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); } + /** + * See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}. + * Uses default ExecutorService for this AP (must have been created with one). + */ + public AsyncRequestFuture submit(TableName tableName, List rows, + boolean atLeastOne, Batch.Callback callback, boolean needResults, long writeBuffer) + throws InterruptedIOException { + return submit(null, tableName, rows, atLeastOne, callback, needResults, writeBuffer); + } /** * See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}. * Uses default ExecutorService for this AP (must have been created with one). + * Uses unlimited size of request. */ public AsyncRequestFuture submit(TableName tableName, List rows, boolean atLeastOne, Batch.Callback callback, boolean needResults) throws InterruptedIOException { - return submit(null, tableName, rows, atLeastOne, callback, needResults); + return submit(tableName, rows, atLeastOne, callback, needResults, -1); } /** @@ -382,7 +394,7 @@ class AsyncProcess { */ public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, List rows, boolean atLeastOne, Batch.Callback callback, - boolean needResults) throws InterruptedIOException { + boolean needResults, long writeBuffer) throws InterruptedIOException { if (rows.isEmpty()) { return NO_REQS_RESULT; } @@ -401,10 +413,7 @@ class AsyncProcess { // Wait until there is at least one slot for a new task. waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString()); - // Remember the previous decisions about regions or region servers we put in the - // final multi. - Map regionIncluded = new HashMap(); - Map serverIncluded = new HashMap(); + OperationTaker taker = new OperationTaker(); int posInList = -1; Iterator it = rows.iterator(); @@ -435,8 +444,8 @@ class AsyncProcess { it.remove(); break; // Backward compat: we stop considering actions on location error. } - - if (canTakeOperation(loc, regionIncluded, serverIncluded)) { + long requestSize = (r instanceof Mutation) ? ((Mutation) r).heapSize() : 0; + if (taker.canTakeOperation(loc, requestSize, writeBuffer)) { Action action = new Action(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); @@ -497,73 +506,6 @@ class AsyncProcess { } /** - * Check if we should send new operations to this region or region server. - * We're taking into account the past decision; if we have already accepted - * operation on a given region, we accept all operations for this region. - * - * @param loc; the region and the server name we want to use. - * @return true if this region is considered as busy. - */ - protected boolean canTakeOperation(HRegionLocation loc, - Map regionsIncluded, - Map serversIncluded) { - HRegionInfo regionInfo = loc.getRegionInfo(); - Boolean regionPrevious = regionsIncluded.get(regionInfo); - - if (regionPrevious != null) { - // We already know what to do with this region. - return regionPrevious; - } - - Boolean serverPrevious = serversIncluded.get(loc.getServerName()); - if (Boolean.FALSE.equals(serverPrevious)) { - // It's a new region, on a region server that we have already excluded. - regionsIncluded.put(regionInfo, Boolean.FALSE); - return false; - } - - AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); - if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { - // Too many tasks on this region already. - regionsIncluded.put(regionInfo, Boolean.FALSE); - return false; - } - - if (serverPrevious == null) { - // The region is ok, but we need to decide for this region server. - int newServers = 0; // number of servers we're going to contact so far - for (Map.Entry kv : serversIncluded.entrySet()) { - if (kv.getValue()) { - newServers++; - } - } - - // Do we have too many total tasks already? - boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks; - - if (ok) { - // If the total is fine, is it ok for this individual server? - AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); - ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer); - } - - if (!ok) { - regionsIncluded.put(regionInfo, Boolean.FALSE); - serversIncluded.put(loc.getServerName(), Boolean.FALSE); - return false; - } - - serversIncluded.put(loc.getServerName(), Boolean.TRUE); - } else { - assert serverPrevious.equals(Boolean.TRUE); - } - - regionsIncluded.put(regionInfo, Boolean.TRUE); - - return true; - } - - /** * See {@link #submitAll(ExecutorService, TableName, List, Batch.Callback, Object[])}. * Uses default ExecutorService for this AP (must have been created with one). */ @@ -617,7 +559,95 @@ class AsyncProcess { if (!(r instanceof Append) && !(r instanceof Increment)) return; action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. } + protected class OperationTaker { + // Remember the previous decisions about regions or region servers we put in the + // final multi. + private final Map regionsIncluded = new HashMap<>(); + private final Map serversIncluded = new HashMap<>(); + private final Map serverRequestSizes = new HashMap<>(); + + protected boolean canTakeOperation(HRegionLocation loc, long requestSize, long writeBuffer) { + return checkTaskCount(loc) + && checkTaskSize(loc, requestSize, writeBuffer); + } + protected boolean checkTaskSize(HRegionLocation loc, long requestSize, long writeBuffer) { + if (writeBuffer <= 0) { + return true; + } + // Is it ok for limit of request size? + long requestSizePrevious = serverRequestSizes.getOrDefault(loc.getServerName(), 0L); + // accept at least one request + if (requestSizePrevious == 0 || requestSizePrevious + requestSize <= writeBuffer) { + serverRequestSizes.put(loc.getServerName(), requestSizePrevious + requestSize); + return true; + } + return false; + } + /** + * Check if we should send new operations to this region or region server. + * We're taking into account the past decision; if we have already accepted + * operation on a given region, we accept all operations for this region. + * + * @param loc; the region and the server name we want to use. + * @return true if this region is considered as busy. + */ + protected boolean checkTaskCount(HRegionLocation loc) { + HRegionInfo regionInfo = loc.getRegionInfo(); + Boolean regionPrevious = regionsIncluded.get(regionInfo); + + if (regionPrevious != null) { + // We already know what to do with this region. + return regionPrevious; + } + + Boolean serverPrevious = serversIncluded.get(loc.getServerName()); + if (Boolean.FALSE.equals(serverPrevious)) { + // It's a new region, on a region server that we have already excluded. + regionsIncluded.put(regionInfo, Boolean.FALSE); + return false; + } + + AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); + if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { + // Too many tasks on this region already. + regionsIncluded.put(regionInfo, Boolean.FALSE); + return false; + } + + if (serverPrevious == null) { + // The region is ok, but we need to decide for this region server. + int newServers = 0; // number of servers we're going to contact so far + for (Map.Entry kv : serversIncluded.entrySet()) { + if (kv.getValue()) { + newServers++; + } + } + + // Do we have too many total tasks already? + boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks; + + if (ok) { + // If the total is fine, is it ok for this individual server? + AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); + ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer); + } + if (!ok) { + regionsIncluded.put(regionInfo, Boolean.FALSE); + serversIncluded.put(loc.getServerName(), Boolean.FALSE); + return false; + } + + serversIncluded.put(loc.getServerName(), Boolean.TRUE); + } else { + assert serverPrevious.equals(Boolean.TRUE); + } + + regionsIncluded.put(regionInfo, Boolean.TRUE); + + return true; + } + } /** * The context, and return value, for a single submit/submitAll call. * Note on how this class (one AP submit) works. Initially, all requests are split into groups @@ -751,6 +781,20 @@ class AsyncProcess { this.callsInProgress = callsInProgress; } + @VisibleForTesting + long heapSize() { + long heapSize = 0; + for (Map.Entry>> e: this.multiAction.actions.entrySet()) { + List> actions = e.getValue(); + for (Action action: actions) { + Row row = action.getAction(); + if (row instanceof Mutation) { + heapSize += ((Mutation) row).heapSize(); + } + } + } + return heapSize; + } @Override public void run() { MultiResponse res; @@ -832,7 +876,7 @@ class AsyncProcess { private final long nonceGroup; private PayloadCarryingServerCallable currentCallable; private int currentCallTotalTimeout; - + private final Map> requestHeapSize = new TreeMap<>(); public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, Batch.Callback callback, PayloadCarryingServerCallable callable, int timeout) { @@ -912,6 +956,21 @@ class AsyncProcess { return callsInProgress; } + @VisibleForTesting + Map> getRequestHeapSize() { + return requestHeapSize; + } + + private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server, + SingleServerRequestRunnable runnable) { + List heapCount = requestHeapSize.get(server); + if (heapCount == null) { + heapCount = new LinkedList<>(); + requestHeapSize.put(server, heapCount); + } + heapCount.add(runnable.heapSize()); + return runnable; + } /** * Group a list of actions per region servers, and send them. * @@ -1081,8 +1140,9 @@ class AsyncProcess { if (connection.getConnectionMetrics() != null) { connection.getConnectionMetrics().incrNormalRunners(); } - return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); + SingleServerRequestRunnable runner = addSingleServerRequestHeapSize(server, + new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); + return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runner)); } // group the actions by the amount of delay @@ -1103,9 +1163,8 @@ class AsyncProcess { List toReturn = new ArrayList(actions.size()); for (DelayingRunner runner : actions.values()) { String traceText = "AsyncProcess.sendMultiAction"; - Runnable runnable = - new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, - callsInProgress); + Runnable runnable = addSingleServerRequestHeapSize(server, + new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); // use a delay runner only if we need to sleep for some time if (runner.getSleepTime() > 0) { runner.setRunner(runnable); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index e98ad4e..cecb35e 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -211,12 +211,7 @@ public class BufferedMutatorImpl implements BufferedMutator { // Grab all of the available mutations. Mutation m; - // If there's no buffer size drain everything. If there is a buffersize drain up to twice - // that amount. This should keep the loop from continually spinning if there are threads - // that keep adding more data to the buffer. - while ( - (writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || synchronous) - && (m = writeAsyncBuffer.poll()) != null) { + while ((m = writeAsyncBuffer.poll()) != null) { buffer.add(m); long size = m.heapSize(); dequeuedSize += size; @@ -228,7 +223,7 @@ public class BufferedMutatorImpl implements BufferedMutator { } if (!synchronous) { - ap.submit(tableName, buffer, true, null, false); + ap.submit(tableName, buffer, true, null, false, writeBufferSize); if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); @@ -236,7 +231,7 @@ public class BufferedMutatorImpl implements BufferedMutator { } if (synchronous || ap.hasError()) { while (!buffer.isEmpty()) { - ap.submit(tableName, buffer, true, null, false); + ap.submit(tableName, buffer, true, null, false, writeBufferSize); } RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString()); diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index d943316..28e048a 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.BlockingQueue; @@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; +import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -65,6 +67,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import org.junit.BeforeClass; import org.junit.Rule; @@ -72,6 +75,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.mockito.Mockito; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; @Category({ClientTests.class, MediumTests.class}) public class TestAsyncProcess { @@ -181,10 +186,10 @@ public class TestAsyncProcess { @Override public AsyncRequestFuture submit(TableName tableName, List rows, - boolean atLeastOne, Callback callback, boolean needResults) + boolean atLeastOne, Callback callback, boolean needResults, long writeBuffer) throws InterruptedIOException { // We use results in tests to check things, so override to always save them. - return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); + return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true, writeBuffer); } @Override @@ -435,6 +440,135 @@ public class TestAsyncProcess { } } + private static long calculateRequestCount(long putSizePerServer, long writeBuffer) { + if (putSizePerServer <= writeBuffer) { + return 1; + } else if (putSizePerServer % writeBuffer == 0) { + return putSizePerServer / writeBuffer; + } else { + return putSizePerServer / writeBuffer + 1; + } + } + + @Test + public void testSubmitSameSizeOfRequestAndBuffer() throws Exception { + long writeBuffer = 2 * 1024 * 1024; + long putsHeapSize = writeBuffer; + doSubmitRequest(writeBuffer, putsHeapSize); + } + + @Test + public void testSubmitLargeRequestWithUnlimitedBuffer() throws Exception { + long writeBuffer = -1; + long putsHeapSize = 2 * 1024 * 1024; + doSubmitRequest(writeBuffer, putsHeapSize); + } + + @Test + public void testSubmitRandomSizeRequest() throws Exception { + Random rn = new Random(); + final long limit = 10 * 1024 * 1024; + for (int count = 0; count != 10; ++count) { + long writeBuffer = Math.min(limit, (Math.abs(rn.nextLong()) % limit) + 1); + long putsHeapSize = Math.min(limit, (Math.abs(rn.nextLong()) % limit) + 1); + LOG.info("[testSubmitRandomSizeRequest] writeBuffer=" + writeBuffer + ", putsHeapSize=" + putsHeapSize); + doSubmitRequest(writeBuffer, putsHeapSize); + } + } + + @Test + public void testSubmitSmallRequest() throws Exception { + long writeBuffer = 2 * 1024 * 1024; + long putsHeapSize = 100; + doSubmitRequest(writeBuffer, putsHeapSize); + } + + @Test + public void testSubmitLargeRequest() throws Exception { + long writeBuffer = 2 * 1024 * 1024; + long putsHeapSize = writeBuffer * 2; + doSubmitRequest(writeBuffer, putsHeapSize); + } + + private void doSubmitRequest(long writeBuffer, long putsHeapSize) throws Exception { + ClusterConnection conn = createHConnection(); + BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); + bufferParam.writeBufferSize(writeBuffer); + + int maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); + int maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); + List rows = Arrays.asList(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); + + long sizeCount = 0; + List puts = new ArrayList<>(); + while (sizeCount <= putsHeapSize) { + for (byte[] row : rows) { + Put put = new Put(row); + put.addColumn(row, row, row); + puts.add(put); + sizeCount += put.heapSize(); + } + } + // sn has two regions + long putSizeSN = sizeCount / rows.size() * 2; + long putSizeSN2 = sizeCount / rows.size(); + boolean submitAtOnceSN = writeBuffer <= 0 ? true + : calculateRequestCount(putSizeSN, writeBuffer) <= Math.min(maxConcurrentTasksPerRegion, maxConcurrentTasksPerServer); + boolean submitAtOnceSN2 = writeBuffer <= 0 ? true + : calculateRequestCount(putSizeSN2, writeBuffer) <= Math.min(maxConcurrentTasksPerRegion, maxConcurrentTasksPerServer); + + try (HTable ht = new HTable(conn, bufferParam)) { + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + ht.mutator.ap = ap; + + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + ht.put(puts); + List reqs = ap.allReqs; + + if (submitAtOnceSN && submitAtOnceSN2) { + // 1) submit all data + // 2) flush (no data -> no req) + assertEquals(1, reqs.size()); + } + Map sizePerServers = new HashMap<>(); + for (AsyncRequestFuture req : reqs) { + if (!(req instanceof AsyncRequestFutureImpl)) { + continue; + } + AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + Map> requestHeapSize = ars.getRequestHeapSize(); + for (Map.Entry> entry : requestHeapSize.entrySet()) { + assertEquals(Math.min(maxConcurrentTasksPerRegion, maxConcurrentTasksPerServer), entry.getValue().size()); + long sum = 0; + for (long size : entry.getValue()) { + if (writeBuffer > 0) { + assertEquals(true, size <= writeBuffer); + } + sum += size; + } + if (writeBuffer <= 0) { + if (entry.getKey().equals(sn)) { + assertEquals(putSizeSN, sum); + } + if (entry.getKey().equals(sn2)) { + assertEquals(putSizeSN2, sum); + } + } else { + assertEquals(true, sum <= writeBuffer); + } + sizePerServers.put(entry.getKey(), sizePerServers.getOrDefault(entry.getKey(), 0L) + sum); + } + } + assertEquals(true, sizePerServers.containsKey(sn)); + assertEquals(true, sizePerServers.containsKey(sn2)); + assertEquals(false, sizePerServers.containsKey(sn3)); + assertEquals(putSizeSN, (long) sizePerServers.get(sn)); + assertEquals(putSizeSN2, (long) sizePerServers.get(sn2)); + } + } + @Test public void testSubmit() throws Exception { ClusterConnection hc = createHConnection();