Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1088204) +++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -230,6 +230,22 @@ throws IOException, RuntimeException; /** + * Pass in List of ServerCallables with your particular bit of logic defined and + * this method will manage the process of doing retries with timed waits + * and refinds of missing regions. + * + * @param the type of the return value + * @param pool ExecutorService that is used to execute callables + * @param callables List of callables to run + * @param results array to receive results of the callables, null to ignore return values + * @throws IOException if a remote or network exception occurs + * @throws RuntimeException other unspecified error + */ + public void getRegionServerWithRetries(ExecutorService pool, + List> callables, Object[] results) + throws IOException, RuntimeException; + + /** * Pass in a ServerCallable with your particular bit of logic defined and * this method will pass it to the defined region server. * @param the type of the return value Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1088204) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -1021,6 +1021,38 @@ return null; } + public void getRegionServerWithRetries(ExecutorService pool, + List> callables, Object[] results) + throws IOException, RuntimeException { + // results must be the same size as callables + if (results != null && results.length != callables.size()) { + throw new IllegalArgumentException( + "argument results must be the same size as argument callables"); + } + List> futures = new ArrayList>(callables.size()); + for (final ServerCallable svrCallable : callables) { + // create Callable since instantiateServer() must be called before + // invocation of each call() + Callable callable = new Callable() { + public T call() throws Exception { + return getRegionServerWithRetries(svrCallable); + } + }; + futures.add(pool.submit(callable)); + } + for (int i = 0; i < futures.size(); i++) { + Future future = futures.get(i); + try { + T ret = future.get(); + if (results != null) { + results[i] = ret; + } + } catch (Exception e) { + LOG.warn(e); + } + } + } + public T getRegionServerWithoutRetries(ServerCallable callable) throws IOException, RuntimeException { try { @@ -1188,6 +1220,8 @@ HServerAddress [] lastServers = new HServerAddress[results.length]; List workingList = new ArrayList(list); boolean retry = true; + // count that helps presize actions array + int actionCount = 0; Throwable singleRowCause = null; for (int tries = 0; tries < numRetries && retry; ++tries) { @@ -1278,6 +1312,7 @@ // order), so they can be retried. retry = false; workingList.clear(); + actionCount = 0; for (int i = 0; i < results.length; i++) { // if null (fail) or instanceof Throwable && not instanceof DNRIOE // then retry that row. else dont. @@ -1286,11 +1321,14 @@ !(results[i] instanceof DoNotRetryIOException))) { retry = true; - + actionCount++; Row row = list.get(i); workingList.add(row); deleteCachedLocation(tableName, row.getRow()); } else { + if (results[i] != null && results[i] instanceof Throwable) { + actionCount++; + } // add null to workingList, so the order remains consistent with the original list argument. workingList.add(null); } @@ -1305,9 +1343,9 @@ } - List exceptions = new ArrayList(); - List actions = new ArrayList(); - List addresses = new ArrayList(); + List exceptions = new ArrayList(actionCount); + List actions = new ArrayList(actionCount); + List addresses = new ArrayList(actionCount); for (int i = 0 ; i < results.length; i++) { if (results[i] == null || results[i] instanceof Throwable) { Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1088204) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -1257,13 +1257,13 @@ } } - static class DaemonThreadFactory implements ThreadFactory { + public static class DaemonThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; - DaemonThreadFactory() { + public DaemonThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); Index: src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (revision 1088765) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (working copy) @@ -21,10 +21,18 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.Runtime; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Deque; import java.util.LinkedList; +import java.util.List; import java.util.Map; -import java.util.ArrayList; +import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,10 +42,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.ServerCallable; @@ -50,12 +61,9 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import java.util.TreeMap; /** @@ -67,23 +75,18 @@ private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class); private static final int TABLE_CREATE_MAX_RETRIES = 20; private static final long TABLE_CREATE_SLEEP = 60000; + static long regionCount = 0; private HBaseAdmin hbAdmin; + private Configuration cfg; public static String NAME = "completebulkload"; public LoadIncrementalHFiles(Configuration conf) throws Exception { super(conf); + this.cfg = conf; this.hbAdmin = new HBaseAdmin(conf); } - /* This constructor does not add HBase configuration. - * Explicit addition is necessary. Do we need this constructor? - */ - public LoadIncrementalHFiles() { - super(); - } - - private void usage() { System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output " + @@ -165,13 +168,29 @@ } Deque queue = null; + int nrThreads = cfg.getInt("hbase.loadincremental.threads.max", + Runtime.getRuntime().availableProcessors()); + ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + new HTable.DaemonThreadFactory()); + ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true); + List> callables = new ArrayList>(); try { queue = discoverLoadQueue(hfofDir); + // outer loop picks up LoadQueueItem due to HFile split while (!queue.isEmpty()) { - LoadQueueItem item = queue.remove(); - tryLoad(item, conn, table.getTableName(), queue); + Pair startEndKeys = table.getStartEndKeys(); + // inner loop groups callables + while (!queue.isEmpty()) { + LoadQueueItem item = queue.remove(); + tryLoad(item, conn, table, queue, startEndKeys, callables); + } + conn.getRegionServerWithRetries(pool, callables, null); + callables.clear(); } } finally { + pool.shutdown(); if (queue != null && !queue.isEmpty()) { StringBuilder err = new StringBuilder(); err.append("-------------------------------------------------\n"); @@ -184,16 +203,50 @@ } } } + + // unique file name for the table + String getUniqueName(byte[] tableName) { + String name = Bytes.toStringBinary(tableName) + "," + regionCount; + regionCount++; + return name; + } + + void splitStoreFileAndRequeue(final LoadQueueItem item, + final Deque queue, final HTable table, + byte[] startKey, byte[] splitKey) throws IOException { + final Path hfilePath = item.hfilePath; + + // We use a '_' prefix which is ignored when walking directory trees + // above. + final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp"); + + LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + + "region. Splitting..."); + String uniqueName = getUniqueName(table.getTableName()); + HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family); + Path botOut = new Path(tmpDir, uniqueName + ".bottom"); + Path topOut = new Path(tmpDir, uniqueName + ".top"); + splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, + botOut, topOut); + + // Add these back at the *front* of the queue, so there's a lower + // chance that the region will just split again before we get there. + queue.addFirst(new LoadQueueItem(item.family, botOut)); + queue.addFirst(new LoadQueueItem(item.family, topOut)); + LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); + } + /** * Attempt to load the given load queue item into its target region server. * If the hfile boundary no longer fits into a region, physically splits * the hfile such that the new bottom half will fit, and adds the two * resultant hfiles back into the load queue. */ - private void tryLoad(final LoadQueueItem item, - HConnection conn, final byte[] table, - final Deque queue) + private boolean tryLoad(final LoadQueueItem item, + HConnection conn, final HTable table, + final Deque queue, Pair startEndKeys, + List> callables) throws IOException { final Path hfilePath = item.hfilePath; final FileSystem fs = hfilePath.getFileSystem(getConf()); @@ -213,43 +266,38 @@ if (first == null || last == null) { assert first == null && last == null; LOG.info("hfile " + hfilePath + " has no entries, skipping"); - return; + return false; } + if (Bytes.compareTo(first, last) > 0) { + throw new IllegalArgumentException( + "Invalid range: " + Bytes.toStringBinary(first) + + " > " + Bytes.toStringBinary(last)); + } + int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR); + if (idx < 0) { + idx = -(idx+1)-1; + } + boolean lastKeyInRange = + Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 || + Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); + if (!lastKeyInRange) { + splitStoreFileAndRequeue(item, queue, table, + startEndKeys.getFirst()[idx], startEndKeys.getSecond()[idx]); + return true; + } - // We use a '_' prefix which is ignored when walking directory trees - // above. - final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp"); - - conn.getRegionServerWithRetries( - new ServerCallable(conn, table, first) { + callables.add(new ServerCallable(conn, table.getTableName(), first) { @Override public Void call() throws Exception { LOG.debug("Going to connect to server " + location + "for row " + Bytes.toStringBinary(row)); - HRegionInfo hri = location.getRegionInfo(); - if (!hri.containsRange(first, last)) { - LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + - "region. Splitting..."); - HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family); - Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom"); - Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top"); - splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(), - botOut, topOut); - - // Add these back at the *front* of the queue, so there's a lower - // chance that the region will just split again before we get there. - queue.addFirst(new LoadQueueItem(item.family, botOut)); - queue.addFirst(new LoadQueueItem(item.family, topOut)); - LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); - return null; - } - byte[] regionName = location.getRegionInfo().getRegionName(); server.bulkLoadHFile(hfilePath.toString(), regionName, item.family); return null; } }); + return false; } /** @@ -419,7 +467,7 @@ System.err.println("Caught Socket timeout.. Mostly caused by a slow region assignment by master!"); } - HTable table = new HTable(tableName); + HTable table = new HTable(this.cfg, tableName); HConnection conn = table.getConnection(); int ctr = 0; @@ -446,7 +494,7 @@ if (!tableExists) this.createTable(tableName,dirPath); Path hfofDir = new Path(dirPath); - HTable table = new HTable(tableName); + HTable table = new HTable(this.cfg, tableName); doBulkLoad(hfofDir, table); return 0;