Index: src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java (revision 0) @@ -0,0 +1,288 @@ +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.util.Threads; + +/** + * Implements the scanner interface for the HBase client. + * If there are multiple regions in a table, this scanner will iterate + * through them all in parallel. + */ +public class ParallelClientScanner extends AbstractClientScanner implements Closeable { + private final Log LOG = LogFactory.getLog(this.getClass()); + + // special marker to indicate when a scanning task has finished + protected static final MyResult MARKER = new MyResult(); + + // the single reader object to use + protected ResultReader reader; + + // the thread pool to be used + protected ExecutorService pool; + + /** + * If a caller has predetermined split points this constructor can be used. + * One thread per split will be used to execute the scan. + * Note that the executing threads can block if the not all data can be buffered + * on the client. This is by design, callers need to be aware of that, for example + * when the result are aggregated with merge sort. In that case the number of threads + * available on the passed thread pool needs to be larger than the number of splits. + * @param connection The cluster connection to use + * @param scans The {@link Scan} objects defining the split points + * @param tableName The table to use + * @param bufferSize How many results to buffer + * (should be larger the individual scans caching setting) + * @param pool The thread pool to use to schedule the tasks + * @throws IOException + */ + public ParallelClientScanner(final HConnection connection, final Collection scans, + final byte[] tableName, int bufferSize) throws IOException { + createDefaultPool(scans.size()); + ReaderWriter rw = new ReaderWriter(bufferSize, scans.size()); + this.reader = rw.getReader(); + for (Scan s : scans) { + pool.submit(new Task(s, connection, tableName, rw)); + } + } + + /** + * Execute a single scan across multiple threads. Results are returned when they are available, + * NOT in rowkey order. + * @param connection The cluster connection to use + * @param scan The scan to execute + * @param tableName The table to use + * @param pool The thread pool to use to schedule the tasks + * @param scalingFactor How many resources of the cluster to use. The number of threads to + * use is calculated as the number of involved RegionServer * scalingFactor. + * @throws IOException + */ + public ParallelClientScanner(final HConnection connection, final Scan scan, + final byte[] tableName, double scalingFactor) throws IOException { + if (scalingFactor <= 0) + throw new IllegalArgumentException("scalingFactor must > 0"); + + // get the region boundaries. null ES is fine, we're just using the table to lookup the region cache + HTable t = new HTable(tableName, connection, null); + List locs = t.getRegionsInRange(scan.getStartRow(), scan.getStopRow()); + t.close(); + LOG.debug("Found "+locs.size()+" region(s)."); + if (locs.size() == 0) return; + + Map> tasks = new HashMap>(); + int i=0; + // organize region locations by region server + for (HRegionLocation loc : locs) { + Scan s = new Scan(scan); + s.setStartRow(i==0?scan.getStartRow() : loc.getRegionInfo().getStartKey()); + i++; + s.setStopRow(i==locs.size()?scan.getStopRow() : loc.getRegionInfo().getEndKey()); + addToMapOfQueues(tasks, loc, s); + } + + int threads = (int)Math.ceil(tasks.size() * scalingFactor); + createDefaultPool(threads); + ReaderWriter rw = new ReaderWriter(Math.max(scan.getCaching() * threads, threads), locs.size()); + this.reader = rw.getReader(); + + LOG.debug("Scheduling "+threads+" thread(s)."); + // round robin among region servers + while(!tasks.isEmpty()) { + for (Iterator>> it = tasks.entrySet().iterator(); it.hasNext();) { + Scan next = it.next().getValue().poll(); + if (next == null) { + it.remove(); + } else { + pool.submit(new Task(next, connection, tableName, rw.getWriter())); + } + } + } + } + + private Queue addToMapOfQueues(Map> map, K key, V value) { + Queue values = map.get(key); + if (values == null) { + values = new ArrayDeque(); + map.put(key, values); + } + values.add(value); + return values; + } + + protected void createDefaultPool(int threads) { + pool = new ThreadPoolExecutor(threads, threads, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + Threads.newDaemonThreadFactory("parallel-scan")); + ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true); + } + + /* actual scanner methods */ + @Override + public Result next() throws IOException { + try { + return reader.take(); + } catch (IOException iox) { + // kill other threads + close(); + // rethrow + throw iox; + } + } + + /** + * close the parallel scanner, callers are strongly encouraged to call this method + * doesn't wait until the threapool actually closes + */ + @Override + public void close() { + // interrupt all running threads, don't wait for completion + pool.shutdownNow(); + } + + ///// Helper classes and interfaces ///// + + // reader interface + public static interface ResultReader { + Result take() throws IOException; + } + // writer interface + public static interface ResultWriter { + // write a new result + void put(Result e) throws InterruptedException; + // the writer encountered an exception pass on to reader + void exception(Exception x); + // a writer thread is done, must be called by each thread + // that is writing to this ResultWriter + void done(); + } + + // a single reader, single writer queue, that reads Results as soon as they become available + private static class ReaderWriter implements ResultReader, ResultWriter { + private BlockingDeque queue; + private int taskCount; + public ReaderWriter(int capacity, int taskCount) { + queue = new LinkedBlockingDeque(capacity); + this.taskCount = taskCount; + } + + public ResultReader getReader() { + return this; + } + + public ResultWriter getWriter() { + return this; + } + + // writer -- + @Override + public void put(Result e) throws InterruptedException { + queue.put(e); + } + + @Override + public void exception(Exception x) { + try { + queue.put(new MyResult(x)); + } catch (InterruptedException ix) { + // ignore + } + } + + @Override + public void done() { + try { + queue.put(MARKER); + } catch (InterruptedException ix) { + // ignore + } + } + + // reader -- + // pop the next Result as it becomes available + @Override + public Result take() throws IOException { + while(true) { + Result r; + if (taskCount > 0) { + try { + // block if there are still tasks running + r = queue.take(); + } catch (InterruptedException x) { + throw new IOException(x); + } + if (r instanceof MyResult) { + ((MyResult)r).rethrow(); + taskCount--; + continue; + } + } else { + r = queue.poll(); + } + return r; + } + } + } + + /* marker class, also used to pass Exception along */ + protected static class MyResult extends Result { + private Exception x = null; + public MyResult() {} + + public MyResult(Exception x) { + this.x = x; + } + public void rethrow() throws IOException { + if (x != null) + throw x instanceof IOException ? (IOException)x : new IOException(x); + } + } + + /* Task executing a scan request */ + protected class Task implements Runnable { + private Scan s; + private ResultWriter writer; + private HConnection connection; + private byte[] tableName; + public Task(Scan s, HConnection connection, byte[] tableName, ResultWriter writer) { + this.s = s; + this.connection = connection; + this.tableName = tableName; + this.writer = writer; + } + + @Override + public void run() { + Thread current = Thread.currentThread(); + try { + ClientScanner scanner = new ClientScanner(connection.getConfiguration(), s, tableName, connection); + Result s; + while((s = scanner.next()) != null && !current.isInterrupted()) { + writer.put(s); + } + scanner.close(); + writer.done(); + } catch (Exception x) { + // record any exceptions encountered + writer.exception(x); + } + } + } +} Index: src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (revision 1522296) +++ src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; /** @@ -69,4 +70,28 @@ } }; } + + /** + * Get nbRows rows. + * How many RPCs are made is determined by the {@link Scan#setCaching(int)} + * setting (or hbase.client.scanner.caching in hbase-site.xml). + * @param nbRows number of rows to return + * @return Between zero and nbRows RowResults. Scan is done + * if returned array is of zero-length (We never return null). + * @throws IOException + */ + @Override + public Result [] next(int nbRows) throws IOException { + // Collect values to be returned here + ArrayList resultSets = new ArrayList(nbRows); + for(int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[resultSets.size()]); + } } Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1522296) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -671,6 +671,9 @@ if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } + if (scan.getParallelScaling() > 0) { + return new ParallelClientScanner(connection, scan, getTableName(), scan.getParallelScaling()); + } return new ClientScanner(getConfiguration(), scan, getTableName(), this.connection); } Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1522296) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -109,6 +109,8 @@ private Map> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); + // never serialized, only used on the client + private double parallelScaling = 0; /** * Create a Scan operation across all rows. */ @@ -692,4 +694,21 @@ return attr == null ? IsolationLevel.READ_COMMITTED : IsolationLevel.fromBytes(attr); } + + /** + * Execute this scan in parallel. + * @param scalingFactor Use this many threads per involved RegionServer. + * 0 to disable parallel scanning + */ + public void setParallelScaling(double scalingFactor) { + this.parallelScaling = scalingFactor; + } + + /** + * @return the current parallel scaling factor + * 0 if disabled (default) + */ + public double getParallelScaling() { + return this.parallelScaling; + } } Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1522296) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; -import java.util.ArrayList; import java.util.LinkedList; import org.apache.commons.logging.Log; @@ -333,29 +332,6 @@ return null; } - /** - * Get nbRows rows. - * How many RPCs are made is determined by the {@link Scan#setCaching(int)} - * setting (or hbase.client.scanner.caching in hbase-site.xml). - * @param nbRows number of rows to return - * @return Between zero and nbRows RowResults. Scan is done - * if returned array is of zero-length (We never return null). - * @throws IOException - */ - public Result [] next(int nbRows) throws IOException { - // Collect values to be returned here - ArrayList resultSets = new ArrayList(nbRows); - for(int i = 0; i < nbRows; i++) { - Result next = next(); - if (next != null) { - resultSets.add(next); - } else { - break; - } - } - return resultSets.toArray(new Result[resultSets.size()]); - } - public void close() { if (callable != null) { callable.setClose(); Index: src/main/ruby/hbase/table.rb =================================================================== --- src/main/ruby/hbase/table.rb (revision 1522296) +++ src/main/ruby/hbase/table.rb (working copy) @@ -99,6 +99,7 @@ org.apache.hadoop.hbase.util.Bytes::toStringBinary(row.getRow)) end + scanner.close # Return the counter return count end @@ -227,6 +228,7 @@ limit = args.delete("LIMIT") || -1 maxlength = args.delete("MAXLENGTH") || -1 + parallel = args.delete("PARALLEL") || -1 if args.any? filter = args["FILTER"] @@ -278,6 +280,9 @@ end # Start the scanner + if parallel > 0 + scan.parallel_scaling = parallel + end scanner = @table.getScanner(scan) count = 0 res = {} @@ -310,7 +315,7 @@ # One more row processed count += 1 end - + scanner.close return ((block_given?) ? count : res) end Index: src/main/ruby/hbase.rb =================================================================== --- src/main/ruby/hbase.rb (revision 1522296) +++ src/main/ruby/hbase.rb (working copy) @@ -57,6 +57,7 @@ SPLITS_FILE = 'SPLITS_FILE' SPLITALGO = 'SPLITALGO' NUMREGIONS = 'NUMREGIONS' + PARALLEL = "PARALLEL" # Load constants from hbase java API def self.promote_constants(constants) Index: src/main/ruby/shell/commands/scan.rb =================================================================== --- src/main/ruby/shell/commands/scan.rb (revision 1522296) +++ src/main/ruby/shell/commands/scan.rb (working copy) @@ -26,7 +26,7 @@ Scan a table; pass table name and optionally a dictionary of scanner specifications. Scanner specifications may include one or more of: TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, TIMESTAMP, MAXLENGTH, -or COLUMNS, CACHE +or COLUMNS, CACHE, PARALLEL If no columns are specified, all columns will be scanned. To scan all members of a column family, leave the qualifier empty as in @@ -37,6 +37,7 @@ Filter Language document attached to the HBASE-4176 JIRA 2. Using the entire package name of the filter. +PARALLEL enables parallel, unsorted scanning. The argument is a scaling factor, 1.0 means one thread per involved RegionServer Some examples: hbase> scan '.META.' @@ -45,6 +46,7 @@ hbase> scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804, 1303668904]} hbase> scan 't1', {FILTER => "(PrefixFilter ('row2') AND (QualifierFilter (>=, 'binary:xyz'))) AND (TimestampsFilter ( 123, 456))"} hbase> scan 't1', {FILTER => org.apache.hadoop.hbase.filter.ColumnPaginationFilter.new(1, 0)} + hbase> scan 't1', {PARALLEL => 1} For experts, there is an additional option -- CACHE_BLOCKS -- which switches block caching for the scanner on (true) or off (false). By