Index: src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (revision 1101262) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (working copy) @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Deque; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Set; @@ -85,6 +86,7 @@ private HBaseAdmin hbAdmin; private Configuration cfg; private Set futures = new HashSet(); + private Set futuresForSplittingHFile = new HashSet(); public static String NAME = "completebulkload"; @@ -188,13 +190,27 @@ try { queue = discoverLoadQueue(hfofDir); // outer loop picks up LoadQueueItem due to HFile split - while (!queue.isEmpty()) { + while (!queue.isEmpty() || futuresForSplittingHFile.size() > 0) { Pair startEndKeys = table.getStartEndKeys(); // inner loop groups callables while (!queue.isEmpty()) { LoadQueueItem item = queue.remove(); tryLoad(item, conn, table, queue, startEndKeys, pool); } + Iterator iter = futuresForSplittingHFile.iterator(); + while (iter.hasNext()) { + Future future = iter.next(); + try { + future.get(); + break; + } catch (ExecutionException ee) { + LOG.error(ee); + } catch (InterruptedException ie) { + LOG.error(ie); + } finally { + iter.remove(); + } + } } for (Future future : futures) { try { @@ -247,8 +263,10 @@ // Add these back at the *front* of the queue, so there's a lower // chance that the region will just split again before we get there. - queue.addFirst(new LoadQueueItem(item.family, botOut)); - queue.addFirst(new LoadQueueItem(item.family, topOut)); + synchronized (queue) { + queue.addFirst(new LoadQueueItem(item.family, botOut)); + queue.addFirst(new LoadQueueItem(item.family, topOut)); + } LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); } @@ -260,7 +278,8 @@ */ private boolean tryLoad(final LoadQueueItem item, final HConnection conn, final HTable table, - final Deque queue, Pair startEndKeys, + final Deque queue, + final Pair startEndKeys, ExecutorService pool) throws IOException { final Path hfilePath = item.hfilePath; @@ -292,12 +311,21 @@ if (idx < 0) { idx = -(idx+1)-1; } + final int indexForCallable = idx; boolean lastKeyInRange = Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 || Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); if (!lastKeyInRange) { - splitStoreFileAndRequeue(item, queue, table, - startEndKeys.getFirst()[idx], startEndKeys.getSecond()[idx]); + Callable callable = new Callable() { + public Void call() throws Exception { + splitStoreFileAndRequeue(item, queue, table, + startEndKeys.getFirst()[indexForCallable], + startEndKeys.getSecond()[indexForCallable]); + return (Void)null; + } + }; + futuresForSplittingHFile.add(pool.submit(callable)); + return true; }