Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (revision 1519839) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (working copy) @@ -56,7 +56,7 @@ long timestamp = 0; //"Match" for(byte [] col : scannerColumns){ - result.add(exp.checkColumn(col, 0, col.length, ++timestamp, + result.add(ScanQueryMatcher.checkColumn(exp, col, 0, col.length, ++timestamp, KeyValue.Type.Put.getCode(), false)); } @@ -169,14 +169,14 @@ Long.MIN_VALUE); for (int i = 0; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode(), + ScanQueryMatcher.checkColumn(explicit, col, 0, col.length, 1, KeyValue.Type.Put.getCode(), false); } explicit.reset(); for (int i = 1; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode(), + ScanQueryMatcher.checkColumn(explicit, col, 0, col.length, 1, KeyValue.Type.Put.getCode(), false); } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java (revision 1519839) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java (working copy) @@ -55,7 +55,7 @@ List actual = new ArrayList(); for(byte [] qualifier : qualifiers) { - ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0, + ScanQueryMatcher.MatchCode mc = ScanQueryMatcher.checkColumn(tracker, qualifier, 0, qualifier.length, 1, KeyValue.Type.Put.getCode(), false); actual.add(mc); } @@ -88,7 +88,7 @@ long timestamp = 0; for(byte [] qualifier : qualifiers) { - MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length, + MatchCode mc = ScanQueryMatcher.checkColumn(tracker, qualifier, 0, qualifier.length, ++timestamp, KeyValue.Type.Put.getCode(), false); actual.add(mc); } @@ -112,7 +112,7 @@ try { for(byte [] qualifier : qualifiers) { - tracker.checkColumn(qualifier, 0, qualifier.length, 1, + ScanQueryMatcher.checkColumn(tracker, qualifier, 0, qualifier.length, 1, KeyValue.Type.Put.getCode(), false); } } catch (Exception e) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1519839) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -340,42 +340,59 @@ return columns.getNextRowOrNextColumn(bytes, offset, qualLength); } - /** - * Filters should be checked before checking column trackers. If we do - * otherwise, as was previously being done, ColumnTracker may increment its - * counter for even that KV which may be discarded later on by Filter. This - * would lead to incorrect results in certain cases. - */ - ReturnCode filterResponse = ReturnCode.SKIP; - if (filter != null) { - filterResponse = filter.filterKeyValue(kv); - if (filterResponse == ReturnCode.SKIP) { - return MatchCode.SKIP; - } else if (filterResponse == ReturnCode.NEXT_COL) { - return columns.getNextRowOrNextColumn(bytes, offset, qualLength); - } else if (filterResponse == ReturnCode.NEXT_ROW) { - stickyNextRow = true; - return MatchCode.SEEK_NEXT_ROW; - } else if (filterResponse == ReturnCode.SEEK_NEXT_USING_HINT) { - return MatchCode.SEEK_NEXT_USING_HINT; + // STEP 1: Check if the column is part of the requested columns + MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, type); + if (colChecker == MatchCode.INCLUDE) { + ReturnCode filterResponse = ReturnCode.SKIP; + // STEP 2: Yes, the column is part of the requested columns. Check if filter is present + if (filter != null) { + // STEP 3: Filter the key value and return if it filters out + filterResponse = filter.filterKeyValue(kv); + switch (filterResponse) { + case SKIP: + return MatchCode.SKIP; + case NEXT_COL: + return columns.getNextRowOrNextColumn(bytes, offset, qualLength); + case NEXT_ROW: + stickyNextRow = true; + return MatchCode.SEEK_NEXT_ROW; + case SEEK_NEXT_USING_HINT: + return MatchCode.SEEK_NEXT_USING_HINT; + default: + //It means it is either include or include and seek next + break; + } } + /* + * STEP 4: Reaching this step means the column is part of the requested columns and either + * the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response. + * Now check the number of versions needed. This method call returns SKIP, INCLUDE, + * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL. + * + * FilterResponse ColumnChecker Desired behavior + * INCLUDE SKIP row has already been included, SKIP. + * INCLUDE INCLUDE INCLUDE + * INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL + * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP. + * INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL + * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL + * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW + * + * In all the above scenarios, we return the column checker return value except for + * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE) + */ + colChecker = columns.checkVersions(type, timestamp, + kv.getMemstoreTS() > maxReadPointToTrackVersions); + //Optimize with stickyNextRow + stickyNextRow = colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW ? true : stickyNextRow; + return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL && + colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL + : colChecker; } - - MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, - timestamp, type, kv.getMemstoreTS() > maxReadPointToTrackVersions); - /* - * According to current implementation, colChecker can only be - * SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return - * the MatchCode. If it is SEEK_NEXT_ROW, also set stickyNextRow. - */ - if (colChecker == MatchCode.SEEK_NEXT_ROW) { - stickyNextRow = true; - } else if (filter != null && colChecker == MatchCode.INCLUDE && - filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL) { - return MatchCode.INCLUDE_AND_SEEK_NEXT_COL; - } + stickyNextRow = (colChecker == MatchCode.SEEK_NEXT_ROW) ? true + : stickyNextRow; return colChecker; - } public boolean moreRowsMayExistAfter(KeyValue kv) { @@ -454,6 +471,16 @@ null, 0, 0); } + //Used only for testing purposes + static MatchCode checkColumn(ColumnTracker columnTracker, byte[] bytes, int offset, + int length, long ttl, byte type, boolean ignoreCount) throws IOException { + MatchCode matchCode = columnTracker.checkColumn(bytes, offset, length, type); + if (matchCode == MatchCode.INCLUDE) { + return columnTracker.checkVersions(type, ttl, ignoreCount); + } + return matchCode; + } + /** * {@link #match} return codes. These instruct the scanner moving through * memstores and StoreFiles what to do with the current KeyValue. Index: src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (revision 1519839) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (working copy) @@ -106,7 +106,7 @@ */ @Override public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, - int length, long timestamp, byte type, boolean ignoreCount) { + int length, byte type) { // delete markers should never be passed to an // *Explicit*ColumnTracker assert !KeyValue.isDelete(type); @@ -125,34 +125,9 @@ int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), column.getLength(), bytes, offset, length); - // Column Matches. If it is not a duplicate key, increment the version count - // and include. + // Column Matches. Return include code. The caller would call checkVersions + // to limit the number of versions. if(ret == 0) { - if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; - - //If column matches, check if it is a duplicate timestamp - if (sameAsPreviousTS(timestamp)) { - //If duplicate, skip this Key - return ScanQueryMatcher.MatchCode.SKIP; - } - int count = this.column.increment(); - if(count >= maxVersions || (count >= minVersions && isExpired(timestamp))) { - // Done with versions for this column - ++this.index; - resetTS(); - if (done()) { - // We have served all the requested columns. - this.column = null; - return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; - } else { - // We are done with current column; advance to next column - // of interest. - this.column = this.columns.get(this.index); - return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL; - } - } else { - setTS(timestamp); - } return ScanQueryMatcher.MatchCode.INCLUDE; } @@ -180,6 +155,35 @@ } while(true); } + @Override + public ScanQueryMatcher.MatchCode checkVersions(byte type, long timestamp, + boolean ignoreCount) { + assert !KeyValue.isDelete(type); + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; + // Check if it is a duplicate timestamp + if (sameAsPreviousTS(timestamp)) { + // If duplicate, skip this Key + return ScanQueryMatcher.MatchCode.SKIP; + } + int count = this.column.increment(); + if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) { + // Done with versions for this column + ++this.index; + resetTS(); + if (done()) { + // We have served all the requested columns. + this.column = null; + return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; + } + // We are done with current column; advance to next column + // of interest. + this.column = this.columns.get(this.index); + return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL; + } + setTS(timestamp); + return ScanQueryMatcher.MatchCode.INCLUDE; + } + // Called between every row. public void reset() { this.index = 0; Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (revision 1519839) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (working copy) @@ -61,30 +61,20 @@ /** * {@inheritDoc} * This receives puts *and* deletes. - * Deletes do not count as a version, but rather take the version - * of the previous put (so eventually all but the last can be reclaimed). */ @Override - public MatchCode checkColumn(byte[] bytes, int offset, int length, - long timestamp, byte type, boolean ignoreCount) throws IOException { - + public MatchCode checkColumn(byte[] bytes, int offset, int length, byte type) + throws IOException { + if (columnBuffer == null) { // first iteration. resetBuffer(bytes, offset, length); - if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; - // do not count a delete marker as another version - return checkVersion(type, timestamp); + return ScanQueryMatcher.MatchCode.INCLUDE; } int cmp = Bytes.compareTo(bytes, offset, length, columnBuffer, columnOffset, columnLength); if (cmp == 0) { - if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; - - //If column matches, check if it is a duplicate timestamp - if (sameAsPreviousTSAndType(timestamp, type)) { - return ScanQueryMatcher.MatchCode.SKIP; - } - return checkVersion(type, timestamp); + return ScanQueryMatcher.MatchCode.INCLUDE; } resetTSAndType(); @@ -93,8 +83,7 @@ if (cmp > 0) { // switched columns, lets do something.x resetBuffer(bytes, offset, length); - if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; - return checkVersion(type, timestamp); + return ScanQueryMatcher.MatchCode.INCLUDE; } // new col < oldcol @@ -107,6 +96,20 @@ Bytes.toStringBinary(bytes, offset, length)); } + /** + * {@inheritDoc} + * This receives puts *and* deletes. Deletes do not count as a version, but rather + * take the version of the previous put (so eventually all but the last can be reclaimed). + */ + @Override + public MatchCode checkVersions(byte type, long timestamp, boolean ignoreCount) { + if (ignoreCount) return MatchCode.INCLUDE; + if (sameAsPreviousTSAndType(timestamp, type)) { + return MatchCode.SKIP; + } + return checkVersion(type, timestamp); + } + private void resetBuffer(byte[] bytes, int offset, int length) { columnBuffer = bytes; columnOffset = offset; Index: src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java (revision 1519839) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java (working copy) @@ -47,25 +47,45 @@ * This class is NOT thread-safe as queries are never multi-threaded */ public interface ColumnTracker { + /** - * Keeps track of the number of versions for the columns asked for + * Checks if the column is present in the list of requested columns by + * returning the match code instance. It does not check against the number of + * versions for the columns asked for. To do the version check, one has to + * call {@link #checkVersions(byte, long, boolean)} method based on the return + * type (INCLUDE) of this method. The values returned by this method are + * {@link MatchCode#INCLUDE}, {@link MatchCode#SEEK_NEXT_COL} and + * {@link MatchCode#SEEK_NEXT_ROW}. + * * @param bytes * @param offset * @param length - * @param ttl The timeToLive to enforce. * @param type The type of the KeyValue - * @param ignoreCount indicates if the KV needs to be excluded while counting - * (used during compactions. We only count KV's that are older than all the - * scanners' read points.) * @return The match code instance. - * @throws IOException in case there is an internal consistency problem - * caused by a data corruption. + * @throws IOException in case there is an internal consistency problem caused by a data + * corruption. */ - public ScanQueryMatcher.MatchCode checkColumn(byte[] bytes, int offset, - int length, long ttl, byte type, boolean ignoreCount) + ScanQueryMatcher.MatchCode checkColumn(byte[] bytes, int offset, int length, byte type) throws IOException; /** + * Keeps track of the number of versions for the columns asked for. It assumes + * that the user has already checked if the keyvalue needs to be included by + * calling the {@link #checkColumn(byte[], int, int, byte)} + * method. The enum values returned by this method are {@link MatchCode#SKIP}, + * {@link MatchCode#INCLUDE}, {@link MatchCode#INCLUDE_AND_SEEK_NEXT_COL} and + * {@link MatchCode#INCLUDE_AND_SEEK_NEXT_ROW}. + * + * @param type the type of the key value (Put/Delete) + * @param ttl The timeToLive to enforce. + * @param ignoreCount indicates if the KV needs to be excluded while counting (used + * during compactions. We only count KV's that are older than all the + * scanners' read points.) + * @return the scan query matcher match code instance + */ + ScanQueryMatcher.MatchCode checkVersions(byte type, long ttl, boolean ignoreCount); + + /** * Resets the Matcher */ public void reset();