Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (revision 1535609)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (working copy)
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -72,4 +73,28 @@
}
};
}
+
+ /**
+ * Get nbRows rows.
+ * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
+ * setting (or hbase.client.scanner.caching in hbase-site.xml).
+ * @param nbRows number of rows to return
+ * @return Between zero and nbRows RowResults. Scan is done
+ * if returned array is of zero-length (We never return null).
+ * @throws IOException
+ */
+ @Override
+ public Result [] next(int nbRows) throws IOException {
+ // Collect values to be returned here
+ ArrayList resultSets = new ArrayList(nbRows);
+ for(int i = 0; i < nbRows; i++) {
+ Result next = next();
+ if (next != null) {
+ resultSets.add(next);
+ } else {
+ break;
+ }
+ }
+ return resultSets.toArray(new Result[resultSets.size()]);
+ }
}
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1535609)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy)
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
@@ -429,31 +428,7 @@
return null;
}
- /**
- * Get nbRows rows.
- * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
- * setting (or hbase.client.scanner.caching in hbase-site.xml).
- * @param nbRows number of rows to return
- * @return Between zero and nbRows RowResults. Scan is done
- * if returned array is of zero-length (We never return null).
- * @throws IOException
- */
@Override
- public Result [] next(int nbRows) throws IOException {
- // Collect values to be returned here
- ArrayList resultSets = new ArrayList(nbRows);
- for(int i = 0; i < nbRows; i++) {
- Result next = next();
- if (next != null) {
- resultSets.add(next);
- } else {
- break;
- }
- }
- return resultSets.toArray(new Result[resultSets.size()]);
- }
-
- @Override
public void close() {
if (!scanMetricsPublished) writeScanMetrics();
if (callable != null) {
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1535609)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy)
@@ -714,6 +714,9 @@
return new ClientSmallScanner(getConfiguration(), scan, getName(),
this.connection);
}
+ if (scan.getParallelScaling() > 0) {
+ return new ParallelClientScanner(connection, scan, getName());
+ }
return new ClientScanner(getConfiguration(), scan,
getName(), this.connection);
}
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java (revision 0)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ParallelClientScanner.java (working copy)
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * Implements the scanner interface for the HBase client.
+ * If there are multiple regions in a table, this scanner will iterate
+ * through them all in parallel.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ParallelClientScanner extends AbstractClientScanner implements Closeable {
+ private final Log LOG = LogFactory.getLog(this.getClass());
+
+ // special marker to indicate when a scanning task has finished
+ protected static final DoneMarker MARKER = new DoneMarker();
+
+ // the single reader object to use
+ protected ResultReader reader;
+
+ // the thread pool to be used
+ protected ExecutorService pool;
+
+ /**
+ * If a caller has predetermined split points this constructor can be used.
+ * One thread per split will be used to execute the scan.
+ * @param connection The cluster connection to use
+ * @param scans The {@link Scan} objects defining the split points
+ * @param tableName The table to use
+ * @param bufferSize How many results to buffer
+ * (should be larger than the individual scans' caching setting)
+ * @throws IOException
+ */
+ public ParallelClientScanner(final HConnection connection, final Collection scans,
+ final TableName tableName, int bufferSize) throws IOException {
+ createDefaultPool(scans.size());
+ SingleReaderMultiWriterQueue rw = new SingleReaderMultiWriterQueue(bufferSize, scans.size());
+ this.reader = rw;
+ for (Scan s : scans) {
+ pool.submit(new ScanTask(s, connection, tableName, rw));
+ }
+ }
+
+ /**
+ * Execute a single scan across multiple threads. Results are returned when they are available,
+ * NOT in rowkey order.
+ * @param connection The cluster connection to use
+ * @param scan The scan to execute
+ * @param tableName The table to use
+ * @throws IOException
+ */
+ public ParallelClientScanner(final HConnection connection, final Scan scan,
+ final TableName tableName) throws IOException {
+ if (scan.getParallelScaling() <= 0)
+ throw new IllegalArgumentException("scalingFactor must > 0");
+
+ // get the region boundaries. null ES is fine, we're just using the table to lookup the region cache
+ HTable t = new HTable(tableName, connection, null);
+ List locs;
+ try {
+ locs = t.getRegionsInRange(scan.getStartRow(), scan.getStopRow());
+ } finally {
+ t.close();
+ }
+ LOG.debug("Found "+locs.size()+" region(s).");
+ if (locs.size() == 0) return;
+
+ Map> tasks = new HashMap>(locs.size());
+ int i=0;
+ // organize region locations by region server
+ for (HRegionLocation loc : locs) {
+ Scan s = new Scan(scan);
+ s.setStartRow(i==0?scan.getStartRow() : loc.getRegionInfo().getStartKey());
+ i++;
+ s.setStopRow(i==locs.size()?scan.getStopRow() : loc.getRegionInfo().getEndKey());
+ addToMapOfQueues(tasks, loc, s);
+ }
+
+ int threads = (int)Math.ceil(tasks.size() * scan.getParallelScaling());
+ createDefaultPool(threads);
+ SingleReaderMultiWriterQueue rw = new SingleReaderMultiWriterQueue(Math.max(scan.getCaching() * threads, threads), locs.size());
+ this.reader = rw;
+
+ LOG.debug("Scheduling "+threads+" thread(s).");
+ // round robin among region servers
+ while(!tasks.isEmpty()) {
+ for (Iterator>> it = tasks.entrySet().iterator(); it.hasNext();) {
+ Scan next = it.next().getValue().poll();
+ if (next == null) {
+ it.remove();
+ } else {
+ pool.submit(new ScanTask(next, connection, tableName, rw));
+ }
+ }
+ }
+ }
+
+ private Queue addToMapOfQueues(Map> map, K key, V value) {
+ Queue values = map.get(key);
+ if (values == null) {
+ values = new ArrayDeque();
+ map.put(key, values);
+ }
+ values.add(value);
+ return values;
+ }
+
+ protected void createDefaultPool(int threads) {
+ pool = new ThreadPoolExecutor(threads, threads,
+ 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue(),
+ Threads.newDaemonThreadFactory("parallel-scan"));
+ ((ThreadPoolExecutor)pool).prestartAllCoreThreads();
+ }
+
+ /* actual scanner methods */
+ @Override
+ public Result next() throws IOException {
+ try {
+ return reader.take();
+ } catch (IOException iox) {
+ // kill other threads
+ close();
+ // rethrow
+ throw iox;
+ }
+ }
+
+ /**
+ * close the parallel scanner, callers are strongly encouraged to call this method
+ * doesn't wait until the threadpool actually closes
+ */
+ @Override
+ public void close() {
+ // interrupt all running threads, don't wait for completion
+ pool.shutdownNow();
+ }
+
+ ///// Helper classes and interfaces /////
+
+ // reader interface
+ private static interface ResultReader {
+ Result take() throws IOException;
+ }
+ // writer interface
+ private static interface ResultWriter {
+ // write a new result
+ void put(Result e) throws InterruptedException;
+ // the writer encountered an exception pass on to reader
+ void exception(Exception x);
+ // a writer thread is done, must be called by each thread
+ // that is writing to this ResultWriter
+ void done();
+ }
+
+ // a single reader, single writer queue, that reads Results as soon as they become available
+ private static class SingleReaderMultiWriterQueue implements ResultReader, ResultWriter {
+ private BlockingDeque queue;
+ private int taskCount;
+ public SingleReaderMultiWriterQueue(int capacity, int taskCount) {
+ queue = new LinkedBlockingDeque(capacity);
+ this.taskCount = taskCount;
+ }
+
+ // writer --
+ @Override
+ public void put(Result e) throws InterruptedException {
+ queue.put(e);
+ }
+
+ @Override
+ public void exception(Exception x) {
+ try {
+ queue.put(new DoneMarker(x));
+ } catch (InterruptedException ix) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void done() {
+ try {
+ queue.put(MARKER);
+ } catch (InterruptedException ix) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ // reader --
+ // pop the next Result as it becomes available
+ @Override
+ public Result take() throws IOException {
+ while(true) {
+ Result r;
+ if (taskCount > 0) {
+ try {
+ // block if there are still tasks running
+ r = queue.take();
+ } catch (InterruptedException x) {
+ throw new IOException(x);
+ }
+ if (r instanceof DoneMarker) {
+ ((DoneMarker)r).rethrow();
+ taskCount--;
+ continue;
+ }
+ } else {
+ r = queue.poll();
+ }
+ return r;
+ }
+ }
+ }
+
+ /* marker class, also used to pass Exception along */
+ protected static class DoneMarker extends Result {
+ private Exception x = null;
+ public DoneMarker() {}
+
+ public DoneMarker(Exception x) {
+ this.x = x;
+ }
+ public void rethrow() throws IOException {
+ if (x != null)
+ throw x instanceof IOException ? (IOException)x : new IOException(x);
+ }
+ }
+
+ /* Task executing a scan request */
+ protected class ScanTask implements Runnable {
+ private Scan s;
+ private ResultWriter writer;
+ private HConnection connection;
+ private TableName tableName;
+ public ScanTask(Scan s, HConnection connection, TableName tableName, ResultWriter writer) {
+ this.s = s;
+ this.connection = connection;
+ this.tableName = tableName;
+ this.writer = writer;
+ }
+
+ @Override
+ public void run() {
+ Thread current = Thread.currentThread();
+ try {
+ ClientScanner scanner = new ClientScanner(connection.getConfiguration(), s, tableName, connection);
+ Result s;
+ while((s = scanner.next()) != null && !current.isInterrupted()) {
+ writer.put(s);
+ }
+ scanner.close();
+ writer.done();
+ } catch (Exception x) {
+ // record any exceptions encountered
+ writer.exception(x);
+ }
+ }
+ }
+}
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1535609)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy)
@@ -116,6 +116,8 @@
new TreeMap>(Bytes.BYTES_COMPARATOR);
private Boolean loadColumnFamiliesOnDemand = null;
+ // never serialized, only used on the client
+ private transient double parallelScaling = 0;
/**
* Set it true for small scan to get better performance
*
@@ -755,4 +757,21 @@
public boolean isSmall() {
return small;
}
+
+ /**
+ * Execute this scan in parallel.
+ * @param scalingFactor Use this many threads per involved RegionServer.
+ * 0 (default) to disable parallel scanning
+ */
+ public void setParallelScaling(double scalingFactor) {
+ this.parallelScaling = scalingFactor;
+ }
+
+ /**
+ * @return the current parallel scaling factor
+ * 0 if disabled (default)
+ */
+ public double getParallelScaling() {
+ return this.parallelScaling;
+ }
}
Index: hbase-shell/src/main/ruby/hbase/table.rb
===================================================================
--- hbase-shell/src/main/ruby/hbase/table.rb (revision 1535609)
+++ hbase-shell/src/main/ruby/hbase/table.rb (working copy)
@@ -336,6 +336,7 @@
limit = args.delete("LIMIT") || -1
maxlength = args.delete("MAXLENGTH") || -1
+ parallel = args.delete("PARALLEL") || -1
@converters.clear()
if args.any?
@@ -389,6 +390,10 @@
scan = org.apache.hadoop.hbase.client.Scan.new
end
+ if parallel > 0
+ scan.parallel_scaling = parallel
+ end
+
# Start the scanner
scanner = @table.getScanner(scan)
count = 0
@@ -423,6 +428,7 @@
count += 1
end
+ scanner.close
return ((block_given?) ? count : res)
end
Index: hbase-shell/src/main/ruby/hbase.rb
===================================================================
--- hbase-shell/src/main/ruby/hbase.rb (revision 1535609)
+++ hbase-shell/src/main/ruby/hbase.rb (working copy)
@@ -56,6 +56,7 @@
SPLITS_FILE = 'SPLITS_FILE'
SPLITALGO = 'SPLITALGO'
NUMREGIONS = 'NUMREGIONS'
+ PARALLEL = "PARALLEL"
CONFIGURATION = org.apache.hadoop.hbase.HConstants::CONFIGURATION
ATTRIBUTES="ATTRIBUTES"
Index: hbase-shell/src/main/ruby/shell/commands/scan.rb
===================================================================
--- hbase-shell/src/main/ruby/shell/commands/scan.rb (revision 1535609)
+++ hbase-shell/src/main/ruby/shell/commands/scan.rb (working copy)
@@ -25,7 +25,7 @@
Scan a table; pass table name and optionally a dictionary of scanner
specifications. Scanner specifications may include one or more of:
TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, TIMESTAMP, MAXLENGTH,
-or COLUMNS, CACHE
+or COLUMNS, CACHE, PARALLEL
If no columns are specified, all columns will be scanned.
To scan all members of a column family, leave the qualifier empty as in
@@ -48,6 +48,11 @@
org.apache.hadoop.hbase.filter.ColumnPaginationFilter.new(1, 0)}
For setting the Operation Attributes
hbase> scan 't1', { COLUMNS => ['c1', 'c2'], ATTRIBUTES => {'mykey' => 'myvalue'}}
+PARALLEL enables parallel, unsorted scanning. The argument is a scaling factor,
+1.0 means one thread per involved RegionServer.
+
+ hbase> scan 't1', {PARALLEL=>2.0}
+
For experts, there is an additional option -- CACHE_BLOCKS -- which
switches block caching for the scanner on (true) or off (false). By
default it is enabled. Examples: