From 11b17a6a0031c95400cdd5c0e63a919b6dae6c20 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Thu, 11 Jun 2015 11:54:57 -0700 Subject: [PATCH] HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path,HTable) doesn't handle unmanaged connections when using SecureBulkLoad (Nick Dimiduk) --- .../hbase/mapreduce/LoadIncrementalHFiles.java | 39 ++++++++++++++++------ 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 05ac012..894d63f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -288,11 +289,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public void doBulkLoad(Path hfofDir, final HTable table) throws TableNotFoundException, IOException { - final HConnection conn = table.getConnection(); + boolean closeConnWhenFinished = false; + HConnection conn = table.getConnection(); + Table t = table; + + if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) { + LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection."); + // can only use unmanaged connections from here on out. + conn = (HConnection) ConnectionFactory.createConnection(table.getConfiguration()); + t = conn.getTable(table.getName()); + closeConnWhenFinished = true; + if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) { + throw new RuntimeException("Failed to create unmanaged connection."); + } + } - if (!conn.isTableAvailable(table.getName())) { + if (!conn.isTableAvailable(t.getName())) { throw new TableNotFoundException("Table " + - Bytes.toStringBinary(table.getTableName()) + + Bytes.toStringBinary(t.getName().getName()) + "is not currently available."); } @@ -313,7 +327,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { discoverLoadQueue(queue, hfofDir); // check whether there is invalid family name in HFiles to be bulkloaded - Collection families = table.getTableDescriptor().getFamilies(); + Collection families = t.getTableDescriptor().getFamilies(); ArrayList familyNames = new ArrayList(families.size()); for (HColumnDescriptor family : families) { familyNames.add(family.getNameAsString()); @@ -331,7 +345,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { String msg = "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + unmatchedFamilies + "; valid family names of table " - + Bytes.toString(table.getTableName()) + " are: " + familyNames; + + Bytes.toString(t.getName().getName()) + " are: " + familyNames; LOG.error(msg); throw new IOException(msg); } @@ -349,13 +363,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // fs is the source filesystem fsDelegationToken.acquireDelegationToken(fs); if(isSecureBulkLoadEndpointAvailable()) { - bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName()); + bulkToken = new SecureBulkLoadClient(t).prepareBulkLoad(t.getName()); } // Assumes that region splits can happen while this occurs. while (!queue.isEmpty()) { // need to reload split keys each iteration. - final Pair startEndKeys = table.getStartEndKeys(); + final Pair startEndKeys = + conn.getRegionLocator(t.getName()).getStartEndKeys(); if (count != 0) { LOG.info("Split occured while grouping HFiles, retry attempt " + + count + " with " + queue.size() + " files remaining to group or split"); @@ -369,7 +384,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { count++; // Using ByteBuffer for byte[] equality semantics - Multimap regionGroups = groupOrSplitPhase(table, + Multimap regionGroups = groupOrSplitPhase((HTable) t, pool, queue, startEndKeys); if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { @@ -378,7 +393,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + " hfiles to one family of one region"); } - bulkLoadPhase(table, conn, pool, queue, regionGroups); + bulkLoadPhase(t, conn, pool, queue, regionGroups); // NOTE: The next iteration's split / group could happen in parallel to // atomic bulkloads assuming that there are splits and no merges, and @@ -388,7 +403,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } finally { fsDelegationToken.releaseDelegationToken(); if(bulkToken != null) { - new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken); + new SecureBulkLoadClient(t).cleanupBulkLoad(bulkToken); } pool.shutdown(); if (queue != null && !queue.isEmpty()) { @@ -401,6 +416,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } LOG.error(err); } + if (closeConnWhenFinished) { + t.close(); + conn.close(); + } } if (queue != null && !queue.isEmpty()) { -- 1.9.5 (Apple Git-50.3)