diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 6978e23..94d5483 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -280,12 +280,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } + /* + * Populate the Queue with given HFiles + */ + private void populateLoadQueue(final Deque ret, + Map> map) throws IOException { + for (Map.Entry> entry : map.entrySet()) { + for (Path p : entry.getValue()) { + ret.add(new LoadQueueItem(entry.getKey(), p)); + } + } + } + /** * Walk the given directory for all HFiles, and return a Queue * containing all such files. */ private void discoverLoadQueue(final Deque ret, final Path hfofDir, - final boolean validateHFile) throws IOException { + final boolean validateHFile) throws IOException { fs = hfofDir.getFileSystem(getConf()); visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor() { @Override @@ -325,6 +337,33 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * Perform a bulk load of the given directory into the given * pre-existing table. This method is not threadsafe. * + * @param map map of family to List of hfiles + * @param admin the Admin + * @param table the table to load into + * @param regionLocator region locator + * @param silence true to ignore unmatched column families + * @throws TableNotFoundException if table does not yet exist + */ + public void doBulkLoad(Map> map, final Admin admin, Table table, + RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException { + if (!admin.isTableAvailable(regionLocator.getName())) { + throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); + } + // LQI queue does not need to be threadsafe -- all operations on this queue + // happen in this thread + Deque queue = new LinkedList<>(); + prepareHFileQueue(map, table, queue, false, silence); + if (queue.isEmpty()) { + LOG.warn("Bulk load operation did not get any files to load"); + return; + } + performBulkLoad(admin, table, regionLocator, queue); + } + + /** + * Perform a bulk load of the given directory into the given + * pre-existing table. This method is not threadsafe. + * * @param hfofDir the directory that was provided as the output path * of a job using HFileOutputFormat * @param admin the Admin @@ -335,41 +374,44 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ public void doBulkLoad(Path hfofDir, final Admin admin, Table table, RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException { - if (!admin.isTableAvailable(regionLocator.getName())) { throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); } - ExecutorService pool = createExecutorService(); - + /* + * Checking hfile format is a time-consuming operation, we should have an option to skip + * this step when bulkloading millions of HFiles. See HBASE-13985. + */ + boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); + if (!validateHFile) { + LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + + "are not correct. If you fail to read data from your table after using this " + + "option, consider removing the files and bulkload again without this option. " + + "See HBASE-13985"); + } // LQI queue does not need to be threadsafe -- all operations on this queue // happen in this thread Deque queue = new LinkedList<>(); + prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); + + if (queue.isEmpty()) { + LOG.warn("Bulk load operation did not find any files to load in " + + "directory " + hfofDir != null ? hfofDir.toUri() : "" + ". Does it contain files in " + + "subdirectories that correspond to column family names?"); + return; + } + performBulkLoad(admin, table, regionLocator, queue); + } + + void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator, + Deque queue) throws IOException { + ExecutorService pool = createExecutorService(); + SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); try { - /* - * Checking hfile format is a time-consuming operation, we should have an option to skip - * this step when bulkloading millions of HFiles. See HBASE-13985. - */ - boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); - if(!validateHFile) { - LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + - "are not correct. If you fail to read data from your table after using this " + - "option, consider removing the files and bulkload again without this option. " + - "See HBASE-13985"); - } - prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); - int count = 0; - if (queue.isEmpty()) { - LOG.warn("Bulk load operation did not find any files to load in " + - "directory " + hfofDir.toUri() + ". Does it contain files in " + - "subdirectories that correspond to column family names?"); - return; - } - if(isSecureBulkLoadEndpointAvailable()) { LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); LOG.warn("Secure bulk load has been integrated into HBase core."); @@ -465,12 +507,29 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @param silence true to ignore unmatched column families * @throws IOException If any I/O or network error occurred */ - public void prepareHFileQueue(Path hfilesDir, Table table, Deque queue, - boolean validateHFile, boolean silence) throws IOException { + public void prepareHFileQueue(Path hfilesDir, Table table, + Deque queue, boolean validateHFile, boolean silence) throws IOException { discoverLoadQueue(queue, hfilesDir, validateHFile); validateFamiliesInHFiles(table, queue, silence); } + /** + * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the + * passed directory and validates whether the prepared queue has all the valid table column + * families in it. + * @param hfilesDir directory containing list of hfiles to be loaded into the table + * @param table table to which hfiles should be loaded + * @param queue queue which needs to be loaded into the table + * @param validateHFile if true hfiles will be validated for its format + * @param silence true to ignore unmatched column families + * @throws IOException If any I/O or network error occurred + */ + public void prepareHFileQueue(Map> map, Table table, + Deque queue, boolean validateHFile, boolean silence) throws IOException { + populateLoadQueue(queue, map); + validateFamiliesInHFiles(table, queue, silence); + } + // Initialize a thread pool private ExecutorService createExecutorService() { ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); @@ -1073,27 +1132,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.info("Table "+ tableName +" is available!!"); } - @Override - public int run(String[] args) throws Exception { - if (args.length < 2) { - usage(); - return -1; - } - + public int run(String dirPath, Map> map, TableName tableName) throws Exception{ initialize(); try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { - String dirPath = args[0]; - TableName tableName = TableName.valueOf(args[1]); - - boolean tableExists = admin.tableExists(tableName); - if (!tableExists) { - if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) { - this.createTable(tableName, dirPath, admin); - } else { - String errorMsg = format("Table '%s' does not exist.", tableName); - LOG.error(errorMsg); - throw new TableNotFoundException(errorMsg); + + if (dirPath != null) { + boolean tableExists = admin.tableExists(tableName); + if (!tableExists) { + if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) { + this.createTable(tableName, dirPath, admin); + } else { + String errorMsg = format("Table '%s' does not exist.", tableName); + LOG.error(errorMsg); + throw new TableNotFoundException(errorMsg); + } } } @@ -1109,6 +1162,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return 0; } + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage(); + return -1; + } + + String dirPath = args[0]; + TableName tableName = TableName.valueOf(args[1]); + return run(dirPath, null, tableName); + } + public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);