From b379b1be9e00780cb4571665697225a0a3a89dd3 Mon Sep 17 00:00:00 2001 From: Mike Grimes Date: Mon, 21 Nov 2016 14:34:07 -0800 Subject: [PATCH] HBASE-17165 Add retry to LoadIncrementalHFiles tool --- .../hbase/mapreduce/LoadIncrementalHFiles.java | 187 +++++++++++++-------- .../TestLoadIncrementalHFilesSplitRecovery.java | 108 +++++++++--- 2 files changed, 196 insertions(+), 99 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 9cb417e..9206bca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -106,6 +106,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * Tool to load the output of HFileOutputFormat into an existing table. @@ -118,6 +119,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private Admin hbAdmin; public static final String NAME = "completebulkload"; + static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException"; public static final String MAX_FILES_PER_REGION_PER_FAMILY = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; @@ -136,6 +138,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private FsDelegationToken fsDelegationToken; private String bulkToken; private UserProvider userProvider; + private AtomicInteger numRetries; private LoadIncrementalHFiles() {} @@ -157,6 +160,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); } + numRetries = new AtomicInteger(1); } private void usage() { @@ -507,11 +511,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { final byte[] first = e.getKey().array(); final Collection lqis = e.getValue(); + final RegionServerCallable svrCallable + = buildRegionServerCallable(conn, table.getName(), first, lqis); final Callable> call = new Callable>() { @Override public List call() throws Exception { List toRetry = - tryAtomicRegionLoad(conn, table.getName(), first, lqis); + tryAtomicRegionLoad(svrCallable, table.getName(), first, lqis); return toRetry; } }; @@ -542,6 +548,84 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } + /** + * Given a connection, a table, the first key, and a list of LoadQueueItems + * (usually HFiles) to be loaded, return a {@link RegionServerCallable} which can be + * passed into {@link LoadIncrementalHFiles#tryAtomicRegionLoad(RegionServerCallable, + * TableName, byte[], Collection)} + * @param conn + * @param tableName + * @param first + * @param lqis + * @return A ${@link RegionServerCallable} which will attempt to load + * a specific ${@link LoadIncrementalHFiles.LoadQueueItem} + */ + protected RegionServerCallable buildRegionServerCallable(final Connection conn, + TableName tableName, byte[] first, Collection lqis) { + + final List> famPaths = new ArrayList<>(lqis.size()); + for (LoadQueueItem lqi : lqis) { + famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); + } + + return new RegionServerCallable(conn, tableName, first) { + @Override + public Boolean call(int callTimeout) throws Exception { + SecureBulkLoadClient secureClient = null; + boolean success = false; + + try { + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + if (!isSecureBulkLoadEndpointAvailable()) { + success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); + } else { + try (Table table = conn.getTable(getTableName())) { + secureClient = new SecureBulkLoadClient(table); + success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), + bulkToken, getLocation().getRegionInfo().getStartKey()); + } + } + return success; + } catch (Exception e) { + LOG.debug("Error connecting to server " + getLocation(), e); + throw e; + } finally { + //Best effort copying of files that might not have been imported + //from the staging directory back to original location + //in user directory + if(secureClient != null && !success) { + FileSystem targetFs = FileSystem.get(getConf()); + // Check to see if the source and target filesystems are the same + // If they are the same filesystem, we will try move the files back + // because previously we moved them to the staging directory. + if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) { + for(Pair el : famPaths) { + Path hfileStagingPath = null; + Path hfileOrigPath = new Path(el.getSecond()); + try { + hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()), + hfileOrigPath.getName()); + if(targetFs.rename(hfileStagingPath, hfileOrigPath)) { + LOG.debug("Moved back file " + hfileOrigPath + " from " + + hfileStagingPath); + } else if(targetFs.exists(hfileStagingPath)){ + LOG.debug("Unable to move back file " + hfileOrigPath + " from " + + hfileStagingPath); + } + } catch(Exception ex) { + LOG.debug("Unable to move back file " + hfileOrigPath + " from " + + hfileStagingPath, ex); + } + } + } + } + } + } + }; + } + private boolean checkHFilesCountPerRegionPerFamily( final Multimap regionGroups) { for (EntryHBASE-9508). * This will be removed in HBase 2.0.0. - * Use {@link #tryAtomicRegionLoad(Connection, TableName, byte[], Collection)}. + * Use {@link LoadIncrementalHFiles#tryAtomicRegionLoad(RegionServerCallable, + * TableName, byte[], Collection)}. */ @Deprecated protected List tryAtomicRegionLoad(final HConnection conn, final byte [] tableName, final byte[] first, Collection lqis) throws IOException { - return tryAtomicRegionLoad(conn, TableName.valueOf(tableName), first, lqis); + RegionServerCallable serverCallable + = buildRegionServerCallable(conn, TableName.valueOf(tableName), first, lqis); + return tryAtomicRegionLoad(serverCallable, TableName.valueOf(tableName), first, lqis); } /** @@ -780,88 +867,40 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @return empty list if success, list of items to retry on recoverable * failure */ - protected List tryAtomicRegionLoad(final Connection conn, - final TableName tableName, final byte[] first, Collection lqis) - throws IOException { - final List> famPaths = - new ArrayList>(lqis.size()); - for (LoadQueueItem lqi : lqis) { - famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); - } - - final RegionServerCallable svrCallable = - new RegionServerCallable(conn, tableName, first) { - @Override - public Boolean call(int callTimeout) throws Exception { - SecureBulkLoadClient secureClient = null; - boolean success = false; - - try { - LOG.debug("Going to connect to server " + getLocation() + " for row " - + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - if (!isSecureBulkLoadEndpointAvailable()) { - success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); - } else { - try (Table table = conn.getTable(getTableName())) { - secureClient = new SecureBulkLoadClient(table); - success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), - bulkToken, getLocation().getRegionInfo().getStartKey()); - } - } - return success; - } finally { - //Best effort copying of files that might not have been imported - //from the staging directory back to original location - //in user directory - if(secureClient != null && !success) { - FileSystem targetFs = FileSystem.get(getConf()); - // Check to see if the source and target filesystems are the same - // If they are the same filesystem, we will try move the files back - // because previously we moved them to the staging directory. - if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) { - for(Pair el : famPaths) { - Path hfileStagingPath = null; - Path hfileOrigPath = new Path(el.getSecond()); - try { - hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()), - hfileOrigPath.getName()); - if(targetFs.rename(hfileStagingPath, hfileOrigPath)) { - LOG.debug("Moved back file " + hfileOrigPath + " from " + - hfileStagingPath); - } else if(targetFs.exists(hfileStagingPath)){ - LOG.debug("Unable to move back file " + hfileOrigPath + " from " + - hfileStagingPath); - } - } catch(Exception ex) { - LOG.debug("Unable to move back file " + hfileOrigPath + " from " + - hfileStagingPath, ex); - } - } - } - } - } - } - }; - + protected List tryAtomicRegionLoad( + RegionServerCallable svrCallable, final TableName tableName, + final byte[] first, Collection lqis) throws IOException { + List toRetry = new ArrayList<>(); try { - List toRetry = new ArrayList(); Configuration conf = getConf(); - boolean success = RpcRetryingCallerFactory.instantiate(conf, - null). newCaller() + boolean success = RpcRetryingCallerFactory.instantiate(conf, null) + . newCaller() .callWithRetries(svrCallable, Integer.MAX_VALUE); if (!success) { LOG.warn("Attempt to bulk load region containing " - + Bytes.toStringBinary(first) + " into table " - + tableName + " with files " + lqis + + Bytes.toStringBinary(first) + " into table " + tableName + + " with files " + lqis + " failed. This is recoverable and they will be retried."); toRetry.addAll(lqis); // return lqi's to retry } // success return toRetry; } catch (IOException e) { - LOG.error("Encountered unrecoverable error from region server, additional details: " - + svrCallable.getExceptionMessageAdditionalDetail(), e); + LOG.warn( + "Received a " + e.getClass().getSimpleName() + + " from region server: " + + svrCallable.getExceptionMessageAdditionalDetail(), e); + if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) + && numRetries.get() < getConf().getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { + LOG.warn("Will attempt to retry loading failed HFiles. Retry #" + + numRetries.get()); + numRetries.getAndIncrement(); + toRetry.addAll(lqis); + return toRetry; + } + LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover"); throw e; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 26583f3..9c186a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -70,6 +71,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import org.mockito.Spy; import com.google.common.collect.Multimap; import com.google.protobuf.RpcController; @@ -269,38 +271,44 @@ public class TestLoadIncrementalHFilesSplitRecovery { */ @Test(expected=IOException.class, timeout=120000) public void testBulkLoadPhaseFailure() throws Exception { - TableName table = TableName.valueOf("bulkLoadPhaseFailure"); + final TableName table = TableName.valueOf("bulkLoadPhaseFailure"); final AtomicInteger attmptedCalls = new AtomicInteger(); final AtomicInteger failedCalls = new AtomicInteger(); util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); - try (Connection connection = ConnectionFactory.createConnection(this.util.getConfiguration())) { + try (Connection connection = ConnectionFactory.createConnection(util + .getConfiguration())) { setupTable(connection, table, 10); - LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { + LoadIncrementalHFiles lih = new LoadIncrementalHFiles( + util.getConfiguration()) { @Override - protected List tryAtomicRegionLoad(final Connection conn, - TableName tableName, final byte[] first, Collection lqis) - throws IOException { + protected List tryAtomicRegionLoad( + RegionServerCallable serverCallable, TableName tableName, + final byte[] first, Collection lqis) + throws IOException { int i = attmptedCalls.incrementAndGet(); if (i == 1) { - Connection errConn = null; + Connection errConn; try { errConn = getMockedConnection(util.getConfiguration()); + serverCallable = this.buildRegionServerCallable(errConn, table, + first, lqis); } catch (Exception e) { LOG.fatal("mocking cruft, should never happen", e); throw new RuntimeException("mocking cruft, should never happen"); } failedCalls.incrementAndGet(); - return super.tryAtomicRegionLoad((HConnection)errConn, tableName, first, lqis); + return super.tryAtomicRegionLoad(serverCallable, tableName, first, + lqis); } - - return super.tryAtomicRegionLoad((HConnection)conn, tableName, first, lqis); + return super.tryAtomicRegionLoad(serverCallable, tableName, first, + lqis); } }; try { // create HFiles for different column families Path dir = buildBulkFiles(table, 1); try (Table t = connection.getTable(table)) { - lih.doBulkLoad(dir, (HTable)t); + lih.doBulkLoad(dir, (HTable) t); } } finally { util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, @@ -310,26 +318,76 @@ public class TestLoadIncrementalHFilesSplitRecovery { } } + /** + * Test that shows that exception thrown from the RS side will result in the + * expected number of retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} + * when ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set + */ + @Test + public void testRetryOnIOException() throws Exception { + final TableName table = TableName.valueOf("retryOnIOException"); + final AtomicInteger calls = new AtomicInteger(); + final Connection conn = ConnectionFactory.createConnection(util + .getConfiguration()); + util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + util.getConfiguration().setBoolean( + LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); + final LoadIncrementalHFiles lih = new LoadIncrementalHFiles( + util.getConfiguration()) { + protected List tryAtomicRegionLoad( + RegionServerCallable serverCallable, TableName tableName, + final byte[] first, Collection lqis) + throws IOException { + if (calls.getAndIncrement() < util.getConfiguration().getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) { + RegionServerCallable regionServerCallable = new RegionServerCallable( + conn, tableName, first) { + @Override + public Boolean call(int callTimeout) throws Exception { + throw new IOException("Error calling something on RegionServer"); + } + }; + return super.tryAtomicRegionLoad(regionServerCallable, tableName, + first, lqis); + } else { + return super.tryAtomicRegionLoad(serverCallable, tableName, first, + lqis); + } + } + }; + setupTable(conn, table, 10); + Path dir = buildBulkFiles(table, 1); + lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), + conn.getRegionLocator(table)); + util.getConfiguration().setBoolean( + LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); + + } + @SuppressWarnings("deprecation") private HConnection getMockedConnection(final Configuration conf) - throws IOException, ServiceException { + throws IOException, ServiceException { HConnection c = Mockito.mock(HConnection.class); Mockito.when(c.getConfiguration()).thenReturn(conf); Mockito.doNothing().when(c).close(); // Make it so we return a particular location when asked. - final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, - ServerName.valueOf("example.org", 1234, 0)); - Mockito.when(c.getRegionLocation((TableName) Mockito.any(), - (byte[]) Mockito.any(), Mockito.anyBoolean())). - thenReturn(loc); - Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())). - thenReturn(loc); - ClientProtos.ClientService.BlockingInterface hri = - Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); - Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())). - thenThrow(new ServiceException(new IOException("injecting bulk load error"))); - Mockito.when(c.getClient(Mockito.any(ServerName.class))). - thenReturn(hri); + final HRegionLocation loc = new HRegionLocation( + HRegionInfo.FIRST_META_REGIONINFO, ServerName.valueOf("example.org", + 1234, 0)); + Mockito.when( + c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), + Mockito.anyBoolean())).thenReturn(loc); + Mockito.when( + c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())) + .thenReturn(loc); + ClientProtos.ClientService.BlockingInterface hri = Mockito + .mock(ClientProtos.ClientService.BlockingInterface.class); + Mockito.when( + hri.bulkLoadHFile((RpcController) Mockito.any(), + (BulkLoadHFileRequest) Mockito.any())).thenThrow( + new ServiceException(new IOException("injecting bulk load error"))); + Mockito.when(c.getClient(Mockito.any(ServerName.class))).thenReturn(hri); return c; } -- 2.8.4 (Apple Git-73)