diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 6978e23..ac06d08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -280,12 +280,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } + /* + * Populate the Queue with given HFiles + */ + private void populateLoadQueue(final Deque ret, + Map> map) throws IOException { + for (Map.Entry> entry : map.entrySet()) { + for (Path p : entry.getValue()) { + ret.add(new LoadQueueItem(entry.getKey(), p)); + } + } + } + /** * Walk the given directory for all HFiles, and return a Queue * containing all such files. */ private void discoverLoadQueue(final Deque ret, final Path hfofDir, - final boolean validateHFile) throws IOException { + final boolean validateHFile) throws IOException { fs = hfofDir.getFileSystem(getConf()); visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor() { @Override @@ -318,7 +330,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ public void doBulkLoad(Path hfofDir, final Admin admin, Table table, RegionLocator regionLocator) throws TableNotFoundException, IOException { - doBulkLoad(hfofDir, admin, table, regionLocator, false); + doBulkLoad(hfofDir, null, admin, table, regionLocator, false); } /** @@ -327,13 +339,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * * @param hfofDir the directory that was provided as the output path * of a job using HFileOutputFormat + * @param map map of family to List of hfiles * @param admin the Admin * @param table the table to load into * @param regionLocator region locator * @param silence true to ignore unmatched column families * @throws TableNotFoundException if table does not yet exist */ - public void doBulkLoad(Path hfofDir, final Admin admin, Table table, + public void doBulkLoad(Path hfofDir, Map> map, final Admin admin, Table table, RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException { if (!admin.isTableAvailable(regionLocator.getName())) { @@ -359,13 +372,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { "option, consider removing the files and bulkload again without this option. " + "See HBASE-13985"); } - prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); + prepareHFileQueue(hfofDir, map, table, queue, validateHFile, silence); int count = 0; if (queue.isEmpty()) { LOG.warn("Bulk load operation did not find any files to load in " + - "directory " + hfofDir.toUri() + ". Does it contain files in " + + "directory " + hfofDir != null ? hfofDir.toUri() : "" + ". Does it contain files in " + "subdirectories that correspond to column family names?"); return; } @@ -451,7 +464,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ public void prepareHFileQueue(Path hfilesDir, Table table, Deque queue, boolean validateHFile) throws IOException { - prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); + prepareHFileQueue(hfilesDir, null, table, queue, validateHFile, false); } /** @@ -465,9 +478,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @param silence true to ignore unmatched column families * @throws IOException If any I/O or network error occurred */ - public void prepareHFileQueue(Path hfilesDir, Table table, Deque queue, - boolean validateHFile, boolean silence) throws IOException { - discoverLoadQueue(queue, hfilesDir, validateHFile); + public void prepareHFileQueue(Path hfilesDir, Map> map, Table table, + Deque queue, boolean validateHFile, boolean silence) throws IOException { + if (map == null) { + discoverLoadQueue(queue, hfilesDir, validateHFile); + } else { + populateLoadQueue(queue, map); + } validateFamiliesInHFiles(table, queue, silence); } @@ -1073,18 +1090,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.info("Table "+ tableName +" is available!!"); } - @Override - public int run(String[] args) throws Exception { - if (args.length < 2) { - usage(); - return -1; - } - + public int run(String dirPath, Map> map, TableName tableName) throws Exception{ initialize(); try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { - String dirPath = args[0]; - TableName tableName = TableName.valueOf(args[1]); boolean tableExists = admin.tableExists(tableName); if (!tableExists) { @@ -1102,13 +1111,25 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, "")); - doBulkLoad(hfofDir, admin, table, locator, silence); + doBulkLoad(hfofDir, null, admin, table, locator, silence); } } return 0; } + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage(); + return -1; + } + + String dirPath = args[0]; + TableName tableName = TableName.valueOf(args[1]); + return run(dirPath, null, tableName); + } + public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);