From 1383d18ff24cd5cf956e6f0ac22038eaab3ad99a Mon Sep 17 00:00:00 2001 From: jingyuntian Date: Thu, 18 Jan 2018 19:06:31 +0800 Subject: [PATCH] HBASE-19695 Handle disabled table for async client --- .../hadoop/hbase/AsyncMetaTableAccessor.java | 22 ++++++ .../hbase/client/AsyncBatchRpcRetryingCaller.java | 83 +++++++++++++++------- .../hbase/client/AsyncRpcRetryingCaller.java | 24 +++++++ .../AsyncSingleRequestRpcRetryingCaller.java | 20 ++++-- .../TestAsyncSingleRequestRpcRetryingCaller.java | 14 ++++ 5 files changed, 134 insertions(+), 29 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java index 05e60d4..7636ab4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java @@ -80,6 +80,28 @@ public class AsyncMetaTableAccessor { return getTableState(metaTable, tableName).thenApply(Optional::isPresent); } + public static CompletableFuture isTableDisable(AsyncTable metaTable, + TableName tableName) { + CompletableFuture future = new CompletableFuture<>(); + if (tableName.equals(META_TABLE_NAME)) { + future.complete(false); + return future; + } + getTableState(metaTable, tableName).whenComplete((tableState, throwable) -> { + if(throwable != null){ + future.completeExceptionally(throwable); + return; + } + if (tableState.isPresent()) { + future.complete(tableState.get().inStates(TableState.State.DISABLED)); + } else { + future.completeExceptionally(new TableNotFoundException(tableName)); + } + }); + + return future; + } + public static CompletableFuture> getTableState(AsyncTable metaTable, TableName tableName) { CompletableFuture> future = new CompletableFuture<>(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 62ee0ab..950daf4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -44,11 +44,14 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -274,39 +277,69 @@ class AsyncBatchRpcRetryingCaller { } } - private void onComplete(Map actionsByRegion, int tries, - ServerName serverName, MultiResponse resp) { - List failedActions = new ArrayList<>(); - actionsByRegion.forEach((rn, regionReq) -> { - RegionResult regionResult = resp.getResults().get(rn); - if (regionResult != null) { - regionReq.actions.forEach( + private void syncComplete(int index, MultiResponse resp, + Map actionsByRegion, List regionNames, + int tries, ServerName serverName, List failedActions){ + if(index == regionNames.size()){ + if (!failedActions.isEmpty()) { + tryResubmit(failedActions.stream(), tries); + } + return; + } + + byte[] rn = regionNames.get(index); + RegionResult regionResult = resp.getResults().get(rn); + RegionRequest regionReq = actionsByRegion.get(rn); + if (regionResult != null) { + regionReq.actions.forEach( action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions)); - } else { - Throwable t = resp.getException(rn); - Throwable error; - if (t == null) { - LOG.error( + syncComplete(index + 1, resp, actionsByRegion, regionNames, tries, serverName, failedActions); + } else { + Throwable t = resp.getException(rn); + Throwable error; + if (t == null) { + LOG.error( "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); - error = new RuntimeException("Invalid response"); + error = new RuntimeException("Invalid response"); + } else { + error = translateException(t); + logException(tries, () -> Stream.of(regionReq), error, serverName); + conn.getLocator().updateCachedLocation(regionReq.loc, error); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + failAll(regionReq.actions.stream(), tries, error, serverName); + return; + } + if (error instanceof NotServingRegionException || error instanceof RegionOfflineException) { + AsyncMetaTableAccessor.isTableDisable(conn.getTable(TableName.META_TABLE_NAME), tableName) + .whenComplete((disabled, e) -> { + if (e != null) { + failAll(regionReq.actions.stream(), tries, e, null); + return; + } + if (disabled) { + failAll(regionReq.actions.stream(), tries, new TableNotEnabledException(tableName), null); + return; + } + addError(regionReq.actions, error, serverName); + failedActions.addAll(regionReq.actions); + syncComplete(index + 1, resp, actionsByRegion, regionNames, tries, serverName, failedActions); + }); } else { - error = translateException(t); - logException(tries, () -> Stream.of(regionReq), error, serverName); - conn.getLocator().updateCachedLocation(regionReq.loc, error); - if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { - failAll(regionReq.actions.stream(), tries, error, serverName); - return; - } - addError(regionReq.actions, error, serverName); - failedActions.addAll(regionReq.actions); + syncComplete(index + 1, resp, actionsByRegion, regionNames, tries, serverName, failedActions); } } - }); - if (!failedActions.isEmpty()) { - tryResubmit(failedActions.stream(), tries); } } + private void onComplete(Map actionsByRegion, int tries, + ServerName serverName, MultiResponse resp) { + List failedActions = new ArrayList<>(); + List regionNames = new ArrayList<>(); + regionNames.addAll(actionsByRegion.keySet()); + + syncComplete(0,resp, actionsByRegion, regionNames, tries, serverName, failedActions); + } + private void send(Map actionsByServer, int tries) { long remainingNs; if (operationTimeoutNs > 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index d30012f..f6167a1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -32,7 +32,11 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; +import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,6 +115,26 @@ public abstract class AsyncRpcRetryingCaller { resetController(controller, callTimeoutNs); } + protected CompletableFuture checkDisable(Throwable error, TableName tableName){ + error = translateException(error); + // error that worth to do a disable check is RegionOfflineException and + // NotServingRegionException, so we exclude other exceptions + if (error instanceof DoNotRetryIOException || + !(error instanceof RegionOfflineException || error instanceof NotServingRegionException)) { + return null; + } + + return AsyncMetaTableAccessor.isTableDisable(conn.getTable(TableName.META_TABLE_NAME), tableName) + .whenComplete((isDisable, e) -> { + if (e != null) { + future.completeExceptionally(e); + } + if (isDisable) { + future.completeExceptionally(new TableNotEnabledException(tableName)); + } + }); + } + protected void onError(Throwable error, Supplier errMsg, Consumer updateCachedLocation) { error = translateException(error); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 56c82fb..ecc9370 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -76,10 +76,22 @@ class AsyncSingleRequestRpcRetryingCaller extends AsyncRpcRetryingCaller { callable.call(controller, loc, stub).whenComplete( (result, error) -> { if (error != null) { - onError(error, - () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " - + loc.getRegion().getEncodedName() + " of " + tableName + " failed", - err -> conn.getLocator().updateCachedLocation(loc, err)); + CompletableFuture checkDisable = checkDisable(error, tableName); + if(checkDisable != null){ + checkDisable.thenAccept(isDisable -> { + if(!isDisable){ + onError(error, + () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + + loc.getRegion().getEncodedName() + " of " + tableName + " failed", + err -> conn.getLocator().updateCachedLocation(loc, err)); + } + }); + } else { + onError(error, + () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + + loc.getRegion().getEncodedName() + " of " + tableName + " failed", + err -> conn.getLocator().updateCachedLocation(loc, err)); + } return; } future.complete(result); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index 7ea69e5..e4c7c08 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -34,6 +34,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -114,6 +115,19 @@ public class TestAsyncSingleRequestRpcRetryingCaller { } @Test + public void testRequestDisableTable() throws IOException, InterruptedException { + TEST_UTIL.getAdmin().disableTable(TABLE_NAME); + try { + AsyncTable table = CONN.getTableBuilder(TABLE_NAME) + .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(30).build(); + table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(TableNotEnabledException.class)); + TEST_UTIL.getAdmin().enableTable(TABLE_NAME); + } + } + + @Test public void testOperationTimeout() throws IOException, InterruptedException { long startNs = System.nanoTime(); try { -- 2.7.4