diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index c3ddc4b..31346ab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -43,23 +43,24 @@ import org.apache.hadoop.hbase.util.Bytes; /** * Used to perform Get operations on a single row. *

- * To get everything for a row, instantiate a Get object with the row to get. - * To further narrow the scope of what to Get, use the methods below. + * To get everything for a row, instantiate a Get object with the row to get. To further narrow the + * scope of what to Get, use the methods below. *

- * To get all columns from specific families, execute {@link #addFamily(byte[]) addFamily} - * for each family to retrieve. + * To get all columns from specific families, execute {@link #addFamily(byte[]) addFamily} for each + * family to retrieve. *

- * To get specific columns, execute {@link #addColumn(byte[], byte[]) addColumn} - * for each column to retrieve. + * To get specific columns, execute {@link #addColumn(byte[], byte[]) addColumn} for each column to + * retrieve. *

- * To only retrieve columns within a specific range of version timestamps, - * execute {@link #setTimeRange(long, long) setTimeRange}. + * To only retrieve columns within a specific range of version timestamps, execute + * {@link #setTimeRange(long, long) setTimeRange}. *

- * To only retrieve columns with a specific timestamp, execute - * {@link #setTimeStamp(long) setTimestamp}. + * To only retrieve columns with a specific timestamp, execute {@link #setTimeStamp(long) + * setTimestamp}. *

- * To limit the number of versions of each column to be returned, execute - * {@link #setMaxVersions(int) setMaxVersions}. + * To limit the number of versions of each column to be returned when not use filter, execute + * {@link #setMaxVersions(int) setMaxVersions}. If use filter, maxVersions means the maximum + * versions of each column will be checked by filter. *

* To add a filter, call {@link #setFilter(Filter) setFilter}. */ @@ -273,13 +274,21 @@ public class Get extends Query } /** - * Get up to the specified number of versions of each column. + * If not use filter, get up to the specified number of versions of each column. + *

+ * If use filter, it means the maximum versions of each column will be checked by filter. So the + * get may return less than the value you set here as the filter may filter out some cells. If you + * want to get a specific number of version for each column after filtering, please call + * setMaxVersions() and use + * {@link org.apache.hadoop.hbase.filter.SpecifiedNumVersionsColumnFilter}. Notice that the + * SpecifiedNumVersionsColumnFilter should be placed at the last position in FilterList to make + * sure it will be checked at last. * @param maxVersions maximum versions for each column * @throws IOException if invalid number of versions * @return this for invocation chaining */ public Get setMaxVersions(int maxVersions) throws IOException { - if(maxVersions <= 0) { + if (maxVersions <= 0) { throw new IOException("maxVersions must be positive"); } this.maxVersions = maxVersions; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 639f43e..e525b71 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -62,8 +62,9 @@ import org.apache.hadoop.hbase.util.Bytes; * To only retrieve columns with a specific timestamp, call {@link #setTimeStamp(long) setTimestamp} * . *

- * To limit the number of versions of each column to be returned, call {@link #setMaxVersions(int) - * setMaxVersions}. + * To limit the number of versions of each column to be returned when not use filter, call + * {@link #setMaxVersions(int) setMaxVersions}. If use filter, it means the maximum + * versions of each column will be checked by filter. *

* To limit the maximum number of values returned for each call to next(), call * {@link #setBatch(int) setBatch}. @@ -598,7 +599,15 @@ public class Scan extends Query { } /** - * Get up to the specified number of versions of each column. + * If not use filter, get up to the specified number of versions of each column. + *

+ * If use filter, it means the maximum versions of each column will be checked by filter. So the + * scan may return less than the value you set here as the filter may filter out some cells. If + * you want to get a specific number of version for each column after filtering, please call + * setMaxVersions() and use + * {@link org.apache.hadoop.hbase.filter.SpecifiedNumVersionsColumnFilter}. Notice that the + * SpecifiedNumVersionsColumnFilter should be placed at the last position in FilterList to make + * sure it will be checked at last. * @param maxVersions maximum versions for each column * @return this */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index ca68de2..2e95601 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -678,24 +678,48 @@ public interface RegionObserver extends Coprocessor { default void preDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final Durability durability) throws IOException {} -/** - * Called before the server updates the timestamp for version delete with latest timestamp. - *

- * Call CoprocessorEnvironment#bypass to skip default actions - *

- * Call CoprocessorEnvironment#complete to skip any subsequent chained - * coprocessors - * @param c the environment provided by the region server - * @param mutation - the parent mutation associated with this delete cell - * @param cell - The deleteColumn with latest version cell - * @param byteNow - timestamp bytes - * @param get - the get formed using the current cell's row. - * Note that the get does not specify the family and qualifier - */ + + /** + * Called before the server updates the timestamp for version delete with latest timestamp. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors + * @param c the environment provided by the region server + * @param mutation - the parent mutation associated with this delete cell + * @param cell - The deleteColumn with latest version cell + * @param byteNow - timestamp bytes + * @param get - the get formed using the current cell's row. Note that the get does not specify + * the family and qualifier + * @deprecated use + * {@link #prePrepareTimeStampForDeleteVersion(ObserverContext, Mutation, Cell, byte[], Get, int)} + * instead + */ + @Deprecated default void prePrepareTimeStampForDeleteVersion( - final ObserverContext c, - final Mutation mutation, final Cell cell, final byte[] byteNow, - final Get get) throws IOException {} + final ObserverContext c, final Mutation mutation, + final Cell cell, final byte[] byteNow, final Get get) throws IOException { + } + + /** + * Called before the server updates the timestamp for version delete with latest timestamp. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors + * @param c the environment provided by the region server + * @param mutation - the parent mutation associated with this delete cell + * @param cell - The deleteColumn with latest version cell + * @param byteNow - timestamp bytes + * @param get - the get formed using the current cell's row. Note that the get does not specify + * the family and qualifier + * @param count - the number of delete + */ + default void prePrepareTimeStampForDeleteVersion( + final ObserverContext c, final Mutation mutation, + final Cell cell, final byte[] byteNow, final Get get, final int count) throws IOException { + prePrepareTimeStampForDeleteVersion(c, mutation, cell, byteNow, get); + } /** * Called after the client deletes a value. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b460d1a..ebafd16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2905,11 +2905,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi count = kvCount.get(qual); Get get = new Get(CellUtil.cloneRow(cell)); - get.setMaxVersions(count); + get.setMaxVersions(); get.addColumn(family, qual); if (coprocessorHost != null) { if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell, - byteNow, get)) { + byteNow, get, count)) { updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow); } } else { @@ -2931,9 +2931,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi CellUtil.updateLatestStamp(cell, byteNow, 0); return; } - if (result.size() > count) { - throw new RuntimeException("Unexpected size: " + result.size()); - } Cell getCell = result.get(count - 1); CellUtil.setTimestamp(cell, getCell.getTimestamp()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 0abc988..a7ff087 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -892,17 +892,18 @@ public class RegionCoprocessorHost * @param byteNow - current timestamp in bytes * @param get - the get that could be used * Note that the get only does not specify the family and qualifier that should be used + * @param count - the number of delete * @return true if default processing should be bypassed * @exception IOException * Exception */ public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, - final Cell kv, final byte[] byteNow, final Get get) throws IOException { + final Cell kv, final byte[] byteNow, final Get get, final int count) throws IOException { return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { @Override public void call(RegionObserver oserver, ObserverContext ctx) throws IOException { - oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get); + oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get, count); } }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java index 74b9084..68cedf1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java @@ -30,7 +30,7 @@ class ColumnCount { private final byte[] bytes; private final int offset; private final int length; - private int count; + private int count, slack; /** * Constructor @@ -57,10 +57,15 @@ class ColumnCount { * @param count initial count */ public ColumnCount(byte[] column, int offset, int length, int count) { + this(column, 0, column.length, count, 0); + } + + public ColumnCount(byte[] column, int offset, int length, int count, int slack) { this.bytes = column; this.offset = offset; this.length = length; this.count = count; + this.slack = slack; } /** @@ -91,6 +96,15 @@ class ColumnCount { public int decrement() { return --count; } + public void retract() { + if (slack > 0) { + count--; + slack--; + } + } + public int decrementSlack() { + return --slack; + } /** * Increment the current version count @@ -107,4 +121,7 @@ class ColumnCount { public void setCount(int count) { this.count = count; } + public void setSlack(int slack) { + this.slack = slack; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java index 7a2a1e2..b060bbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java @@ -126,4 +126,5 @@ public interface ColumnTracker extends ShipperListener { * @return true to early out based on timestamp. */ boolean isDone(long timestamp); + void retract(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java index b4825f0..2a912e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java @@ -52,6 +52,7 @@ public class ExplicitColumnTracker implements ColumnTracker { private final int maxVersions; private final int minVersions; + private int slack; /** * Contains the list of columns that the ExplicitColumnTracker is tracking. Each ColumnCount @@ -74,17 +75,21 @@ public class ExplicitColumnTracker implements ColumnTracker { * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL */ public ExplicitColumnTracker(NavigableSet columns, int minVersions, int maxVersions, - long oldestUnexpiredTS) { + int slack, long oldestUnexpiredTS) { this.maxVersions = maxVersions; this.minVersions = minVersions; + this.slack = slack; this.oldestStamp = oldestUnexpiredTS; this.columns = new ColumnCount[columns.size()]; int i = 0; for (byte[] column : columns) { - this.columns[i++] = new ColumnCount(column); + this.columns[i++] = new ColumnCount(column, 0, column.length, 0, slack); } reset(); } + public void retract() { + this.column.retract(); + } /** * Done when there are no more columns to match against. @@ -187,6 +192,7 @@ public class ExplicitColumnTracker implements ColumnTracker { this.column = this.columns[this.index]; for (ColumnCount col : this.columns) { col.setCount(0); + col.setSlack(slack); } resetTS(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java index 11dd51f..5edc5d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java @@ -380,7 +380,7 @@ public class LegacyScanQueryMatcher extends ScanQueryMatcher { // whether there is null column in the explicit column query hasNullColumn = columns.first().length == 0; columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, - oldestUnexpiredTS); + 0, oldestUnexpiredTS); } DeleteTracker deletes = instantiateDeleteTracker(regionCoprocessorHost); if (dropDeletesFromRow == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java index a73cc0b..c62bdc5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java @@ -38,6 +38,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker { private int currentCount = 0; private int maxVersions; private int minVersions; + private int slack = 0; /* * Keeps track of the latest timestamp and type included for current column. Used to eliminate * duplicates. @@ -54,9 +55,19 @@ public class ScanWildcardColumnTracker implements ColumnTracker { * @param oldestUnexpiredTS oldest timestamp that has not expired according to the TTL. */ public ScanWildcardColumnTracker(int minVersion, int maxVersion, long oldestUnexpiredTS) { + this(minVersion, maxVersion, 0, oldestUnexpiredTS); + } + public ScanWildcardColumnTracker(int minVersion, int maxVersion, int slack, long oldestUnexpiredTS) { this.maxVersions = maxVersion; this.minVersions = minVersion; this.oldestStamp = oldestUnexpiredTS; + this.slack = slack; + } + public void retract() { + if (slack > 0) { + slack--; + currentCount--; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java index 95563b5..cd1c50c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.querymatcher; import java.io.IOException; import java.util.NavigableSet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo; */ @InterfaceAudience.Private public abstract class UserScanQueryMatcher extends ScanQueryMatcher { + private static final Log LOG = LogFactory.getLog(ScanQueryMatcher.class); protected final boolean hasNullColumn; @@ -111,53 +114,91 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher { if (colChecker != MatchCode.INCLUDE) { return colChecker; } - ReturnCode filterResponse = ReturnCode.SKIP; - // STEP 2: Yes, the column is part of the requested columns. Check if filter is present + /* + * STEP 2: check the number of versions needed. This method call returns SKIP, SEEK_NEXT_COL, + * INCLUDE, INCLUDE_AND_SEEK_NEXT_COL, or INCLUDE_AND_SEEK_NEXT_ROW. + */ + colChecker = columns.checkVersions(cell, timestamp, typeByte, false); + switch (colChecker) { + case SKIP: + return MatchCode.SKIP; + case SEEK_NEXT_COL: + return MatchCode.SEEK_NEXT_COL; + default: + // It means it is INCLUDE, INCLUDE_AND_SEEK_NEXT_COL or INCLUDE_AND_SEEK_NEXT_ROW. + assert colChecker == MatchCode.INCLUDE || colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_COL + || colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; + break; + } + + /* + * STEP 3: Check if filter is present. + */ if (filter != null) { - // STEP 3: Filter the key value and return if it filters out - filterResponse = filter.filterKeyValue(cell); + ReturnCode filterResponse = filter.filterKeyValue(cell); + /* + * ColumnChecker FilterResponse Desired behavior + * INCLUDE SKIP SKIP + * INCLUDE NEXT_COL SEEK_NEXT_COL or SEEK_NEXT_ROW + * INCLUDE NEXT_ROW SEEK_NEXT_ROW + * INCLUDE SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT + * INCLUDE INCLUDE INCLUDE + * INCLUDE INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL + * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_COL SKIP SEEK_NEXT_COL + * INCLUDE_AND_SEEK_NEXT_COL NEXT_COL SEEK_NEXT_COL or SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_COL NEXT_ROW SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_COL SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT + * INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL + * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL + * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW SKIP SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW NEXT_COL SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW NEXT_ROW SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT + * INCLUDE_AND_SEEK_NEXT_ROW INCLUDE INCLUDE_AND_SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW + */ switch (filterResponse) { case SKIP: - return MatchCode.SKIP; + if (colChecker == MatchCode.INCLUDE) { + columns.retract(); + return MatchCode.SKIP; + } else if (colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { + return MatchCode.SEEK_NEXT_COL; + } else if (colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + return MatchCode.SEEK_NEXT_ROW; + } + break; case NEXT_COL: - return columns.getNextRowOrNextColumn(cell); + if (colChecker == MatchCode.INCLUDE || colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { + return columns.getNextRowOrNextColumn(cell); + } else if (colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + return MatchCode.SEEK_NEXT_ROW; + } + break; case NEXT_ROW: return MatchCode.SEEK_NEXT_ROW; case SEEK_NEXT_USING_HINT: return MatchCode.SEEK_NEXT_USING_HINT; - default: - // It means it is either include or include and seek next + case INCLUDE: + return colChecker; + case INCLUDE_AND_NEXT_COL: + if (colChecker == MatchCode.INCLUDE || colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { + return MatchCode.INCLUDE_AND_SEEK_NEXT_COL; + } else if (colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; + } break; + case INCLUDE_AND_SEEK_NEXT_ROW: + return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; + default: + throw new RuntimeException("UNEXPECTED"); } } - /* - * STEP 4: Reaching this step means the column is part of the requested columns and either - * the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response. - * Now check the number of versions needed. This method call returns SKIP, INCLUDE, - * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL. - * - * FilterResponse ColumnChecker Desired behavior - * INCLUDE SKIP row has already been included, SKIP. - * INCLUDE INCLUDE INCLUDE - * INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW - * INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP. - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW - * - * In all the above scenarios, we return the column checker return value except for - * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE) - */ - colChecker = columns.checkVersions(cell, timestamp, typeByte, false); - if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) { - if (colChecker != MatchCode.SKIP) { - return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; - } - return MatchCode.SEEK_NEXT_ROW; - } - return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL && colChecker == MatchCode.INCLUDE) - ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL : colChecker; + + return colChecker; } protected abstract boolean isGet(); @@ -186,20 +227,27 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher { RegionCoprocessorHost regionCoprocessorHost) throws IOException { int maxVersions = scan.isRaw() ? scan.getMaxVersions() : Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions()); + int slack = scan.isRaw() || scan.getMaxVersions() >= scanInfo.getMaxVersions() ? 0 : + scan.getFilter() == null ? 0 : + Math.max(scan.getMaxVersions(), scanInfo.getMaxVersions())-maxVersions; + if (slack != 0) { + LOG.debug("slack=" + slack + " scanInfo.max-ver=" + scanInfo.getMaxVersions() + " max-ver=" + + maxVersions + " scan.max-ver=" + scan.getMaxVersions()); + } boolean hasNullColumn; ColumnTracker columnTracker; if (columns == null || columns.isEmpty()) { // there is always a null column in the wildcard column query. hasNullColumn = true; // use a specialized scan for wildcard column tracker. - columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions, + columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions, slack, oldestUnexpiredTS); } else { // We can share the ExplicitColumnTracker, diff is we reset // between rows, not between storefiles. // whether there is null column in the explicit column query hasNullColumn = columns.first().length == 0; - columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, + columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, slack, oldestUnexpiredTS); } if (scan.isRaw()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index 476921b..0abbac6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterList; +//import org.apache.hadoop.hbase.filter.SpecifiedNumVersionsColumnFilter; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcServer; @@ -413,7 +414,7 @@ public class VisibilityController implements MasterObserver, RegionObserver, @Override public void prePrepareTimeStampForDeleteVersion( ObserverContext ctx, Mutation delete, Cell cell, - byte[] byteNow, Get get) throws IOException { + byte[] byteNow, Get get, int count) throws IOException { // Nothing to do if we are not filtering by visibility if (!authorizationEnabled) { return; @@ -437,20 +438,19 @@ public class VisibilityController implements MasterObserver, RegionObserver, throw new IOException("Invalid cell visibility specified " + labelsExp, e); } } - get.setFilter(new DeleteVersionVisibilityExpressionFilter(visibilityTags, - VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT)); + get.setFilter(//new FilterList( + new DeleteVersionVisibilityExpressionFilter(visibilityTags, + VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT) + //, new SpecifiedNumVersionsColumnFilter(count)) + ); List result = ctx.getEnvironment().getRegion().get(get, false); - if (result.size() < get.getMaxVersions()) { + if (result.size() < count) { // Nothing to delete CellUtil.updateLatestStamp(cell, byteNow, 0); return; } - if (result.size() > get.getMaxVersions()) { - throw new RuntimeException("Unexpected size: " + result.size() - + ". Results more than the max versions obtained."); - } - Cell getCell = result.get(get.getMaxVersions() - 1); + Cell getCell = result.get(count - 1); CellUtil.setTimestamp(cell, getCell.getTimestamp()); // We are bypassing here because in the HRegion.updateDeleteLatestVersionTimeStamp we would diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index dd9024a..5174be8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +//import org.apache.hadoop.hbase.filter.SpecifiedNumVersionsColumnFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.BlockCache; @@ -6424,4 +6425,169 @@ public class TestFromClientSide { } } + @Test + public void testDeleteSpecifiedVersionOfSpecifiedColumn() throws Exception { + Admin admin = TEST_UTIL.getAdmin(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + + byte[][] VALUES = makeN(VALUE, 5); + long[] ts = { 1000, 2000, 3000, 4000, 5000 }; + + Table ht = TEST_UTIL.createTable(tableName, FAMILY, 5); + + Put put = new Put(ROW); + // Put version 1000,2000,3000,4000 of column FAMILY:QUALIFIER + for (int t = 0; t < 4; t++) { + put.addColumn(FAMILY, QUALIFIER, ts[t], VALUES[t]); + } + ht.put(put); + + Delete delete = new Delete(ROW); + // Delete version 3000 of column FAMILY:QUALIFIER + delete.addColumn(FAMILY, QUALIFIER, ts[2]); + ht.delete(delete); + + Get get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = ht.get(get); + // verify version 1000,2000,4000 remains for column FAMILY:QUALIFIER + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[3] }, new byte[][] { + VALUES[0], VALUES[1], VALUES[3] }, 0, 2); + + delete = new Delete(ROW); + // Delete a version 5000 of column FAMILY:QUALIFIER which didn't exist + delete.addColumn(FAMILY, QUALIFIER, ts[4]); + ht.delete(delete); + + get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + // verify version 1000,2000,4000 remains for column FAMILY:QUALIFIER + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[3] }, new byte[][] { + VALUES[0], VALUES[1], VALUES[3] }, 0, 2); + + ht.close(); + admin.close(); + } + + @Test + public void testDeleteLatestVersionOfSpecifiedColumn() throws Exception { + Admin admin = TEST_UTIL.getAdmin(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + + byte[][] VALUES = makeN(VALUE, 5); + long[] ts = { 1000, 2000, 3000, 4000, 5000 }; + + Table ht = TEST_UTIL.createTable(tableName, FAMILY, 5); + + Put put = new Put(ROW); + // Put version 1000,2000,3000,4000 of column FAMILY:QUALIFIER + for (int t = 0; t < 4; t++) { + put.addColumn(FAMILY, QUALIFIER, ts[t], VALUES[t]); + } + ht.put(put); + + Delete delete = new Delete(ROW); + // Delete latest version of column FAMILY:QUALIFIER + delete.addColumn(FAMILY, QUALIFIER); + ht.delete(delete); + + Get get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = ht.get(get); + // verify version 1000,2000,3000 remains for column FAMILY:QUALIFIER + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[2] }, new byte[][] { + VALUES[0], VALUES[1], VALUES[2] }, 0, 2); + + delete = new Delete(ROW); + // Delete two latest version of column FAMILY:QUALIFIER + delete.addColumn(FAMILY, QUALIFIER); + delete.addColumn(FAMILY, QUALIFIER); + ht.delete(delete); + + get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + // verify version 1000 remains for column FAMILY:QUALIFIER + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0] }, new byte[][] { VALUES[0] }, + 0, 0); + + put = new Put(ROW); + // Put a version 5000 of column FAMILY:QUALIFIER + put.addColumn(FAMILY, QUALIFIER, ts[4], VALUES[4]); + ht.put(put); + + get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + // verify version 1000,5000 remains for column FAMILY:QUALIFIER + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[4] }, new byte[][] { + VALUES[0], VALUES[4] }, 0, 1); + + ht.close(); + admin.close(); + } + + /* @Test + public void testSpecifiedNumVersionsColumnFilter() throws Exception { + Admin admin = TEST_UTIL.getAdmin(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + + byte[][] VALUES = makeN(VALUE, 5); + long[] ts = { 1000, 2000, 3000, 4000, 5000 }; + + Table ht = TEST_UTIL.createTable(tableName, FAMILY, 5); + + byte [] ROW1 = Bytes.toBytes("testRow1"); + byte [] ROW2 = Bytes.toBytes("testRow2"); + + Put put = new Put(ROW1); + for (int t = 0; t < 5; t++) { + put.addColumn(FAMILY, QUALIFIER, ts[t], VALUES[t]); + } + ht.put(put); + put = new Put(ROW2); + for (int t = 0; t < 5; t++) { + put.addColumn(FAMILY, QUALIFIER, ts[t], VALUES[t]); + } + ht.put(put); + + // Test Get with SpecifiedNumVersionsColumnFilter + Get get = new Get(ROW1); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(); + get.setFilter(new SpecifiedNumVersionsColumnFilter(3)); + + Result result = ht.get(get); + assertNResult(result, ROW1, FAMILY, QUALIFIER, new long[] { ts[2], ts[3], ts[4] }, new byte[][] { + VALUES[2], VALUES[3], VALUES[4] }, 0, 2); + + get.setFilter(new SpecifiedNumVersionsColumnFilter(5)); + result = ht.get(get); + assertNResult(result, ROW1, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[2], ts[3], ts[4] }, new byte[][] { + VALUES[0], VALUES[1], VALUES[2], VALUES[3], VALUES[4] }, 0, 4); + + // Test Scan with SpecifiedNumVersionsColumnFilter + Scan scan = new Scan(); + scan.setMaxVersions(); + scan.setFilter(new SpecifiedNumVersionsColumnFilter(2)); + + ResultScanner scanner = ht.getScanner(scan); + result = scanner.next(); + assertNResult(result, ROW1, FAMILY, QUALIFIER, new long[] { ts[3], ts[4] }, new byte[][] { + VALUES[3], VALUES[4] }, 0, 1); + result = scanner.next(); + assertNResult(result, ROW2, FAMILY, QUALIFIER, new long[] { ts[3], ts[4] }, new byte[][] { + VALUES[3], VALUES[4] }, 0, 1); + result = scanner.next(); + assertNull(result); + + ht.close(); + admin.close(); + }*/ } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index c508b02..8b9f110 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -467,6 +467,12 @@ public class SimpleRegionObserver implements RegionObserver { } @Override + public void prePrepareTimeStampForDeleteVersion(ObserverContext e, + Mutation delete, Cell cell, byte[] byteNow, Get get, int count) throws IOException { + ctPrePrepareDeleteTS.incrementAndGet(); + } + + @Override public void postDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final Durability durability) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 4f46c88..9d9b7f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -122,6 +122,8 @@ import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.SubstringComparator; +import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -2637,6 +2639,68 @@ public class TestHRegion { } } + @Test + public void testGetWithFilter() throws IOException, InterruptedException { + byte[] row1 = Bytes.toBytes("row1"); + byte[] fam1 = Bytes.toBytes("fam1"); + byte[] col1 = Bytes.toBytes("col1"); + byte[] value1 = Bytes.toBytes("value1"); + byte[] value2 = Bytes.toBytes("value2"); + + final int maxVersions = 3; + HColumnDescriptor hcd = new HColumnDescriptor(fam1); + hcd.setMaxVersions(maxVersions); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker")); + htd.addFamily(hcd); + HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); + Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log"); + final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + this.region = TEST_UTIL.createLocalHRegion(info, htd, wal); + + try { + // Put 4 version to memstore + long ts = 0; + Put put = new Put(row1, ts); + put.addColumn(fam1, col1, value1); + region.put(put); + put = new Put(row1, ts + 1); + put.addColumn(fam1, col1, Bytes.toBytes("filter1")); + region.put(put); + put = new Put(row1, ts + 2); + put.addColumn(fam1, col1, Bytes.toBytes("filter2")); + region.put(put); + put = new Put(row1, ts + 3); + put.addColumn(fam1, col1, value2); + region.put(put); + + Get get = new Get(row1); + get.setMaxVersions(); + Result res = region.get(get); + // Get 3 versions, the oldest version has gone from user view + assertEquals(maxVersions, res.size()); + + get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value"))); + res = region.get(get); + // When use value filter, the oldest version should still gone from user view and it + // should only return one key vaule + assertEquals(1, res.size()); + assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0])); + assertEquals(ts + 3, res.rawCells()[0].getTimestamp()); + + region.flush(true); + region.compact(true); + Thread.sleep(1000); + res = region.get(get); + // After flush and compact, the result should be consistent with previous result + assertEquals(1, res.size()); + assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0])); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + } + } + // //////////////////////////////////////////////////////////////////////////// // Scanner tests // //////////////////////////////////////////////////////////////////////////// diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java index 52b5a40..e8d60e6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java @@ -431,24 +431,27 @@ public class TestMinVersions { tss.add(ts-1); tss.add(ts-2); + // Sholud only get T2, versions is 2, so T1 is gone from user view. Get g = new Get(T1); g.addColumn(c1,c1); g.setFilter(new TimestampsFilter(tss)); g.setMaxVersions(); Result r = region.get(g); - checkResult(r, c1, T2,T1); + checkResult(r, c1, T2); + // Sholud only get T2, versions is 2, so T1 is gone from user view. g = new Get(T1); g.addColumn(c0,c0); g.setFilter(new TimestampsFilter(tss)); g.setMaxVersions(); r = region.get(g); - checkResult(r, c0, T2,T1); + checkResult(r, c0, T2); // now flush/compact region.flush(true); region.compact(true); + // After flush/compact, the result should be consistent with previous result g = new Get(T1); g.addColumn(c1,c1); g.setFilter(new TimestampsFilter(tss)); @@ -456,6 +459,7 @@ public class TestMinVersions { r = region.get(g); checkResult(r, c1, T2); + // After flush/compact, the result should be consistent with previous result g = new Get(T1); g.addColumn(c0,c0); g.setFilter(new TimestampsFilter(tss)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestExplicitColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestExplicitColumnTracker.java index 4e07f80..ccd92a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestExplicitColumnTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestExplicitColumnTracker.java @@ -46,7 +46,7 @@ public class TestExplicitColumnTracker { private void runTest(int maxVersions, TreeSet trackColumns, List scannerColumns, List expected) throws IOException { - ColumnTracker exp = new ExplicitColumnTracker(trackColumns, 0, maxVersions, Long.MIN_VALUE); + ColumnTracker exp = new ExplicitColumnTracker(trackColumns, 0, maxVersions, 0, Long.MIN_VALUE); // Initialize result List result = new ArrayList<>(scannerColumns.size()); @@ -153,7 +153,7 @@ public class TestExplicitColumnTracker { columns.add(Bytes.toBytes("col" + i)); } - ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions, Long.MIN_VALUE); + ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions, 0, Long.MIN_VALUE); for (int i = 0; i < 100000; i += 2) { byte[] col = Bytes.toBytes("col" + i); ScanQueryMatcher.checkColumn(explicit, col, 0, col.length, 1, KeyValue.Type.Put.getCode(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java index 7ac5f34..6f693df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java @@ -738,7 +738,7 @@ public abstract class TestVisibilityLabels { table.put(put); Scan s = new Scan(); - s.setMaxVersions(1); + s.setMaxVersions(); s.setAuthorizations(new Authorizations(SECRET)); ResultScanner scanner = table.getScanner(s); Result result = scanner.next();