Index: src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (revision 1520646) +++ src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (working copy) @@ -800,4 +800,8 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException { throw new IOException("setWriteBufferSize not supported"); } + @Override + public ResultScanner getParallelScanner(Scan scan, double scalingFactor) throws IOException { + throw new IOException("setWriteBufferSize not supported"); + } } Index: src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java (revision 0) @@ -0,0 +1,290 @@ +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionLocation; + +/** + * Implements the scanner interface for the HBase client. + * If there are multiple regions in a table, this scanner will iterate + * through them all in parallel. + */ +public class ParallelClientScanner extends AbstractClientScanner implements Closeable { + private final Log LOG = LogFactory.getLog(this.getClass()); + + // special marker to indicate when a scanning task has finished + protected static final MyResult MARKER = new MyResult(); + + // the single reader object to use + protected ResultReader reader; + + /** + * If a caller has predetermined split points this constructor can be used. + * One thread per split will be used to execute the scan. + * Note that the executing threads can block if the not all data can be buffered + * on the client. This is by design, callers need to be aware of that, for example + * when the result are aggregated with merge sort. In that case the number of threads + * available on the passed thread pool needs to be larger than the number of splits. + * @param connection The cluster connection to use + * @param scans The {@link Scan} objects defining the split points + * @param tableName The table to use + * @param bufferSize How many results to buffer + * (should be larger the individual scans caching setting) + * @param pool The thread pool to use to schedule the tasks + * @throws IOException + */ + public ParallelClientScanner(final HConnection connection, final Collection scans, + final byte[] tableName, int bufferSize, ExecutorService pool) throws IOException { + ReaderWriter rw = new ReaderWriter(bufferSize, scans.size()); + this.reader = rw.getReader(); + for (Scan s : scans) { + pool.submit(new Task(s, connection, tableName, rw)); + } + } + + /** + * Execute a single scan across multiple threads. Results are returned when they are available, + * NOT in rowkey order. + * @param connection The cluster connection to use + * @param scan The scan to execute + * @param tableName The table to use + * @param pool The thread pool to use to schedule the tasks + * @param scalingFactor How many resources of the cluster to use. The number of threads to + * use is calculated as the number of involved RegionServer * scalingFactor. + * @throws IOException + */ + public ParallelClientScanner(final HConnection connection, final Scan scan, + final byte[] tableName, ExecutorService pool, double scalingFactor) throws IOException { + if (scalingFactor <= 0) + throw new IllegalArgumentException("scalingFactor must > 0"); + + // get the region boundaries. null ES is fine, we're just using the table to lookup the region cache + HTable t = new HTable(tableName, connection, null); + List locs = t.getRegionsInRange(scan.getStartRow(), scan.getStopRow()); + t.close(); + LOG.debug("Found "+locs.size()+" regions."); + if (locs.size() == 0) return; + + Map> tasks = new HashMap>(); + int i=0; + // organize region locations by region server + for (HRegionLocation loc : locs) { + Scan s = new Scan(scan); + s.setStartRow(i==0?scan.getStartRow() : loc.getRegionInfo().getStartKey()); + i++; + s.setStopRow(i==locs.size()?scan.getStopRow() : loc.getRegionInfo().getEndKey()); + addToMapOfQueues(tasks, loc, s); + } + + ReaderWriter rw = new ReaderWriter(scan.getCaching() * tasks.size(), locs.size()); + this.reader = rw.getReader(); + + int threads = (int)Math.ceil(tasks.size() * scalingFactor); + LOG.debug("Scheduling "+threads+" thread(s)."); + // round robin among region servers + Queue taskList = new ConcurrentLinkedQueue(); + while(!tasks.isEmpty()) { + for (Iterator>> it = tasks.entrySet().iterator(); it.hasNext();) { + Scan next = it.next().getValue().poll(); + if (next == null) { + it.remove(); + } else { + taskList.add(new Task(next, connection, tableName, rw.getWriter())); + } + } + } + + // maintain the list of outstanding tasks, so that we can we use an unlimited pool + // (such as the type used by HTable anyway). + for(int j=0; j Queue addToMapOfQueues(Map> map, K key, V value) { + Queue values = map.get(key); + if (values == null) { + values = new ArrayDeque(); + map.put(key, values); + } + values.add(value); + return values; + } + + /* actual scanner methods */ + @Override + public Result next() throws IOException { + return reader.take(); + } + + /** + * close the parallel scanner, callers are strongly encouraged to call this method + * doesn't wait until the threapool actually closes + */ + @Override + public void close() { + } + + ///// Helper classes and interfaces ///// + + // reader interface + public static interface ResultReader { + Result take() throws IOException; + } + // writer interface + public static interface ResultWriter { + // write a new result + void put(Result e); + // the writer encountered an exception pass on to reader + void exception(Exception x); + // a writer thread is done, must be called by each thread + // that is writing to this ResultWriter + void done(); + } + + // a single reader, single writer queue, that reads Results as soon as they become available + private static class ReaderWriter implements ResultReader, ResultWriter { + private BlockingDeque queue; + private int taskCount; + public ReaderWriter(int capacity, int taskCount) { + queue = new LinkedBlockingDeque(capacity); + this.taskCount = taskCount; + } + + public ResultReader getReader() { + return this; + } + + public ResultWriter getWriter() { + return this; + } + + // writer -- + @Override + public void put(Result e) { + try { + queue.put(e); + } catch (InterruptedException ix) { + // ignore + } + } + + @Override + public void exception(Exception x) { + try { + queue.put(new MyResult(x)); + } catch (InterruptedException ix) { + // ignore + } + } + + @Override + public void done() { + try { + queue.put(MARKER); + } catch (InterruptedException ix) { + // ignore + } + } + + // reader -- + // pop the next Result as it becomes available + @Override + public Result take() throws IOException { + while(true) { + Result r; + if (taskCount > 0) { + try { + // block if there are still tasks running + r = queue.take(); + } catch (InterruptedException x) { + throw new IOException(x); + } + if (r instanceof MyResult) { + ((MyResult)r).rethrow(); + taskCount--; + continue; + } + } else { + r = queue.poll(); + } + return r; + } + } + } + + /* marker class, also used to pass Exception along */ + protected static class MyResult extends Result { + private Exception x = null; + public MyResult() {} + + public MyResult(Exception x) { + this.x = x; + } + public void rethrow() throws IOException { + if (x != null) + throw x instanceof IOException ? (IOException)x : new IOException(x); + } + } + + /* runnable scheduled on the threadpool */ + protected class Runner implements Runnable { + private Queue tasks; + public Runner(Queue tasks) { + this.tasks = tasks; + } + @Override + public void run() { + Task task; + while((task = tasks.poll()) != null) { + // delegate to the task + task.run(); + } + } + } + + /* Task executing a scan request */ + protected class Task implements Runnable { + private Scan s; + private ResultWriter writer; + private HConnection connection; + private byte[] tableName; + public Task(Scan s, HConnection connection, byte[] tableName, ResultWriter writer) { + this.s = s; + this.connection = connection; + this.tableName = tableName; + this.writer = writer; + } + + @Override + public void run() { + try { + ClientScanner scanner = new ClientScanner(connection.getConfiguration(), s, tableName, connection); + Result s; + while((s = scanner.next()) != null) { + writer.put(s); + } + scanner.close(); + } catch (Exception x) { + // record any exceptions encountered + writer.exception(x); + } finally { + writer.done(); + } + } + } +} Index: src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (revision 1520646) +++ src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; /** @@ -69,4 +70,28 @@ } }; } + + /** + * Get nbRows rows. + * How many RPCs are made is determined by the {@link Scan#setCaching(int)} + * setting (or hbase.client.scanner.caching in hbase-site.xml). + * @param nbRows number of rows to return + * @return Between zero and nbRows RowResults. Scan is done + * if returned array is of zero-length (We never return null). + * @throws IOException + */ + @Override + public Result [] next(int nbRows) throws IOException { + // Collect values to be returned here + ArrayList resultSets = new ArrayList(nbRows); + for(int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[resultSets.size()]); + } } Index: src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (revision 1520646) +++ src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (working copy) @@ -529,4 +529,18 @@ * @throws IOException if a remote or network exception occurs. */ public void setWriteBufferSize(long writeBufferSize) throws IOException; + + /** + * Returns a parallel, unordered scanner on the current table as specified by the {@link Scan} + * object. + * Note that the passed {@link Scan}'s start row and caching properties + * maybe changed. + * + * @param scan A configured {@link Scan} object. + * @param scalingFactor How many resources of the cluster to use. The number of threads to + * use is calculated as the number of involved RegionServer * scalingFactor. + * @return A scanner. + * @throws IOException if a remote or network exception occurs. + */ + ResultScanner getParallelScanner(Scan scan, double scalingFactor) throws IOException; } Index: src/main/java/org/apache/hadoop/hbase/client/HTablePool.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (revision 1520646) +++ src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (working copy) @@ -539,5 +539,10 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException { table.setWriteBufferSize(writeBufferSize); } + + @Override + public ResultScanner getParallelScanner(Scan scan, double scalingFactor) throws IOException { + return table.getScanner(scan); + } } } Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1520646) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -1304,4 +1304,11 @@ return operationTimeout; } + /** + * {@inheritDoc} + */ + @Override + public ResultScanner getParallelScanner(Scan scan, double scalingFactor) throws IOException { + return new ParallelClientScanner(connection, scan, getTableName(), getPool(), scalingFactor); + } } Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1520646) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -333,29 +333,6 @@ return null; } - /** - * Get nbRows rows. - * How many RPCs are made is determined by the {@link Scan#setCaching(int)} - * setting (or hbase.client.scanner.caching in hbase-site.xml). - * @param nbRows number of rows to return - * @return Between zero and nbRows RowResults. Scan is done - * if returned array is of zero-length (We never return null). - * @throws IOException - */ - public Result [] next(int nbRows) throws IOException { - // Collect values to be returned here - ArrayList resultSets = new ArrayList(nbRows); - for(int i = 0; i < nbRows; i++) { - Result next = next(); - if (next != null) { - resultSets.add(next); - } else { - break; - } - } - return resultSets.toArray(new Result[resultSets.size()]); - } - public void close() { if (callable != null) { callable.setClose(); Index: src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (revision 1520646) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (working copy) @@ -548,6 +548,11 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException { table.setWriteBufferSize(writeBufferSize); } + + @Override + public ResultScanner getParallelScanner(Scan scan, double scalingFactor) throws IOException { + return table.getScanner(scan); + } } /** The coprocessor */