From 8bb80c43669bb3d8190bd0c352e25f6b24c569f3 Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Wed, 6 Dec 2017 12:06:19 -0800 Subject: [PATCH] Add more flexibility for input directory structure to LoadIncrementalHFiles --- .../hadoop/hbase/tool/LoadIncrementalHFiles.java | 37 ++++++++++-- .../hbase/tool/TestLoadIncrementalHFiles.java | 66 ++++++++++++++++------ 2 files changed, 80 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index c457e224da..010509bf8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -49,6 +49,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.util.FSVisitor; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClientServiceCallable; @@ -181,10 +184,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } private void usage() { - System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" + - CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + - " Note: if you set this to 'no', then the target table must already exist in HBase\n -D" + - IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" + + System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename -loadTable" + + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by " + + "this tool\n Note: if you set this to 'no', then the target table must already exist " + + "in HBase\n -loadTable implies your baseDirectory to store file has a depth of 3 ,you" + + " must have an existing table\n-D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used " + + "to ignore unmatched column families\n" + "\n"); } @@ -1178,13 +1183,33 @@ public class LoadIncrementalHFiles extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - if (args.length < 2) { + if (args.length != 2 && args.length != 3) { usage(); return -1; } String dirPath = args[0]; TableName tableName = TableName.valueOf(args[1]); - return !run(dirPath, tableName).isEmpty() ? 0 : -1; + + + if (args.length == 2) { + return !run(dirPath, tableName).isEmpty() ? 0 : -1; + } else { + Map> family2Files = Maps.newHashMap(); + FileSystem fs = FileSystem.get(getConf()); + for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) { + FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> { + Path path = new Path(regionDir.getPath(), new Path(family, hfileName)); + byte[] familyName = Bytes.toBytes(family); + if (family2Files.containsKey(familyName)) { + family2Files.get(familyName).add(path); + } else { + family2Files.put(familyName, Lists.newArrayList(path)); + } + }); + } + return !run(family2Files, tableName).isEmpty() ? 0 : -1; + } + } public static void main(String[] args) throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java index 7e4d40edf3..32bbdf3414 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem; @@ -131,10 +132,10 @@ public class TestLoadIncrementalHFiles { public void testSimpleLoadWithFileCopy() throws Exception { String testName = tn.getMethodName(); final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); - runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE, - false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), + false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, - false, true); + false, true, 2); } /** @@ -257,29 +258,56 @@ public class TestLoadIncrementalHFiles { // Run the test bulkloading the table to the default namespace final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, - useMap); + useMap, 2); + + + /* Run the test bulkloading the table from a depth of 3 + directory structure is now + baseDirectory + -- regionDir + -- familyDir + -- storeFileDir + */ + if (preCreateTable) { + runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, + false, 3); + } // Run the test bulkloading the table to the specified namespace final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, - useMap); + useMap, 2); } private void runTest(String testName, TableName tableName, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, + int depth) throws Exception { TableDescriptor htd = buildHTD(tableName, bloomType); - runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false); + runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth); + } + + public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util, + byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, + byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, + int initRowCount, int factor) throws Exception { + return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges, useMap, + deleteFile, copyFiles, initRowCount, factor, 2); } public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util, byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, - int initRowCount, int factor) throws Exception { - Path dir = util.getDataTestDirOnTestFS(testName); + int initRowCount, int factor, int depth) throws Exception { + Path baseDirectory = util.getDataTestDirOnTestFS(testName); FileSystem fs = util.getTestFileSystem(); - dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); - Path familyDir = new Path(dir, Bytes.toString(fam)); + baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path parentDir = baseDirectory; + if (depth == 3) { + assert !useMap; + parentDir = new Path(baseDirectory, "someRegion"); + } + Path familyDir = new Path(parentDir, Bytes.toString(fam)); int hfileIdx = 0; Map> map = null; @@ -314,7 +342,11 @@ public class TestLoadIncrementalHFiles { conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); } LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); - String[] args = { dir.toString(), tableName.toString() }; + List args = Lists.newArrayList(baseDirectory.toString(), tableName.toString()); + if (depth == 3) { + args.add("-loadTable"); + } + if (useMap) { if (deleteFile) { fs.delete(last, true); @@ -329,7 +361,7 @@ public class TestLoadIncrementalHFiles { } } } else { - loader.run(args); + loader.run(args.toArray(new String[]{})); } if (copyFiles) { @@ -348,11 +380,11 @@ public class TestLoadIncrementalHFiles { return expectedRows; } - private void runTest(String testName, TableDescriptor htd, BloomType bloomType, + private void runTest(String testName, TableDescriptor htd, boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, - boolean copyFiles) throws Exception { + boolean copyFiles, int depth) throws Exception { loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges, - useMap, true, copyFiles, 0, 1000); + useMap, true, copyFiles, 0, 1000, depth); final TableName tableName = htd.getTableName(); // verify staging folder has been cleaned up @@ -430,7 +462,7 @@ public class TestLoadIncrementalHFiles { .build(); try { - runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false); + runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2); assertTrue("Loading into table with non-existent family should have failed", false); } catch (Exception e) { assertTrue("IOException expected", e instanceof IOException); -- 2.15.0