Index: src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (revision 1099118) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (working copy) @@ -21,10 +21,22 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Deque; +import java.util.HashSet; import java.util.LinkedList; import java.util.Map; -import java.util.ArrayList; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,10 +46,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.ServerCallable; @@ -50,14 +65,13 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import java.util.TreeMap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Tool to load the output of HFileOutputFormat into an existing table. * @see #usage() @@ -67,23 +81,19 @@ private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class); private static final int TABLE_CREATE_MAX_RETRIES = 20; private static final long TABLE_CREATE_SLEEP = 60000; + static AtomicLong regionCount = new AtomicLong(0); private HBaseAdmin hbAdmin; + private Configuration cfg; + private Set futures = new HashSet(); public static String NAME = "completebulkload"; public LoadIncrementalHFiles(Configuration conf) throws Exception { super(conf); + this.cfg = conf; this.hbAdmin = new HBaseAdmin(conf); } - /* This constructor does not add HBase configuration. - * Explicit addition is necessary. Do we need this constructor? - */ - public LoadIncrementalHFiles() { - super(); - } - - private void usage() { System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output " + @@ -165,13 +175,38 @@ } Deque queue = null; + int nrThreads = cfg.getInt("hbase.loadincremental.threads.max", + Runtime.getRuntime().availableProcessors()); + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setNameFormat("LoadIncrementalHFiles-%1$d"); + + ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + builder.build()); + ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true); try { queue = discoverLoadQueue(hfofDir); + // outer loop picks up LoadQueueItem due to HFile split while (!queue.isEmpty()) { - LoadQueueItem item = queue.remove(); - tryLoad(item, conn, table.getTableName(), queue); + Pair startEndKeys = table.getStartEndKeys(); + // inner loop groups callables + while (!queue.isEmpty()) { + LoadQueueItem item = queue.remove(); + tryLoad(item, conn, table, queue, startEndKeys, pool); + } } + for (Future future : futures) { + try { + future.get(); + } catch (ExecutionException ee) { + LOG.error(ee); + } catch (InterruptedException ie) { + LOG.error(ie); + } + } } finally { + pool.shutdown(); if (queue != null && !queue.isEmpty()) { StringBuilder err = new StringBuilder(); err.append("-------------------------------------------------\n"); @@ -185,15 +220,48 @@ } } + // unique file name for the table + String getUniqueName(byte[] tableName) { + String name = Bytes.toStringBinary(tableName) + "," + regionCount.incrementAndGet(); + return name; + } + + void splitStoreFileAndRequeue(final LoadQueueItem item, + final Deque queue, final HTable table, + byte[] startKey, byte[] splitKey) throws IOException { + final Path hfilePath = item.hfilePath; + + // We use a '_' prefix which is ignored when walking directory trees + // above. + final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp"); + + LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + + "region. Splitting..."); + + String uniqueName = getUniqueName(table.getTableName()); + HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family); + Path botOut = new Path(tmpDir, uniqueName + ".bottom"); + Path topOut = new Path(tmpDir, uniqueName + ".top"); + splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, + botOut, topOut); + + // Add these back at the *front* of the queue, so there's a lower + // chance that the region will just split again before we get there. + queue.addFirst(new LoadQueueItem(item.family, botOut)); + queue.addFirst(new LoadQueueItem(item.family, topOut)); + LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); + } + /** * Attempt to load the given load queue item into its target region server. * If the hfile boundary no longer fits into a region, physically splits * the hfile such that the new bottom half will fit, and adds the two * resultant hfiles back into the load queue. */ - private void tryLoad(final LoadQueueItem item, - HConnection conn, final byte[] table, - final Deque queue) + private boolean tryLoad(final LoadQueueItem item, + final HConnection conn, final HTable table, + final Deque queue, Pair startEndKeys, + ExecutorService pool) throws IOException { final Path hfilePath = item.hfilePath; final FileSystem fs = hfilePath.getFileSystem(getConf()); @@ -213,43 +281,44 @@ if (first == null || last == null) { assert first == null && last == null; LOG.info("hfile " + hfilePath + " has no entries, skipping"); - return; + return false; } + if (Bytes.compareTo(first, last) > 0) { + throw new IllegalArgumentException( + "Invalid range: " + Bytes.toStringBinary(first) + + " > " + Bytes.toStringBinary(last)); + } + int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR); + if (idx < 0) { + idx = -(idx+1)-1; + } + boolean lastKeyInRange = + Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 || + Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); + if (!lastKeyInRange) { + splitStoreFileAndRequeue(item, queue, table, + startEndKeys.getFirst()[idx], startEndKeys.getSecond()[idx]); + return true; + } - // We use a '_' prefix which is ignored when walking directory trees - // above. - final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp"); + final ServerCallable svrCallable = new ServerCallable(conn, table.getTableName(), first) { + @Override + public Void call() throws Exception { + LOG.debug("Going to connect to server " + location + + "for row " + Bytes.toStringBinary(row)); - conn.getRegionServerWithRetries( - new ServerCallable(conn, table, first) { - @Override - public Void call() throws Exception { - LOG.debug("Going to connect to server " + location + - "for row " + Bytes.toStringBinary(row)); - HRegionInfo hri = location.getRegionInfo(); - if (!hri.containsRange(first, last)) { - LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + - "region. Splitting..."); - - HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family); - Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom"); - Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top"); - splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(), - botOut, topOut); - - // Add these back at the *front* of the queue, so there's a lower - // chance that the region will just split again before we get there. - queue.addFirst(new LoadQueueItem(item.family, botOut)); - queue.addFirst(new LoadQueueItem(item.family, topOut)); - LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); - return null; - } - - byte[] regionName = location.getRegionInfo().getRegionName(); - server.bulkLoadHFile(hfilePath.toString(), regionName, item.family); - return null; - } - }); + byte[] regionName = location.getRegionInfo().getRegionName(); + server.bulkLoadHFile(hfilePath.toString(), regionName, item.family); + return null; + } + }; + Callable callable = new Callable() { + public Void call() throws Exception { + return conn.getRegionServerWithRetries(svrCallable); + } + }; + futures.add(pool.submit(callable)); + return false; } /** @@ -418,9 +487,9 @@ } catch (java.net.SocketTimeoutException e) { System.err.println("Caught Socket timeout.. Mostly caused by a slow region assignment by master!"); } - - HTable table = new HTable(tableName); + HTable table = new HTable(this.cfg, tableName); + HConnection conn = table.getConnection(); int ctr = 0; while (!conn.isTableAvailable(table.getTableName()) && (ctr