diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index c7ca4c7..99e9d02 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -77,6 +77,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
*/
private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
+ /**
+ * The attribute for storing TTL for the result of the mutation.
+ */
+ private static final String OP_ATTRIBUTE_TTL = "_ttl";
+
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
protected Durability durability = Durability.USE_DEFAULT;
@@ -205,6 +210,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
if (getId() != null) {
map.put("id", getId());
}
+ // Add the TTL if set
+ // Long.MAX_VALUE is the default, and is interpreted to mean this attribute
+ // has not been set.
+ if (getTTL() != Long.MAX_VALUE) {
+ map.put("ttl", getTTL());
+ }
return map;
}
@@ -407,6 +418,37 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
}
/**
+ * Return the TTL requested for the result of the mutation, in milliseconds.
+ * @return the TTL requested for the result of the mutation, in milliseconds,
+ * or Long.MAX_VALUE if unset
+ */
+ public long getTTL() {
+ byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL);
+ if (ttlBytes != null) {
+ return Bytes.toLong(ttlBytes);
+ }
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Set the TTL desired for the result of the mutation, in milliseconds.
+ *
+ * Wizard: This will cause all cells carried within the mutation to be
+ * rewritten on the server to carry the operation TTL per cell in a tag.
+ * This overhead can be avoided through client side construction of cells
+ * which include the cell TTL tag (TagType.TTL_TAG_TYPE) directly. If a
+ * mutation carries both cells with TTL tags and the TTL attribute, cells
+ * in the mutation will be rewritten to carry one TTL tag set to the value
+ * of the attribute.
+ * @param ttl the TTL desired for the result of the mutation, in milliseconds
+ * @return this
+ */
+ public Mutation setTTL(long ttl) {
+ setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl));
+ return this;
+ }
+
+ /**
* Subclasses should override this method to add the heap size of their own fields.
* @return the heap size to add (will be aligned).
*/
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
index 8d3c0b9..2095f4e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
@@ -195,6 +195,25 @@ public class Tag {
}
/**
+ * Write a list of tags into a byte array
+ * @param tags
+ * @return the serialized tag data as bytes
+ */
+ public static byte[] fromList(List tags) {
+ int length = 0;
+ for (Tag tag: tags) {
+ length += tag.length;
+ }
+ byte[] b = new byte[length];
+ int pos = 0;
+ for (Tag tag: tags) {
+ System.arraycopy(tag.bytes, tag.offset, b, pos, tag.length);
+ pos += tag.length;
+ }
+ return b;
+ }
+
+ /**
* Returns the total length of the entire tag entity
*/
int getLength() {
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index 45c8476..9ed247e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -28,4 +28,5 @@ public final class TagType {
public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
+ public static final byte TTL_TAG_TYPE = (byte)7;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java
index 71ea1bd..a83f03e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java
@@ -101,6 +101,13 @@ public class ColumnCount {
}
/**
+ * @return current count
+ */
+ public int getCount() {
+ return count;
+ }
+
+ /**
* Set the current count to a new count
* @param count new count to set
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
index 8568cfc..2a5e5de 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
@@ -77,14 +77,14 @@ public interface ColumnTracker {
* the {@link #checkColumn(byte[], int, int, byte)} method and perform all the operations in this
* checkVersions method.
* @param type the type of the key value (Put/Delete)
- * @param ttl The timeToLive to enforce.
+ * @param ts the timestamp of the key value
* @param ignoreCount indicates if the KV needs to be excluded while counting (used during
* compactions. We only count KV's that are older than all the scanners' read points.)
* @return the scan query matcher match code instance
* @throws IOException in case there is an internal consistency problem caused by a data
* corruption.
*/
- ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length, long ttl,
+ ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length, long ts,
byte type, boolean ignoreCount) throws IOException;
/**
* Resets the Matcher
@@ -118,11 +118,8 @@ public interface ColumnTracker {
);
/**
- * Give the tracker a chance to declare it's done based on only the timestamp
- * to allow an early out.
- *
- * @param timestamp
- * @return true to early out based on timestamp.
+ * Give the tracker a chance to declare it's done if the minimum number of
+ * versions have been seen
*/
- boolean isDone(long timestamp);
+ boolean hasMinVersions();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index fddfdca..18b3f77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -409,7 +409,7 @@ public class DefaultMemStore implements MemStore {
Cell kv = i.next();
// Did we go beyond the target row? If so break.
if (state.isTooFar(kv, firstOnRow)) break;
- if (state.isExpired(kv)) {
+ if (state.isExpired(kv) != 0) {
i.remove();
continue;
}
@@ -630,7 +630,7 @@ public class DefaultMemStore implements MemStore {
if (head.isEmpty()) return null;
for (Iterator i = head.descendingIterator(); i.hasNext();) {
Cell found = i.next();
- if (state.isExpired(found)) {
+ if (state.isExpired(found) != 0) {
i.remove();
continue;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
index 470d36a..abc2cae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
@@ -71,7 +71,6 @@ public class ExplicitColumnTracker implements ColumnTracker {
/** Keeps track of the latest timestamp included for current column.
* Used to eliminate duplicates. */
private long latestTSOfCurrentColumn;
- private long oldestStamp;
private int skipCount;
/**
@@ -79,17 +78,14 @@ public class ExplicitColumnTracker implements ColumnTracker {
* @param columns columns specified user in query
* @param minVersions minimum number of versions to keep
* @param maxVersions maximum versions to return per column
- * @param oldestUnexpiredTS the oldest timestamp we are interested in,
- * based on TTL
* @param lookAhead number of KeyValues to look ahead via next before
* (re)seeking
*/
public ExplicitColumnTracker(NavigableSet columns, int minVersions,
- int maxVersions, long oldestUnexpiredTS, int lookAhead) {
+ int maxVersions, int lookAhead) {
this.maxVersions = maxVersions;
this.minVersions = minVersions;
this.lookAhead = lookAhead;
- this.oldestStamp = oldestUnexpiredTS;
this.columns = new ColumnCount[columns.size()];
int i=0;
for(byte [] column : columns) {
@@ -176,7 +172,7 @@ public class ExplicitColumnTracker implements ColumnTracker {
return ScanQueryMatcher.MatchCode.SKIP;
}
int count = this.column.increment();
- if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) {
+ if (count >= maxVersions) {
// Done with versions for this column
++this.index;
this.skipCount = 0;
@@ -218,10 +214,6 @@ public class ExplicitColumnTracker implements ColumnTracker {
return timestamp == latestTSOfCurrentColumn;
}
- private boolean isExpired(long timestamp) {
- return timestamp < oldestStamp;
- }
-
/**
* This method is used to inform the column tracker that we are done with
* this column. We may get this information from external filters or
@@ -252,6 +244,7 @@ public class ExplicitColumnTracker implements ColumnTracker {
}
}
+ @Override
public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
int qualLength) {
doneWithColumn(bytes, offset,qualLength);
@@ -263,7 +256,12 @@ public class ExplicitColumnTracker implements ColumnTracker {
}
}
- public boolean isDone(long timestamp) {
- return minVersions <= 0 && isExpired(timestamp);
+ @Override
+ public boolean hasMinVersions() {
+ // Always have enough versions if minVersions is not set
+ if (minVersions <= 0) {
+ return true;
+ }
+ return column.getCount() >= minVersions;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
index fce3b29..3575adc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
- * Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not
+ * Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not
* implement the {@link DeleteTracker} interface since state spans rows (There
* is no update nor reset method).
*/
@@ -42,7 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes;
class GetClosestRowBeforeTracker {
private final KeyValue targetkey;
// Any cell w/ a ts older than this is expired.
- private final long oldestts;
+ private final long now;
+ private final long oldestUnexpiredTs;
private Cell candidate = null;
private final KVComparator kvcomparator;
// Flag for whether we're doing getclosest on a metaregion.
@@ -75,17 +76,20 @@ class GetClosestRowBeforeTracker {
HConstants.DELIMITER) - this.rowoffset;
}
this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
- this.oldestts = System.currentTimeMillis() - ttl;
+ this.now = System.currentTimeMillis();
+ this.oldestUnexpiredTs = now - ttl;
this.kvcomparator = c;
this.deletes = new TreeMap>(new CellComparator.RowComparator());
}
/**
- * @param kv
- * @return True if this kv is expired.
+ * @param cell
+ * @return 1 if the cell is expired and no other cells will be alive; -1 if
+ * the cell is expired but if we are not sure other cells should be skipped;
+ * 0 otherwise
*/
- boolean isExpired(final Cell kv) {
- return HStore.isExpired(kv, this.oldestts);
+ int isExpired(final Cell cell) {
+ return HStore.isExpired(cell, this.oldestUnexpiredTs, this.now);
}
/*
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 2fd03de..1862b07 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -84,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
@@ -2627,6 +2630,7 @@ public class HRegion implements HeapSize { // , Writable{
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
noOfDeletes++;
}
+ rewriteCellTags(familyMaps[i], mutation);
}
lock(this.updatesLock.readLock(), numReadyToWrite);
@@ -3099,6 +3103,85 @@ public class HRegion implements HeapSize { // , Writable{
}
}
+ /**
+ * Rewrite incoming cell tags.
+ *
+ * Add cell TTLs to the incoming mutation. Drop any incoming TTL tags.
+ */
+ void rewriteCellTags(Map> familyMap, final Mutation m) {
+ // Check if we have any work to do and early out otherwise
+ // Update these checks as more logic is added here
+
+ // If we don't have a TTL on the mutation, and this is a Put, we don't need to filter
+ // TTL tags
+ if (m.getTTL() == Long.MAX_VALUE && m instanceof Put) {
+ return;
+ }
+
+ // Sadly we have to recreate the cell because cell or KV doesn't support
+ // scatter-gather. It will be expensive to do this in multiple places so
+ // this method is envisioned as a one-stop-shop for incoming cell tag
+ // rewrite rules applied during doMiniBatchMutation
+ List updatedCells = new ArrayList();
+ for (Map.Entry> e: familyMap.entrySet()) {
+ updatedCells.clear(); // Be sure to clear this each time around the loop
+ List cells = e.getValue();
+ Iterator| cellIterator = cells.iterator(); // Not sure what kind of list this is
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ // Avoid as much work as possible if the cell doesn't carry tags
+ if (cell.getTagsLength() > 0) {
+ List newTags = new ArrayList();
+ Iterator tagIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ boolean changed = false;
+ while (tagIterator.hasNext()) {
+ Tag t = tagIterator.next();
+
+ // Filter tags
+
+ // Drop incoming TTL tags if the mutation specified a TTL or is not a Put
+ if (t.getType() == TagType.TTL_TAG_TYPE &&
+ (m.getTTL() != Long.MAX_VALUE || !(m instanceof Put))) {
+ changed = true;
+ continue;
+ }
+
+ newTags.add(t);
+ }
+
+ // Add new tags here
+
+ // Add a TTL to Puts
+ if ((m instanceof Put) && m.getTTL() != Long.MAX_VALUE) {
+ // Set the TTL of this cell to that specified by the mutation attr
+ changed = true;
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
+ }
+
+ // Cell is rewritten here
+ if (changed) {
+ updatedCells.add(new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+ cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ newTags));
+ // Remove original after rewrite
+ cellIterator.remove();
+ }
+ }
+ }
+
+ // We may have queued up rewritten cells while iterating, add them now
+ if (!updatedCells.isEmpty()) {
+ cells.addAll(updatedCells);
+ // Maintain sorted order
+ Collections.sort(cells, new CellComparator());
+ }
+ }
+ }
+
/*
* Check if resources to support an update.
*
@@ -5369,50 +5452,88 @@ public class HRegion implements HeapSize { // , Writable{
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the append value
- // Avoid as much copying as possible. Every byte is copied at most
- // once.
+ // Avoid as much copying as possible. We may need to rewrite and
+ // consolidate tags. Bytes are only copied once.
// Would be nice if KeyValue had scatter/gather logic
int idx = 0;
for (Cell cell : family.getValue()) {
Cell newCell;
Cell oldCell = null;
+ List newTags = new ArrayList();
if (idx < results.size()
&& CellUtil.matchingQualifier(results.get(idx), cell)) {
oldCell = results.get(idx);
- // allocate an empty kv once
+
+ // Process cell tags
+
+ // Make a union of the set of tags in the old and new KVs, but
+ // filter cell TTL tags if the append mutation specifies one.
+
+ if (oldCell.getTagsLength() > 0) {
+ Iterator i = CellUtil.tagsIterator(oldCell.getTagsArray(),
+ oldCell.getTagsOffset(), oldCell.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && append.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
+ if (cell.getTagsLength() > 0) {
+ Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && append.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
+
+ if (append.getTTL() != Long.MAX_VALUE) {
+ // Add the new TTL tag
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+ }
+
+ // Rebuild tags
+ byte[] tagBytes = Tag.fromList(newTags);
+
+ // allocate an empty cell once
newCell = new KeyValue(row.length, cell.getFamilyLength(),
cell.getQualifierLength(), now, KeyValue.Type.Put,
oldCell.getValueLength() + cell.getValueLength(),
- oldCell.getTagsLength() + cell.getTagsLength());
- // copy in the value
- System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
- newCell.getValueArray(), newCell.getValueOffset(),
- oldCell.getValueLength());
- System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
- newCell.getValueArray(),
- newCell.getValueOffset() + oldCell.getValueLength(),
- cell.getValueLength());
- // copy in the tags
- System.arraycopy(oldCell.getTagsArray(), oldCell.getTagsOffset(),
- newCell.getTagsArray(), newCell.getTagsOffset(), oldCell.getTagsLength());
- System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), newCell.getTagsArray(),
- newCell.getTagsOffset() + oldCell.getTagsLength(), cell.getTagsLength());
+ tagBytes.length);
// copy in row, family, and qualifier
System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
- newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
+ newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
- newCell.getFamilyArray(), newCell.getFamilyOffset(),
- cell.getFamilyLength());
+ newCell.getFamilyArray(), newCell.getFamilyOffset(),
+ cell.getFamilyLength());
System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
- newCell.getQualifierArray(), newCell.getQualifierOffset(),
- cell.getQualifierLength());
+ newCell.getQualifierArray(), newCell.getQualifierOffset(),
+ cell.getQualifierLength());
+ // copy in the value
+ System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
+ newCell.getValueArray(), newCell.getValueOffset(),
+ oldCell.getValueLength());
+ System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
+ newCell.getValueArray(),
+ newCell.getValueOffset() + oldCell.getValueLength(),
+ cell.getValueLength());
+ // Copy in tag data
+ System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
+ tagBytes.length);
idx++;
} else {
// Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
// so only need to update the timestamp to 'now'
CellUtil.updateLatestStamp(cell, now);
newCell = cell;
- }
+ }
CellUtil.setSequenceId(newCell, mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
@@ -5587,12 +5708,27 @@ public class HRegion implements HeapSize { // , Writable{
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the increment amount
int idx = 0;
- for (Cell kv: family.getValue()) {
- long amount = Bytes.toLong(CellUtil.cloneValue(kv));
+ for (Cell cell: family.getValue()) {
+ long amount = Bytes.toLong(CellUtil.cloneValue(cell));
boolean noWriteBack = (amount == 0);
+ List newTags = new ArrayList();
+
+ // Process cell tags from mutation
+ if (cell.getTagsLength() > 0) {
+ Iterator i = CellUtil.tagsIterator(cell.getTagsArray(),
+ cell.getTagsOffset(), cell.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && increment.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
Cell c = null;
- if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
+ if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
c = results.get(idx);
if(c.getValueLength() == Bytes.SIZEOF_LONG) {
amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
@@ -5601,32 +5737,41 @@ public class HRegion implements HeapSize { // , Writable{
throw new org.apache.hadoop.hbase.DoNotRetryIOException(
"Attempted to increment field that isn't 64 bits wide");
}
+ // Carry tags forward from previous version
+ if (c.getTagsLength() > 0) {
+ Iterator i = CellUtil.tagsIterator(c.getTagsArray(),
+ c.getTagsOffset(), c.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && increment.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
idx++;
}
// Append new incremented KeyValue to list
- byte[] q = CellUtil.cloneQualifier(kv);
+ byte[] q = CellUtil.cloneQualifier(cell);
byte[] val = Bytes.toBytes(amount);
- int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
- int incCellTagsLen = kv.getTagsLength();
- Cell newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
- KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
- System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
- System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
- family.getKey().length);
- System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
- // copy in the value
- System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
- // copy tags
- if (oldCellTagsLen > 0) {
- System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
- newKV.getTagsOffset(), oldCellTagsLen);
- }
- if (incCellTagsLen > 0) {
- System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
- newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
+
+ // Add the TTL tag if the mutation carried one
+ if (increment.getTTL() != Long.MAX_VALUE) {
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
}
+
+ Cell newKV = new KeyValue(row, 0, row.length,
+ family.getKey(), 0, family.getKey().length,
+ q, 0, q.length,
+ now,
+ KeyValue.Type.Put,
+ val, 0, val.length,
+ newTags);
+
CellUtil.setSequenceId(newKV, mvccNum);
+
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = coprocessorHost.postMutationBeforeWAL(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 4813e10..bfddc11 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
@@ -1668,8 +1670,47 @@ public class HStore implements Store {
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
}
- static boolean isExpired(final Cell key, final long oldestTimestamp) {
- return key.getTimestamp() < oldestTimestamp;
+ /**
+ * @param cell
+ * @param oldestTimestamp
+ * @return 1 if the cell is expired and no other cells will be alive; -1 if
+ * the cell is expired but if we are not sure other cells should be skipped;
+ * 0 otherwise
+ */
+ static int isExpired(final Cell cell, final long oldestTimestamp, final long now) {
+ // Do not create an Iterator or Tag objects unless the cell actually has
+ // tags
+ if (cell.getTagsLength() > 0) {
+ // Look for a TTL tag first. Use it instead of the family setting if
+ // found. If a cell has multiple TTLs, resolve the conflict by using the
+ // first tag encountered.
+ Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ if (TagType.TTL_TAG_TYPE == t.getType()) {
+ if (t.getTagLength() == Bytes.SIZEOF_LONG) {
+ // Unlike in schema cell TTLs are stored in milliseconds, no need
+ // to convert
+ long ts = cell.getTimestamp();
+ long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
+ if (ts + ttl < now) {
+ return -1;
+ }
+ // Per cell TTLs cannot extend lifetime beyond family settings, so
+ // fall through to check that
+ break;
+ } else {
+ LOG.warn("TTL tag for cell " + cell + " has wrong size: have=" + t.getTagLength() +
+ ", want=" + Bytes.SIZEOF_LONG);
+ }
+ }
+ }
+ }
+ if (cell.getTimestamp() < oldestTimestamp) {
+ return 1;
+ }
+ return 0;
}
@Override
@@ -1811,7 +1852,7 @@ public class HStore implements Store {
if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
// Did we go beyond the target row? If so break.
if (state.isTooFar(kv, firstOnRow)) break;
- if (state.isExpired(kv)) {
+ if (state.isExpired(kv) != 0) {
continue;
}
// If we added something, this row is a contender. break.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
index a5c17fb..a29ef8e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
@@ -56,6 +56,7 @@ public class ScanDeleteTracker implements DeleteTracker {
/**
* Constructor for ScanDeleteTracker
+ * @param oldestUnexpiredTS
*/
public ScanDeleteTracker() {
super();
@@ -111,6 +112,7 @@ public class ScanDeleteTracker implements DeleteTracker {
@Override
public DeleteResult isDeleted(Cell cell) {
long timestamp = cell.getTimestamp();
+
int qualifierOffset = cell.getQualifierOffset();
int qualifierLength = cell.getQualifierLength();
if (hasFamilyStamp && timestamp <= familyStamp) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index 964fad8..f280645 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -100,6 +100,9 @@ public class ScanQueryMatcher {
*/
private final long earliestPutTs;
+ /** The oldest timestamp we are interested in, based on TTL */
+ private final long oldestUnexpiredTS;
+ private final long now;
/** readPoint over which the KVs are unconditionally included */
protected long maxReadPointToTrackVersions;
@@ -152,7 +155,7 @@ public class ScanQueryMatcher {
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns,
ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
- RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+ long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator();
this.regionCoprocessorHost = regionCoprocessorHost;
@@ -162,6 +165,8 @@ public class ScanQueryMatcher {
scanInfo.getFamily());
this.filter = scan.getFilter();
this.earliestPutTs = earliestPutTs;
+ this.oldestUnexpiredTS = oldestUnexpiredTS;
+ this.now = now;
this.maxReadPointToTrackVersions = readPointToUse;
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
@@ -184,8 +189,7 @@ public class ScanQueryMatcher {
hasNullColumn = true;
// use a specialized scan for wildcard column tracker.
- this.columns = new ScanWildcardColumnTracker(
- scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS);
+ this.columns = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions);
} else {
// whether there is null column in the explicit column query
hasNullColumn = (columns.first().length == 0);
@@ -194,7 +198,7 @@ public class ScanQueryMatcher {
// between rows, not between storefiles.
byte[] attr = scan.getAttribute(Scan.HINT_LOOKAHEAD);
this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
- oldestUnexpiredTS, attr == null ? 0 : Bytes.toInt(attr));
+ attr == null ? 0 : Bytes.toInt(attr));
}
this.isReversed = scan.isReversed();
}
@@ -213,18 +217,18 @@ public class ScanQueryMatcher {
* @param scanInfo The store's immutable scan info
* @param columns
* @param earliestPutTs Earliest put seen in any of the store files.
- * @param oldestUnexpiredTS the oldest timestamp we are interested in,
- * based on TTL
+ * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
+ * @param now the current server time
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
* @param regionCoprocessorHost
* @throws IOException
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns,
- long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
+ long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
- oldestUnexpiredTS, regionCoprocessorHost);
+ oldestUnexpiredTS, now, regionCoprocessorHost);
Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
this.dropDeletesFromRow = dropDeletesFromRow;
this.dropDeletesToRow = dropDeletesToRow;
@@ -234,10 +238,10 @@ public class ScanQueryMatcher {
* Constructor for tests
*/
ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
- NavigableSet columns, long oldestUnexpiredTS) throws IOException {
+ NavigableSet columns, long oldestUnexpiredTS, long now) throws IOException {
this(scan, scanInfo, columns, ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */
- HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
+ HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
}
/**
@@ -295,12 +299,6 @@ public class ScanQueryMatcher {
int qualifierOffset = cell.getQualifierOffset();
int qualifierLength = cell.getQualifierLength();
- long timestamp = cell.getTimestamp();
- // check for early out based on timestamp alone
- if (columns.isDone(timestamp)) {
- return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
- qualifierLength);
- }
/*
* The delete logic is pretty complicated now.
@@ -317,6 +315,7 @@ public class ScanQueryMatcher {
*/
byte typeByte = cell.getTypeByte();
long mvccVersion = cell.getMvccVersion();
+ long timestamp = cell.getTimestamp();
if (CellUtil.isDelete(cell)) {
if (!keepDeletedCells) {
// first ignore delete markers if the scanner can do so, and the
@@ -362,9 +361,23 @@ public class ScanQueryMatcher {
}
// note the following next else if...
// delete marker are not subject to other delete markers
- } else if (!this.deletes.isEmpty()) {
- DeleteResult deleteResult = deletes.isDeleted(cell);
- switch (deleteResult) {
+ } else {
+ if (columns.hasMinVersions()) {
+ int expired = isExpired(cell);
+ if (expired < 0) {
+ // If the cell is expired and we have enough versions, skip
+ return MatchCode.SKIP;
+ } else if (expired >= 1) {
+ // If the cell is expired and we have enough versions, and we are
+ // sure no other cells can be alive, then skip forward
+ return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
+ qualifierLength);
+ }
+ }
+ // Check deletes
+ if (!this.deletes.isEmpty()) {
+ DeleteResult deleteResult = deletes.isDeleted(cell);
+ switch (deleteResult) {
case FAMILY_DELETED:
case COLUMN_DELETED:
return columns.getNextRowOrNextColumn(cell.getQualifierArray(),
@@ -377,6 +390,7 @@ public class ScanQueryMatcher {
default:
throw new RuntimeException("UNEXPECTED");
}
+ }
}
int timestampComparison = tr.compare(timestamp);
@@ -446,6 +460,16 @@ public class ScanQueryMatcher {
return colChecker;
}
+ /**
+ * @param cell
+ * @return 1 if the cell is expired and no other cells will be alive; -1 if
+ * the cell is expired but if we are not sure other cells should be skipped;
+ * 0 otherwise
+ */
+ int isExpired(final Cell cell) {
+ return HStore.isExpired(cell, this.oldestUnexpiredTS, this.now);
+ }
+
/** Handle partial-drop-deletes. As we match keys in order, when we have a range from which
* we can drop deletes, we can set retainDeletesInOutput to false for the duration of this
* range only, and maintain consistency. */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
index 85b36fb..9bb0dd6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
@@ -43,20 +43,14 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
private long latestTSOfCurrentColumn;
private byte latestTypeOfCurrentColumn;
- private long oldestStamp;
-
/**
* Return maxVersions of every row.
* @param minVersion Minimum number of versions to keep
* @param maxVersion Maximum number of versions to return
- * @param oldestUnexpiredTS oldest timestamp that has not expired according
- * to the TTL.
*/
- public ScanWildcardColumnTracker(int minVersion, int maxVersion,
- long oldestUnexpiredTS) {
+ public ScanWildcardColumnTracker(int minVersion, int maxVersion) {
this.maxVersions = maxVersion;
this.minVersions = minVersion;
- this.oldestStamp = oldestUnexpiredTS;
}
/**
@@ -126,11 +120,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
/**
* Check whether this version should be retained.
- * There are 4 variables considered:
- * If this version is past max versions -> skip it
- * If this kv has expired or was deleted, check min versions
- * to decide whther to skip it or not.
- *
* Increase the version counter unless this is a delete
*/
private MatchCode checkVersion(byte type, long timestamp) {
@@ -140,14 +129,8 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
if (currentCount > maxVersions) {
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col
}
- // keep the KV if required by minversions or it is not expired, yet
- if (currentCount <= minVersions || !isExpired(timestamp)) {
- setTSAndType(timestamp, type);
- return ScanQueryMatcher.MatchCode.INCLUDE;
- } else {
- return MatchCode.SEEK_NEXT_COL;
- }
-
+ setTSAndType(timestamp, type);
+ return ScanQueryMatcher.MatchCode.INCLUDE;
}
@Override
@@ -170,10 +153,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
return timestamp == latestTSOfCurrentColumn && type == latestTypeOfCurrentColumn;
}
- private boolean isExpired(long timestamp) {
- return timestamp < oldestStamp;
- }
-
/**
* Used by matcher and scan/get to get a hint of the next column
* to seek to after checkColumn() returns SKIP. Returns the next interesting
@@ -194,12 +173,18 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
return false;
}
+ @Override
public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
int qualLength) {
return MatchCode.SEEK_NEXT_COL;
}
- public boolean isDone(long timestamp) {
- return minVersions <= 0 && isExpired(timestamp);
+ @Override
+ public boolean hasMinVersions() {
+ // Always have enough versions if minVersions is not set
+ if (minVersions <= 0) {
+ return true;
+ }
+ return currentCount >= minVersions;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 0a4e1ed..73eb031 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -76,6 +76,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final Scan scan;
protected final NavigableSet columns;
protected final long oldestUnexpiredTS;
+ protected final long now;
protected final int minVersions;
protected final long maxRowSize;
@@ -122,7 +123,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
explicitColumnQuery = numCol > 0;
this.scan = scan;
this.columns = columns;
- oldestUnexpiredTS = EnvironmentEdgeManager.currentTime() - ttl;
+ this.now = EnvironmentEdgeManager.currentTime();
+ this.oldestUnexpiredTS = now - ttl;
this.minVersions = minVersions;
if (store != null && ((HStore)store).getHRegion() != null
@@ -172,7 +174,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
- oldestUnexpiredTS, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, store.getCoprocessorHost());
this.store.addChangedReaderObserver(this);
@@ -237,10 +239,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
if (dropDeletesFromRow == null) {
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
- earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
+ earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
} else {
matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
- oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
}
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
@@ -280,7 +282,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions(), readPt);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
- Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
+ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
// In unit tests, the store could be null
if (this.store != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
index 72d7aa9..ff933a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
@@ -49,9 +49,7 @@ public class TestExplicitColumnTracker {
TreeSet trackColumns,
List scannerColumns,
List expected, int lookAhead) throws IOException {
- ColumnTracker exp = new ExplicitColumnTracker(
- trackColumns, 0, maxVersions, Long.MIN_VALUE, lookAhead);
-
+ ColumnTracker exp = new ExplicitColumnTracker(trackColumns, 0, maxVersions, lookAhead);
//Initialize result
List result = new ArrayList();
@@ -210,8 +208,7 @@ public class TestExplicitColumnTracker {
columns.add(Bytes.toBytes("col"+i));
}
- ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions,
- Long.MIN_VALUE, 0);
+ ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions, 0);
for (int i = 0; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
ScanQueryMatcher.checkColumn(explicit, col, 0, col.length, 1, KeyValue.Type.Put.getCode(),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index b4e94bf..d9dc80b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
@@ -111,6 +113,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -136,6 +139,7 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
@@ -5672,6 +5676,163 @@ public class TestHRegion {
}
}
+ @Test
+ public void testCellTTLs() throws IOException {
+ IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ final byte[] row = Bytes.toBytes("testRow");
+ final byte[] q1 = Bytes.toBytes("q1");
+ final byte[] q2 = Bytes.toBytes("q2");
+ final byte[] q3 = Bytes.toBytes("q3");
+ final byte[] q4 = Bytes.toBytes("q4");
+
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
+ HColumnDescriptor hcd = new HColumnDescriptor(fam1);
+ hcd.setTimeToLive(10); // 10 seconds
+ htd.addFamily(hcd);
+
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
+
+ HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
+ TEST_UTIL.getDataTestDir(), conf, htd);
+ assertNotNull(region);
+ try {
+ long now = EnvironmentEdgeManager.currentTime();
+ // Add a cell that will expire in 5 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire after 10 seconds via family setting
+ region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
+ // Add a cell that will expire in 15 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire in 20 seconds via family setting
+ region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
+
+ // Flush so we are sure store scanning gets this right
+ region.flushcache();
+
+ // A query at time T+0 should return all cells
+ Result r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+5 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+10 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+15 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+20 seconds
+ edge.incrementTime(10000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNull(r.getValue(fam1, q4));
+
+ now = edge.currentTime();
+
+ // If the cell TTL is specified by a mutation attribute, this should
+ // override any TTLs that may have been passed in as tags.
+
+ // Add a cell that will expire in 5 seconds via mutation attribute
+ Put put = new Put(row).add(new KeyValue(row, fam1, q1, now,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // Include an earlier expiration time as tag, this should be ignored
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(1000L)) } ));
+ put.setTTL(5000L);
+ region.put(put);
+ r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+
+ // Increment time to T+21 seconds
+ edge.incrementTime(1000);
+ r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+
+ // Increment time to T+22 seconds
+ edge.incrementTime(1000);
+ r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+
+ // Increment time to T+25 seconds
+ edge.incrementTime(3000);
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+
+ // Fun with disappearing increments
+
+ // Start at 1
+ region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
+ r = region.get(new Get(row));
+ byte[] val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment with a TTL of 5 seconds
+ Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
+ incr.setTTL(5000);
+ region.increment(incr); // 2
+
+ // New value should be 2
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 2L);
+
+ // Increment time to T+30 seconds
+ edge.incrementTime(5000);
+
+ // Value should be back to 1
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment time to T+35 seconds
+ edge.incrementTime(5000);
+
+ // Original value written at T+25 should be gone now via family TTL
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+
+ } finally {
+ HRegion.closeHRegion(region);
+ }
+ }
+
private static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
index 2b1f11f..a2b7383 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
@@ -91,11 +91,12 @@ public class TestQueryMatcher extends HBaseTestCase {
}
- private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException {
- // 2,4,5
+ private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException {
+ long now = EnvironmentEdgeManager.currentTime();
+ // 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, false, 0, rowComparator), get.getFamilyMap().get(fam2),
- EnvironmentEdgeManager.currentTime() - ttl);
+ now - ttl, now);
List memstore = new ArrayList();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -175,9 +176,10 @@ public class TestQueryMatcher extends HBaseTestCase {
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
+ long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, false, 0, rowComparator), null,
- EnvironmentEdgeManager.currentTime() - ttl);
+ now - ttl, now);
List memstore = new ArrayList();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -231,7 +233,7 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, false, 0, rowComparator), get.getFamilyMap().get(fam2),
- now - testTTL);
+ now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -285,7 +287,7 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, false, 0, rowComparator), null,
- now - testTTL);
+ now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -343,7 +345,7 @@ public class TestQueryMatcher extends HBaseTestCase {
NavigableSet cols = get.getFamilyMap().get(fam2);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
- HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
+ HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
List actual =
new ArrayList(rows.length);
byte[] prevRow = null;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
index c0dcee6..1482ae3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.regionserver;
+import static org.junit.Assert.*;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -28,16 +30,19 @@ import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.Ignore;
+import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, SmallTests.class})
-public class TestScanWildcardColumnTracker extends HBaseTestCase {
+public class TestScanWildcardColumnTracker {
final static int VERSIONS = 2;
+ @Test
public void testCheckColumn_Ok() throws IOException {
- ScanWildcardColumnTracker tracker =
- new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
+ ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS);
//Create list of qualifiers
List qualifiers = new ArrayList();
@@ -68,9 +73,9 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
}
}
+ @Test
public void testCheckColumn_EnforceVersions() throws IOException {
- ScanWildcardColumnTracker tracker =
- new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
+ ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS);
//Create list of qualifiers
List qualifiers = new ArrayList();
@@ -102,9 +107,10 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
}
}
- public void DisabledTestCheckColumn_WrongOrder() {
- ScanWildcardColumnTracker tracker =
- new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
+ @Test
+ @Ignore
+ public void testCheckColumn_WrongOrder() {
+ ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS);
//Create list of qualifiers
List qualifiers = new ArrayList();
@@ -125,6 +131,5 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
assertEquals(true, ok);
}
-
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
index a613319..17fe754 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
@@ -455,8 +455,13 @@ public class TestTags {
tags = TestCoprocessorForTags.tags;
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ List tagValues = new ArrayList();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
@@ -512,8 +517,13 @@ public class TestTags {
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ tagValues.clear();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
| | | | | |