From 15d9326090dc6890682e568e24587505f1b604c8 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Sun, 14 Jun 2015 15:45:10 -0700 Subject: [PATCH] HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path,HTable) doesn't handle unmanaged connections when using SecureBulkLoad --- .../hbase/mapreduce/LoadIncrementalHFiles.java | 81 ++++++++++++++-------- .../hbase/mapreduce/TestLoadIncrementalHFiles.java | 79 ++++++++++++--------- 2 files changed, 96 insertions(+), 64 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 05ac012..8d4a002 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -65,11 +65,13 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.Table; @@ -288,11 +290,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public void doBulkLoad(Path hfofDir, final HTable table) throws TableNotFoundException, IOException { - final HConnection conn = table.getConnection(); + boolean closeConnWhenFinished = false; + HConnection conn = table.getConnection(); + Table t = table; + + if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) { + LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection."); + // can only use unmanaged connections from here on out. + conn = (HConnection) ConnectionFactory.createConnection(table.getConfiguration()); + t = conn.getTable(table.getName()); + closeConnWhenFinished = true; + if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) { + throw new RuntimeException("Failed to create unmanaged connection."); + } + } - if (!conn.isTableAvailable(table.getName())) { + if (!conn.isTableAvailable(t.getName())) { throw new TableNotFoundException("Table " + - Bytes.toStringBinary(table.getTableName()) + + Bytes.toStringBinary(t.getName().getName()) + "is not currently available."); } @@ -313,7 +328,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { discoverLoadQueue(queue, hfofDir); // check whether there is invalid family name in HFiles to be bulkloaded - Collection families = table.getTableDescriptor().getFamilies(); + Collection families = t.getTableDescriptor().getFamilies(); ArrayList familyNames = new ArrayList(families.size()); for (HColumnDescriptor family : families) { familyNames.add(family.getNameAsString()); @@ -331,7 +346,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { String msg = "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + unmatchedFamilies + "; valid family names of table " - + Bytes.toString(table.getTableName()) + " are: " + familyNames; + + Bytes.toString(t.getName().getName()) + " are: " + familyNames; LOG.error(msg); throw new IOException(msg); } @@ -349,46 +364,48 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // fs is the source filesystem fsDelegationToken.acquireDelegationToken(fs); if(isSecureBulkLoadEndpointAvailable()) { - bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName()); + bulkToken = new SecureBulkLoadClient(t).prepareBulkLoad(t.getName()); } // Assumes that region splits can happen while this occurs. while (!queue.isEmpty()) { // need to reload split keys each iteration. - final Pair startEndKeys = table.getStartEndKeys(); - if (count != 0) { - LOG.info("Split occured while grouping HFiles, retry attempt " + - + count + " with " + queue.size() + " files remaining to group or split"); - } + try (RegionLocator rl = conn.getRegionLocator(t.getName())) { + final Pair startEndKeys = rl.getStartEndKeys(); + if (count != 0) { + LOG.info("Split occured while grouping HFiles, retry attempt " + + +count + " with " + queue.size() + " files remaining to group or split"); + } - int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10); - if (maxRetries != 0 && count >= maxRetries) { - throw new IOException("Retry attempted " + count + - " times without completing, bailing out"); - } - count++; + int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10); + if (maxRetries != 0 && count >= maxRetries) { + throw new IOException("Retry attempted " + count + + " times without completing, bailing out"); + } + count++; - // Using ByteBuffer for byte[] equality semantics - Multimap regionGroups = groupOrSplitPhase(table, - pool, queue, startEndKeys); + // Using ByteBuffer for byte[] equality semantics + Multimap regionGroups = groupOrSplitPhase((HTable) t, + pool, queue, startEndKeys); - if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { - // Error is logged inside checkHFilesCountPerRegionPerFamily. - throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily - + " hfiles to one family of one region"); - } + if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { + // Error is logged inside checkHFilesCountPerRegionPerFamily. + throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily + + " hfiles to one family of one region"); + } - bulkLoadPhase(table, conn, pool, queue, regionGroups); + bulkLoadPhase(t, conn, pool, queue, regionGroups); - // NOTE: The next iteration's split / group could happen in parallel to - // atomic bulkloads assuming that there are splits and no merges, and - // that we can atomically pull out the groups we want to retry. + // NOTE: The next iteration's split / group could happen in parallel to + // atomic bulkloads assuming that there are splits and no merges, and + // that we can atomically pull out the groups we want to retry. + } } } finally { fsDelegationToken.releaseDelegationToken(); if(bulkToken != null) { - new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken); + new SecureBulkLoadClient(t).cleanupBulkLoad(bulkToken); } pool.shutdown(); if (queue != null && !queue.isEmpty()) { @@ -401,6 +418,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } LOG.error(err); } + if (closeConnWhenFinished) { + t.close(); + conn.close(); + } } if (queue != null && !queue.isEmpty()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index d5a9a86..51ba98a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.NamespaceDescriptor; @@ -232,47 +234,56 @@ public class TestLoadIncrementalHFiles { private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { - Path dir = util.getDataTestDirOnTestFS(testName); - FileSystem fs = util.getTestFileSystem(); - dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(FAMILY)); - int hfileIdx = 0; - for (byte[][] range : hfileRanges) { - byte[] from = range[0]; - byte[] to = range[1]; - HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" - + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000); - } - int expectedRows = hfileIdx * 1000; + for (boolean managed : new boolean[] { true, false }) { + Path dir = util.getDataTestDirOnTestFS(testName); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000); + } + int expectedRows = hfileIdx * 1000; - if (preCreateTable) { - util.getHBaseAdmin().createTable(htd, tableSplitKeys); - } + if (preCreateTable) { + util.getHBaseAdmin().createTable(htd, tableSplitKeys); + } - final TableName tableName = htd.getTableName(); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); - String [] args= {dir.toString(), tableName.toString()}; - loader.run(args); + final TableName tableName = htd.getTableName(); + if (!util.getHBaseAdmin().tableExists(tableName)) { + util.getHBaseAdmin().createTable(htd); + } + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); - Table table = new HTable(util.getConfiguration(), tableName); - try { - assertEquals(expectedRows, util.countRows(table)); - } finally { - table.close(); - } + if (managed) { + try (HTable table = new HTable(util.getConfiguration(), tableName)) { + loader.doBulkLoad(dir, table); + assertEquals(expectedRows, util.countRows(table)); + } + } else { + try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); + HTable table = (HTable) conn.getTable(tableName)) { + loader.doBulkLoad(dir, table); + } + } - // verify staging folder has been cleaned up - Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration()); - if(fs.exists(stagingBasePath)) { - FileStatus[] files = fs.listStatus(stagingBasePath); - for(FileStatus file : files) { - assertTrue("Folder=" + file.getPath() + " is not cleaned up.", - file.getPath().getName() != "DONOTERASE"); + // verify staging folder has been cleaned up + Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration()); + if (fs.exists(stagingBasePath)) { + FileStatus[] files = fs.listStatus(stagingBasePath); + for (FileStatus file : files) { + assertTrue("Folder=" + file.getPath() + " is not cleaned up.", + file.getPath().getName() != "DONOTERASE"); + } } - } - util.deleteTable(tableName); + util.deleteTable(tableName); + } } /** -- 1.9.5 (Apple Git-50.3)