diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index c72fb0f..64efde2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -684,21 +685,33 @@ class AsyncProcess { private final MultiAction multiAction; private final int numAttempt; private final ServerName server; + private final Set> callsInProgress; private SingleServerRequestRunnable( - MultiAction multiAction, int numAttempt, ServerName server) { + MultiAction multiAction, int numAttempt, ServerName server, + Set> callsInProgress) { this.multiAction = multiAction; this.numAttempt = numAttempt; this.server = server; + this.callsInProgress = callsInProgress; } @Override public void run() { MultiResponse res; + MultiServerCallable callable = null; try { - MultiServerCallable callable = createCallable(server, tableName, multiAction); + callable = createCallable(server, tableName, multiAction); try { - res = createCaller(callable).callWithoutRetries(callable, timeout); + RpcRetryingCaller caller = createCaller(callable); + if (callsInProgress != null) callsInProgress.add(callable); + res = caller.callWithoutRetries(callable, timeout); + + if (res == null) { + // Cancelled + return; + } + } catch (IOException e) { // The service itself failed . It may be an error coming from the communication // layer, but, as well, a functional error raised by the server. @@ -721,6 +734,9 @@ class AsyncProcess { throw new RuntimeException(t); } finally { decTaskCounters(multiAction.getRegions(), server); + if (callsInProgress != null && callable != null) { + callsInProgress.remove(callable); + } } } } @@ -729,6 +745,7 @@ class AsyncProcess { private final BatchErrors errors; private final ConnectionManager.ServerErrorTracker errorsByServer; private final ExecutorService pool; + private final Set> callsInProgress; private final TableName tableName; @@ -813,10 +830,17 @@ class AsyncProcess { } else { this.replicaGetIndices = null; } + this.callsInProgress = !hasAnyReplicaGets ? null : + Collections.newSetFromMap(new ConcurrentHashMap, Boolean>()); + this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); } + public Set> getCallsInProgress() { + return callsInProgress; + } + /** * Group a list of actions per region servers, and send them. * @@ -979,7 +1003,7 @@ class AsyncProcess { // no stats to manage, just do the standard action if (AsyncProcess.this.connection.getStatisticsTracker() == null) { return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server))); + new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); } // group the actions by the amount of delay @@ -1001,7 +1025,8 @@ class AsyncProcess { for (DelayingRunner runner : actions.values()) { String traceText = "AsyncProcess.sendMultiAction"; Runnable runnable = - new SingleServerRequestRunnable(runner.getActions(), numAttempt, server); + new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, + callsInProgress); // use a delay runner only if we need to sleep for some time if (runner.getSleepTime() > 0) { runner.setRunner(runnable); @@ -1517,6 +1542,12 @@ class AsyncProcess { waitUntilDone(Long.MAX_VALUE); } catch (InterruptedException iex) { throw new InterruptedIOException(iex.getMessage()); + } finally { + if (callsInProgress != null) { + for (MultiServerCallable clb : callsInProgress) { + clb.cancel(); + } + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 8d63105..9dbebb4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -50,21 +51,21 @@ import com.google.protobuf.ServiceException; * {@link RegionServerCallable} that goes against multiple regions. * @param */ -class MultiServerCallable extends RegionServerCallable { +class MultiServerCallable extends RegionServerCallable implements Cancellable { private final MultiAction multiAction; private final boolean cellBlock; - private RpcControllerFactory rpcFactory; + private final PayloadCarryingRpcController controller; MultiServerCallable(final ClusterConnection connection, final TableName tableName, final ServerName location, RpcControllerFactory rpcFactory, final MultiAction multi) { super(connection, tableName, null); - this.rpcFactory = rpcFactory; this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so // we will store the server here, and throw if someone tries to obtain location/regioninfo. this.location = new HRegionLocation(null, location); this.cellBlock = isCellBlock(); + controller = rpcFactory.newController(); } @Override @@ -119,7 +120,7 @@ class MultiServerCallable extends RegionServerCallable { // Controller optionally carries cell data over the proxy/service boundary and also // optionally ferries cell response data back out again. - PayloadCarryingRpcController controller = rpcFactory.newController(cells); + if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells)); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); ClientProtos.MultiResponse responseProto; @@ -129,10 +130,19 @@ class MultiServerCallable extends RegionServerCallable { } catch (ServiceException e) { throw ProtobufUtil.getRemoteException(e); } + if (responseProto == null) return null; // Occurs on cancel return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner()); } + @Override + public void cancel() { + controller.startCancel(); + } + @Override + public boolean isCancelled() { + return controller.isCanceled(); + } /** * @return True if we should send data in cellblocks. This is an expensive call. Cache the diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 7069a42..054c9b5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -307,9 +307,14 @@ public class AsyncRpcChannel { controller.notifyOnCancel(new RpcCallback() { @Override public void run(Object parameter) { - failCall(call, new IOException("Canceled connection")); + calls.remove(call.id); } }); + if (controller.isCanceled()) { + // To finish if the call was cancelled before we set the notification (race condition) + call.cancel(true); + return call; + } calls.put(call.id, call); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index a8b21ba..66f3c0c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; +import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -55,10 +57,12 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -531,6 +535,71 @@ public class TestReplicasClient { } @Test + public void testCancelOfMultiGet() throws Exception { + openRegion(hriSecondary); + try { + List puts = new ArrayList(2); + byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0); + Put p = new Put(b1); + p.add(f, b1, b1); + puts.add(p); + + byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1); + p = new Put(b2); + p.add(f, b2, b2); + puts.add(p); + table.put(puts); + LOG.debug("PUT done"); + flushRegion(hriPrimary); + LOG.info("flush done"); + + Thread.sleep(1000 + REFRESH_PERIOD * 2); + + AsyncProcess ap = ((ClusterConnection) HTU.getHBaseAdmin().getConnection()) + .getAsyncProcess(); + + // Make primary slowdown + SlowMeCopro.getCdl().set(new CountDownLatch(1)); + + List gets = new ArrayList(); + Get g = new Get(b1); + g.setCheckExistenceOnly(true); + g.setConsistency(Consistency.TIMELINE); + gets.add(g); + g = new Get(b2); + g.setCheckExistenceOnly(true); + g.setConsistency(Consistency.TIMELINE); + gets.add(g); + Object[] results = new Object[2]; + AsyncRequestFuture reqs = ap.submitAll(table.getPool(), table.getName(), + gets, null, results); + reqs.waitUntilDone(); + // verify we got the right results back + for (Object r : results) { + Assert.assertTrue(((Result)r).isStale()); + Assert.assertTrue(((Result)r).getExists()); + } + Set> set = ((AsyncRequestFutureImpl)reqs).getCallsInProgress(); + // verify we did cancel unneeded calls + Assert.assertTrue(!set.isEmpty()); + for (MultiServerCallable m : set) { + Assert.assertTrue(m.isCancelled()); + } + } finally { + SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.sleepTime.set(0); + SlowMeCopro.slowDownNext.set(false); + SlowMeCopro.countOfNext.set(0); + for (int i = 0; i < 2; i++) { + byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i); + Delete d = new Delete(b1); + table.delete(d); + } + closeRegion(hriSecondary); + } + } + + @Test public void testScanWithReplicas() throws Exception { //simple scan runMultipleScansOfOneType(false, false);