From d5632190e79627fda1dcb47beae889d156a3fa3a Mon Sep 17 00:00:00 2001 From: Mike Grimes Date: Mon, 21 Nov 2016 14:34:07 -0800 Subject: [PATCH] HBASE-17165 Add retry to LoadIncrementalHFiles tool --- .../hbase/mapreduce/LoadIncrementalHFiles.java | 136 ++++++++++++--------- .../TestLoadIncrementalHFilesSplitRecovery.java | 69 +++++++++-- 2 files changed, 136 insertions(+), 69 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 980dcb1..963c4a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -45,6 +45,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; @@ -110,6 +111,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private boolean initalized = false; public static final String NAME = "completebulkload"; + static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException"; public static final String MAX_FILES_PER_REGION_PER_FAMILY = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; @@ -133,6 +135,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private UserProvider userProvider; private int nrThreads; private RpcControllerFactory rpcControllerFactory; + private AtomicInteger numRetries; private Map retValue = null; @@ -158,6 +161,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { nrThreads = conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); initalized = true; + numRetries = new AtomicInteger(1); } private void usage() { @@ -510,6 +514,69 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return item2RegionMap; } + protected ClientServiceCallable buildClientServiceCallable(final Connection conn, + TableName tableName, byte[] first, Collection lqis, boolean copyFile) { + + final List> famPaths = new ArrayList<>(lqis.size()); + for (LoadQueueItem lqi : lqis) { + famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); + } + + return new ClientServiceCallable(conn, + tableName, first, rpcControllerFactory.newController()) { + @Override + protected byte[] rpcCall() throws Exception { + SecureBulkLoadClient secureClient = null; + boolean success = false; + try { + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + try (Table table = conn.getTable(getTableName())) { + secureClient = new SecureBulkLoadClient(getConf(), table); + success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, + assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); + } + return success ? regionName : null; + } finally { + //Best effort copying of files that might not have been imported + //from the staging directory back to original location + //in user directory + if (secureClient != null && !success) { + FileSystem targetFs = FileSystem.get(getConf()); + // fs is the source filesystem + if (fs == null) { + fs = lqis.iterator().next().hfilePath.getFileSystem(getConf()); + } + // Check to see if the source and target filesystems are the same + // If they are the same filesystem, we will try move the files back + // because previously we moved them to the staging directory. + if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) { + for (Pair el : famPaths) { + Path hfileStagingPath = null; + Path hfileOrigPath = new Path(el.getSecond()); + try { + hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())), + hfileOrigPath.getName()); + if (targetFs.rename(hfileStagingPath, hfileOrigPath)) { + LOG.debug("Moved back file " + hfileOrigPath + " from " + + hfileStagingPath); + } else if (targetFs.exists(hfileStagingPath)) { + LOG.debug("Unable to move back file " + hfileOrigPath + " from " + + hfileStagingPath); + } + } catch (Exception ex) { + LOG.debug("Unable to move back file " + hfileOrigPath + " from " + + hfileStagingPath, ex); + } + } + } + } + } + } + }; + } + /** * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the * passed directory and validates whether the prepared queue has all the valid table column @@ -655,11 +722,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { final byte[] first = e.getKey().array(); final Collection lqis = e.getValue(); + final ClientServiceCallable serviceCallable = + buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile); + final Callable> call = new Callable>() { @Override public List call() throws Exception { List toRetry = - tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile); + tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis); return toRetry; } }; @@ -946,75 +1016,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @return empty list if success, list of items to retry on recoverable * failure */ - protected List tryAtomicRegionLoad(final Connection conn, - final TableName tableName, final byte[] first, final Collection lqis, - boolean copyFile) throws IOException { + protected List tryAtomicRegionLoad(ClientServiceCallable serviceCallable, + final TableName tableName, final byte[] first, final Collection lqis) + throws IOException { final List> famPaths = new ArrayList<>(lqis.size()); for (LoadQueueItem lqi : lqis) { if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) { famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); } } - final ClientServiceCallable svrCallable = new ClientServiceCallable(conn, - tableName, first, rpcControllerFactory.newController()) { - @Override - protected byte[] rpcCall() throws Exception { - SecureBulkLoadClient secureClient = null; - boolean success = false; - try { - LOG.debug("Going to connect to server " + getLocation() + " for row " - + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - try (Table table = conn.getTable(getTableName())) { - secureClient = new SecureBulkLoadClient(getConf(), table); - success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, - assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); - } - return success ? regionName : null; - } finally { - //Best effort copying of files that might not have been imported - //from the staging directory back to original location - //in user directory - if (secureClient != null && !success) { - FileSystem targetFs = FileSystem.get(getConf()); - // fs is the source filesystem - if(fs == null) { - fs = lqis.iterator().next().hfilePath.getFileSystem(getConf()); - } - // Check to see if the source and target filesystems are the same - // If they are the same filesystem, we will try move the files back - // because previously we moved them to the staging directory. - if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) { - for(Pair el : famPaths) { - Path hfileStagingPath = null; - Path hfileOrigPath = new Path(el.getSecond()); - try { - hfileStagingPath= new Path(new Path(bulkToken, Bytes.toString(el.getFirst())), - hfileOrigPath.getName()); - if(targetFs.rename(hfileStagingPath, hfileOrigPath)) { - LOG.debug("Moved back file " + hfileOrigPath + " from " + - hfileStagingPath); - } else if(targetFs.exists(hfileStagingPath)){ - LOG.debug("Unable to move back file " + hfileOrigPath + " from " + - hfileStagingPath); - } - } catch(Exception ex) { - LOG.debug("Unable to move back file " + hfileOrigPath + " from " + - hfileStagingPath, ex); - } - } - } - } - } - } - }; - try { List toRetry = new ArrayList<>(); Configuration conf = getConf(); byte[] region = RpcRetryingCallerFactory.instantiate(conf, null). newCaller() - .callWithRetries(svrCallable, Integer.MAX_VALUE); + .callWithRetries(serviceCallable, Integer.MAX_VALUE); if (region == null) { LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + " into table " @@ -1026,7 +1042,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return toRetry; } catch (IOException e) { LOG.error("Encountered unrecoverable error from region server, additional details: " - + svrCallable.getExceptionMessageAdditionalDetail(), e); + + serviceCallable.getExceptionMessageAdditionalDetail(), e); throw e; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 8337da8..a0bac77 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClientServiceCallable; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -57,7 +58,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; @@ -275,31 +276,34 @@ public class TestLoadIncrementalHFilesSplitRecovery { */ @Test(expected=IOException.class, timeout=120000) public void testBulkLoadPhaseFailure() throws Exception { - TableName table = TableName.valueOf("bulkLoadPhaseFailure"); + final TableName table = TableName.valueOf("bulkLoadPhaseFailure"); final AtomicInteger attmptedCalls = new AtomicInteger(); final AtomicInteger failedCalls = new AtomicInteger(); util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); - try (Connection connection = ConnectionFactory.createConnection(this.util.getConfiguration())) { + try (Connection connection = ConnectionFactory.createConnection(util + .getConfiguration())) { setupTable(connection, table, 10); - LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { + LoadIncrementalHFiles lih = new LoadIncrementalHFiles( + util.getConfiguration()) { @Override - protected List tryAtomicRegionLoad(final Connection conn, - TableName tableName, final byte[] first, Collection lqis, - boolean copyFile) throws IOException { + protected List tryAtomicRegionLoad( + ClientServiceCallable serviceCallable, TableName tableName, final byte[] first, + Collection lqis) throws IOException { int i = attmptedCalls.incrementAndGet(); if (i == 1) { Connection errConn; try { errConn = getMockedConnection(util.getConfiguration()); + serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true); } catch (Exception e) { LOG.fatal("mocking cruft, should never happen", e); throw new RuntimeException("mocking cruft, should never happen"); } failedCalls.incrementAndGet(); - return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, copyFile); + return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); } - return super.tryAtomicRegionLoad(conn, tableName, first, lqis, copyFile); + return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); } }; try { @@ -318,6 +322,53 @@ public class TestLoadIncrementalHFilesSplitRecovery { } } + /** + * Test that shows that exception thrown from the RS side will result in the + * expected number of retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} + * when ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set + */ + @Test + public void testRetryOnIOException() throws Exception { + final TableName table = TableName.valueOf("retryOnIOException"); + final AtomicInteger calls = new AtomicInteger(1); + final Connection conn = ConnectionFactory.createConnection(util + .getConfiguration()); + util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + util.getConfiguration().setBoolean( + LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); + final LoadIncrementalHFiles lih = new LoadIncrementalHFiles( + util.getConfiguration()) { + @Override + protected List tryAtomicRegionLoad( + ClientServiceCallable serverCallable, TableName tableName, + final byte[] first, Collection lqis) + throws IOException { + if (calls.getAndIncrement() < util.getConfiguration().getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) { + ClientServiceCallable newServerCallable = new ClientServiceCallable( + conn, tableName, first, new RpcControllerFactory( + util.getConfiguration()).newController()) { + @Override + public byte[] rpcCall() throws Exception { + throw new IOException("Error calling something on RegionServer"); + } + }; + return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis); + } else { + return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis); + } + } + }; + setupTable(conn, table, 10); + Path dir = buildBulkFiles(table, 1); + lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), + conn.getRegionLocator(table)); + util.getConfiguration().setBoolean( + LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); + + } + @SuppressWarnings("deprecation") private ClusterConnection getMockedConnection(final Configuration conf) throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException { -- 2.9.3 (Apple Git-75)