From 88c76e4a462148e91ce55ba54552ce5119453b74 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 29 Sep 2014 09:09:28 -0700 Subject: [PATCH 1/2] HBASE-11763 Move TTL handling into ScanQueryMatcher --- .../hadoop/hbase/regionserver/ColumnCount.java | 7 ++++ .../hadoop/hbase/regionserver/ColumnTracker.java | 13 +++----- .../hbase/regionserver/ExplicitColumnTracker.java | 22 ++++++------- .../hbase/regionserver/ScanDeleteTracker.java | 2 ++ .../hbase/regionserver/ScanQueryMatcher.java | 30 +++++++++++------- .../regionserver/ScanWildcardColumnTracker.java | 37 +++++++--------------- .../regionserver/TestExplicitColumnTracker.java | 7 ++-- .../TestScanWildcardColumnTracker.java | 23 ++++++++------ 8 files changed, 70 insertions(+), 71 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java index 71ea1bd..a83f03e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java @@ -101,6 +101,13 @@ public class ColumnCount { } /** + * @return current count + */ + public int getCount() { + return count; + } + + /** * Set the current count to a new count * @param count new count to set */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java index 8568cfc..2a5e5de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java @@ -77,14 +77,14 @@ public interface ColumnTracker { * the {@link #checkColumn(byte[], int, int, byte)} method and perform all the operations in this * checkVersions method. * @param type the type of the key value (Put/Delete) - * @param ttl The timeToLive to enforce. + * @param ts the timestamp of the key value * @param ignoreCount indicates if the KV needs to be excluded while counting (used during * compactions. We only count KV's that are older than all the scanners' read points.) * @return the scan query matcher match code instance * @throws IOException in case there is an internal consistency problem caused by a data * corruption. */ - ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length, long ttl, + ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length, long ts, byte type, boolean ignoreCount) throws IOException; /** * Resets the Matcher @@ -118,11 +118,8 @@ public interface ColumnTracker { ); /** - * Give the tracker a chance to declare it's done based on only the timestamp - * to allow an early out. - * - * @param timestamp - * @return true to early out based on timestamp. + * Give the tracker a chance to declare it's done if the minimum number of + * versions have been seen */ - boolean isDone(long timestamp); + boolean hasMinVersions(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java index b42ff25..20bb750 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java @@ -71,7 +71,6 @@ public class ExplicitColumnTracker implements ColumnTracker { /** Keeps track of the latest timestamp included for current column. * Used to eliminate duplicates. */ private long latestTSOfCurrentColumn; - private long oldestStamp; private int skipCount; /** @@ -79,17 +78,14 @@ public class ExplicitColumnTracker implements ColumnTracker { * @param columns columns specified user in query * @param minVersions minimum number of versions to keep * @param maxVersions maximum versions to return per column - * @param oldestUnexpiredTS the oldest timestamp we are interested in, - * based on TTL * @param lookAhead number of KeyValues to look ahead via next before * (re)seeking */ public ExplicitColumnTracker(NavigableSet columns, int minVersions, - int maxVersions, long oldestUnexpiredTS, int lookAhead) { + int maxVersions, int lookAhead) { this.maxVersions = maxVersions; this.minVersions = minVersions; this.lookAhead = lookAhead; - this.oldestStamp = oldestUnexpiredTS; this.columns = new ColumnCount[columns.size()]; int i=0; for(byte [] column : columns) { @@ -176,7 +172,7 @@ public class ExplicitColumnTracker implements ColumnTracker { return ScanQueryMatcher.MatchCode.SKIP; } int count = this.column.increment(); - if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) { + if (count >= maxVersions) { // Done with versions for this column ++this.index; this.skipCount = 0; @@ -218,10 +214,6 @@ public class ExplicitColumnTracker implements ColumnTracker { return timestamp == latestTSOfCurrentColumn; } - private boolean isExpired(long timestamp) { - return timestamp < oldestStamp; - } - /** * This method is used to inform the column tracker that we are done with * this column. We may get this information from external filters or @@ -252,6 +244,7 @@ public class ExplicitColumnTracker implements ColumnTracker { } } + @Override public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, int qualLength) { doneWithColumn(bytes, offset,qualLength); @@ -263,7 +256,12 @@ public class ExplicitColumnTracker implements ColumnTracker { } } - public boolean isDone(long timestamp) { - return minVersions <= 0 && isExpired(timestamp); + @Override + public boolean hasMinVersions() { + // Always have enough versions if minVersions is not set + if (minVersions <= 0) { + return true; + } + return column.getCount() >= minVersions; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java index a5c17fb..a29ef8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java @@ -56,6 +56,7 @@ public class ScanDeleteTracker implements DeleteTracker { /** * Constructor for ScanDeleteTracker + * @param oldestUnexpiredTS */ public ScanDeleteTracker() { super(); @@ -111,6 +112,7 @@ public class ScanDeleteTracker implements DeleteTracker { @Override public DeleteResult isDeleted(Cell cell) { long timestamp = cell.getTimestamp(); + int qualifierOffset = cell.getQualifierOffset(); int qualifierLength = cell.getQualifierLength(); if (hasFamilyStamp && timestamp <= familyStamp) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index fae8678..75316b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -24,6 +24,7 @@ import java.util.NavigableSet; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; @@ -98,6 +99,9 @@ public class ScanQueryMatcher { */ private final long earliestPutTs; + /** The oldest timestamp we are interested in, based on TTL */ + private final long oldestUnexpiredTS; + /** readPoint over which the KVs are unconditionally included */ protected long maxReadPointToTrackVersions; @@ -160,6 +164,7 @@ public class ScanQueryMatcher { scanInfo.getFamily()); this.filter = scan.getFilter(); this.earliestPutTs = earliestPutTs; + this.oldestUnexpiredTS = oldestUnexpiredTS; this.maxReadPointToTrackVersions = readPointToUse; this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes(); @@ -182,8 +187,7 @@ public class ScanQueryMatcher { hasNullColumn = true; // use a specialized scan for wildcard column tracker. - this.columns = new ScanWildcardColumnTracker( - scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); + this.columns = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions); } else { // whether there is null column in the explicit column query hasNullColumn = (columns.first().length == 0); @@ -192,7 +196,7 @@ public class ScanQueryMatcher { // between rows, not between storefiles. byte[] attr = scan.getAttribute(Scan.HINT_LOOKAHEAD); this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, - oldestUnexpiredTS, attr == null ? 0 : Bytes.toInt(attr)); + attr == null ? 0 : Bytes.toInt(attr)); } this.isReversed = scan.isReversed(); } @@ -315,10 +319,6 @@ public class ScanQueryMatcher { (offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE; long timestamp = Bytes.toLong(bytes, initialOffset + keyLength - KeyValue.TIMESTAMP_TYPE_SIZE); - // check for early out based on timestamp alone - if (columns.isDone(timestamp)) { - return columns.getNextRowOrNextColumn(bytes, offset, qualLength); - } /* * The delete logic is pretty complicated now. @@ -334,7 +334,7 @@ public class ScanQueryMatcher { * they affect */ byte type = bytes[initialOffset + keyLength - 1]; - if (kv.isDelete()) { + if (CellUtil.isDelete(kv)) { if (!keepDeletedCells) { // first ignore delete markers if the scanner can do so, and the // range does not include the marker @@ -377,9 +377,16 @@ public class ScanQueryMatcher { } // note the following next else if... // delete marker are not subject to other delete markers - } else if (!this.deletes.isEmpty()) { - DeleteResult deleteResult = deletes.isDeleted(kv); - switch (deleteResult) { + } else { + // If the cell is expired and we have enough versions, skip + if (columns.hasMinVersions() && HStore.isExpired(kv, oldestUnexpiredTS)) { + return columns.getNextRowOrNextColumn(kv.getQualifierArray(), kv.getQualifierOffset(), + kv.getQualifierLength()); + } + // Check deletes + if (!this.deletes.isEmpty()) { + DeleteResult deleteResult = deletes.isDeleted(kv); + switch (deleteResult) { case FAMILY_DELETED: case COLUMN_DELETED: return columns.getNextRowOrNextColumn(bytes, offset, qualLength); @@ -391,6 +398,7 @@ public class ScanQueryMatcher { default: throw new RuntimeException("UNEXPECTED"); } + } } int timestampComparison = tr.compare(timestamp); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java index 5101584..b6597a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java @@ -43,20 +43,14 @@ public class ScanWildcardColumnTracker implements ColumnTracker { private long latestTSOfCurrentColumn; private byte latestTypeOfCurrentColumn; - private long oldestStamp; - /** * Return maxVersions of every row. * @param minVersion Minimum number of versions to keep * @param maxVersion Maximum number of versions to return - * @param oldestUnexpiredTS oldest timestamp that has not expired according - * to the TTL. */ - public ScanWildcardColumnTracker(int minVersion, int maxVersion, - long oldestUnexpiredTS) { + public ScanWildcardColumnTracker(int minVersion, int maxVersion) { this.maxVersions = maxVersion; this.minVersions = minVersion; - this.oldestStamp = oldestUnexpiredTS; } /** @@ -126,11 +120,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker { /** * Check whether this version should be retained. - * There are 4 variables considered: - * If this version is past max versions -> skip it - * If this kv has expired or was deleted, check min versions - * to decide whther to skip it or not. - * * Increase the version counter unless this is a delete */ private MatchCode checkVersion(byte type, long timestamp) { @@ -140,14 +129,8 @@ public class ScanWildcardColumnTracker implements ColumnTracker { if (currentCount > maxVersions) { return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col } - // keep the KV if required by minversions or it is not expired, yet - if (currentCount <= minVersions || !isExpired(timestamp)) { - setTSAndType(timestamp, type); - return ScanQueryMatcher.MatchCode.INCLUDE; - } else { - return MatchCode.SEEK_NEXT_COL; - } - + setTSAndType(timestamp, type); + return ScanQueryMatcher.MatchCode.INCLUDE; } @Override @@ -170,10 +153,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker { return timestamp == latestTSOfCurrentColumn && type == latestTypeOfCurrentColumn; } - private boolean isExpired(long timestamp) { - return timestamp < oldestStamp; - } - /** * Used by matcher and scan/get to get a hint of the next column * to seek to after checkColumn() returns SKIP. Returns the next interesting @@ -194,12 +173,18 @@ public class ScanWildcardColumnTracker implements ColumnTracker { return false; } + @Override public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, int qualLength) { return MatchCode.SEEK_NEXT_COL; } - public boolean isDone(long timestamp) { - return minVersions <= 0 && isExpired(timestamp); + @Override + public boolean hasMinVersions() { + // Always have enough versions if minVersions is not set + if (minVersions <= 0) { + return true; + } + return currentCount >= minVersions; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java index 2dbf279..87e3108 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java @@ -47,9 +47,7 @@ public class TestExplicitColumnTracker { TreeSet trackColumns, List scannerColumns, List expected, int lookAhead) throws IOException { - ColumnTracker exp = new ExplicitColumnTracker( - trackColumns, 0, maxVersions, Long.MIN_VALUE, lookAhead); - + ColumnTracker exp = new ExplicitColumnTracker(trackColumns, 0, maxVersions, lookAhead); //Initialize result List result = new ArrayList(); @@ -208,8 +206,7 @@ public class TestExplicitColumnTracker { columns.add(Bytes.toBytes("col"+i)); } - ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions, - Long.MIN_VALUE, 0); + ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions, 0); for (int i = 0; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); ScanQueryMatcher.checkColumn(explicit, col, 0, col.length, 1, KeyValue.Type.Put.getCode(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java index 5b99940..d94c546 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.*; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -26,16 +28,19 @@ import java.util.List; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; + +import org.junit.Ignore; +import org.junit.Test; import org.junit.experimental.categories.Category; @Category(SmallTests.class) -public class TestScanWildcardColumnTracker extends HBaseTestCase { +public class TestScanWildcardColumnTracker { final static int VERSIONS = 2; + @Test public void testCheckColumn_Ok() throws IOException { - ScanWildcardColumnTracker tracker = - new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE); + ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS); //Create list of qualifiers List qualifiers = new ArrayList(); @@ -66,9 +71,9 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { } } + @Test public void testCheckColumn_EnforceVersions() throws IOException { - ScanWildcardColumnTracker tracker = - new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE); + ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS); //Create list of qualifiers List qualifiers = new ArrayList(); @@ -100,9 +105,10 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { } } - public void DisabledTestCheckColumn_WrongOrder() { - ScanWildcardColumnTracker tracker = - new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE); + @Test + @Ignore + public void testCheckColumn_WrongOrder() { + ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS); //Create list of qualifiers List qualifiers = new ArrayList(); @@ -123,6 +129,5 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { assertEquals(true, ok); } - } -- 1.7.12.4 (Apple Git-37)