From 84a294ebf4f11a35a4374ff2555548e970c8d2b1 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Tue, 6 Sep 2016 18:34:32 +0800 Subject: [PATCH] HBASE-15968 MVCC-sensitive semantics of versions --- .../org/apache/hadoop/hbase/HColumnDescriptor.java | 21 ++ .../apache/hadoop/hbase/regionserver/ScanInfo.java | 18 +- .../hbase/regionserver/compactions/Compactor.java | 3 +- .../regionserver/querymatcher/ColumnTracker.java | 6 +- .../querymatcher/CompactionScanQueryMatcher.java | 18 +- .../regionserver/querymatcher/DeleteTracker.java | 4 +- .../IncludeAllCompactionQueryMatcher.java | 42 +++ .../querymatcher/MvccSensitiveTracker.java | 332 +++++++++++++++++++++ .../querymatcher/NormalUserScanQueryMatcher.java | 5 +- .../querymatcher/ScanQueryMatcher.java | 11 +- .../querymatcher/UserScanQueryMatcher.java | 8 +- .../TestMvccSensitiveSemanticsFromClientSide.java | 196 ++++++++++++ .../querymatcher/TestMvccSensitiveTracker.java | 262 ++++++++++++++++ 13 files changed, 907 insertions(+), 19 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/IncludeAllCompactionQueryMatcher.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MvccSensitiveTracker.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMvccSensitiveSemanticsFromClientSide.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestMvccSensitiveTracker.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index b75e8cd..b6e5ee0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -134,6 +134,9 @@ public class HColumnDescriptor implements Comparable { public static final String DFS_REPLICATION = "DFS_REPLICATION"; public static final short DEFAULT_DFS_REPLICATION = 0; + public static final String MVCC_SENSITIVE = "MVCC_SENSITIVE"; + public static final boolean DEFAULT_MVCC_SENSITIVE = false; + /** * Default compression type. */ @@ -273,6 +276,7 @@ public class HColumnDescriptor implements Comparable { DEFAULT_VALUES.put(CACHE_BLOOMS_ON_WRITE, String.valueOf(DEFAULT_CACHE_BLOOMS_ON_WRITE)); DEFAULT_VALUES.put(EVICT_BLOCKS_ON_CLOSE, String.valueOf(DEFAULT_EVICT_BLOCKS_ON_CLOSE)); DEFAULT_VALUES.put(PREFETCH_BLOCKS_ON_OPEN, String.valueOf(DEFAULT_PREFETCH_BLOCKS_ON_OPEN)); + DEFAULT_VALUES.put(MVCC_SENSITIVE, String.valueOf(DEFAULT_MVCC_SENSITIVE)); for (String s : DEFAULT_VALUES.keySet()) { RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s))); } @@ -727,6 +731,23 @@ public class HColumnDescriptor implements Comparable { } /** + * By default, HBase only consider timestamp in versions. So a previous Delete with higher ts will + * mask a later Put with lower ts. Set this to true to open MVCC_SENSITIVE semantics of versions. + * We also consider mvcc in versions. See HBASE-15968 for details. + */ + public boolean isMvccSensitive() { + String value = getValue(MVCC_SENSITIVE); + if (value != null) { + return Boolean.parseBoolean(value); + } + return DEFAULT_MVCC_SENSITIVE; + } + + public HColumnDescriptor setMvccSensitive(boolean sensitive) { + return setValue(MVCC_SENSITIVE, Boolean.toString(sensitive)); + } + + /** * @return Time-to-live of cell contents, in seconds. */ public int getTimeToLive() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java index 349e166..3a3455e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java @@ -48,6 +48,7 @@ public class ScanInfo { private boolean usePread; private long cellsPerTimeoutCheck; private boolean parallelSeekEnabled; + private boolean mvccSensitive; private final Configuration conf; public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT @@ -65,7 +66,14 @@ public class ScanInfo { public ScanInfo(final Configuration conf, final HColumnDescriptor family, final long ttl, final long timeToPurgeDeletes, final CellComparator comparator) { this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family - .getKeepDeletedCells(), timeToPurgeDeletes, comparator); + .getKeepDeletedCells(), timeToPurgeDeletes, comparator, family.isMvccSensitive()); + } + + public ScanInfo(final Configuration conf, final byte[] family, final int minVersions, + final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells, + final long timeToPurgeDeletes, final CellComparator comparator) { + this(conf, family, minVersions, maxVersions, ttl, keepDeletedCells, timeToPurgeDeletes, + comparator, false); } /** @@ -78,10 +86,11 @@ public class ScanInfo { * be purged during a major compaction. * @param keepDeletedCells Store's keepDeletedCells setting * @param comparator The store's comparator + * @param mvccSensitive use mvcc-sensitive semantics of versions */ public ScanInfo(final Configuration conf, final byte[] family, final int minVersions, final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells, - final long timeToPurgeDeletes, final CellComparator comparator) { + final long timeToPurgeDeletes, final CellComparator comparator, boolean mvccSensitive) { this.family = family; this.minVersions = minVersions; this.maxVersions = maxVersions; @@ -99,6 +108,7 @@ public class ScanInfo { perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK; this.parallelSeekEnabled = conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false); + this.mvccSensitive = mvccSensitive; this.conf = conf; } @@ -149,4 +159,8 @@ public class ScanInfo { public CellComparator getComparator() { return comparator; } + + public boolean isMvccSensitive() { + return mvccSensitive; + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index c695788..e439a56 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -304,7 +304,8 @@ public abstract class Compactor { return new ArrayList(); } boolean cleanSeqId = false; - if (fd.minSeqIdToKeep > 0) { + if (fd.minSeqIdToKeep > 0 && !store.getFamily().isMvccSensitive()) { + // For mvcc-sensitive family, we never set mvcc to 0. smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java index 17c6afe..50dbd12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java @@ -77,7 +77,7 @@ public interface ColumnTracker { * the {@link #checkColumn(Cell, byte)} method and perform all the operations in this * checkVersions method. * @param cell - * @param ttl The timeToLive to enforce. + * @param timestamp The timestamp of the cell. * @param type the type of the key value (Put/Delete) * @param ignoreCount indicates if the KV needs to be excluded while counting (used during * compactions. We only count KV's that are older than all the scanners' read points.) @@ -85,8 +85,8 @@ public interface ColumnTracker { * @throws IOException in case there is an internal consistency problem caused by a data * corruption. */ - ScanQueryMatcher.MatchCode checkVersions(Cell cell, long ttl, byte type, boolean ignoreCount) - throws IOException; + ScanQueryMatcher.MatchCode checkVersions(Cell cell, long timestamp, byte type, + boolean ignoreCount) throws IOException; /** * Resets the Matcher */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java index d3224dc..a6cf322 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java @@ -45,8 +45,9 @@ public abstract class CompactionScanQueryMatcher extends ScanQueryMatcher { protected CompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes, long readPointToUse, long oldestUnexpiredTS, long now) { - super(HConstants.EMPTY_START_ROW, scanInfo, - new ScanWildcardColumnTracker(scanInfo.getMinVersions(), scanInfo.getMaxVersions(), + super(HConstants.EMPTY_START_ROW, scanInfo, deletes instanceof MvccSensitiveTracker ? + (MvccSensitiveTracker) deletes : + new ScanWildcardColumnTracker(scanInfo.getMinVersions(), scanInfo.getMaxVersions(), oldestUnexpiredTS), oldestUnexpiredTS, now); this.maxReadPointToTrackVersions = readPointToUse; @@ -102,11 +103,18 @@ public abstract class CompactionScanQueryMatcher extends ScanQueryMatcher { long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow, byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException { - DeleteTracker deleteTracker = instantiateDeleteTracker(regionCoprocessorHost); + DeleteTracker deleteTracker = scanInfo.isMvccSensitive() ? new MvccSensitiveTracker(null, + scanInfo.getMinVersions(), scanInfo.getMaxVersions(), oldestUnexpiredTS) : + instantiateDeleteTracker(regionCoprocessorHost); if (dropDeletesFromRow == null) { if (scanType == ScanType.COMPACT_RETAIN_DELETES) { - return new MinorCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse, - oldestUnexpiredTS, now); + if (scanInfo.isMvccSensitive()) { + return new IncludeAllCompactionQueryMatcher(scanInfo, deleteTracker, readPointToUse, + oldestUnexpiredTS, now); + } else { + return new MinorCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse, + oldestUnexpiredTS, now); + } } else { return new MajorCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse, earliestPutTs, oldestUnexpiredTS, now); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java index 4e1ba4e..8824cc4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java @@ -95,7 +95,9 @@ public interface DeleteTracker { FAMILY_VERSION_DELETED, // The KeyValue is deleted by a delete family version. COLUMN_DELETED, // The KeyValue is deleted by a delete column. VERSION_DELETED, // The KeyValue is deleted by a version delete. - NOT_DELETED + NOT_DELETED, + VERSION_MASKED // The KeyValue is masked by max number of versions which is considered as + // deleted in strong semantics of versions(See MvccTracker) } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/IncludeAllCompactionQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/IncludeAllCompactionQueryMatcher.java new file mode 100644 index 0000000..bb9924f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/IncludeAllCompactionQueryMatcher.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.querymatcher; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.ScanInfo; + +/** + * A compaction query matcher that always return INCLUDE and drops nothing. + */ +@InterfaceAudience.Private +public class IncludeAllCompactionQueryMatcher extends MinorCompactionScanQueryMatcher{ + + public IncludeAllCompactionQueryMatcher(ScanInfo scanInfo, + DeleteTracker deletes, long readPointToUse, long oldestUnexpiredTS, long now) { + super(scanInfo, deletes, readPointToUse, oldestUnexpiredTS, now); + } + + @Override + public MatchCode match(Cell cell) throws IOException { + return MatchCode.INCLUDE; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MvccSensitiveTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MvccSensitiveTracker.java new file mode 100644 index 0000000..1e2c264 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MvccSensitiveTracker.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.querymatcher; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A tracked both implementing ColumnTracker and DeleteTracker, used for mvcc-sensitive scanning. + * We should make sure in one QueryMatcher the ColumnTracker and DeleteTracker is the same instance. + */ +@InterfaceAudience.Private +public class MvccSensitiveTracker implements ColumnTracker, DeleteTracker { + + private NavigableMap versionsDeletesMap = new TreeMap<>(); + + private byte[] lastCqArray; + private int lastCqLength; + private int lastCqOffset; + private long lastCqTs; + private long lastCqMvcc; + private byte lastCqType; + private int maxVersions; + private byte[][] columns; + private int columnIndex; + private int minVersions; + private long oldestStamp; + private int countCurrentCol; + + // This map is constructed and used for each cq. + private NavigableMap delColMap = new TreeMap<>(); // + + // This map is constructed when cq is null and is a snapshot. + // Each time the cq is changed, we should reconstruct delColMap to a deep copy of delFamMap + private NavigableMap delFamMap = new TreeMap<>(); + + /** + * Note maxVersion and minVersion must set accourding to cf's conf, not user's scan parameter. + * + * @param minVersion The minimum number of versions to keep(used when TTL is set). + */ + public MvccSensitiveTracker(NavigableSet columns, int minVersion, int maxVersion, + long oldestUnexpiredTS) { + this.maxVersions = maxVersion; + this.minVersions = minVersion; + this.oldestStamp = oldestUnexpiredTS; + if (columns != null && columns.size() > 0) { + this.columns = new byte[columns.size()][]; + int i = 0; + for (byte[] column : columns) { + this.columns[i++] = column; + } + } + reset(); + } + + private class DeleteVersionsNode { + private long ts; + private long mvcc; + private Map> deletesMap = new HashMap<>(); //> + private NavigableMap mvccCountingMap = new TreeMap<>(); // + + private DeleteVersionsNode(long ts, long mvcc) { + this.ts = ts; + this.mvcc = mvcc; + mvccCountingMap.put(Long.MAX_VALUE, 0); + } + + private DeleteVersionsNode() { + this(Long.MIN_VALUE, Long.MAX_VALUE); + } + + private void addVersionDelete(Cell cell) { + SortedSet set = deletesMap.get(cell.getTimestamp()); + if (set == null) { + set = new TreeSet<>(); + deletesMap.put(cell.getTimestamp(), set); + } + set.add(cell.getSequenceId()); + // The init value should be its next node's current value. + int value = mvccCountingMap.ceilingEntry(cell.getSequenceId()).getValue(); + mvccCountingMap.put(cell.getSequenceId(), value); + } + + private DeleteVersionsNode getDeepCopy() { + DeleteVersionsNode node = new DeleteVersionsNode(ts, mvcc); + for (Map.Entry> e : deletesMap.entrySet()) { + SortedSet s = new TreeSet<>(); + for (Long l : e.getValue()) { + s.add(l); + } + node.deletesMap.put(e.getKey(), s); + } + for (Map.Entry e : mvccCountingMap.entrySet()) { + node.mvccCountingMap.put(e.getKey(), e.getValue()); + } + return node; + } + } + + /** + * Reset the map if it is different with the last Cell. + * Save the cq array/offset/length for next Cell. + * + * @return If this put has duplicate ts with last cell, return the mvcc of last cell. + * Else return MAX_VALUE. + */ + private long prepare(Cell cell) { + boolean matchCq = CellUtil.matchingQualifier(cell, lastCqArray, lastCqOffset, lastCqLength); + if (!matchCq) { + // The last cell is family-level delete and this is not, or the cq is changed, + // we should construct delColMap as a deep copy of delFamMap. + delColMap.clear(); + for (Map.Entry e : delFamMap.entrySet()) { + delColMap.put(e.getKey(), e.getValue().getDeepCopy()); + } + countCurrentCol = 0; + } + if (matchCq && !CellUtil.isDelete(lastCqType) && lastCqType == cell.getTypeByte() + && lastCqTs == cell.getTimestamp()) { + // Put with duplicate timestamp, ignore. + return lastCqMvcc; + } + lastCqArray = cell.getQualifierArray(); + lastCqOffset = cell.getQualifierOffset(); + lastCqLength = cell.getQualifierLength(); + lastCqTs = cell.getTimestamp(); + lastCqMvcc = cell.getSequenceId(); + lastCqType = cell.getTypeByte(); + return Long.MAX_VALUE; + } + + // DeleteTracker + @Override + public void add(Cell cell) { + prepare(cell); + byte type = cell.getTypeByte(); + switch (Type.codeToType(type)) { + // By the order of seen. We put null cq at first. + case DeleteFamily: // Delete all versions of all columns of the specified family + delFamMap.put(cell.getSequenceId(), + new DeleteVersionsNode(cell.getTimestamp(), cell.getSequenceId())); + break; + case DeleteFamilyVersion: // Delete all columns of the specified family and specified version + delFamMap.ceilingEntry(cell.getSequenceId()).getValue().addVersionDelete(cell); + break; + + // These two kinds of markers are mix with Puts. + case DeleteColumn: // Delete all versions of the specified column + delColMap.put(cell.getSequenceId(), + new DeleteVersionsNode(cell.getTimestamp(), cell.getSequenceId())); + break; + case Delete: // Delete the specified version of the specified column. + delColMap.ceilingEntry(cell.getSequenceId()).getValue().addVersionDelete(cell); + break; + } + } + + /** + * This method is not idempotent, we will save some info to judge VERSION_MASKED. + * @param cell - current cell to check if deleted by a previously seen delete + * @return We don't distinguish DeleteColumn and DeleteFamily. We only return code for column. + */ + @Override + public DeleteResult isDeleted(Cell cell) { + long duplicateMvcc = prepare(cell); + + for (Map.Entry e : delColMap.tailMap(cell.getSequenceId()) + .entrySet()) { + DeleteVersionsNode node = e.getValue(); + long deleteMvcc = Long.MAX_VALUE; + SortedSet deleteVersionMvccs = node.deletesMap.get(cell.getTimestamp()); + if (deleteVersionMvccs != null) { + SortedSet tail = deleteVersionMvccs.tailSet(cell.getSequenceId()); + if (!tail.isEmpty()) { + deleteMvcc = tail.first(); + } + } + SortedMap subMap = + node.mvccCountingMap + .subMap(cell.getSequenceId(), true, Math.min(duplicateMvcc, deleteMvcc), true); + for (Map.Entry seg : subMap.entrySet()) { + int count = seg.getValue(); + if (count >= maxVersions) { + return DeleteResult.VERSION_MASKED; + } + seg.setValue(count + 1); + } + if (deleteMvcc < Long.MAX_VALUE) { + return DeleteResult.VERSION_DELETED; + } + + if (cell.getTimestamp() <= node.ts) { + return DeleteResult.COLUMN_DELETED; + } + } + if (duplicateMvcc < Long.MAX_VALUE) { + return DeleteResult.VERSION_MASKED; + } + return DeleteResult.NOT_DELETED; + } + + @Override + public boolean isEmpty() { + return delColMap.size() == 1 && delColMap.get(Long.MAX_VALUE).mvccCountingMap.size() == 1 + && delFamMap.size() == 1 && delFamMap.get(Long.MAX_VALUE).mvccCountingMap.size() == 1; + } + + @Override + public void update() { + // ignore + } + + //ColumnTracker + + @Override + public MatchCode checkColumn(Cell cell, byte type) throws IOException { + if (done()) { + // No more columns left, we are done with this query + return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row + } + if (columns != null) { + while (columnIndex < columns.length) { + int c = Bytes.compareTo(columns[columnIndex], 0, columns[columnIndex].length, + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + if (c < 0) { + columnIndex++; + } else if (c == 0) { + // We drop old version in #isDeleted, so here we must return INCLUDE. + return MatchCode.INCLUDE; + } else { + return MatchCode.SKIP; + } + } + return MatchCode.SEEK_NEXT_ROW; + } + return MatchCode.INCLUDE; + } + + @Override + public MatchCode checkVersions(Cell cell, long timestamp, byte type, + boolean ignoreCount) throws IOException { + assert !CellUtil.isDelete(type); + // We drop old version in #isDeleted, so here we won't SKIP because of versioning. But we should + // consider TTL. + if (ignoreCount) { + return MatchCode.INCLUDE; + } + countCurrentCol++; + if (timestamp < this.oldestStamp) { + if (countCurrentCol > minVersions) { + return MatchCode.SEEK_NEXT_COL; + } + } + return MatchCode.INCLUDE; + } + + @Override + public void reset() { + delColMap.clear(); + delFamMap.clear(); + delFamMap.put(Long.MAX_VALUE, new DeleteVersionsNode()); + lastCqArray = null; + lastCqLength = 0; + lastCqOffset = 0; + lastCqTs = Long.MIN_VALUE; + lastCqMvcc = 0; + lastCqType = 0; + columnIndex = 0; + countCurrentCol = 0; + } + + @Override + public boolean done() { + return !(columns == null || lastCqArray == null) && Bytes + .compareTo(lastCqArray, lastCqOffset, lastCqLength, columns[columnIndex - 1], 0, + columns[columnIndex - 1].length) > 0; + } + + @Override + public ColumnCount getColumnHint() { + // TODO maybe we can optimize. + return null; + } + + @Override + public MatchCode getNextRowOrNextColumn(Cell cell) { + // TODO maybe we can optimize. + return MatchCode.SEEK_NEXT_COL; + } + + @Override + public boolean isDone(long timestamp) { + // We can not skip Cells with small ts. + return false; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java index 3942f04..b1abf84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java @@ -40,7 +40,7 @@ public class NormalUserScanQueryMatcher extends UserScanQueryMatcher { private final boolean get; /** whether time range queries can see rows "behind" a delete */ - private final boolean seePastDeleteMarkers; + protected final boolean seePastDeleteMarkers; protected NormalUserScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns, boolean hasNullColumn, DeleteTracker deletes, long oldestUnexpiredTS, long now) { @@ -88,7 +88,8 @@ public class NormalUserScanQueryMatcher extends UserScanQueryMatcher { public static NormalUserScanQueryMatcher create(Scan scan, ScanInfo scanInfo, ColumnTracker columns, boolean hasNullColumn, long oldestUnexpiredTS, long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException { - DeleteTracker deletes = instantiateDeleteTracker(regionCoprocessorHost); + DeleteTracker deletes = columns instanceof MvccSensitiveTracker ? + (MvccSensitiveTracker) columns : instantiateDeleteTracker(regionCoprocessorHost); if (scan.isReversed()) { return new NormalUserScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, deletes, oldestUnexpiredTS, now) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java index b5469d3..b7ff42e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.ScanInfo; @@ -204,16 +203,21 @@ public abstract class ScanQueryMatcher { } protected final MatchCode checkDeleted(DeleteTracker deletes, Cell cell) { - if (deletes.isEmpty()) { + if (deletes.isEmpty() && !(deletes instanceof MvccSensitiveTracker)) { return null; } + // MvccSensitiveTracker always need check all cells to save some infos. DeleteResult deleteResult = deletes.isDeleted(cell); switch (deleteResult) { case FAMILY_DELETED: case COLUMN_DELETED: - return columns.getNextRowOrNextColumn(cell); + if (!(deletes instanceof MvccSensitiveTracker)) { + // MvccSensitive can not seek to next because the Put with lower ts may have higher mvcc + return columns.getNextRowOrNextColumn(cell); + } case VERSION_DELETED: case FAMILY_VERSION_DELETED: + case VERSION_MASKED: return MatchCode.SKIP; case NOT_DELETED: return null; @@ -222,6 +226,7 @@ public abstract class ScanQueryMatcher { } } + /** * Determines if the caller should do one of several things: *

    diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java index ec7fc11..a1643b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java @@ -88,7 +88,7 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher { } } - protected final MatchCode matchColumn(Cell cell) throws IOException { + protected MatchCode matchColumn(Cell cell) throws IOException { long timestamp = cell.getTimestamp(); int tsCmp = tr.compare(timestamp); if (tsCmp > 0) { @@ -192,7 +192,11 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher { : Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions()); boolean hasNullColumn; ColumnTracker columnTracker; - if (columns == null || columns.size() == 0) { + if (scanInfo.isMvccSensitive()) { + hasNullColumn = true; + columnTracker = new MvccSensitiveTracker(columns, scanInfo.getMinVersions(), maxVersions, + oldestUnexpiredTS); + } else if (columns == null || columns.size() == 0) { // there is always a null column in the wildcard column query. hasNullColumn = true; // use a specialized scan for wildcard column tracker. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMvccSensitiveSemanticsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMvccSensitiveSemanticsFromClientSide.java new file mode 100644 index 0000000..53c495e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMvccSensitiveSemanticsFromClientSide.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class TestMvccSensitiveSemanticsFromClientSide { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final byte[] ROW = Bytes.toBytes("r"); + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final byte[] value = Bytes.toBytes("value"); + private static final byte[] col1 = Bytes.toBytes("col1"); + private static final byte[] col2 = Bytes.toBytes("col2"); + private static final byte[] col3 = Bytes.toBytes("col3"); + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void setDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private Table createTable() throws IOException { + TableName tableName = TableName.valueOf(name.getMethodName()); + HTableDescriptor table = new HTableDescriptor(tableName); + HColumnDescriptor fam = new HColumnDescriptor(FAMILY); + fam.setMvccSensitive(true); + fam.setMaxVersions(3); + table.addFamily(fam); + TEST_UTIL.getHBaseAdmin().createTable(table); + return TEST_UTIL.getConnection().getTable(tableName); + } + + @Test + public void testPutAndDeleteVersions() throws IOException { + try (Table t = createTable()) { + Delete d = new Delete(ROW); + d.addColumns(FAMILY, col1, 2000000); + t.delete(d); + Put p = new Put(ROW); + p.addColumn(FAMILY, col1, 1000000, value); + t.put(p); + Get g = new Get(ROW); + assertFalse(t.get(g).isEmpty()); + } + } + + @Test + public void testPutMasked() throws IOException { + try (Table t = createTable()) { + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value)); + + t.delete(new Delete(ROW).addColumn(FAMILY, col1, 1000003)); + + Result r = t.get(new Get(ROW).setMaxVersions(3)); + assertEquals(2, r.size()); + } + } + + @Test + public void testSameTs() throws IOException { + try (Table t = createTable()) { + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value)); + + Result r = t.get(new Get(ROW).setMaxVersions(3)); + assertEquals(3, r.size()); + assertEquals(1000004, r.rawCells()[0].getTimestamp()); + assertEquals(1000003, r.rawCells()[1].getTimestamp()); + assertEquals(1000002, r.rawCells()[2].getTimestamp()); + } + } + + @Test + public void testSameTsAndDelete() throws IOException { + try (Table t = createTable()) { + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value)); + + t.delete(new Delete(ROW).addColumn(FAMILY, col1, 1000003)); + + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value)); + + Result r = t.get(new Get(ROW).setMaxVersions(3)); + assertEquals(2, r.size()); + assertEquals(1000004, r.rawCells()[0].getTimestamp()); + assertEquals(1000002, r.rawCells()[1].getTimestamp()); + } + } + + @Test + public void testDeleteFamily() throws IOException { + try (Table t = createTable()) { + + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value)); + t.put(new Put(ROW).addColumn(FAMILY, col2, 1000002, value)); + t.put(new Put(ROW).addColumn(FAMILY, col3, 1000001, value)); + + t.delete(new Delete(ROW).addFamily(FAMILY, 2000000)); + + t.put(new Put(ROW).addColumn(FAMILY, col3, 1500002, value)); + t.put(new Put(ROW).addColumn(FAMILY, col2, 1500001, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1500001, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1500002, value)); + + Result r = t.get(new Get(ROW).setMaxVersions(3)); + assertEquals(4, r.size()); + + t.delete(new Delete(ROW).addFamilyVersion(FAMILY, 1500001)); + + r = t.get(new Get(ROW).setMaxVersions(3)); + assertEquals(2, r.size()); + + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value)); + t.put(new Put(ROW).addColumn(FAMILY, col2, 1000002, value)); + t.put(new Put(ROW).addColumn(FAMILY, col3, 1000001, value)); + + r = t.get(new Get(ROW).setMaxVersions(3)); + assertEquals(6, r.size()); + } + } + + @Test + public void testTimeRange() throws IOException { + try (Table t = createTable()) { + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000005, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000006, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000007, value)); + t.put(new Put(ROW).addColumn(FAMILY, col1, 1000008, value)); + Result r = t.get(new Get(ROW).setMaxVersions(3).setTimeRange(0, 1000005)); + assertEquals(0, r.size()); + } + + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestMvccSensitiveTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestMvccSensitiveTracker.java new file mode 100644 index 0000000..3a1f140 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestMvccSensitiveTracker.java @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.querymatcher; + +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestMvccSensitiveTracker { + + private final byte[] col1 = Bytes.toBytes("col1"); + private final byte[] col2 = Bytes.toBytes("col2"); + private final byte[] row = Bytes.toBytes("row"); + private final byte[] family = Bytes.toBytes("family"); + private final byte[] value = Bytes.toBytes("value"); + + @Test + public void testMaxVersionMask() { + MvccSensitiveTracker tracker = new MvccSensitiveTracker(null, 1, 3, 10000); + + KeyValue keyValue = new KeyValue(row, family, col1, 20000, KeyValue.Type.Put, value); + keyValue.setTimestamp(20000); + keyValue.setSequenceId(1000); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + keyValue.setTimestamp(19999); + keyValue.setSequenceId(999); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + keyValue.setTimestamp(19999); + keyValue.setSequenceId(998); + assertEquals(DeleteResult.VERSION_MASKED, tracker.isDeleted(keyValue)); + keyValue.setTimestamp(19998); + keyValue.setSequenceId(997); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + keyValue.setTimestamp(19997); + keyValue.setSequenceId(996); + assertEquals(DeleteResult.VERSION_MASKED, tracker.isDeleted(keyValue)); + + keyValue = new KeyValue(row, family, col2, 20000, KeyValue.Type.Put, value); + keyValue.setTimestamp(20000); + keyValue.setSequenceId(1000); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + keyValue.setTimestamp(19999); + keyValue.setSequenceId(1002); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + keyValue.setTimestamp(19999); + keyValue.setSequenceId(1001); + assertEquals(DeleteResult.VERSION_MASKED, tracker.isDeleted(keyValue)); + keyValue.setTimestamp(19998); + keyValue.setSequenceId(1003); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + keyValue.setTimestamp(19997); + keyValue.setSequenceId(1004); + assertEquals(DeleteResult.VERSION_MASKED, tracker.isDeleted(keyValue)); + } + + @Test + public void testVersionsDelete() { + MvccSensitiveTracker tracker = new MvccSensitiveTracker(null, 1, 3, 10000); + KeyValue put = new KeyValue(row, family, col1, 20000, KeyValue.Type.Put, value); + KeyValue delete = new KeyValue(row, family, col1, 20000, KeyValue.Type.DeleteColumn, value); + delete.setSequenceId(1000); + delete.setTimestamp(20000); + tracker.add(delete); + put.setSequenceId(1001); + put.setTimestamp(19999); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(put)); + put.setSequenceId(999); + put.setTimestamp(19998); + assertEquals(DeleteResult.COLUMN_DELETED, tracker.isDeleted(put)); + + delete = new KeyValue(row, family, col2, 20000, KeyValue.Type.DeleteColumn, value); + delete.setSequenceId(1002); + delete.setTimestamp(20000); + tracker.add(delete); + put = new KeyValue(row, family, col2, 20000, KeyValue.Type.Put, value); + put.setSequenceId(1001); + put.setTimestamp(19999); + assertEquals(DeleteResult.COLUMN_DELETED, tracker.isDeleted(put)); + put.setSequenceId(999); + put.setTimestamp(19998); + assertEquals(DeleteResult.COLUMN_DELETED, tracker.isDeleted(put)); + } + + @Test + public void testVersionDelete() { + MvccSensitiveTracker tracker = new MvccSensitiveTracker(null, 1, 3, 10000); + KeyValue put = new KeyValue(row, family, col1, 20000, KeyValue.Type.Put, value); + KeyValue delete = new KeyValue(row, family, col1, 20000, KeyValue.Type.Delete, value); + delete.setSequenceId(1000); + delete.setTimestamp(20000); + tracker.add(delete); + put.setSequenceId(1001); + put.setTimestamp(20000); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(put)); + put.setSequenceId(999); + put.setTimestamp(20000); + assertEquals(DeleteResult.VERSION_DELETED, tracker.isDeleted(put)); + + delete = new KeyValue(row, family, col2, 20000, KeyValue.Type.Delete, value); + delete.setSequenceId(1002); + delete.setTimestamp(20000); + tracker.add(delete); + put = new KeyValue(row, family, col2, 20000, KeyValue.Type.Put, value); + put.setSequenceId(1001); + put.setTimestamp(20000); + assertEquals(DeleteResult.VERSION_DELETED, tracker.isDeleted(put)); + put.setSequenceId(999); + put.setTimestamp(20000); + assertEquals(DeleteResult.VERSION_DELETED, tracker.isDeleted(put)); + put.setSequenceId(1002); + put.setTimestamp(19999); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(put)); + put.setSequenceId(998); + put.setTimestamp(19999); + assertEquals(DeleteResult.VERSION_MASKED, tracker.isDeleted(put)); + } + + @Test + public void testFamilyVersionsDelete() { + MvccSensitiveTracker tracker = new MvccSensitiveTracker(null, 1, 3, 10000); + + KeyValue delete = new KeyValue(row, family, null, 20000, KeyValue.Type.DeleteFamily, value); + delete.setSequenceId(1000); + delete.setTimestamp(20000); + + KeyValue put = new KeyValue(row, family, col1, 20000, KeyValue.Type.Put, value); + tracker.add(delete); + put.setSequenceId(1001); + put.setTimestamp(20000); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(put)); + put.setSequenceId(999); + put.setTimestamp(19998); + assertEquals(DeleteResult.COLUMN_DELETED, tracker.isDeleted(put)); + + put = new KeyValue(row, family, col2, 20000, KeyValue.Type.Put, value); + put.setSequenceId(998); + put.setTimestamp(19999); + assertEquals(DeleteResult.COLUMN_DELETED, tracker.isDeleted(put)); + put.setSequenceId(999); + put.setTimestamp(19998); + assertEquals(DeleteResult.COLUMN_DELETED, tracker.isDeleted(put)); + } + + @Test + public void testFamilyVersionDelete() { + MvccSensitiveTracker tracker = new MvccSensitiveTracker(null, 1, 3, 10000); + + KeyValue delete = new KeyValue(row, family, null, 20000, KeyValue.Type.DeleteFamilyVersion, + value); + delete.setSequenceId(1000); + delete.setTimestamp(20000); + tracker.add(delete); + + KeyValue put = new KeyValue(row, family, col1, 20000, KeyValue.Type.Put, value); + put.setSequenceId(1001); + put.setTimestamp(20000); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(put)); + put.setSequenceId(999); + put.setTimestamp(20000); + assertEquals(DeleteResult.VERSION_DELETED, tracker.isDeleted(put)); + + put = new KeyValue(row, family, col2, 20000, KeyValue.Type.Put, value); + put.setSequenceId(1001); + put.setTimestamp(20000); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(put)); + put.setSequenceId(999); + put.setTimestamp(20000); + assertEquals(DeleteResult.VERSION_DELETED, tracker.isDeleted(put)); + put.setSequenceId(1002); + put.setTimestamp(19999); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(put)); + put.setSequenceId(998); + put.setTimestamp(19999); + assertEquals(DeleteResult.VERSION_MASKED, tracker.isDeleted(put)); + } + + @Test + public void testMinVersionsAndTTL() throws IOException { + MvccSensitiveTracker tracker = new MvccSensitiveTracker(null, 1, 3, 30000); + + KeyValue keyValue = new KeyValue(row, family, col1, 20000, KeyValue.Type.Put, value); + keyValue.setTimestamp(20000); + keyValue.setSequenceId(1000); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + assertEquals(MatchCode.INCLUDE, + tracker.checkVersions(keyValue, keyValue.getTimestamp(), keyValue.getTypeByte(), false)); + keyValue.setTimestamp(19999); + keyValue.setSequenceId(999); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + assertEquals( + MatchCode.SEEK_NEXT_COL, + tracker.checkVersions(keyValue, keyValue.getTimestamp(), keyValue.getTypeByte(), false)); + keyValue.setTimestamp(19999); + keyValue.setSequenceId(998); + assertEquals(DeleteResult.VERSION_MASKED, tracker.isDeleted(keyValue)); + assertEquals(MatchCode.SEEK_NEXT_COL, + tracker.checkVersions(keyValue, keyValue.getTimestamp(), keyValue.getTypeByte(), false)); + keyValue.setTimestamp(19998); + keyValue.setSequenceId(997); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + assertEquals(MatchCode.SEEK_NEXT_COL, + tracker.checkVersions(keyValue, keyValue.getTimestamp(), keyValue.getTypeByte(), false)); + keyValue.setTimestamp(19997); + keyValue.setSequenceId(996); + assertEquals(DeleteResult.VERSION_MASKED, tracker.isDeleted(keyValue)); + assertEquals(MatchCode.SEEK_NEXT_COL, + tracker.checkVersions(keyValue, keyValue.getTimestamp(), keyValue.getTypeByte(), false)); + + keyValue = new KeyValue(row, family, col2, 20000, KeyValue.Type.Put, value); + keyValue.setTimestamp(20000); + keyValue.setSequenceId(1000); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + assertEquals(MatchCode.INCLUDE, + tracker.checkVersions(keyValue, keyValue.getTimestamp(), keyValue.getTypeByte(), false)); + keyValue.setTimestamp(19999); + keyValue.setSequenceId(1002); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + assertEquals(MatchCode.SEEK_NEXT_COL, + tracker.checkVersions(keyValue, keyValue.getTimestamp(), keyValue.getTypeByte(), false)); + keyValue.setTimestamp(19999); + keyValue.setSequenceId(1001); + assertEquals(DeleteResult.VERSION_MASKED, tracker.isDeleted(keyValue)); + assertEquals(MatchCode.SEEK_NEXT_COL, + tracker.checkVersions(keyValue, keyValue.getTimestamp(), keyValue.getTypeByte(), false)); + keyValue.setTimestamp(19998); + keyValue.setSequenceId(1003); + assertEquals(DeleteResult.NOT_DELETED, tracker.isDeleted(keyValue)); + assertEquals(MatchCode.SEEK_NEXT_COL, + tracker.checkVersions(keyValue, keyValue.getTimestamp(), keyValue.getTypeByte(), false)); + keyValue.setTimestamp(19997); + keyValue.setSequenceId(1004); + assertEquals(DeleteResult.VERSION_MASKED, tracker.isDeleted(keyValue)); + assertEquals(MatchCode.SEEK_NEXT_COL, + tracker.checkVersions(keyValue, keyValue.getTimestamp(), keyValue.getTypeByte(), false)); + } +} -- 2.7.4 (Apple Git-66)