Index: src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java (revision 0) @@ -0,0 +1,290 @@ +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionLocation; + +/** + * Implements the scanner interface for the HBase client. + * If there are multiple regions in a table, this scanner will iterate + * through them all in parallel. + */ +public class ParallelClientScanner extends AbstractClientScanner implements Closeable { + private final Log LOG = LogFactory.getLog(this.getClass()); + + // special marker to indicate when a scanning task has finished + protected static final MyResult MARKER = new MyResult(); + + // the single reader object to use + protected ResultReader reader; + + /** + * If a caller has predetermined split points this constructor can be used. + * One thread per split will be used to execute the scan. + * Note that the executing threads can block if the not all data can be buffered + * on the client. This is by design, callers need to be aware of that, for example + * when the result are aggregated with merge sort. In that case the number of threads + * available on the passed thread pool needs to be larger than the number of splits. + * @param connection The cluster connection to use + * @param scans The {@link Scan} objects defining the split points + * @param tableName The table to use + * @param bufferSize How many results to buffer + * (should be larger the individual scans caching setting) + * @param pool The thread pool to use to schedule the tasks + * @throws IOException + */ + public ParallelClientScanner(final HConnection connection, final Collection scans, + final byte[] tableName, int bufferSize, ExecutorService pool) throws IOException { + ReaderWriter rw = new ReaderWriter(bufferSize, scans.size()); + this.reader = rw.getReader(); + for (Scan s : scans) { + pool.submit(new Task(s, connection, tableName, rw)); + } + } + + /** + * Execute a single scan across multiple threads. Results are returned when they are available, + * NOT in rowkey order. + * @param connection The cluster connection to use + * @param scan The scan to execute + * @param tableName The table to use + * @param pool The thread pool to use to schedule the tasks + * @param scalingFactor How many resources of the cluster to use. The number of threads to + * use is calculated as the number of involved RegionServer * scalingFactor. + * @throws IOException + */ + public ParallelClientScanner(final HConnection connection, final Scan scan, + final byte[] tableName, ExecutorService pool, double scalingFactor) throws IOException { + if (scalingFactor <= 0) + throw new IllegalArgumentException("scalingFactor must > 0"); + + // get the region boundaries. null ES is fine, we're just using the table to lookup the region cache + HTable t = new HTable(tableName, connection, null); + List locs = t.getRegionsInRange(scan.getStartRow(), scan.getStopRow()); + t.close(); + LOG.debug("Found "+locs.size()+" regions."); + if (locs.size() == 0) return; + + Map> tasks = new HashMap>(); + int i=0; + // organize region locations by region server + for (HRegionLocation loc : locs) { + Scan s = new Scan(scan); + s.setStartRow(i==0?scan.getStartRow() : loc.getRegionInfo().getStartKey()); + i++; + s.setStopRow(i==locs.size()?scan.getStopRow() : loc.getRegionInfo().getEndKey()); + addToMapOfQueues(tasks, loc, s); + } + + int threads = (int)Math.ceil(tasks.size() * scalingFactor); + ReaderWriter rw = new ReaderWriter(Math.max(scan.getCaching() * threads, threads), locs.size()); + this.reader = rw.getReader(); + + LOG.debug("Scheduling "+threads+" thread(s)."); + // round robin among region servers + Queue taskList = new ConcurrentLinkedQueue(); + while(!tasks.isEmpty()) { + for (Iterator>> it = tasks.entrySet().iterator(); it.hasNext();) { + Scan next = it.next().getValue().poll(); + if (next == null) { + it.remove(); + } else { + taskList.add(new Task(next, connection, tableName, rw.getWriter())); + } + } + } + + // maintain the list of outstanding tasks, so that we can we use an unlimited pool + // (such as the type used by HTable anyway). + for(int j=0; j Queue addToMapOfQueues(Map> map, K key, V value) { + Queue values = map.get(key); + if (values == null) { + values = new ArrayDeque(); + map.put(key, values); + } + values.add(value); + return values; + } + + /* actual scanner methods */ + @Override + public Result next() throws IOException { + return reader.take(); + } + + /** + * close the parallel scanner, callers are strongly encouraged to call this method + * doesn't wait until the threapool actually closes + */ + @Override + public void close() { + } + + ///// Helper classes and interfaces ///// + + // reader interface + public static interface ResultReader { + Result take() throws IOException; + } + // writer interface + public static interface ResultWriter { + // write a new result + void put(Result e); + // the writer encountered an exception pass on to reader + void exception(Exception x); + // a writer thread is done, must be called by each thread + // that is writing to this ResultWriter + void done(); + } + + // a single reader, single writer queue, that reads Results as soon as they become available + private static class ReaderWriter implements ResultReader, ResultWriter { + private BlockingDeque queue; + private int taskCount; + public ReaderWriter(int capacity, int taskCount) { + queue = new LinkedBlockingDeque(capacity); + this.taskCount = taskCount; + } + + public ResultReader getReader() { + return this; + } + + public ResultWriter getWriter() { + return this; + } + + // writer -- + @Override + public void put(Result e) { + try { + queue.put(e); + } catch (InterruptedException ix) { + // ignore + } + } + + @Override + public void exception(Exception x) { + try { + queue.put(new MyResult(x)); + } catch (InterruptedException ix) { + // ignore + } + } + + @Override + public void done() { + try { + queue.put(MARKER); + } catch (InterruptedException ix) { + // ignore + } + } + + // reader -- + // pop the next Result as it becomes available + @Override + public Result take() throws IOException { + while(true) { + Result r; + if (taskCount > 0) { + try { + // block if there are still tasks running + r = queue.take(); + } catch (InterruptedException x) { + throw new IOException(x); + } + if (r instanceof MyResult) { + ((MyResult)r).rethrow(); + taskCount--; + continue; + } + } else { + r = queue.poll(); + } + return r; + } + } + } + + /* marker class, also used to pass Exception along */ + protected static class MyResult extends Result { + private Exception x = null; + public MyResult() {} + + public MyResult(Exception x) { + this.x = x; + } + public void rethrow() throws IOException { + if (x != null) + throw x instanceof IOException ? (IOException)x : new IOException(x); + } + } + + /* runnable scheduled on the threadpool */ + protected class Runner implements Runnable { + private Queue tasks; + public Runner(Queue tasks) { + this.tasks = tasks; + } + @Override + public void run() { + Task task; + while((task = tasks.poll()) != null) { + // delegate to the task + task.run(); + } + } + } + + /* Task executing a scan request */ + protected class Task implements Runnable { + private Scan s; + private ResultWriter writer; + private HConnection connection; + private byte[] tableName; + public Task(Scan s, HConnection connection, byte[] tableName, ResultWriter writer) { + this.s = s; + this.connection = connection; + this.tableName = tableName; + this.writer = writer; + } + + @Override + public void run() { + try { + ClientScanner scanner = new ClientScanner(connection.getConfiguration(), s, tableName, connection); + Result s; + while((s = scanner.next()) != null) { + writer.put(s); + } + scanner.close(); + } catch (Exception x) { + // record any exceptions encountered + writer.exception(x); + } finally { + writer.done(); + } + } + } +} Index: src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (revision 1521556) +++ src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; /** @@ -69,4 +70,28 @@ } }; } + + /** + * Get nbRows rows. + * How many RPCs are made is determined by the {@link Scan#setCaching(int)} + * setting (or hbase.client.scanner.caching in hbase-site.xml). + * @param nbRows number of rows to return + * @return Between zero and nbRows RowResults. Scan is done + * if returned array is of zero-length (We never return null). + * @throws IOException + */ + @Override + public Result [] next(int nbRows) throws IOException { + // Collect values to be returned here + ArrayList resultSets = new ArrayList(nbRows); + for(int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[resultSets.size()]); + } } Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1521556) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -671,6 +671,9 @@ if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } + if (scan.getParallelScaling() > 0) { + return new ParallelClientScanner(connection, scan, getTableName(), getPool(), scan.getParallelScaling()); + } return new ClientScanner(getConfiguration(), scan, getTableName(), this.connection); } Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1521556) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -109,6 +109,8 @@ private Map> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); + // never serialized, only used on the client + private double parallelScaling = 0; /** * Create a Scan operation across all rows. */ @@ -692,4 +694,21 @@ return attr == null ? IsolationLevel.READ_COMMITTED : IsolationLevel.fromBytes(attr); } + + /** + * Execute this scan in parallel. + * @param scalingFactor Use this many threads per involved RegionServer. + * 0 to disable parallel scanning + */ + public void setParallelScaling(double scalingFactor) { + this.parallelScaling = scalingFactor; + } + + /** + * @return the current parallel scaling factor + * 0 if disabled (default) + */ + public double getParallelScaling() { + return this.parallelScaling; + } } Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1521556) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; -import java.util.ArrayList; import java.util.LinkedList; import org.apache.commons.logging.Log; @@ -333,29 +332,6 @@ return null; } - /** - * Get nbRows rows. - * How many RPCs are made is determined by the {@link Scan#setCaching(int)} - * setting (or hbase.client.scanner.caching in hbase-site.xml). - * @param nbRows number of rows to return - * @return Between zero and nbRows RowResults. Scan is done - * if returned array is of zero-length (We never return null). - * @throws IOException - */ - public Result [] next(int nbRows) throws IOException { - // Collect values to be returned here - ArrayList resultSets = new ArrayList(nbRows); - for(int i = 0; i < nbRows; i++) { - Result next = next(); - if (next != null) { - resultSets.add(next); - } else { - break; - } - } - return resultSets.toArray(new Result[resultSets.size()]); - } - public void close() { if (callable != null) { callable.setClose(); Index: src/main/ruby/hbase/table.rb =================================================================== --- src/main/ruby/hbase/table.rb (revision 1521556) +++ src/main/ruby/hbase/table.rb (working copy) @@ -83,22 +83,23 @@ scan.cache_blocks = false scan.caching = caching_rows scan.setFilter(org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter.new) + scan.setParallelScaling(1); # Run the scanner scanner = @table.getScanner(scan) count = 0 - iter = scanner.iterator # Iterate results - while iter.hasNext - row = iter.next - count += 1 - next unless (block_given? && count % interval == 0) + begin + rows = scanner.next(caching_rows) + count += rows.length + next unless (block_given? && count % interval == 0 && rows.length > 0) # Allow command modules to visualize counting process yield(count, - org.apache.hadoop.hbase.util.Bytes::toStringBinary(row.getRow)) - end + org.apache.hadoop.hbase.util.Bytes::toStringBinary(rows[0].getRow)) + end while rows.length == caching_rows + scanner.close # Return the counter return count end @@ -229,6 +230,7 @@ maxlength = args.delete("MAXLENGTH") || -1 if args.any? + parallel = args["PARALLEL"] || -1 filter = args["FILTER"] startrow = args["STARTROW"] || '' stoprow = args["STOPROW"] @@ -278,6 +280,9 @@ end # Start the scanner + if parallel > 0 + scan.setParallelScaling(parallel) + end scanner = @table.getScanner(scan) count = 0 res = {} @@ -310,7 +315,7 @@ # One more row processed count += 1 end - + scanner.close return ((block_given?) ? count : res) end Index: src/main/ruby/hbase.rb =================================================================== --- src/main/ruby/hbase.rb (revision 1521556) +++ src/main/ruby/hbase.rb (working copy) @@ -57,6 +57,7 @@ SPLITS_FILE = 'SPLITS_FILE' SPLITALGO = 'SPLITALGO' NUMREGIONS = 'NUMREGIONS' + PARALLEL = "PARALLEL" # Load constants from hbase java API def self.promote_constants(constants) Index: src/main/ruby/shell/commands/scan.rb =================================================================== --- src/main/ruby/shell/commands/scan.rb (revision 1521556) +++ src/main/ruby/shell/commands/scan.rb (working copy) @@ -26,7 +26,7 @@ Scan a table; pass table name and optionally a dictionary of scanner specifications. Scanner specifications may include one or more of: TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, TIMESTAMP, MAXLENGTH, -or COLUMNS, CACHE +or COLUMNS, CACHE, PARALLEL If no columns are specified, all columns will be scanned. To scan all members of a column family, leave the qualifier empty as in @@ -37,6 +37,7 @@ Filter Language document attached to the HBASE-4176 JIRA 2. Using the entire package name of the filter. +PARALLEL enables parallel, unsorted scanning. The argument is a scaling factor, 1.0 means one thread per involved RegionServer Some examples: hbase> scan '.META.' @@ -45,6 +46,7 @@ hbase> scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804, 1303668904]} hbase> scan 't1', {FILTER => "(PrefixFilter ('row2') AND (QualifierFilter (>=, 'binary:xyz'))) AND (TimestampsFilter ( 123, 456))"} hbase> scan 't1', {FILTER => org.apache.hadoop.hbase.filter.ColumnPaginationFilter.new(1, 0)} + hbase> scan 't1', {PARALLEL => 1} For experts, there is an additional option -- CACHE_BLOCKS -- which switches block caching for the scanner on (true) or off (false). By