diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index a75d3b0..4e5343a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -76,6 +76,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
*/
private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
+ /**
+ * The attribute for storing TTL for the result of the mutation.
+ */
+ private static final String OP_ATTRIBUTE_TTL = "_ttl";
+
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
protected Durability durability = Durability.USE_DEFAULT;
@@ -199,6 +204,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
if (getId() != null) {
map.put("id", getId());
}
+ // Add the TTL if set
+ // Long.MAX_VALUE is the default, and is interpreted to mean this attribute
+ // has not been set.
+ if (getTTL() != Long.MAX_VALUE) {
+ map.put("ttl", getTTL());
+ }
return map;
}
@@ -417,6 +428,37 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
}
/**
+ * Return the TTL requested for the result of the mutation, in milliseconds.
+ * @return the TTL requested for the result of the mutation, in milliseconds,
+ * or Long.MAX_VALUE if unset
+ */
+ public long getTTL() {
+ byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL);
+ if (ttlBytes != null) {
+ return Bytes.toLong(ttlBytes);
+ }
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Set the TTL desired for the result of the mutation, in milliseconds.
+ *
+ * Wizard: This will cause all cells carried within the mutation to be
+ * rewritten on the server to carry the operation TTL per cell in a tag.
+ * This overhead can be avoided through client side construction of cells
+ * which include the cell TTL tag (TagType.TTL_TAG_TYPE) directly. If a
+ * mutation carries both cells with TTL tags and the TTL attribute, cells
+ * in the mutation will be rewritten to carry one TTL tag set to the value
+ * of the attribute.
+ * @param ttl the TTL desired for the result of the mutation, in milliseconds
+ * @return this
+ */
+ public Mutation setTTL(long ttl) {
+ setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl));
+ return this;
+ }
+
+ /**
* Subclasses should override this method to add the heap size of their own fields.
* @return the heap size to add (will be aligned).
*/
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
index 8d3c0b9..2095f4e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
@@ -195,6 +195,25 @@ public class Tag {
}
/**
+ * Write a list of tags into a byte array
+ * @param tags
+ * @return the serialized tag data as bytes
+ */
+ public static byte[] fromList(List tags) {
+ int length = 0;
+ for (Tag tag: tags) {
+ length += tag.length;
+ }
+ byte[] b = new byte[length];
+ int pos = 0;
+ for (Tag tag: tags) {
+ System.arraycopy(tag.bytes, tag.offset, b, pos, tag.length);
+ pos += tag.length;
+ }
+ return b;
+ }
+
+ /**
* Returns the total length of the entire tag entity
*/
int getLength() {
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index 45c8476..9ed247e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -28,4 +28,5 @@ public final class TagType {
public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
+ public static final byte TTL_TAG_TYPE = (byte)7;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
index fce3b29..4d22c0e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
- * Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not
+ * Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not
* implement the {@link DeleteTracker} interface since state spans rows (There
* is no update nor reset method).
*/
@@ -42,7 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes;
class GetClosestRowBeforeTracker {
private final KeyValue targetkey;
// Any cell w/ a ts older than this is expired.
- private final long oldestts;
+ private final long now;
+ private final long oldestUnexpiredTs;
private Cell candidate = null;
private final KVComparator kvcomparator;
// Flag for whether we're doing getclosest on a metaregion.
@@ -75,19 +76,12 @@ class GetClosestRowBeforeTracker {
HConstants.DELIMITER) - this.rowoffset;
}
this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
- this.oldestts = System.currentTimeMillis() - ttl;
+ this.now = System.currentTimeMillis();
+ this.oldestUnexpiredTs = now - ttl;
this.kvcomparator = c;
this.deletes = new TreeMap>(new CellComparator.RowComparator());
}
- /**
- * @param kv
- * @return True if this kv is expired.
- */
- boolean isExpired(final Cell kv) {
- return HStore.isExpired(kv, this.oldestts);
- }
-
/*
* Add the specified KeyValue to the list of deletes.
* @param kv
@@ -172,6 +166,15 @@ class GetClosestRowBeforeTracker {
return false;
}
+ /**
+ * @param cell
+ * @return true if the cell is expired
+ */
+ public boolean isExpired(final Cell cell) {
+ return cell.getTimestamp() < this.oldestUnexpiredTs ||
+ HStore.isCellTTLExpired(cell, this.oldestUnexpiredTs, this.now);
+ }
+
/*
* Handle keys whose values hold deletes.
* Add to the set of deletes and then if the candidate keys contain any that
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index e362a17..ae73421 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -84,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
@@ -2627,6 +2630,7 @@ public class HRegion implements HeapSize { // , Writable{
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
noOfDeletes++;
}
+ rewriteCellTags(familyMaps[i], mutation);
}
lock(this.updatesLock.readLock(), numReadyToWrite);
@@ -3099,6 +3103,85 @@ public class HRegion implements HeapSize { // , Writable{
}
}
+ /**
+ * Rewrite incoming cell tags.
+ *
+ * Add cell TTLs to the incoming mutation. Drop any incoming TTL tags.
+ */
+ void rewriteCellTags(Map> familyMap, final Mutation m) {
+ // Check if we have any work to do and early out otherwise
+ // Update these checks as more logic is added here
+
+ // If we don't have a TTL on the mutation, and this is a Put, we don't need to filter
+ // TTL tags
+ if (m.getTTL() == Long.MAX_VALUE && m instanceof Put) {
+ return;
+ }
+
+ // Sadly we have to recreate the cell because cell or KV doesn't support
+ // scatter-gather. It will be expensive to do this in multiple places so
+ // this method is envisioned as a one-stop-shop for incoming cell tag
+ // rewrite rules applied during doMiniBatchMutation
+ List updatedCells = new ArrayList();
+ for (Map.Entry> e: familyMap.entrySet()) {
+ updatedCells.clear(); // Be sure to clear this each time around the loop
+ List cells = e.getValue();
+ Iterator| cellIterator = cells.iterator(); // Not sure what kind of list this is
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ // Avoid as much work as possible if the cell doesn't carry tags
+ if (cell.getTagsLength() > 0) {
+ List newTags = new ArrayList();
+ Iterator tagIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ boolean changed = false;
+ while (tagIterator.hasNext()) {
+ Tag t = tagIterator.next();
+
+ // Filter tags
+
+ // Drop incoming TTL tags if the mutation specified a TTL or is not a Put
+ if (t.getType() == TagType.TTL_TAG_TYPE &&
+ (m.getTTL() != Long.MAX_VALUE || !(m instanceof Put))) {
+ changed = true;
+ continue;
+ }
+
+ newTags.add(t);
+ }
+
+ // Add new tags here
+
+ // Add a TTL to Puts
+ if ((m instanceof Put) && m.getTTL() != Long.MAX_VALUE) {
+ // Set the TTL of this cell to that specified by the mutation attr
+ changed = true;
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
+ }
+
+ // Cell is rewritten here
+ if (changed) {
+ updatedCells.add(new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+ cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ newTags));
+ // Remove original after rewrite
+ cellIterator.remove();
+ }
+ }
+ }
+
+ // We may have queued up rewritten cells while iterating, add them now
+ if (!updatedCells.isEmpty()) {
+ cells.addAll(updatedCells);
+ // Maintain sorted order
+ Collections.sort(cells, new CellComparator());
+ }
+ }
+ }
+
/*
* Check if resources to support an update.
*
@@ -5369,50 +5452,88 @@ public class HRegion implements HeapSize { // , Writable{
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the append value
- // Avoid as much copying as possible. Every byte is copied at most
- // once.
+ // Avoid as much copying as possible. We may need to rewrite and
+ // consolidate tags. Bytes are only copied once.
// Would be nice if KeyValue had scatter/gather logic
int idx = 0;
for (Cell cell : family.getValue()) {
Cell newCell;
Cell oldCell = null;
+ List newTags = new ArrayList();
if (idx < results.size()
&& CellUtil.matchingQualifier(results.get(idx), cell)) {
oldCell = results.get(idx);
- // allocate an empty kv once
+
+ // Process cell tags
+
+ // Make a union of the set of tags in the old and new KVs, but
+ // filter cell TTL tags if the append mutation specifies one.
+
+ if (oldCell.getTagsLength() > 0) {
+ Iterator i = CellUtil.tagsIterator(oldCell.getTagsArray(),
+ oldCell.getTagsOffset(), oldCell.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && append.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
+ if (cell.getTagsLength() > 0) {
+ Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && append.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
+
+ if (append.getTTL() != Long.MAX_VALUE) {
+ // Add the new TTL tag
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+ }
+
+ // Rebuild tags
+ byte[] tagBytes = Tag.fromList(newTags);
+
+ // allocate an empty cell once
newCell = new KeyValue(row.length, cell.getFamilyLength(),
cell.getQualifierLength(), now, KeyValue.Type.Put,
oldCell.getValueLength() + cell.getValueLength(),
- oldCell.getTagsLength() + cell.getTagsLength());
- // copy in the value
- System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
- newCell.getValueArray(), newCell.getValueOffset(),
- oldCell.getValueLength());
- System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
- newCell.getValueArray(),
- newCell.getValueOffset() + oldCell.getValueLength(),
- cell.getValueLength());
- // copy in the tags
- System.arraycopy(oldCell.getTagsArray(), oldCell.getTagsOffset(),
- newCell.getTagsArray(), newCell.getTagsOffset(), oldCell.getTagsLength());
- System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), newCell.getTagsArray(),
- newCell.getTagsOffset() + oldCell.getTagsLength(), cell.getTagsLength());
+ tagBytes.length);
// copy in row, family, and qualifier
System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
- newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
+ newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
- newCell.getFamilyArray(), newCell.getFamilyOffset(),
- cell.getFamilyLength());
+ newCell.getFamilyArray(), newCell.getFamilyOffset(),
+ cell.getFamilyLength());
System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
- newCell.getQualifierArray(), newCell.getQualifierOffset(),
- cell.getQualifierLength());
+ newCell.getQualifierArray(), newCell.getQualifierOffset(),
+ cell.getQualifierLength());
+ // copy in the value
+ System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
+ newCell.getValueArray(), newCell.getValueOffset(),
+ oldCell.getValueLength());
+ System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
+ newCell.getValueArray(),
+ newCell.getValueOffset() + oldCell.getValueLength(),
+ cell.getValueLength());
+ // Copy in tag data
+ System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
+ tagBytes.length);
idx++;
} else {
// Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
// so only need to update the timestamp to 'now'
CellUtil.updateLatestStamp(cell, now);
newCell = cell;
- }
+ }
CellUtil.setSequenceId(newCell, mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
@@ -5587,12 +5708,27 @@ public class HRegion implements HeapSize { // , Writable{
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the increment amount
int idx = 0;
- for (Cell kv: family.getValue()) {
- long amount = Bytes.toLong(CellUtil.cloneValue(kv));
+ for (Cell cell: family.getValue()) {
+ long amount = Bytes.toLong(CellUtil.cloneValue(cell));
boolean noWriteBack = (amount == 0);
+ List newTags = new ArrayList();
+
+ // Process cell tags from mutation
+ if (cell.getTagsLength() > 0) {
+ Iterator i = CellUtil.tagsIterator(cell.getTagsArray(),
+ cell.getTagsOffset(), cell.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && increment.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
Cell c = null;
- if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
+ if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
c = results.get(idx);
if(c.getValueLength() == Bytes.SIZEOF_LONG) {
amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
@@ -5601,32 +5737,41 @@ public class HRegion implements HeapSize { // , Writable{
throw new org.apache.hadoop.hbase.DoNotRetryIOException(
"Attempted to increment field that isn't 64 bits wide");
}
+ // Carry tags forward from previous version
+ if (c.getTagsLength() > 0) {
+ Iterator i = CellUtil.tagsIterator(c.getTagsArray(),
+ c.getTagsOffset(), c.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && increment.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
idx++;
}
// Append new incremented KeyValue to list
- byte[] q = CellUtil.cloneQualifier(kv);
+ byte[] q = CellUtil.cloneQualifier(cell);
byte[] val = Bytes.toBytes(amount);
- int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
- int incCellTagsLen = kv.getTagsLength();
- Cell newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
- KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
- System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
- System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
- family.getKey().length);
- System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
- // copy in the value
- System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
- // copy tags
- if (oldCellTagsLen > 0) {
- System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
- newKV.getTagsOffset(), oldCellTagsLen);
- }
- if (incCellTagsLen > 0) {
- System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
- newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
+
+ // Add the TTL tag if the mutation carried one
+ if (increment.getTTL() != Long.MAX_VALUE) {
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
}
+
+ Cell newKV = new KeyValue(row, 0, row.length,
+ family.getKey(), 0, family.getKey().length,
+ q, 0, q.length,
+ now,
+ KeyValue.Type.Put,
+ val, 0, val.length,
+ newTags);
+
CellUtil.setSequenceId(newKV, mvccNum);
+
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = coprocessorHost.postMutationBeforeWAL(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 7a331b1..5f10294 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
@@ -1668,8 +1670,42 @@ public class HStore implements Store {
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
}
- static boolean isExpired(final Cell key, final long oldestTimestamp) {
- return key.getTimestamp() < oldestTimestamp;
+ /**
+ * @param cell
+ * @param oldestTimestamp
+ * @return true if the cell is expired
+ */
+ static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
+ // Do not create an Iterator or Tag objects unless the cell actually has
+ // tags
+ if (cell.getTagsLength() > 0) {
+ // Look for a TTL tag first. Use it instead of the family setting if
+ // found. If a cell has multiple TTLs, resolve the conflict by using the
+ // first tag encountered.
+ Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ if (TagType.TTL_TAG_TYPE == t.getType()) {
+ if (t.getTagLength() == Bytes.SIZEOF_LONG) {
+ // Unlike in schema cell TTLs are stored in milliseconds, no need
+ // to convert
+ long ts = cell.getTimestamp();
+ long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
+ if (ts + ttl < now) {
+ return true;
+ }
+ // Per cell TTLs cannot extend lifetime beyond family settings, so
+ // fall through to check that
+ break;
+ } else {
+ LOG.warn("TTL tag for cell " + cell + " has wrong size: have=" + t.getTagLength() +
+ ", want=" + Bytes.SIZEOF_LONG);
+ }
+ }
+ }
+ }
+ return false;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index 964fad8..1160c7e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -100,6 +100,10 @@ public class ScanQueryMatcher {
*/
private final long earliestPutTs;
+ /** The oldest timestamp we are interested in, based on TTL */
+ private final long oldestUnexpiredTS;
+ private final long now;
+
/** readPoint over which the KVs are unconditionally included */
protected long maxReadPointToTrackVersions;
@@ -152,7 +156,7 @@ public class ScanQueryMatcher {
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns,
ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
- RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+ long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator();
this.regionCoprocessorHost = regionCoprocessorHost;
@@ -162,6 +166,9 @@ public class ScanQueryMatcher {
scanInfo.getFamily());
this.filter = scan.getFilter();
this.earliestPutTs = earliestPutTs;
+ this.oldestUnexpiredTS = oldestUnexpiredTS;
+ this.now = now;
+
this.maxReadPointToTrackVersions = readPointToUse;
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
@@ -213,18 +220,18 @@ public class ScanQueryMatcher {
* @param scanInfo The store's immutable scan info
* @param columns
* @param earliestPutTs Earliest put seen in any of the store files.
- * @param oldestUnexpiredTS the oldest timestamp we are interested in,
- * based on TTL
+ * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
+ * @param now the current server time
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
* @param regionCoprocessorHost
* @throws IOException
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns,
- long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
+ long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
- oldestUnexpiredTS, regionCoprocessorHost);
+ oldestUnexpiredTS, now, regionCoprocessorHost);
Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
this.dropDeletesFromRow = dropDeletesFromRow;
this.dropDeletesToRow = dropDeletesToRow;
@@ -234,10 +241,10 @@ public class ScanQueryMatcher {
* Constructor for tests
*/
ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
- NavigableSet columns, long oldestUnexpiredTS) throws IOException {
+ NavigableSet columns, long oldestUnexpiredTS, long now) throws IOException {
this(scan, scanInfo, columns, ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */
- HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
+ HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
}
/**
@@ -295,12 +302,17 @@ public class ScanQueryMatcher {
int qualifierOffset = cell.getQualifierOffset();
int qualifierLength = cell.getQualifierLength();
+
long timestamp = cell.getTimestamp();
// check for early out based on timestamp alone
if (columns.isDone(timestamp)) {
return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
qualifierLength);
}
+ // check if the cell is expired by cell TTL
+ if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) {
+ return MatchCode.SKIP;
+ }
/*
* The delete logic is pretty complicated now.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 0a4e1ed..73eb031 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -76,6 +76,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final Scan scan;
protected final NavigableSet columns;
protected final long oldestUnexpiredTS;
+ protected final long now;
protected final int minVersions;
protected final long maxRowSize;
@@ -122,7 +123,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
explicitColumnQuery = numCol > 0;
this.scan = scan;
this.columns = columns;
- oldestUnexpiredTS = EnvironmentEdgeManager.currentTime() - ttl;
+ this.now = EnvironmentEdgeManager.currentTime();
+ this.oldestUnexpiredTS = now - ttl;
this.minVersions = minVersions;
if (store != null && ((HStore)store).getHRegion() != null
@@ -172,7 +174,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
- oldestUnexpiredTS, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, store.getCoprocessorHost());
this.store.addChangedReaderObserver(this);
@@ -237,10 +239,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
if (dropDeletesFromRow == null) {
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
- earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
+ earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
} else {
matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
- oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
}
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
@@ -280,7 +282,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions(), readPt);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
- Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
+ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
// In unit tests, the store could be null
if (this.store != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index b4e94bf..d9dc80b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
@@ -111,6 +113,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -136,6 +139,7 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
@@ -5672,6 +5676,163 @@ public class TestHRegion {
}
}
+ @Test
+ public void testCellTTLs() throws IOException {
+ IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ final byte[] row = Bytes.toBytes("testRow");
+ final byte[] q1 = Bytes.toBytes("q1");
+ final byte[] q2 = Bytes.toBytes("q2");
+ final byte[] q3 = Bytes.toBytes("q3");
+ final byte[] q4 = Bytes.toBytes("q4");
+
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
+ HColumnDescriptor hcd = new HColumnDescriptor(fam1);
+ hcd.setTimeToLive(10); // 10 seconds
+ htd.addFamily(hcd);
+
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
+
+ HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
+ TEST_UTIL.getDataTestDir(), conf, htd);
+ assertNotNull(region);
+ try {
+ long now = EnvironmentEdgeManager.currentTime();
+ // Add a cell that will expire in 5 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire after 10 seconds via family setting
+ region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
+ // Add a cell that will expire in 15 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire in 20 seconds via family setting
+ region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
+
+ // Flush so we are sure store scanning gets this right
+ region.flushcache();
+
+ // A query at time T+0 should return all cells
+ Result r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+5 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+10 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+15 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+20 seconds
+ edge.incrementTime(10000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNull(r.getValue(fam1, q4));
+
+ now = edge.currentTime();
+
+ // If the cell TTL is specified by a mutation attribute, this should
+ // override any TTLs that may have been passed in as tags.
+
+ // Add a cell that will expire in 5 seconds via mutation attribute
+ Put put = new Put(row).add(new KeyValue(row, fam1, q1, now,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // Include an earlier expiration time as tag, this should be ignored
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(1000L)) } ));
+ put.setTTL(5000L);
+ region.put(put);
+ r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+
+ // Increment time to T+21 seconds
+ edge.incrementTime(1000);
+ r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+
+ // Increment time to T+22 seconds
+ edge.incrementTime(1000);
+ r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+
+ // Increment time to T+25 seconds
+ edge.incrementTime(3000);
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+
+ // Fun with disappearing increments
+
+ // Start at 1
+ region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
+ r = region.get(new Get(row));
+ byte[] val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment with a TTL of 5 seconds
+ Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
+ incr.setTTL(5000);
+ region.increment(incr); // 2
+
+ // New value should be 2
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 2L);
+
+ // Increment time to T+30 seconds
+ edge.incrementTime(5000);
+
+ // Value should be back to 1
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment time to T+35 seconds
+ edge.incrementTime(5000);
+
+ // Original value written at T+25 should be gone now via family TTL
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+
+ } finally {
+ HRegion.closeHRegion(region);
+ }
+ }
+
private static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
index 2b1f11f..a2b7383 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
@@ -91,11 +91,12 @@ public class TestQueryMatcher extends HBaseTestCase {
}
- private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException {
- // 2,4,5
+ private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException {
+ long now = EnvironmentEdgeManager.currentTime();
+ // 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, false, 0, rowComparator), get.getFamilyMap().get(fam2),
- EnvironmentEdgeManager.currentTime() - ttl);
+ now - ttl, now);
List memstore = new ArrayList();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -175,9 +176,10 @@ public class TestQueryMatcher extends HBaseTestCase {
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
+ long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, false, 0, rowComparator), null,
- EnvironmentEdgeManager.currentTime() - ttl);
+ now - ttl, now);
List memstore = new ArrayList();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -231,7 +233,7 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, false, 0, rowComparator), get.getFamilyMap().get(fam2),
- now - testTTL);
+ now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -285,7 +287,7 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, false, 0, rowComparator), null,
- now - testTTL);
+ now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -343,7 +345,7 @@ public class TestQueryMatcher extends HBaseTestCase {
NavigableSet cols = get.getFamilyMap().get(fam2);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
- HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
+ HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
List actual =
new ArrayList(rows.length);
byte[] prevRow = null;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
index a613319..17fe754 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
@@ -455,8 +455,13 @@ public class TestTags {
tags = TestCoprocessorForTags.tags;
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ List tagValues = new ArrayList();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
@@ -512,8 +517,13 @@ public class TestTags {
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ tagValues.clear();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
| | | | |