diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 20ed183..10e7f3b 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -21,8 +21,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; -import com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -53,6 +51,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.annotations.VisibleForTesting; + /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate @@ -399,6 +399,9 @@ public abstract class ClientScanner extends AbstractClientScanner { // it doesn't tell us otherwise. We rely on the size or count of results boolean serverHasMoreResults = false; boolean allResultsSkipped = false; + // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should + // make sure that we are not retrying indefinitely. + int retriesLeft = getRetries(); do { allResultsSkipped = false; try { @@ -423,8 +426,18 @@ public abstract class ClientScanner extends AbstractClientScanner { // An exception was thrown which makes any partial results that we were collecting // invalid. The scanner will need to be reset to the beginning of a row. clearPartialResults(); - // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us - // to reset the scanner and come back in again. + + // Unfortunately, DNRIOE is used in two different semantics. + // (1) The first is to close the client scanner and bubble up the exception all the way + // to the application. This is preferred when the exception is really un-recoverable + // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this + // bucket usually. + // (2) Second semantics is to close the current region scanner only, but continue the + // client scanner by overriding the exception. This is usually UnknownScannerException, + // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the + // application-level ClientScanner has to continue without bubbling up the exception to + // the client. See RSRpcServices to see how it throws DNRIOE's. + // See also: HBASE-16604, HBASE-17187 // If exception is any but the list below throw it back to the client; else setup // the scanner and retry. @@ -436,6 +449,9 @@ public abstract class ClientScanner extends AbstractClientScanner { e instanceof ScannerResetException) { // Pass. It is easier writing the if loop test as list of what is allowed rather than // as a list of what is not allowed... so if in here, it means we do not throw. + if (retriesLeft-- <= 0) { + throw e; // no more retries + } } else { throw e; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index a61a9f2..716839a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.annotations.VisibleForTesting; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -95,6 +93,27 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.MasterRpcServices; +import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.regionserver.Leases.Lease; +import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.regionserver.Region.Operation; +import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; +import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; +import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; +import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitResponse; @@ -106,10 +125,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; @@ -135,9 +154,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; @@ -177,18 +193,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; -import org.apache.hadoop.hbase.quotas.OperationQuota; -import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; -import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; -import org.apache.hadoop.hbase.regionserver.Leases.Lease; -import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; -import org.apache.hadoop.hbase.regionserver.Region.Operation; -import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; -import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; -import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; -import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DNS; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -201,12 +205,7 @@ import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.zookeeper.KeeperException; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import com.google.common.annotations.VisibleForTesting; /** * Implements the regionserver RPC services. @@ -2999,6 +2998,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // scanner is closed here scannerClosed = true; + // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is + // used in two different semantics. + // (1) The first is to close the client scanner and bubble up the exception all the way + // to the application. This is preferred when the exception is really un-recoverable + // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this + // bucket usually. + // (2) Second semantics is to close the current region scanner only, but continue the + // client scanner by overriding the exception. This is usually UnknownScannerException, + // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the + // application-level ClientScanner has to continue without bubbling up the exception to + // the client. See ClientScanner code to see how it deals with these special exceptions. + if (e instanceof DoNotRetryIOException) { + throw e; + } + // If it is a CorruptHFileException or a FileNotFoundException, throw the // DoNotRetryIOException. This can avoid the retry in ClientScanner. if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index ae93e67..ab780de 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -43,6 +44,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -52,6 +54,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -71,6 +74,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -546,6 +550,15 @@ public class TestFromClientSide { */ public static class ExceptionInReseekRegionObserver extends BaseRegionObserver { static AtomicLong reqCount = new AtomicLong(0); + static AtomicBoolean isDoNotRetry = new AtomicBoolean(false); // whether to throw DNRIOE + static AtomicBoolean throwOnce = new AtomicBoolean(true); // whether to only throw once + + static void reset() { + reqCount.set(0); + isDoNotRetry.set(false); + throwOnce.set(true); + } + class MyStoreScanner extends StoreScanner { public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet columns, long readPt) throws IOException { @@ -561,8 +574,13 @@ public class TestFromClientSide { newScanners.add(new DelegatingKeyValueScanner(scanner) { @Override public boolean reseek(Cell key) throws IOException { - if (reqCount.incrementAndGet() == 1) { - throw new IOException("Injected exception"); + reqCount.incrementAndGet(); + if (!throwOnce.get()|| reqCount.get() == 1) { + if (isDoNotRetry.get()) { + throw new DoNotRetryIOException("Injected exception"); + } else { + throw new IOException("Injected exception"); + } } return super.reseek(key); } @@ -596,6 +614,8 @@ public class TestFromClientSide { HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY); htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName()); TEST_UTIL.getAdmin().createTable(htd); + ExceptionInReseekRegionObserver.reset(); + ExceptionInReseekRegionObserver.throwOnce.set(true); // throw exceptions only once try (Table t = TEST_UTIL.getConnection().getTable(name)) { int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); TEST_UTIL.getAdmin().flush(name); @@ -605,6 +625,61 @@ public class TestFromClientSide { assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0); } + /** + * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation + * is that the exception will bubble up to the client scanner instead of being retried. + */ + @Test (timeout = 180000) + public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE() + throws IOException, InterruptedException { + TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); + TableName name = TableName.valueOf("testClientScannerIsNotRetriedWhenCoprocessorThrowsDNRIOE"); + + HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY); + htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName()); + TEST_UTIL.getAdmin().createTable(htd); + ExceptionInReseekRegionObserver.reset(); + ExceptionInReseekRegionObserver.isDoNotRetry.set(true); + try (Table t = TEST_UTIL.getConnection().getTable(name)) { + int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); + TEST_UTIL.getAdmin().flush(name); + int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); + fail("Should have thrown an exception"); + } catch (UncheckedIOException expected) { + assertEquals(DoNotRetryIOException.class, expected.getCause().getClass()); + // expected + } + assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0); + } + + /** + * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation + * is that the we will keep on retrying, but fail after the retries are exhausted instead of + * retrying indefinitely. + */ + @Test (timeout = 180000) + public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE() + throws IOException, InterruptedException { + TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); + TableName name = TableName.valueOf("testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE"); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); + HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY); + htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName()); + TEST_UTIL.getAdmin().createTable(htd); + ExceptionInReseekRegionObserver.reset(); + ExceptionInReseekRegionObserver.throwOnce.set(false); // throw exceptions in every retry + try (Table t = TEST_UTIL.getConnection().getTable(name)) { + int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); + TEST_UTIL.getAdmin().flush(name); + int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); + fail("Should have thrown an exception"); + } catch (UncheckedIOException expected) { + assertEquals(ScannerResetException.class, expected.getCause().getClass()); + // expected + } + assertTrue(ExceptionInReseekRegionObserver.reqCount.get() >= 3); + } + /* * @param key * @return Scan with RowFilter that does LESS than passed key.