From d634cf539bee36763f788e14f2ffefa5e8373c8a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 19 Feb 2019 22:08:14 +0800 Subject: [PATCH] HBASE-21930 Deal with ScannerResetException when opening region scanner --- .../hbase/client/AsyncRpcRetryingCaller.java | 9 +- ...syncScanSingleRegionRpcRetryingCaller.java | 11 +- .../client/TestAsyncTableScanException.java | 179 ++++++++++++++++++ 3 files changed, 193 insertions(+), 6 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanException.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 45266e9be2..387b103a26 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FutureUtils; @@ -151,7 +152,13 @@ public abstract class AsyncRpcRetryingCaller { return; } Throwable error = translateException(t); - if (error instanceof DoNotRetryIOException) { + // We use this retrying caller to open a scanner, as it is idempotent, but we may throw + // ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will + // also fetch data when opening a scanner. The intention here is that if we hit a + // ScannerResetException when scanning then we should try to open a new scanner, instead of + // retrying on the old one, so it is declared as a DoNotRetryIOException. But here we are + // exactly trying to open a new scanner, so we should retry on ScannerResetException. + if (error instanceof DoNotRetryIOException && !(error instanceof ScannerResetException)) { future.completeExceptionally(error); return; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 96961afc65..ab37b5ddd5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -393,11 +393,12 @@ class AsyncScanSingleRegionRpcRetryingCaller { " ms", error); } - boolean scannerClosed = error instanceof UnknownScannerException || - error instanceof NotServingRegionException || error instanceof RegionServerStoppedException; + boolean scannerClosed = + error instanceof UnknownScannerException || error instanceof NotServingRegionException || + error instanceof RegionServerStoppedException || error instanceof ScannerResetException; RetriesExhaustedException.ThrowableWithExtraContext qt = - new RetriesExhaustedException.ThrowableWithExtraContext(error, - EnvironmentEdgeManager.currentTime(), ""); + new RetriesExhaustedException.ThrowableWithExtraContext(error, + EnvironmentEdgeManager.currentTime(), ""); exceptions.add(qt); if (tries >= maxAttempts) { completeExceptionally(!scannerClosed); @@ -418,7 +419,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { completeWhenError(false); return; } - if (error instanceof OutOfOrderScannerNextException || error instanceof ScannerResetException) { + if (error instanceof OutOfOrderScannerNextException) { completeWhenError(true); return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanException.java new file mode 100644 index 0000000000..a1715cca78 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanException.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableScanException { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableScanException.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("scan"); + + private static byte[] FAMILY = Bytes.toBytes("family"); + + private static byte[] QUAL = Bytes.toBytes("qual"); + + private static AsyncConnection CONN; + + private static AtomicInteger REQ_COUNT = new AtomicInteger(); + + private static volatile int ERROR_AT; + + private static volatile boolean ERROR; + + private static volatile boolean DO_NOT_RETRY; + + public static final class ErrorCP implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public boolean postScannerNext(ObserverContext c, + InternalScanner s, List result, int limit, boolean hasNext) throws IOException { + REQ_COUNT.incrementAndGet(); + if ((ERROR_AT == REQ_COUNT.get()) || ERROR) { + if (DO_NOT_RETRY) { + throw new DoNotRetryIOException("Injected exception"); + } else { + throw new IOException("Injected exception"); + } + } + return RegionObserver.super.postScannerNext(c, s, result, limit, hasNext); + } + + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(3); + UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .setCoprocessor(ErrorCP.class.getName()).build()); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i))); + } + } + CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + Closeables.close(CONN, true); + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUpBeforeTest() { + REQ_COUNT.set(0); + ERROR_AT = 0; + ERROR = false; + DO_NOT_RETRY = false; + } + + @Test(expected = DoNotRetryIOException.class) + public void testDoNotRetryIOException() throws IOException { + ERROR_AT = 1; + DO_NOT_RETRY = true; + try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(FAMILY)) { + scanner.next(); + } + } + + @Test + public void testIOException() throws IOException { + ERROR = true; + try (ResultScanner scanner = + CONN.getTableBuilder(TABLE_NAME).setMaxAttempts(3).build().getScanner(FAMILY)) { + scanner.next(); + fail(); + } catch (RetriesExhaustedException e) { + // expected + assertThat(e.getCause(), instanceOf(ScannerResetException.class)); + } + assertTrue(REQ_COUNT.get() >= 3); + } + + private void count() throws IOException { + try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(1))) { + for (int i = 0; i < 100; i++) { + Result result = scanner.next(); + assertArrayEquals(Bytes.toBytes(i), result.getRow()); + assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUAL)); + } + } + } + + @Test + public void testRecoveryFromScannerResetWhileOpening() throws IOException { + ERROR_AT = 1; + count(); + // we should at least request 1 time otherwise the error will not be triggered, and then we + // need at least one more request to get the remaining results. + assertTrue(REQ_COUNT.get() >= 2); + } + + @Test + public void testRecoveryFromScannerResetInTheMiddle() throws IOException { + ERROR_AT = 2; + count(); + // we should at least request 2 times otherwise the error will not be triggered, and then we + // need at least one more request to get the remaining results. + assertTrue(REQ_COUNT.get() >= 3); + } +} -- 2.17.1