From 066be41458d52ee3ffb3ce44a156b20723b4527a Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Sun, 14 Jun 2015 15:44:49 -0700 Subject: [PATCH] HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path,HTable) doesn't handle unmanaged connections when using SecureBulkLoad --- .../hbase/mapreduce/LoadIncrementalHFiles.java | 30 ++++++-- .../hbase/mapreduce/TestLoadIncrementalHFiles.java | 79 ++++++++++++---------- 2 files changed, 69 insertions(+), 40 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 827699b..417deec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -291,15 +292,32 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throws TableNotFoundException, IOException { Admin admin = null; + Table t = table; + Connection conn = table.getConnection(); + boolean closeConnWhenFinished = false; try { - try { - admin = table.getConnection().getAdmin(); - } catch (NeedUnmanagedConnectionException ex) { - admin = new HBaseAdmin(table.getConfiguration()); + if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) { + LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection."); + // can only use unmanaged connections from here on out. + conn = ConnectionFactory.createConnection(table.getConfiguration()); + t = conn.getTable(table.getName()); + closeConnWhenFinished = true; + if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) { + throw new RuntimeException("Failed to create unmanaged connection."); + } + admin = conn.getAdmin(); + } else { + admin = conn.getAdmin(); + } + try (RegionLocator rl = conn.getRegionLocator(t.getName())) { + doBulkLoad(hfofDir, admin, t, rl); } - doBulkLoad(hfofDir, admin, table, table.getRegionLocator()); } finally { - admin.close(); + if (admin != null) admin.close(); + if (closeConnWhenFinished) { + t.close(); + conn.close(); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 0b4dc56..21f4e10 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; @@ -232,47 +234,56 @@ public class TestLoadIncrementalHFiles { private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { - Path dir = util.getDataTestDirOnTestFS(testName); - FileSystem fs = util.getTestFileSystem(); - dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(FAMILY)); - int hfileIdx = 0; - for (byte[][] range : hfileRanges) { - byte[] from = range[0]; - byte[] to = range[1]; - HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" - + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000); - } - int expectedRows = hfileIdx * 1000; + for (boolean managed : new boolean[] { true, false }) { + Path dir = util.getDataTestDirOnTestFS(testName); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000); + } + int expectedRows = hfileIdx * 1000; - if (preCreateTable) { - util.getHBaseAdmin().createTable(htd, tableSplitKeys); - } + if (preCreateTable) { + util.getHBaseAdmin().createTable(htd, tableSplitKeys); + } - final TableName tableName = htd.getTableName(); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); - String [] args= {dir.toString(), tableName.toString()}; - loader.run(args); + final TableName tableName = htd.getTableName(); + if (!util.getHBaseAdmin().tableExists(tableName)) { + util.getHBaseAdmin().createTable(htd); + } + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); - Table table = new HTable(util.getConfiguration(), tableName); - try { - assertEquals(expectedRows, util.countRows(table)); - } finally { - table.close(); - } + if (managed) { + try (HTable table = new HTable(util.getConfiguration(), tableName)) { + loader.doBulkLoad(dir, table); + assertEquals(expectedRows, util.countRows(table)); + } + } else { + try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); + HTable table = (HTable) conn.getTable(tableName)) { + loader.doBulkLoad(dir, table); + } + } - // verify staging folder has been cleaned up - Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration()); - if(fs.exists(stagingBasePath)) { - FileStatus[] files = fs.listStatus(stagingBasePath); - for(FileStatus file : files) { - assertTrue("Folder=" + file.getPath() + " is not cleaned up.", - file.getPath().getName() != "DONOTERASE"); + // verify staging folder has been cleaned up + Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration()); + if (fs.exists(stagingBasePath)) { + FileStatus[] files = fs.listStatus(stagingBasePath); + for (FileStatus file : files) { + assertTrue("Folder=" + file.getPath() + " is not cleaned up.", + file.getPath().getName() != "DONOTERASE"); + } } - } - util.deleteTable(tableName); + util.deleteTable(tableName); + } } /** -- 1.9.5 (Apple Git-50.3)