Index: src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (revision 1087961) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (working copy) @@ -21,10 +21,12 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Deque; import java.util.LinkedList; import java.util.Map; -import java.util.ArrayList; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,10 +36,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.ServerCallable; @@ -50,12 +55,9 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import java.util.TreeMap; /** @@ -68,22 +70,16 @@ private static final int TABLE_CREATE_MAX_RETRIES = 20; private static final long TABLE_CREATE_SLEEP = 60000; private HBaseAdmin hbAdmin; + private Configuration cfg; public static String NAME = "completebulkload"; public LoadIncrementalHFiles(Configuration conf) throws Exception { super(conf); + this.cfg = conf; this.hbAdmin = new HBaseAdmin(conf); } - /* This constructor does not add HBase configuration. - * Explicit addition is necessary. Do we need this constructor? - */ - public LoadIncrementalHFiles() { - super(); - } - - private void usage() { System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output " + @@ -168,8 +164,11 @@ try { queue = discoverLoadQueue(hfofDir); while (!queue.isEmpty()) { - LoadQueueItem item = queue.remove(); - tryLoad(item, conn, table.getTableName(), queue); + Pair startEndKeys = table.getStartEndKeys(); + while (!queue.isEmpty()) { + LoadQueueItem item = queue.remove(); + tryLoad(item, conn, table, queue, startEndKeys); + } } } finally { if (queue != null && !queue.isEmpty()) { @@ -184,16 +183,48 @@ } } } + + // region name in similar format to that of HRegionInfo + String getRegionName(byte[] tableName, byte[] startKey) { + return Bytes.toStringBinary(tableName)+","+Bytes.toStringBinary(startKey)+ + ","+System.currentTimeMillis(); + } + + void splitStoreFileAndRequeue(final LoadQueueItem item, + final Deque queue, final HTable table, + byte[] startKey, byte[] splitKey) throws IOException { + final Path hfilePath = item.hfilePath; + + // We use a '_' prefix which is ignored when walking directory trees + // above. + final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp"); + + LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + + "region. Splitting..."); + String encodedName = getRegionName(table.getTableName(), startKey); + HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family); + Path botOut = new Path(tmpDir, encodedName + ".bottom"); + Path topOut = new Path(tmpDir, encodedName + ".top"); + splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, + botOut, topOut); + + // Add these back at the *front* of the queue, so there's a lower + // chance that the region will just split again before we get there. + queue.addFirst(new LoadQueueItem(item.family, botOut)); + queue.addFirst(new LoadQueueItem(item.family, topOut)); + LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); + } + /** * Attempt to load the given load queue item into its target region server. * If the hfile boundary no longer fits into a region, physically splits * the hfile such that the new bottom half will fit, and adds the two * resultant hfiles back into the load queue. */ - private void tryLoad(final LoadQueueItem item, - HConnection conn, final byte[] table, - final Deque queue) + private boolean tryLoad(final LoadQueueItem item, + HConnection conn, final HTable table, + final Deque queue, Pair startEndKeys) throws IOException { final Path hfilePath = item.hfilePath; final FileSystem fs = hfilePath.getFileSystem(getConf()); @@ -213,43 +244,39 @@ if (first == null || last == null) { assert first == null && last == null; LOG.info("hfile " + hfilePath + " has no entries, skipping"); - return; + return false; } + if (Bytes.compareTo(first, last) > 0) { + throw new IllegalArgumentException( + "Invalid range: " + Bytes.toStringBinary(first) + + " > " + Bytes.toStringBinary(last)); + } + int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR); + if (idx < 0) { + idx = -(idx+1)-1; + } + boolean lastKeyInRange = + Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 || + Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); + if (!lastKeyInRange) { + splitStoreFileAndRequeue(item, queue, table, + startEndKeys.getFirst()[idx], startEndKeys.getSecond()[idx]); + return true; + } - // We use a '_' prefix which is ignored when walking directory trees - // above. - final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp"); - conn.getRegionServerWithRetries( - new ServerCallable(conn, table, first) { + new ServerCallable(conn, table.getTableName(), first) { @Override public Void call() throws Exception { LOG.debug("Going to connect to server " + location + "for row " + Bytes.toStringBinary(row)); - HRegionInfo hri = location.getRegionInfo(); - if (!hri.containsRange(first, last)) { - LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + - "region. Splitting..."); - HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family); - Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom"); - Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top"); - splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(), - botOut, topOut); - - // Add these back at the *front* of the queue, so there's a lower - // chance that the region will just split again before we get there. - queue.addFirst(new LoadQueueItem(item.family, botOut)); - queue.addFirst(new LoadQueueItem(item.family, topOut)); - LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); - return null; - } - byte[] regionName = location.getRegionInfo().getRegionName(); server.bulkLoadHFile(hfilePath.toString(), regionName, item.family); return null; } }); + return false; } /** @@ -419,7 +446,7 @@ System.err.println("Caught Socket timeout.. Mostly caused by a slow region assignment by master!"); } - HTable table = new HTable(tableName); + HTable table = new HTable(this.cfg, tableName); HConnection conn = table.getConnection(); int ctr = 0; @@ -446,7 +473,7 @@ if (!tableExists) this.createTable(tableName,dirPath); Path hfofDir = new Path(dirPath); - HTable table = new HTable(tableName); + HTable table = new HTable(this.cfg, tableName); doBulkLoad(hfofDir, table); return 0;