From 7f4fd0024dbfe382db0c83afad049b474a9109d6 Mon Sep 17 00:00:00 2001 From: Mike Grimes Date: Fri, 17 Nov 2017 19:47:54 -0800 Subject: [PATCH] HBASE-17165 Make use of retry setting in LoadIncrementalHFiles & fix test --- .../hadoop/hbase/tool/LoadIncrementalHFiles.java | 22 +++++++++++++++++++--- .../TestLoadIncrementalHFilesSplitRecovery.java | 10 +++++----- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index c457e224da..1ca47117d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -131,6 +131,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private final FsDelegationToken fsDelegationToken; private final UserProvider userProvider; private final int nrThreads; + private AtomicInteger numRetries; private final RpcControllerFactory rpcControllerFactory; private String bulkToken; @@ -177,6 +178,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); nrThreads = conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); + numRetries = new AtomicInteger(0); rpcControllerFactory = new RpcControllerFactory(conf); } @@ -781,8 +783,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { protected List tryAtomicRegionLoad(ClientServiceCallable serviceCallable, final TableName tableName, final byte[] first, final Collection lqis) throws IOException { + List toRetry = new ArrayList<>(); try { - List toRetry = new ArrayList<>(); Configuration conf = getConf(); byte[] region = RpcRetryingCallerFactory.instantiate(conf, null). newCaller() .callWithRetries(serviceCallable, Integer.MAX_VALUE); @@ -796,8 +798,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return toRetry; } catch (IOException e) { LOG.error("Encountered unrecoverable error from region server, additional details: " + - serviceCallable.getExceptionMessageAdditionalDetail(), - e); + serviceCallable.getExceptionMessageAdditionalDetail(), + e); + LOG.warn( + "Received a " + e.getClass().getSimpleName() + + " from region server: " + + serviceCallable.getExceptionMessageAdditionalDetail(), e); + if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) + && numRetries.get() < getConf().getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { + LOG.warn("Will attempt to retry loading failed HFiles. Retry #" + + numRetries.incrementAndGet()); + toRetry.addAll(lqis); + return toRetry; + } + LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover"); throw e; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java index bf43982f7d..9be3ecca52 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java @@ -323,7 +323,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { @Test public void testRetryOnIOException() throws Exception { final TableName table = TableName.valueOf(name.getMethodName()); - final AtomicInteger calls = new AtomicInteger(1); + final AtomicInteger calls = new AtomicInteger(0); final Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); @@ -332,9 +332,8 @@ public class TestLoadIncrementalHFilesSplitRecovery { protected List tryAtomicRegionLoad( ClientServiceCallable serverCallable, TableName tableName, final byte[] first, Collection lqis) throws IOException { - if (calls.getAndIncrement() < util.getConfiguration().getInt( - HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - - 1) { + if (calls.get() < util.getConfiguration().getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { ClientServiceCallable newServerCallable = new ClientServiceCallable(conn, tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @@ -343,6 +342,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { throw new IOException("Error calling something on RegionServer"); } }; + calls.getAndIncrement(); return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis); } else { return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis); @@ -352,8 +352,8 @@ public class TestLoadIncrementalHFilesSplitRecovery { setupTable(conn, table, 10); Path dir = buildBulkFiles(table, 1); lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table)); + assertEquals(calls.get(), 2); util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); - } private ClusterConnection getMockedConnection(final Configuration conf) -- 2.13.6 (Apple Git-96)