diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index d00118c..1900a25 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -685,21 +686,33 @@ class AsyncProcess { private final MultiAction multiAction; private final int numAttempt; private final ServerName server; + private final Set> callsInProgress; private SingleServerRequestRunnable( - MultiAction multiAction, int numAttempt, ServerName server) { + MultiAction multiAction, int numAttempt, ServerName server, + Set> callsInProgress) { this.multiAction = multiAction; this.numAttempt = numAttempt; this.server = server; + this.callsInProgress = callsInProgress; } @Override public void run() { MultiResponse res; + MultiServerCallable callable = null; try { - MultiServerCallable callable = createCallable(server, tableName, multiAction); + callable = createCallable(server, tableName, multiAction); try { - res = createCaller(callable).callWithoutRetries(callable, timeout); + RpcRetryingCaller caller = createCaller(callable); + if (callsInProgress != null) callsInProgress.add(callable); + res = caller.callWithoutRetries(callable, timeout); + + if (res == null) { + // Cancelled + return; + } + } catch (IOException e) { // The service itself failed . It may be an error coming from the communication // layer, but, as well, a functional error raised by the server. @@ -722,6 +735,9 @@ class AsyncProcess { throw new RuntimeException(t); } finally { decTaskCounters(multiAction.getRegions(), server); + if (callsInProgress != null && callable != null) { + callsInProgress.remove(callable); + } } } } @@ -730,6 +746,7 @@ class AsyncProcess { private final BatchErrors errors; private final ConnectionManager.ServerErrorTracker errorsByServer; private final ExecutorService pool; + private final Set> callsInProgress; private final TableName tableName; @@ -814,10 +831,17 @@ class AsyncProcess { } else { this.replicaGetIndices = null; } + this.callsInProgress = !hasAnyReplicaGets ? null : + Collections.newSetFromMap(new ConcurrentHashMap, Boolean>()); + this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); } + public Set> getCallsInProgress() { + return callsInProgress; + } + /** * Group a list of actions per region servers, and send them. * @@ -980,7 +1004,7 @@ class AsyncProcess { // no stats to manage, just do the standard action if (AsyncProcess.this.connection.getStatisticsTracker() == null) { return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server))); + new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); } // group the actions by the amount of delay @@ -1002,7 +1026,8 @@ class AsyncProcess { for (DelayingRunner runner : actions.values()) { String traceText = "AsyncProcess.sendMultiAction"; Runnable runnable = - new SingleServerRequestRunnable(runner.getActions(), numAttempt, server); + new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, + callsInProgress); // use a delay runner only if we need to sleep for some time if (runner.getSleepTime() > 0) { runner.setRunner(runnable); @@ -1520,6 +1545,12 @@ class AsyncProcess { waitUntilDone(Long.MAX_VALUE); } catch (InterruptedException iex) { throw new InterruptedIOException(iex.getMessage()); + } finally { + if (callsInProgress != null) { + for (MultiServerCallable clb : callsInProgress) { + clb.cancel(); + } + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 88e4e22..72ae829 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -50,21 +51,21 @@ import com.google.protobuf.ServiceException; * {@link RegionServerCallable} that goes against multiple regions. * @param */ -class MultiServerCallable extends RegionServerCallable { +class MultiServerCallable extends RegionServerCallable implements Cancellable { private final MultiAction multiAction; private final boolean cellBlock; - private RpcControllerFactory rpcFactory; + private final PayloadCarryingRpcController controller; MultiServerCallable(final ClusterConnection connection, final TableName tableName, final ServerName location, RpcControllerFactory rpcFactory, final MultiAction multi) { super(connection, tableName, null); - this.rpcFactory = rpcFactory; this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so // we will store the server here, and throw if someone tries to obtain location/regioninfo. this.location = new HRegionLocation(null, location); this.cellBlock = isCellBlock(); + controller = rpcFactory.newController(); } @Override @@ -119,7 +120,7 @@ class MultiServerCallable extends RegionServerCallable { // Controller optionally carries cell data over the proxy/service boundary and also // optionally ferries cell response data back out again. - PayloadCarryingRpcController controller = rpcFactory.newController(cells); + if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells)); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); ClientProtos.MultiResponse responseProto; @@ -129,10 +130,19 @@ class MultiServerCallable extends RegionServerCallable { } catch (ServiceException e) { throw ProtobufUtil.getRemoteException(e); } + if (responseProto == null) return null; // Occurs on cancel return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner()); } + @Override + public void cancel() { + controller.startCancel(); + } + @Override + public boolean isCancelled() { + return controller.isCanceled(); + } /** * @return True if we should send data in cellblocks. This is an expensive call. Cache the diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 0be4cd4..8889dc2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; + import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; @@ -31,6 +32,7 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; @@ -57,6 +59,7 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import javax.security.sasl.SaslException; + import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -307,9 +310,14 @@ public class AsyncRpcChannel { controller.notifyOnCancel(new RpcCallback() { @Override public void run(Object parameter) { - failCall(call, new IOException("Canceled connection")); + calls.remove(call.id); } }); + if (controller.isCanceled()) { + // To finish if the call was cancelled before we set the notification (race condition) + call.cancel(true); + return call; + } calls.put(call.id, call); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index bb2d4db..efc8db2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -20,10 +20,12 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -43,6 +45,8 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; +import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -522,6 +526,71 @@ public class TestReplicasClient { } @Test + public void testCancelOfMultiGet() throws Exception { + openRegion(hriSecondary); + try { + List puts = new ArrayList(2); + byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0); + Put p = new Put(b1); + p.add(f, b1, b1); + puts.add(p); + + byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1); + p = new Put(b2); + p.add(f, b2, b2); + puts.add(p); + table.put(puts); + LOG.debug("PUT done"); + flushRegion(hriPrimary); + LOG.info("flush done"); + + Thread.sleep(1000 + REFRESH_PERIOD * 2); + + AsyncProcess ap = ((ClusterConnection) HTU.getHBaseAdmin().getConnection()) + .getAsyncProcess(); + + // Make primary slowdown + SlowMeCopro.getCdl().set(new CountDownLatch(1)); + + List gets = new ArrayList(); + Get g = new Get(b1); + g.setCheckExistenceOnly(true); + g.setConsistency(Consistency.TIMELINE); + gets.add(g); + g = new Get(b2); + g.setCheckExistenceOnly(true); + g.setConsistency(Consistency.TIMELINE); + gets.add(g); + Object[] results = new Object[2]; + AsyncRequestFuture reqs = ap.submitAll(table.getPool(), table.getName(), + gets, null, results); + reqs.waitUntilDone(); + // verify we got the right results back + for (Object r : results) { + Assert.assertTrue(((Result)r).isStale()); + Assert.assertTrue(((Result)r).getExists()); + } + Set> set = ((AsyncRequestFutureImpl)reqs).getCallsInProgress(); + // verify we did cancel unneeded calls + Assert.assertTrue(!set.isEmpty()); + for (MultiServerCallable m : set) { + Assert.assertTrue(m.isCancelled()); + } + } finally { + SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.sleepTime.set(0); + SlowMeCopro.slowDownNext.set(false); + SlowMeCopro.countOfNext.set(0); + for (int i = 0; i < 2; i++) { + byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i); + Delete d = new Delete(b1); + table.delete(d); + } + closeRegion(hriSecondary); + } + } + + @Test public void testScanWithReplicas() throws Exception { //simple scan runMultipleScansOfOneType(false, false);