For MapReduce jobs that use HBase tables as a source, if there a pattern where the "slow" map tasks seem to
Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1576414)
+++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy)
@@ -83,7 +83,25 @@
private static final String RAW_ATTR = "_raw_";
private static final String ONDEMAND_ATTR = "_ondemand_";
private static final String ISOLATION_LEVEL = "_isolationlevel_";
+
+ /** Scan Hints */
private static final String SMALL_ATTR = "_small_";
+ /**
+ * EXPERT ONLY.
+ * An integer (not long) indicating to the scanner logic how many times we attempt to retrieve the
+ * next KV before we schedule a reseek.
+ * The right value depends on the size of the average KV. A reseek is more efficient when
+ * it can skip 5-10 KVs or 512B-1KB, or when the next KV is likely found in another HFile block.
+ * Setting this only has any effect when columns were added with
+ * {@link #addColumn(byte[], byte[])}
+ * {@code
+ * Scan s = new Scan(...);
+ * s.addColumn(...);
+ * s.setAttribute(Scan.HINT_LOOKAHEAD, Bytes.toBytes(2));
+ * }
+ * Default is 0 (always reseek).
+ */
+ public static final String HINT_LOOKAHEAD = "_look_ahead_";
private static final byte SCAN_VERSION = (byte)2;
private byte [] startRow = HConstants.EMPTY_START_ROW;
Index: src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (revision 1576414)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (working copy)
@@ -54,6 +54,10 @@
private final int maxVersions;
private final int minVersions;
+ // hint for the tracker about how many KVs we will attempt to search via next()
+ // before we schedule a (re)seek operation
+ private final int lookAhead;
+
/**
* Contains the list of columns that the ExplicitColumnTracker is tracking.
* Each ColumnCount instance also tracks how many versions of the requested
@@ -66,6 +70,7 @@
* Used to eliminate duplicates. */
private long latestTSOfCurrentColumn;
private long oldestStamp;
+ private int skipCount;
/**
* Default constructor.
@@ -74,12 +79,14 @@
* @param maxVersions maximum versions to return per column
* @param oldestUnexpiredTS the oldest timestamp we are interested in,
* based on TTL
- * @param ttl The timeToLive to enforce
+ * @param lookAhead number of KeyValues to look ahead via next before
+ * (re)seeking
*/
public ExplicitColumnTracker(NavigableSet columns, int minVersions,
- int maxVersions, long oldestUnexpiredTS) {
+ int maxVersions, long oldestUnexpiredTS, int lookAhead) {
this.maxVersions = maxVersions;
this.minVersions = minVersions;
+ this.lookAhead = lookAhead;
this.oldestStamp = oldestUnexpiredTS;
this.columns = new ColumnCount[columns.size()];
int i=0;
@@ -135,7 +142,8 @@
if (ret > 0) {
// The current KV is smaller than the column the ExplicitColumnTracker
// is interested in, so seek to that column of interest.
- return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
+ return this.skipCount++ < this.lookAhead ? ScanQueryMatcher.MatchCode.SKIP
+ : ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
}
// The current KV is bigger than the column the ExplicitColumnTracker
@@ -144,6 +152,7 @@
// column of interest, and check again.
if (ret <= -1) {
++this.index;
+ this.skipCount = 0;
if (done()) {
// No more to match, do not include, done with this row.
return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
@@ -168,6 +177,7 @@
if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) {
// Done with versions for this column
++this.index;
+ this.skipCount = 0;
resetTS();
if (done()) {
// We have served all the requested columns.
@@ -186,6 +196,7 @@
// Called between every row.
public void reset() {
this.index = 0;
+ this.skipCount = 0;
this.column = this.columns[this.index];
for(ColumnCount col : this.columns) {
col.setCount(0);
@@ -225,6 +236,7 @@
resetTS();
if (compare <= 0) {
++this.index;
+ this.skipCount = 0;
if (done()) {
// Will not hit any more columns in this storefile
this.column = null;
Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1576414)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy)
@@ -176,8 +176,9 @@
// We can share the ExplicitColumnTracker, diff is we reset
// between rows, not between storefiles.
- this.columns = new ExplicitColumnTracker(columns,
- scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS);
+ byte[] attr = scan.getAttribute(Scan.HINT_LOOKAHEAD);
+ this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
+ oldestUnexpiredTS, attr == null ? 0 : Bytes.toInt(attr));
}
}
Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (revision 1576414)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (working copy)
@@ -45,9 +45,9 @@
private void runTest(int maxVersions,
TreeSet trackColumns,
List scannerColumns,
- List expected) throws IOException {
+ List expected, int lookAhead) throws IOException {
ColumnTracker exp = new ExplicitColumnTracker(
- trackColumns, 0, maxVersions, Long.MIN_VALUE);
+ trackColumns, 0, maxVersions, Long.MIN_VALUE, lookAhead);
//Initialize result
@@ -96,7 +96,7 @@
scanner.add(col4);
scanner.add(col5);
- runTest(maxVersions, columns, scanner, expected);
+ runTest(maxVersions, columns, scanner, expected, 0);
}
public void testGet_MultiVersion() throws IOException{
@@ -151,10 +151,64 @@
scanner.add(col5);
//Initialize result
- runTest(maxVersions, columns, scanner, expected);
+ runTest(maxVersions, columns, scanner, expected, 0);
}
+ public void testGet_MultiVersionWithLookAhead() throws IOException{
+ if(PRINT){
+ System.out.println("\nMultiVersion");
+ }
+ //Create tracker
+ TreeSet columns = new TreeSet(Bytes.BYTES_COMPARATOR);
+ //Looking for every other
+ columns.add(col2);
+ columns.add(col4);
+
+ List expected = new ArrayList();
+ expected.add(ScanQueryMatcher.MatchCode.SKIP);
+ expected.add(ScanQueryMatcher.MatchCode.SKIP);
+ expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
+
+ expected.add(ScanQueryMatcher.MatchCode.INCLUDE); // col2; 1st version
+ expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); // col2; 2nd version
+ expected.add(ScanQueryMatcher.MatchCode.SKIP);
+
+ expected.add(ScanQueryMatcher.MatchCode.SKIP);
+ expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
+ expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
+
+ expected.add(ScanQueryMatcher.MatchCode.INCLUDE); // col4; 1st version
+ expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); // col4; 2nd version
+ expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
+
+ expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
+ expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
+ expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
+ int maxVersions = 2;
+
+ //Create "Scanner"
+ List scanner = new ArrayList();
+ scanner.add(col1);
+ scanner.add(col1);
+ scanner.add(col1);
+ scanner.add(col2);
+ scanner.add(col2);
+ scanner.add(col2);
+ scanner.add(col3);
+ scanner.add(col3);
+ scanner.add(col3);
+ scanner.add(col4);
+ scanner.add(col4);
+ scanner.add(col4);
+ scanner.add(col5);
+ scanner.add(col5);
+ scanner.add(col5);
+
+ //Initialize result
+ runTest(maxVersions, columns, scanner, expected, 2);
+ }
+
/**
* hbase-2259
*/
@@ -166,7 +220,7 @@
}
ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions,
- Long.MIN_VALUE);
+ Long.MIN_VALUE, 0);
for (int i = 0; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
ScanQueryMatcher.checkColumn(explicit, col, 0, col.length, 1, KeyValue.Type.Put.getCode(),
@@ -194,7 +248,7 @@
new ScanQueryMatcher.MatchCode[] {
ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
ScanQueryMatcher.MatchCode.SEEK_NEXT_COL });
- runTest(1, columns, scanner, expected);
+ runTest(1, columns, scanner, expected, 0);
}
@org.junit.Rule
Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java (revision 1576414)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java (working copy)
@@ -83,22 +83,11 @@
}
- public void testMatch_ExplicitColumns()
- throws IOException {
+ private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException {
//Moving up from the Tracker by using Gets and List instead
//of just byte []
- //Expected result
- List expected = new ArrayList();
- expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
- expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
- expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
- expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
- expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW);
- expected.add(ScanQueryMatcher.MatchCode.DONE);
-
- // 2,4,5
-
+ // 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2,
0, 1, ttl, false, 0, rowComparator), get.getFamilyMap().get(fam2),
EnvironmentEdgeManager.currentTimeMillis() - ttl);
@@ -130,7 +119,43 @@
}
}
+ public void testMatch_ExplicitColumns()
+ throws IOException {
+ //Moving up from the Tracker by using Gets and List instead
+ //of just byte []
+ //Expected result
+ List expected = new ArrayList();
+ expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
+ expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
+ expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
+ expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
+ expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW);
+ expected.add(ScanQueryMatcher.MatchCode.DONE);
+
+ _testMatch_ExplicitColumns(scan, expected);
+ }
+
+ public void testMatch_ExplicitColumnsWithLookAhead()
+ throws IOException {
+ //Moving up from the Tracker by using Gets and List instead
+ //of just byte []
+
+ //Expected result
+ List expected = new ArrayList();
+ expected.add(ScanQueryMatcher.MatchCode.SKIP);
+ expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
+ expected.add(ScanQueryMatcher.MatchCode.SKIP);
+ expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
+ expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW);
+ expected.add(ScanQueryMatcher.MatchCode.DONE);
+
+ Scan s = new Scan(scan);
+ s.setAttribute(Scan.HINT_LOOKAHEAD, Bytes.toBytes(2));
+ _testMatch_ExplicitColumns(s, expected);
+ }
+
+
public void testMatch_Wildcard()
throws IOException {
//Moving up from the Tracker by using Gets and List instead