diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 980df1c..97664b0 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -702,6 +702,24 @@ public final class HConstants { /** Configuration key for the directory to backup HFiles for a table */ public static final String HFILE_ARCHIVE_DIRECTORY = "hbase.table.archive.directory"; + /* MR-related properties */ + + /** Parameter to toggle if the bulkloadable files must be assigned sequence numbers or not, during bulkload + * to an RS */ + public static String HBASE_MR_BULKLOAD_ASSIGN_SEQUENCENUMBERS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; + /** Default for {@link #HBASE_MR_BULKLOAD_ASSIGN_SEQUENCENUMBERS} */ + public static boolean DEFAULT_HBASE_MR_BULKLOAD_ASSIGN_SEQUENCENUMBERS = true; + + /** Parameter to control the maximum parallelism of the bulkloading client program */ + public static String HBASE_LOADINCREMENTAL_THREADS_MAX = "hbase.loadincremental.threads.max"; + + /** Parameter to control the number of retries the bulkload operation from the + * client may do, upon unsuccessful bulkload in the first round. + */ + public static String HBASE_BULKLOAD_RETRIES_NUMBER = "hbase.bulkload.retries.number"; + /** Default for {@link #HBASE_BULKLOAD_RETRIES_NUMBER} */ + public static int DEFAULT_HBASE_BULKLOAD_RETRIES_NUMBER = 0; + /** * QOS attributes: these attributes are used to demarcate RPC call processing * by different set of handlers. For example, HIGH_QOS tagged methods are diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index eb2c476..5e3a533 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -94,17 +94,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class); static AtomicLong regionCount = new AtomicLong(0); private HBaseAdmin hbAdmin; - private Configuration cfg; public static String NAME = "completebulkload"; - private static String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; + private boolean assignSeqIds; - public LoadIncrementalHFiles(Configuration conf) throws Exception { - super(conf); - this.cfg = conf; - this.hbAdmin = new HBaseAdmin(conf); - assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); + public LoadIncrementalHFiles() throws Exception { + setConf(HBaseConfiguration.create(getConf())); + this.hbAdmin = new HBaseAdmin(getConf()); + assignSeqIds = getConf().getBoolean( + HConstants.HBASE_MR_BULKLOAD_ASSIGN_SEQUENCENUMBERS, + HConstants.DEFAULT_HBASE_MR_BULKLOAD_ASSIGN_SEQUENCENUMBERS); } private void usage() { @@ -191,7 +191,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } // initialize thread pools - int nrThreads = cfg.getInt("hbase.loadincremental.threads.max", + int nrThreads = getConf().getInt(HConstants.HBASE_LOADINCREMENTAL_THREADS_MAX, Runtime.getRuntime().availableProcessors()); ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setNameFormat("LoadIncrementalHFiles-%1$d"); @@ -224,7 +224,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + count + " with " + queue.size() + " files remaining to group or split"); } - int maxRetries = cfg.getInt("hbase.bulkload.retries.number", 0); + int maxRetries = getConf().getInt( + HConstants.HBASE_BULKLOAD_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_BULKLOAD_RETRIES_NUMBER); if (maxRetries != 0 && count >= maxRetries) { LOG.error("Retry attempted " + count + " times without completing, bailing out"); return; @@ -698,14 +700,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { if (!tableExists) this.createTable(tableName,dirPath); Path hfofDir = new Path(dirPath); - HTable table = new HTable(this.cfg, tableName); + HTable table = new HTable(getConf(), tableName); doBulkLoad(hfofDir, table); return 0; } public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args); + int ret = ToolRunner.run(new LoadIncrementalHFiles(), args); System.exit(ret); }