diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index 7cdc1d1..28c80ae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -78,6 +78,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C */ private static final String CONSUMED_CLUSTER_IDS = "_cs.id"; + /** + * The attribute for storing TTL for the result of the mutation. + */ + private static final String OP_ATTRIBUTE_TTL = "_ttl"; + protected byte [] row = null; protected long ts = HConstants.LATEST_TIMESTAMP; protected Durability durability = Durability.USE_DEFAULT; @@ -206,6 +211,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C if (getId() != null) { map.put("id", getId()); } + // Add the TTL if set + // Long.MAX_VALUE is the default, and is interpreted to mean this attribute + // has not been set. + if (getTTL() != Long.MAX_VALUE) { + map.put("ttl", getTTL()); + } return map; } @@ -463,6 +474,37 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C } /** + * Return the TTL requested for the result of the mutation, in milliseconds. + * @return the TTL requested for the result of the mutation, in milliseconds, + * or Long.MAX_VALUE if unset + */ + public long getTTL() { + byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL); + if (ttlBytes != null) { + return Bytes.toLong(ttlBytes); + } + return Long.MAX_VALUE; + } + + /** + * Set the TTL desired for the result of the mutation, in milliseconds. + *

+ * Wizard: This will cause all cells carried within the mutation to be + * rewritten on the server to carry the operation TTL per cell in a tag. + * This overhead can be avoided through client side construction of cells + * which include the cell TTL tag (TagType.TTL_TAG_TYPE) directly. If a + * mutation carries both cells with TTL tags and the TTL attribute, cells + * in the mutation will be rewritten to carry one TTL tag set to the value + * of the attribute. + * @param ttl the TTL desired for the result of the mutation, in milliseconds + * @return this + */ + public Mutation setTTL(long ttl) { + setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl)); + return this; + } + + /** * Subclasses should override this method to add the heap size of their own fields. * @return the heap size to add (will be aligned). */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java index 45c8476..aea0f53 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@ -28,4 +28,5 @@ public final class TagType { public static final byte VISIBILITY_TAG_TYPE = (byte) 2; public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4; + public static final byte TTL_TAG_TYPE = (byte)5; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java index 71ea1bd..a83f03e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java @@ -101,6 +101,13 @@ public class ColumnCount { } /** + * @return current count + */ + public int getCount() { + return count; + } + + /** * Set the current count to a new count * @param count new count to set */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java index 8568cfc..2a5e5de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java @@ -77,14 +77,14 @@ public interface ColumnTracker { * the {@link #checkColumn(byte[], int, int, byte)} method and perform all the operations in this * checkVersions method. * @param type the type of the key value (Put/Delete) - * @param ttl The timeToLive to enforce. + * @param ts the timestamp of the key value * @param ignoreCount indicates if the KV needs to be excluded while counting (used during * compactions. We only count KV's that are older than all the scanners' read points.) * @return the scan query matcher match code instance * @throws IOException in case there is an internal consistency problem caused by a data * corruption. */ - ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length, long ttl, + ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length, long ts, byte type, boolean ignoreCount) throws IOException; /** * Resets the Matcher @@ -118,11 +118,8 @@ public interface ColumnTracker { ); /** - * Give the tracker a chance to declare it's done based on only the timestamp - * to allow an early out. - * - * @param timestamp - * @return true to early out based on timestamp. + * Give the tracker a chance to declare it's done if the minimum number of + * versions have been seen */ - boolean isDone(long timestamp); + boolean hasMinVersions(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index f77a8bd..28e1920 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -410,7 +410,7 @@ public class DefaultMemStore implements MemStore { KeyValue kv = i.next(); // Did we go beyond the target row? If so break. if (state.isTooFar(kv, firstOnRow)) break; - if (state.isExpired(kv)) { + if (state.isExpired(kv) != 0) { i.remove(); continue; } @@ -632,7 +632,7 @@ public class DefaultMemStore implements MemStore { if (head.isEmpty()) return null; for (Iterator i = head.descendingIterator(); i.hasNext();) { KeyValue found = i.next(); - if (state.isExpired(found)) { + if (state.isExpired(found) != 0) { i.remove(); continue; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java index 470d36a..abc2cae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java @@ -71,7 +71,6 @@ public class ExplicitColumnTracker implements ColumnTracker { /** Keeps track of the latest timestamp included for current column. * Used to eliminate duplicates. */ private long latestTSOfCurrentColumn; - private long oldestStamp; private int skipCount; /** @@ -79,17 +78,14 @@ public class ExplicitColumnTracker implements ColumnTracker { * @param columns columns specified user in query * @param minVersions minimum number of versions to keep * @param maxVersions maximum versions to return per column - * @param oldestUnexpiredTS the oldest timestamp we are interested in, - * based on TTL * @param lookAhead number of KeyValues to look ahead via next before * (re)seeking */ public ExplicitColumnTracker(NavigableSet columns, int minVersions, - int maxVersions, long oldestUnexpiredTS, int lookAhead) { + int maxVersions, int lookAhead) { this.maxVersions = maxVersions; this.minVersions = minVersions; this.lookAhead = lookAhead; - this.oldestStamp = oldestUnexpiredTS; this.columns = new ColumnCount[columns.size()]; int i=0; for(byte [] column : columns) { @@ -176,7 +172,7 @@ public class ExplicitColumnTracker implements ColumnTracker { return ScanQueryMatcher.MatchCode.SKIP; } int count = this.column.increment(); - if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) { + if (count >= maxVersions) { // Done with versions for this column ++this.index; this.skipCount = 0; @@ -218,10 +214,6 @@ public class ExplicitColumnTracker implements ColumnTracker { return timestamp == latestTSOfCurrentColumn; } - private boolean isExpired(long timestamp) { - return timestamp < oldestStamp; - } - /** * This method is used to inform the column tracker that we are done with * this column. We may get this information from external filters or @@ -252,6 +244,7 @@ public class ExplicitColumnTracker implements ColumnTracker { } } + @Override public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, int qualLength) { doneWithColumn(bytes, offset,qualLength); @@ -263,7 +256,12 @@ public class ExplicitColumnTracker implements ColumnTracker { } } - public boolean isDone(long timestamp) { - return minVersions <= 0 && isExpired(timestamp); + @Override + public boolean hasMinVersions() { + // Always have enough versions if minVersions is not set + if (minVersions <= 0) { + return true; + } + return column.getCount() >= minVersions; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java index fce3b29..3575adc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes; /** * State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}. - * Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not + * Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not * implement the {@link DeleteTracker} interface since state spans rows (There * is no update nor reset method). */ @@ -42,7 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes; class GetClosestRowBeforeTracker { private final KeyValue targetkey; // Any cell w/ a ts older than this is expired. - private final long oldestts; + private final long now; + private final long oldestUnexpiredTs; private Cell candidate = null; private final KVComparator kvcomparator; // Flag for whether we're doing getclosest on a metaregion. @@ -75,17 +76,20 @@ class GetClosestRowBeforeTracker { HConstants.DELIMITER) - this.rowoffset; } this.tablenamePlusDelimiterLength = metaregion? l + 1: -1; - this.oldestts = System.currentTimeMillis() - ttl; + this.now = System.currentTimeMillis(); + this.oldestUnexpiredTs = now - ttl; this.kvcomparator = c; this.deletes = new TreeMap>(new CellComparator.RowComparator()); } /** - * @param kv - * @return True if this kv is expired. + * @param cell + * @return 1 if the cell is expired and no other cells will be alive; -1 if + * the cell is expired but if we are not sure other cells should be skipped; + * 0 otherwise */ - boolean isExpired(final Cell kv) { - return HStore.isExpired(kv, this.oldestts); + int isExpired(final Cell cell) { + return HStore.isExpired(cell, this.oldestUnexpiredTs, this.now); } /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 561a3f3..dbc5aad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -68,6 +68,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -84,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.Append; @@ -2629,6 +2632,7 @@ public class HRegion implements HeapSize { // , Writable{ prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); noOfDeletes++; } + rewriteCellTags(familyMaps[i], mutation); } lock(this.updatesLock.readLock(), numReadyToWrite); @@ -3101,6 +3105,76 @@ public class HRegion implements HeapSize { // , Writable{ } } + /** + * Rewrite incoming cell tags. + *

+ * Add cell TTLs to the incoming mutation. Drop any incoming TTL tags. + */ + void rewriteCellTags(Map> familyMap, final Mutation m) { + // Sadly we have to recreate the cell because cell or KV doesn't support + // scatter-gather. It will be expensive to do this in multiple places so + // this method is envisioned as a one-stop-shop for incoming cell tag + // rewrite rules applied during doMiniBatchMutation + List updatedCells = new ArrayList(); + for (Map.Entry> e: familyMap.entrySet()) { + updatedCells.clear(); // Be sure to clear this each time around the loop + List cells = e.getValue(); + Iterator cellIterator = cells.iterator(); // Not sure what kind of list this is + while (cellIterator.hasNext()) { + Cell cell = cellIterator.next(); + // Avoid as much work as possible if the cell doesn't carry tags + if (cell.getTagsLength() > 0) { + List newTags = new ArrayList(); + Iterator tagIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + boolean changed = false; + while (tagIterator.hasNext()) { + Tag t = tagIterator.next(); + + // Filter tags + + // Drop incoming TTL tags if the mutation specified a TTL or is not a Put + if (t.getType() == TagType.TTL_TAG_TYPE && + (m.getTTL() != Long.MAX_VALUE || !(m instanceof Put))) { + changed = true; + continue; + } + + newTags.add(t); + } + + // Add new tags here + + // Add a TTL to Puts + if ((m instanceof Put) && m.getTTL() != Long.MAX_VALUE) { + // Set the TTL of this cell to that specified by the mutation attr + changed = true; + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL()))); + } + + // Cell is rewritten here + if (changed) { + updatedCells.add(new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), + cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), + newTags)); + // Remove original after rewrite + cellIterator.remove(); + } + } + } + + // We may have queued up rewritten cells while iterating, add them now + if (!updatedCells.isEmpty()) { + cells.addAll(updatedCells); + // Maintain sorted order + Collections.sort(cells, new CellComparator()); + } + } + } + /* * Check if resources to support an update. * @@ -5371,8 +5445,8 @@ public class HRegion implements HeapSize { // , Writable{ // Iterate the input columns and update existing values if they were // found, otherwise add new column initialized to the append value - // Avoid as much copying as possible. Every byte is copied at most - // once. + // Avoid as much copying as possible. We may need to rewrite and + // consolidate tags. Bytes are only copied once. // Would be nice if KeyValue had scatter/gather logic int idx = 0; for (Cell cell : family.getValue()) { @@ -5380,25 +5454,57 @@ public class HRegion implements HeapSize { // , Writable{ Cell oldCell = null; if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { - oldCell = results.get(idx); - // allocate an empty kv once + + // Process cell tags + + // We might be able to reuse code elsewhere if append were + // refactored into a Get-Transform-Put transaction. We could + // apply a common set of tag processing rules on the Put path + + // Make a union of the set of tags in the old and new KVs, but + // filter cell TTL tags if the append mutation specifies one. + + List newTags = new ArrayList(); + int tagsLength = 0; + + if (oldCell.getTagsLength() > 0) { + Iterator i = CellUtil.tagsIterator(oldCell.getTagsArray(), + oldCell.getTagsOffset(), oldCell.getTagsLength()); + while (i.hasNext()) { + Tag t = i.next(); + // Strip any existing TTL tags if the mutation specified one + if (t.getType() == TagType.TTL_TAG_TYPE && append.getTTL() != Long.MAX_VALUE) { + continue; + } + newTags.add(t); + tagsLength += Tag.INFRASTRUCTURE_SIZE + t.getTagLength(); + } + } + if (cell.getTagsLength() > 0) { + Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + while (i.hasNext()) { + Tag t = i.next(); + // Strip any existing TTL tags if the mutation specified one + if (t.getType() == TagType.TTL_TAG_TYPE && append.getTTL() != Long.MAX_VALUE) { + continue; + } + newTags.add(t); + tagsLength += Tag.INFRASTRUCTURE_SIZE + t.getTagLength(); + } + } + + if (append.getTTL() != Long.MAX_VALUE) { + // Add the new TTL tag + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL()))); + tagsLength += Tag.INFRASTRUCTURE_SIZE + Bytes.SIZEOF_LONG; + } + + // allocate an empty cell once newCell = new KeyValue(row.length, cell.getFamilyLength(), cell.getQualifierLength(), now, KeyValue.Type.Put, oldCell.getValueLength() + cell.getValueLength(), - oldCell.getTagsLength() + cell.getTagsLength()); - // copy in the value - System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(), - newCell.getValueArray(), newCell.getValueOffset(), - oldCell.getValueLength()); - System.arraycopy(cell.getValueArray(), cell.getValueOffset(), - newCell.getValueArray(), - newCell.getValueOffset() + oldCell.getValueLength(), - cell.getValueLength()); - // copy in the tags - System.arraycopy(oldCell.getTagsArray(), oldCell.getTagsOffset(), - newCell.getTagsArray(), newCell.getTagsOffset(), oldCell.getTagsLength()); - System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), newCell.getTagsArray(), - newCell.getTagsOffset() + oldCell.getTagsLength(), cell.getTagsLength()); + tagsLength); // copy in row, family, and qualifier System.arraycopy(cell.getRowArray(), cell.getRowOffset(), newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength()); @@ -5408,6 +5514,22 @@ public class HRegion implements HeapSize { // , Writable{ System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(), newCell.getQualifierArray(), newCell.getQualifierOffset(), cell.getQualifierLength()); + // copy in the value + System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(), + newCell.getValueArray(), newCell.getValueOffset(), + oldCell.getValueLength()); + System.arraycopy(cell.getValueArray(), cell.getValueOffset(), + newCell.getValueArray(), + newCell.getValueOffset() + oldCell.getValueLength(), + cell.getValueLength()); + // rebuild tags + int offset = newCell.getTagsOffset() - KeyValue.TAGS_LENGTH_SIZE; + offset = Bytes.putAsShort(newCell.getTagsArray(), offset, tagsLength); + for (Tag t: newTags) { + offset = Bytes.putBytes(newCell.getTagsArray(), offset, t.getBuffer(), + t.getTagOffset() - Tag.INFRASTRUCTURE_SIZE, // t.getOffset is not public + t.getTagLength() + Tag.INFRASTRUCTURE_SIZE); // t.getLength is not public + } idx++; } else { // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP, @@ -5416,7 +5538,7 @@ public class HRegion implements HeapSize { // , Writable{ KeyValue newKV = KeyValueUtil.ensureKeyValue(cell); newKV.updateLatestStamp(Bytes.toBytes(now)); newCell = newKV; - } + } CellUtil.setSequenceId(newCell, mvccNum); // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { @@ -5611,26 +5733,30 @@ public class HRegion implements HeapSize { // , Writable{ // Append new incremented KeyValue to list byte[] q = CellUtil.cloneQualifier(kv); byte[] val = Bytes.toBytes(amount); - int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength(); - int incCellTagsLen = kv.getTagsLength(); - Cell newKV = new KeyValue(row.length, family.getKey().length, q.length, now, - KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen); - System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length); - System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(), - family.getKey().length); - System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length); - // copy in the value - System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length); - // copy tags - if (oldCellTagsLen > 0) { - System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(), - newKV.getTagsOffset(), oldCellTagsLen); - } - if (incCellTagsLen > 0) { - System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(), - newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen); + + // Process cell tags + + // We might be able to reuse code elsewhere if increment were + // refactored into a Get-Transform-Put transaction. We could + // apply a common set of tag processing rules on the Put path + + // An increment result is a synthetic record. Just create a cell + // TTL tag as required if the mutation provided one. + + List newTags = new ArrayList(); + if (increment.getTTL() != Long.MAX_VALUE) { + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL()))); } + + Cell newKV = new KeyValue(row, 0, row.length, + family.getKey(), 0, family.getKey().length, + q, 0, q.length, + now, + KeyValue.Type.Put, + val, 0, val.length, + newTags); CellUtil.setSequenceId(newKV, mvccNum); + // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newKV = coprocessorHost.postMutationBeforeWAL( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 4813e10..bfddc11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; @@ -1668,8 +1670,47 @@ public class HStore implements Store { return wantedVersions > maxVersions ? maxVersions: wantedVersions; } - static boolean isExpired(final Cell key, final long oldestTimestamp) { - return key.getTimestamp() < oldestTimestamp; + /** + * @param cell + * @param oldestTimestamp + * @return 1 if the cell is expired and no other cells will be alive; -1 if + * the cell is expired but if we are not sure other cells should be skipped; + * 0 otherwise + */ + static int isExpired(final Cell cell, final long oldestTimestamp, final long now) { + // Do not create an Iterator or Tag objects unless the cell actually has + // tags + if (cell.getTagsLength() > 0) { + // Look for a TTL tag first. Use it instead of the family setting if + // found. If a cell has multiple TTLs, resolve the conflict by using the + // first tag encountered. + Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + while (i.hasNext()) { + Tag t = i.next(); + if (TagType.TTL_TAG_TYPE == t.getType()) { + if (t.getTagLength() == Bytes.SIZEOF_LONG) { + // Unlike in schema cell TTLs are stored in milliseconds, no need + // to convert + long ts = cell.getTimestamp(); + long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength()); + if (ts + ttl < now) { + return -1; + } + // Per cell TTLs cannot extend lifetime beyond family settings, so + // fall through to check that + break; + } else { + LOG.warn("TTL tag for cell " + cell + " has wrong size: have=" + t.getTagLength() + + ", want=" + Bytes.SIZEOF_LONG); + } + } + } + } + if (cell.getTimestamp() < oldestTimestamp) { + return 1; + } + return 0; } @Override @@ -1811,7 +1852,7 @@ public class HStore implements Store { if (this.comparator.compareRows(kv, firstOnRow) < 0) continue; // Did we go beyond the target row? If so break. if (state.isTooFar(kv, firstOnRow)) break; - if (state.isExpired(kv)) { + if (state.isExpired(kv) != 0) { continue; } // If we added something, this row is a contender. break. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java index a5c17fb..a29ef8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java @@ -56,6 +56,7 @@ public class ScanDeleteTracker implements DeleteTracker { /** * Constructor for ScanDeleteTracker + * @param oldestUnexpiredTS */ public ScanDeleteTracker() { super(); @@ -111,6 +112,7 @@ public class ScanDeleteTracker implements DeleteTracker { @Override public DeleteResult isDeleted(Cell cell) { long timestamp = cell.getTimestamp(); + int qualifierOffset = cell.getQualifierOffset(); int qualifierLength = cell.getQualifierLength(); if (hasFamilyStamp && timestamp <= familyStamp) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index 964fad8..f280645 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -100,6 +100,9 @@ public class ScanQueryMatcher { */ private final long earliestPutTs; + /** The oldest timestamp we are interested in, based on TTL */ + private final long oldestUnexpiredTS; + private final long now; /** readPoint over which the KVs are unconditionally included */ protected long maxReadPointToTrackVersions; @@ -152,7 +155,7 @@ public class ScanQueryMatcher { */ public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns, ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, - RegionCoprocessorHost regionCoprocessorHost) throws IOException { + long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException { this.tr = scan.getTimeRange(); this.rowComparator = scanInfo.getComparator(); this.regionCoprocessorHost = regionCoprocessorHost; @@ -162,6 +165,8 @@ public class ScanQueryMatcher { scanInfo.getFamily()); this.filter = scan.getFilter(); this.earliestPutTs = earliestPutTs; + this.oldestUnexpiredTS = oldestUnexpiredTS; + this.now = now; this.maxReadPointToTrackVersions = readPointToUse; this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes(); @@ -184,8 +189,7 @@ public class ScanQueryMatcher { hasNullColumn = true; // use a specialized scan for wildcard column tracker. - this.columns = new ScanWildcardColumnTracker( - scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); + this.columns = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions); } else { // whether there is null column in the explicit column query hasNullColumn = (columns.first().length == 0); @@ -194,7 +198,7 @@ public class ScanQueryMatcher { // between rows, not between storefiles. byte[] attr = scan.getAttribute(Scan.HINT_LOOKAHEAD); this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, - oldestUnexpiredTS, attr == null ? 0 : Bytes.toInt(attr)); + attr == null ? 0 : Bytes.toInt(attr)); } this.isReversed = scan.isReversed(); } @@ -213,18 +217,18 @@ public class ScanQueryMatcher { * @param scanInfo The store's immutable scan info * @param columns * @param earliestPutTs Earliest put seen in any of the store files. - * @param oldestUnexpiredTS the oldest timestamp we are interested in, - * based on TTL + * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL + * @param now the current server time * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. * @param regionCoprocessorHost * @throws IOException */ public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns, - long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow, + long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow, byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException { this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs, - oldestUnexpiredTS, regionCoprocessorHost); + oldestUnexpiredTS, now, regionCoprocessorHost); Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null)); this.dropDeletesFromRow = dropDeletesFromRow; this.dropDeletesToRow = dropDeletesToRow; @@ -234,10 +238,10 @@ public class ScanQueryMatcher { * Constructor for tests */ ScanQueryMatcher(Scan scan, ScanInfo scanInfo, - NavigableSet columns, long oldestUnexpiredTS) throws IOException { + NavigableSet columns, long oldestUnexpiredTS, long now) throws IOException { this(scan, scanInfo, columns, ScanType.USER_SCAN, Long.MAX_VALUE, /* max Readpoint to track versions */ - HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null); + HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null); } /** @@ -295,12 +299,6 @@ public class ScanQueryMatcher { int qualifierOffset = cell.getQualifierOffset(); int qualifierLength = cell.getQualifierLength(); - long timestamp = cell.getTimestamp(); - // check for early out based on timestamp alone - if (columns.isDone(timestamp)) { - return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset, - qualifierLength); - } /* * The delete logic is pretty complicated now. @@ -317,6 +315,7 @@ public class ScanQueryMatcher { */ byte typeByte = cell.getTypeByte(); long mvccVersion = cell.getMvccVersion(); + long timestamp = cell.getTimestamp(); if (CellUtil.isDelete(cell)) { if (!keepDeletedCells) { // first ignore delete markers if the scanner can do so, and the @@ -362,9 +361,23 @@ public class ScanQueryMatcher { } // note the following next else if... // delete marker are not subject to other delete markers - } else if (!this.deletes.isEmpty()) { - DeleteResult deleteResult = deletes.isDeleted(cell); - switch (deleteResult) { + } else { + if (columns.hasMinVersions()) { + int expired = isExpired(cell); + if (expired < 0) { + // If the cell is expired and we have enough versions, skip + return MatchCode.SKIP; + } else if (expired >= 1) { + // If the cell is expired and we have enough versions, and we are + // sure no other cells can be alive, then skip forward + return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset, + qualifierLength); + } + } + // Check deletes + if (!this.deletes.isEmpty()) { + DeleteResult deleteResult = deletes.isDeleted(cell); + switch (deleteResult) { case FAMILY_DELETED: case COLUMN_DELETED: return columns.getNextRowOrNextColumn(cell.getQualifierArray(), @@ -377,6 +390,7 @@ public class ScanQueryMatcher { default: throw new RuntimeException("UNEXPECTED"); } + } } int timestampComparison = tr.compare(timestamp); @@ -446,6 +460,16 @@ public class ScanQueryMatcher { return colChecker; } + /** + * @param cell + * @return 1 if the cell is expired and no other cells will be alive; -1 if + * the cell is expired but if we are not sure other cells should be skipped; + * 0 otherwise + */ + int isExpired(final Cell cell) { + return HStore.isExpired(cell, this.oldestUnexpiredTS, this.now); + } + /** Handle partial-drop-deletes. As we match keys in order, when we have a range from which * we can drop deletes, we can set retainDeletesInOutput to false for the duration of this * range only, and maintain consistency. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java index 85b36fb..9bb0dd6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java @@ -43,20 +43,14 @@ public class ScanWildcardColumnTracker implements ColumnTracker { private long latestTSOfCurrentColumn; private byte latestTypeOfCurrentColumn; - private long oldestStamp; - /** * Return maxVersions of every row. * @param minVersion Minimum number of versions to keep * @param maxVersion Maximum number of versions to return - * @param oldestUnexpiredTS oldest timestamp that has not expired according - * to the TTL. */ - public ScanWildcardColumnTracker(int minVersion, int maxVersion, - long oldestUnexpiredTS) { + public ScanWildcardColumnTracker(int minVersion, int maxVersion) { this.maxVersions = maxVersion; this.minVersions = minVersion; - this.oldestStamp = oldestUnexpiredTS; } /** @@ -126,11 +120,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker { /** * Check whether this version should be retained. - * There are 4 variables considered: - * If this version is past max versions -> skip it - * If this kv has expired or was deleted, check min versions - * to decide whther to skip it or not. - * * Increase the version counter unless this is a delete */ private MatchCode checkVersion(byte type, long timestamp) { @@ -140,14 +129,8 @@ public class ScanWildcardColumnTracker implements ColumnTracker { if (currentCount > maxVersions) { return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col } - // keep the KV if required by minversions or it is not expired, yet - if (currentCount <= minVersions || !isExpired(timestamp)) { - setTSAndType(timestamp, type); - return ScanQueryMatcher.MatchCode.INCLUDE; - } else { - return MatchCode.SEEK_NEXT_COL; - } - + setTSAndType(timestamp, type); + return ScanQueryMatcher.MatchCode.INCLUDE; } @Override @@ -170,10 +153,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker { return timestamp == latestTSOfCurrentColumn && type == latestTypeOfCurrentColumn; } - private boolean isExpired(long timestamp) { - return timestamp < oldestStamp; - } - /** * Used by matcher and scan/get to get a hint of the next column * to seek to after checkColumn() returns SKIP. Returns the next interesting @@ -194,12 +173,18 @@ public class ScanWildcardColumnTracker implements ColumnTracker { return false; } + @Override public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, int qualLength) { return MatchCode.SEEK_NEXT_COL; } - public boolean isDone(long timestamp) { - return minVersions <= 0 && isExpired(timestamp); + @Override + public boolean hasMinVersions() { + // Always have enough versions if minVersions is not set + if (minVersions <= 0) { + return true; + } + return currentCount >= minVersions; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 0a4e1ed..73eb031 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -76,6 +76,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected final Scan scan; protected final NavigableSet columns; protected final long oldestUnexpiredTS; + protected final long now; protected final int minVersions; protected final long maxRowSize; @@ -122,7 +123,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner explicitColumnQuery = numCol > 0; this.scan = scan; this.columns = columns; - oldestUnexpiredTS = EnvironmentEdgeManager.currentTime() - ttl; + this.now = EnvironmentEdgeManager.currentTime(); + this.oldestUnexpiredTS = now - ttl; this.minVersions = minVersions; if (store != null && ((HStore)store).getHRegion() != null @@ -172,7 +174,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } matcher = new ScanQueryMatcher(scan, scanInfo, columns, ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, - oldestUnexpiredTS, store.getCoprocessorHost()); + oldestUnexpiredTS, now, store.getCoprocessorHost()); this.store.addChangedReaderObserver(this); @@ -237,10 +239,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); if (dropDeletesFromRow == null) { matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint, - earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost()); + earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost()); } else { matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs, - oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); + oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); } // Filter the list of scanners using Bloom filters, time range, TTL, etc. @@ -280,7 +282,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), scanInfo.getMinVersions(), readPt); this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, - Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null); + Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null); // In unit tests, the store could be null if (this.store != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java index 72d7aa9..ff933a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java @@ -49,9 +49,7 @@ public class TestExplicitColumnTracker { TreeSet trackColumns, List scannerColumns, List expected, int lookAhead) throws IOException { - ColumnTracker exp = new ExplicitColumnTracker( - trackColumns, 0, maxVersions, Long.MIN_VALUE, lookAhead); - + ColumnTracker exp = new ExplicitColumnTracker(trackColumns, 0, maxVersions, lookAhead); //Initialize result List result = new ArrayList(); @@ -210,8 +208,7 @@ public class TestExplicitColumnTracker { columns.add(Bytes.toBytes("col"+i)); } - ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions, - Long.MIN_VALUE, 0); + ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions, 0); for (int i = 0; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); ScanQueryMatcher.checkColumn(explicit, col, 0, col.length, 1, KeyValue.Type.Put.getCode(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 1c1c0a4..16e1542 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; @@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; @@ -136,6 +138,7 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; @@ -5672,6 +5675,160 @@ public class TestHRegion { } } + @Test + public void testCellTTLs() throws IOException { + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + + final byte[] row = Bytes.toBytes("testRow"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs")); + HColumnDescriptor hcd = new HColumnDescriptor(fam1); + hcd.setTimeToLive(10); // 10 seconds + htd.addFamily(hcd); + + HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY), + TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd); + assertNotNull(region); + try { + long now = EnvironmentEdgeManager.currentTime(); + // Add a cell that will expire in 5 seconds via cell TTL + region.put(new Put(row).add(new KeyValue(row, fam1, q1, now, + HConstants.EMPTY_BYTE_ARRAY, new Tag[] { + // TTL tags specify ts in milliseconds + new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } ))); + // Add a cell that will expire after 10 seconds via family setting + region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY)); + // Add a cell that will expire in 15 seconds via cell TTL + region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1, + HConstants.EMPTY_BYTE_ARRAY, new Tag[] { + // TTL tags specify ts in milliseconds + new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } ))); + // Add a cell that will expire in 20 seconds via family setting + region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY)); + + // Flush so we are sure store scanning gets this right + region.flushcache(); + + // A query at time T+0 should return all cells + Result r = region.get(new Get(row)); + assertNotNull(r.getValue(fam1, q1)); + assertNotNull(r.getValue(fam1, q2)); + assertNotNull(r.getValue(fam1, q3)); + assertNotNull(r.getValue(fam1, q4)); + + // Increment time to T+5 seconds + edge.incrementTime(5000); + + r = region.get(new Get(row)); + assertNull(r.getValue(fam1, q1)); + assertNotNull(r.getValue(fam1, q2)); + assertNotNull(r.getValue(fam1, q3)); + assertNotNull(r.getValue(fam1, q4)); + + // Increment time to T+10 seconds + edge.incrementTime(5000); + + r = region.get(new Get(row)); + assertNull(r.getValue(fam1, q1)); + assertNull(r.getValue(fam1, q2)); + assertNotNull(r.getValue(fam1, q3)); + assertNotNull(r.getValue(fam1, q4)); + + // Increment time to T+15 seconds + edge.incrementTime(5000); + + r = region.get(new Get(row)); + assertNull(r.getValue(fam1, q1)); + assertNull(r.getValue(fam1, q2)); + assertNull(r.getValue(fam1, q3)); + assertNotNull(r.getValue(fam1, q4)); + + // Increment time to T+20 seconds + edge.incrementTime(10000); + + r = region.get(new Get(row)); + assertNull(r.getValue(fam1, q1)); + assertNull(r.getValue(fam1, q2)); + assertNull(r.getValue(fam1, q3)); + assertNull(r.getValue(fam1, q4)); + + now = edge.currentTime(); + + // If the cell TTL is specified by a mutation attribute, this should + // override any TTLs that may have been passed in as tags. + + // Add a cell that will expire in 5 seconds via mutation attribute + Put put = new Put(row).add(new KeyValue(row, fam1, q1, now, + HConstants.EMPTY_BYTE_ARRAY, new Tag[] { + // Include an earlier expiration time as tag, this should be ignored + new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(1000L)) } )); + put.setTTL(5000L); + region.put(put); + r = region.get(new Get(row)); + assertNotNull(r.getValue(fam1, q1)); + + // Increment time to T+21 seconds + edge.incrementTime(1000); + r = region.get(new Get(row)); + assertNotNull(r.getValue(fam1, q1)); + + // Increment time to T+22 seconds + edge.incrementTime(1000); + r = region.get(new Get(row)); + assertNotNull(r.getValue(fam1, q1)); + + // Increment time to T+25 seconds + edge.incrementTime(3000); + r = region.get(new Get(row)); + assertNull(r.getValue(fam1, q1)); + + // Fun with disappearing increments + + // Start at 1 + region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L))); + r = region.get(new Get(row)); + byte[] val = r.getValue(fam1, q1); + assertNotNull(val); + assertEquals(Bytes.toLong(val), 1L); + + // Increment with a TTL of 5 seconds + Increment incr = new Increment(row).addColumn(fam1, q1, 1L); + incr.setTTL(5000); + region.increment(incr); // 2 + + // New value should be 2 + r = region.get(new Get(row)); + val = r.getValue(fam1, q1); + assertNotNull(val); + assertEquals(Bytes.toLong(val), 2L); + + // Increment time to T+30 seconds + edge.incrementTime(5000); + + // Value should be back to 1 + r = region.get(new Get(row)); + val = r.getValue(fam1, q1); + assertNotNull(val); + assertEquals(Bytes.toLong(val), 1L); + + // Increment time to T+35 seconds + edge.incrementTime(5000); + + // Original value written at T+25 should be gone now via family TTL + r = region.get(new Get(row)); + assertNull(r.getValue(fam1, q1)); + + } finally { + HRegion.closeHRegion(region); + } + } + private static HRegion initHRegion(byte[] tableName, String callingMethod, byte[]... families) throws IOException { return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java index 2b1f11f..a2b7383 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java @@ -91,11 +91,12 @@ public class TestQueryMatcher extends HBaseTestCase { } - private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException { - // 2,4,5 + private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException { + long now = EnvironmentEdgeManager.currentTime(); + // 2,4,5 ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, ttl, false, 0, rowComparator), get.getFamilyMap().get(fam2), - EnvironmentEdgeManager.currentTime() - ttl); + now - ttl, now); List memstore = new ArrayList(); memstore.add(new KeyValue(row1, fam2, col1, 1, data)); @@ -175,9 +176,10 @@ public class TestQueryMatcher extends HBaseTestCase { expected.add(ScanQueryMatcher.MatchCode.INCLUDE); expected.add(ScanQueryMatcher.MatchCode.DONE); + long now = EnvironmentEdgeManager.currentTime(); ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, ttl, false, 0, rowComparator), null, - EnvironmentEdgeManager.currentTime() - ttl); + now - ttl, now); List memstore = new ArrayList(); memstore.add(new KeyValue(row1, fam2, col1, 1, data)); @@ -231,7 +233,7 @@ public class TestQueryMatcher extends HBaseTestCase { long now = EnvironmentEdgeManager.currentTime(); ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, false, 0, rowComparator), get.getFamilyMap().get(fam2), - now - testTTL); + now - testTTL, now); KeyValue [] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now-100, data), @@ -285,7 +287,7 @@ public class TestQueryMatcher extends HBaseTestCase { long now = EnvironmentEdgeManager.currentTime(); ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, false, 0, rowComparator), null, - now - testTTL); + now - testTTL, now); KeyValue [] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now-100, data), @@ -343,7 +345,7 @@ public class TestQueryMatcher extends HBaseTestCase { NavigableSet cols = get.getFamilyMap().get(fam2); ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE, - HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null); + HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null); List actual = new ArrayList(rows.length); byte[] prevRow = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java index c0dcee6..1482ae3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.*; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -28,16 +30,19 @@ import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; + +import org.junit.Ignore; +import org.junit.Test; import org.junit.experimental.categories.Category; @Category({RegionServerTests.class, SmallTests.class}) -public class TestScanWildcardColumnTracker extends HBaseTestCase { +public class TestScanWildcardColumnTracker { final static int VERSIONS = 2; + @Test public void testCheckColumn_Ok() throws IOException { - ScanWildcardColumnTracker tracker = - new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE); + ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS); //Create list of qualifiers List qualifiers = new ArrayList(); @@ -68,9 +73,9 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { } } + @Test public void testCheckColumn_EnforceVersions() throws IOException { - ScanWildcardColumnTracker tracker = - new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE); + ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS); //Create list of qualifiers List qualifiers = new ArrayList(); @@ -102,9 +107,10 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { } } - public void DisabledTestCheckColumn_WrongOrder() { - ScanWildcardColumnTracker tracker = - new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE); + @Test + @Ignore + public void testCheckColumn_WrongOrder() { + ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS); //Create list of qualifiers List qualifiers = new ArrayList(); @@ -125,6 +131,5 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { assertEquals(true, ok); } - }