diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index 869d940..f4c39cb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -78,6 +78,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
*/
private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
+ /**
+ * The attribute for storing TTL for the result of the mutation.
+ */
+ private static final String OP_ATTRIBUTE_TTL = "_ttl";
+
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
protected Durability durability = Durability.USE_DEFAULT;
@@ -206,6 +211,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
if (getId() != null) {
map.put("id", getId());
}
+ // Add the TTL if set
+ // Long.MAX_VALUE is the default, and is interpreted to mean this attribute
+ // has not been set.
+ if (getTTL() != Long.MAX_VALUE) {
+ map.put("ttl", getTTL());
+ }
return map;
}
@@ -470,6 +481,37 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
}
/**
+ * Return the TTL requested for the result of the mutation, in milliseconds.
+ * @return the TTL requested for the result of the mutation, in milliseconds,
+ * or Long.MAX_VALUE if unset
+ */
+ public long getTTL() {
+ byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL);
+ if (ttlBytes != null) {
+ return Bytes.toLong(ttlBytes);
+ }
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Set the TTL desired for the result of the mutation, in milliseconds.
+ *
+ * Wizard: This will cause all cells carried within the mutation to be
+ * rewritten on the server to carry the operation TTL per cell in a tag.
+ * This overhead can be avoided through client side construction of cells
+ * which include the cell TTL tag (TagType.TTL_TAG_TYPE) directly. If a
+ * mutation carries both cells with TTL tags and the TTL attribute, cells
+ * in the mutation will be rewritten to carry one TTL tag set to the value
+ * of the attribute.
+ * @param ttl the TTL desired for the result of the mutation, in milliseconds
+ * @return this
+ */
+ public Mutation setTTL(long ttl) {
+ setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl));
+ return this;
+ }
+
+ /**
* Subclasses should override this method to add the heap size of their own fields.
* @return the heap size to add (will be aligned).
*/
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
index f214edb..31aaef0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
@@ -212,6 +212,25 @@ public class Tag {
}
/**
+ * Write a list of tags into a byte array
+ * @param tags
+ * @return the serialized tag data as bytes
+ */
+ public static byte[] fromList(List tags) {
+ int length = 0;
+ for (Tag tag: tags) {
+ length += tag.length;
+ }
+ byte[] b = new byte[length];
+ int pos = 0;
+ for (Tag tag: tags) {
+ System.arraycopy(tag.bytes, tag.offset, b, pos, tag.length);
+ pos += tag.length;
+ }
+ return b;
+ }
+
+ /**
* Returns the total length of the entire tag entity
*/
int getLength() {
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index 45c8476..9ed247e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -28,4 +28,5 @@ public final class TagType {
public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
+ public static final byte TTL_TAG_TYPE = (byte)7;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
index d37aa5a..0c3291e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
@@ -24,6 +24,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
@@ -31,7 +32,7 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
- * Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not
+ * Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not
* implement the {@link DeleteTracker} interface since state spans rows (There
* is no update nor reset method).
*/
@@ -39,7 +40,8 @@ import org.apache.hadoop.hbase.util.Bytes;
class GetClosestRowBeforeTracker {
private final KeyValue targetkey;
// Any cell w/ a ts older than this is expired.
- private final long oldestts;
+ private final long now;
+ private final long oldestUnexpiredTs;
private KeyValue candidate = null;
private final KVComparator kvcomparator;
// Flag for whether we're doing getclosest on a metaregion.
@@ -72,20 +74,13 @@ class GetClosestRowBeforeTracker {
HConstants.DELIMITER) - this.rowoffset;
}
this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
- this.oldestts = System.currentTimeMillis() - ttl;
+ this.now = System.currentTimeMillis();
+ this.oldestUnexpiredTs = now - ttl;
this.kvcomparator = c;
KeyValue.RowOnlyComparator rc = new KeyValue.RowOnlyComparator(this.kvcomparator);
this.deletes = new TreeMap>(rc);
}
- /**
- * @param kv
- * @return True if this kv is expired.
- */
- boolean isExpired(final KeyValue kv) {
- return HStore.isExpired(kv, this.oldestts);
- }
-
/*
* Add the specified KeyValue to the list of deletes.
* @param kv
@@ -170,6 +165,14 @@ class GetClosestRowBeforeTracker {
return false;
}
+ /**
+ * @param cell
+ * @return true if the cell is expired
+ */
+ public boolean isExpired(final Cell cell) {
+ return HStore.isCellExpired(cell, this.oldestUnexpiredTs, this.now);
+ }
+
/*
* Handle keys whose values hold deletes.
* Add to the set of deletes and then if the candidate keys contain any that
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 01dda60..4a434b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -68,6 +69,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -84,6 +86,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
@@ -2457,6 +2461,7 @@ public class HRegion implements HeapSize { // , Writable{
}
noOfDeletes++;
}
+ rewriteCellTags(familyMaps[i], mutation);
}
lock(this.updatesLock.readLock(), numReadyToWrite);
@@ -2869,6 +2874,7 @@ public class HRegion implements HeapSize { // , Writable{
closeRegionOperation();
}
}
+
private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
// Currently this is only called for puts and deletes, so no nonces.
OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
@@ -2920,6 +2926,85 @@ public class HRegion implements HeapSize { // , Writable{
}
}
+ /**
+ * Rewrite incoming cell tags.
+ *
+ * Add cell TTLs to the incoming mutation. Drop any incoming TTL tags.
+ */
+ void rewriteCellTags(Map> familyMap, final Mutation m) {
+ // Check if we have any work to do and early out otherwise
+ // Update these checks as more logic is added here
+
+ // If we don't have a TTL on the mutation, and this is a Put, we don't need to filter
+ // TTL tags
+ if (m.getTTL() == Long.MAX_VALUE && m instanceof Put) {
+ return;
+ }
+
+ // Sadly we have to recreate the cell because cell or KV doesn't support
+ // scatter-gather. It will be expensive to do this in multiple places so
+ // this method is envisioned as a one-stop-shop for incoming cell tag
+ // rewrite rules applied during doMiniBatchMutation
+ List updatedCells = new ArrayList();
+ for (Map.Entry> e: familyMap.entrySet()) {
+ updatedCells.clear(); // Be sure to clear this each time around the loop
+ List cells = e.getValue();
+ Iterator| cellIterator = cells.iterator(); // Not sure what kind of list this is
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ // Avoid as much work as possible if the cell doesn't carry tags
+ if (cell.getTagsLengthUnsigned() > 0) {
+ List newTags = new ArrayList();
+ Iterator tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
+ cell.getTagsOffset(), cell.getTagsLengthUnsigned());
+ boolean changed = false;
+ while (tagIterator.hasNext()) {
+ Tag t = tagIterator.next();
+
+ // Filter tags
+
+ // Drop incoming TTL tags if the mutation specified a TTL or is not a Put
+ if (t.getType() == TagType.TTL_TAG_TYPE &&
+ (m.getTTL() != Long.MAX_VALUE || !(m instanceof Put))) {
+ changed = true;
+ continue;
+ }
+
+ newTags.add(t);
+ }
+
+ // Add new tags here
+
+ // Add a TTL to Puts
+ if ((m instanceof Put) && m.getTTL() != Long.MAX_VALUE) {
+ // Set the TTL of this cell to that specified by the mutation attr
+ changed = true;
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
+ }
+
+ // Cell is rewritten here
+ if (changed) {
+ updatedCells.add(new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+ cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ newTags));
+ // Remove original after rewrite
+ cellIterator.remove();
+ }
+ }
+ }
+
+ // We may have queued up rewritten cells while iterating, add them now
+ if (!updatedCells.isEmpty()) {
+ cells.addAll(updatedCells);
+ // Maintain sorted order
+ Collections.sort(cells, new CellComparator());
+ }
+ }
+ }
+
/*
* Check if resources to support an update.
*
@@ -5215,66 +5300,103 @@ public class HRegion implements HeapSize { // , Writable{
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the append value
- // Avoid as much copying as possible. Every byte is copied at most
- // once.
+ // Avoid as much copying as possible. We may need to rewrite and
+ // consolidate tags. Bytes are only copied once.
// Would be nice if KeyValue had scatter/gather logic
int idx = 0;
for (Cell cell : family.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- KeyValue newKV;
+ KeyValue newKv;
KeyValue oldKv = null;
+ List newTags = new ArrayList();
if (idx < results.size()
- && CellUtil.matchingQualifier(results.get(idx),kv)) {
+ && CellUtil.matchingQualifier(results.get(idx), kv)) {
oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
- // allocate an empty kv once
- newKV = new KeyValue(row.length, kv.getFamilyLength(),
+
+ // Process cell tags
+
+ // Make a union of the set of tags in the old and new KVs, but
+ // filter cell TTL tags if the append mutation specifies one.
+
+ if (oldKv.getTagsLengthUnsigned() > 0) {
+ Iterator i = CellUtil.tagsIterator(oldKv.getTagsArray(),
+ oldKv.getTagsOffset(), oldKv.getTagsLengthUnsigned());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && append.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
+ if (kv.getTagsLengthUnsigned() > 0) {
+ Iterator i = CellUtil.tagsIterator(kv.getTagsArray(), kv.getTagsOffset(),
+ kv.getTagsLengthUnsigned());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && append.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
+
+ if (append.getTTL() != Long.MAX_VALUE) {
+ // Add the new TTL tag
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+ }
+
+ // Rebuild tags
+ byte[] tagBytes = Tag.fromList(newTags);
+
+ // allocate an empty cell once
+ newKv = new KeyValue(row.length, kv.getFamilyLength(),
kv.getQualifierLength(), now, KeyValue.Type.Put,
oldKv.getValueLength() + kv.getValueLength(),
- oldKv.getTagsLengthUnsigned() + kv.getTagsLengthUnsigned());
- // copy in the value
- System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
- newKV.getBuffer(), newKV.getValueOffset(),
- oldKv.getValueLength());
- System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
- newKV.getBuffer(),
- newKV.getValueOffset() + oldKv.getValueLength(),
- kv.getValueLength());
- // copy in the tags
- System.arraycopy(oldKv.getBuffer(), oldKv.getTagsOffset(), newKV.getBuffer(),
- newKV.getTagsOffset(), oldKv.getTagsLengthUnsigned());
- System.arraycopy(kv.getBuffer(), kv.getTagsOffset(), newKV.getBuffer(),
- newKV.getTagsOffset() + oldKv.getTagsLengthUnsigned(),
- kv.getTagsLengthUnsigned());
+ tagBytes.length);
// copy in row, family, and qualifier
- System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
- newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
- System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
- newKV.getBuffer(), newKV.getFamilyOffset(),
- kv.getFamilyLength());
- System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
- newKV.getBuffer(), newKV.getQualifierOffset(),
- kv.getQualifierLength());
+ System.arraycopy(kv.getRowArray(), kv.getRowOffset(),
+ newKv.getRowArray(), newKv.getRowOffset(), kv.getRowLength());
+ System.arraycopy(kv.getFamilyArray(), kv.getFamilyOffset(),
+ newKv.getFamilyArray(), newKv.getFamilyOffset(),
+ kv.getFamilyLength());
+ System.arraycopy(kv.getQualifierArray(), kv.getQualifierOffset(),
+ newKv.getQualifierArray(), newKv.getQualifierOffset(),
+ kv.getQualifierLength());
+ // copy in the value
+ System.arraycopy(oldKv.getValueArray(), oldKv.getValueOffset(),
+ newKv.getValueArray(), newKv.getValueOffset(),
+ oldKv.getValueLength());
+ System.arraycopy(kv.getValueArray(), kv.getValueOffset(),
+ newKv.getValueArray(),
+ newKv.getValueOffset() + oldKv.getValueLength(),
+ kv.getValueLength());
+ // Copy in tag data
+ System.arraycopy(tagBytes, 0, newKv.getTagsArray(), newKv.getTagsOffset(),
+ tagBytes.length);
idx++;
} else {
- newKV = kv;
+ newKv = kv;
// Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
// so only need to update the timestamp to 'now'
- newKV.updateLatestStamp(Bytes.toBytes(now));
+ newKv.updateLatestStamp(Bytes.toBytes(now));
}
- newKV.setMvccVersion(w.getWriteNumber());
+ newKv.setMvccVersion(w.getWriteNumber());
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
- newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
- RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKV));
+ newKv = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
+ RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKv));
}
- kvs.add(newKV);
+ kvs.add(newKv);
// Append update to WAL
if (writeToWAL) {
if (walEdits == null) {
walEdits = new WALEdit();
}
- walEdits.add(newKV);
+ walEdits.add(newKv);
}
}
@@ -5414,12 +5536,27 @@ public class HRegion implements HeapSize { // , Writable{
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the increment amount
int idx = 0;
- for (Cell kv: family.getValue()) {
- long amount = Bytes.toLong(CellUtil.cloneValue(kv));
+ for (Cell cell: family.getValue()) {
+ long amount = Bytes.toLong(CellUtil.cloneValue(cell));
boolean noWriteBack = (amount == 0);
+ List newTags = new ArrayList();
+
+ // Process cell tags from mutation
+ if (cell.getTagsLengthUnsigned() > 0) {
+ Iterator i = CellUtil.tagsIterator(cell.getTagsArray(),
+ cell.getTagsOffset(), cell.getTagsLengthUnsigned());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && increment.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
Cell c = null;
- if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
+ if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
c = results.get(idx);
if(c.getValueLength() == Bytes.SIZEOF_LONG) {
amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
@@ -5428,48 +5565,57 @@ public class HRegion implements HeapSize { // , Writable{
throw new org.apache.hadoop.hbase.DoNotRetryIOException(
"Attempted to increment field that isn't 64 bits wide");
}
+ // Carry tags forward from previous version
+ if (c.getTagsLength() > 0) {
+ Iterator i = CellUtil.tagsIterator(c.getTagsArray(),
+ c.getTagsOffset(), c.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ // Strip any existing TTL tags if the mutation specified one
+ if (t.getType() == TagType.TTL_TAG_TYPE && increment.getTTL() != Long.MAX_VALUE) {
+ continue;
+ }
+ newTags.add(t);
+ }
+ }
idx++;
}
// Append new incremented KeyValue to list
- byte[] q = CellUtil.cloneQualifier(kv);
+ byte[] q = CellUtil.cloneQualifier(cell);
byte[] val = Bytes.toBytes(amount);
- int oldCellTagsLen = (c == null) ? 0 : c.getTagsLengthUnsigned();
- int incCellTagsLen = kv.getTagsLengthUnsigned();
- KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
- KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
- System.arraycopy(row, 0, newKV.getBuffer(), newKV.getRowOffset(), row.length);
- System.arraycopy(family.getKey(), 0, newKV.getBuffer(), newKV.getFamilyOffset(),
- family.getKey().length);
- System.arraycopy(q, 0, newKV.getBuffer(), newKV.getQualifierOffset(), q.length);
- // copy in the value
- System.arraycopy(val, 0, newKV.getBuffer(), newKV.getValueOffset(), val.length);
- // copy tags
- if (oldCellTagsLen > 0) {
- System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getBuffer(),
- newKV.getTagsOffset(), oldCellTagsLen);
- }
- if (incCellTagsLen > 0) {
- System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getBuffer(),
- newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
+
+ // Add the TTL tag if the mutation carried one
+ if (increment.getTTL() != Long.MAX_VALUE) {
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
}
- newKV.setMvccVersion(w.getWriteNumber());
+
+ KeyValue newKv = new KeyValue(row, 0, row.length,
+ family.getKey(), 0, family.getKey().length,
+ q, 0, q.length,
+ now,
+ KeyValue.Type.Put,
+ val, 0, val.length,
+ newTags);
+
+ newKv.setMvccVersion(w.getWriteNumber());
+
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
- newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
- RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV));
+ newKv = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
+ RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKv));
}
- allKVs.add(newKV);
+ allKVs.add(newKv);
if (!noWriteBack) {
- kvs.add(newKV);
+ kvs.add(newKv);
// Prepare WAL updates
if (writeToWAL) {
if (walEdits == null) {
walEdits = new WALEdit();
}
- walEdits.add(newKV);
+ walEdits.add(newKv);
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index c6b62d4..8ed2d89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
@@ -1592,8 +1595,45 @@ public class HStore implements Store {
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
}
- static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
- return key.getTimestamp() < oldestTimestamp;
+ /**
+ * @param kv
+ * @param oldestTimestamp
+ * @return true if the cell is expired
+ */
+ static boolean isCellExpired(final Cell cell, final long oldestTimestamp, final long now) {
+ // Do not create an Iterator or Tag objects unless the cell actually has
+ // tags
+ if (cell.getTagsLengthUnsigned() > 0) {
+ // Look for a TTL tag first. Use it instead of the family setting if
+ // found. If a cell has multiple TTLs, resolve the conflict by using the
+ // first tag encountered.
+ Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLengthUnsigned());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ if (TagType.TTL_TAG_TYPE == t.getType()) {
+ if (t.getTagLength() == Bytes.SIZEOF_LONG) {
+ // Unlike in schema cell TTLs are stored in milliseconds, no need
+ // to convert
+ long ts = cell.getTimestamp();
+ long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
+ if (ts + ttl < now) {
+ return true;
+ }
+ // Per cell TTLs cannot extend lifetime beyond family settings, so
+ // fall through to check that
+ break;
+ } else {
+ LOG.warn("TTL tag for kv " + cell + " has wrong size: have=" + t.getTagLength() +
+ ", want=" + Bytes.SIZEOF_LONG);
+ }
+ }
+ }
+ }
+ if (cell.getTimestamp() < oldestTimestamp) {
+ return true;
+ }
+ return false;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index fae8678..2856bec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -24,6 +24,7 @@ import java.util.NavigableSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
@@ -98,6 +99,10 @@ public class ScanQueryMatcher {
*/
private final long earliestPutTs;
+ /** The oldest timestamp we are interested in, based on TTL */
+ private final long oldestUnexpiredTS;
+ private final long now;
+
/** readPoint over which the KVs are unconditionally included */
protected long maxReadPointToTrackVersions;
@@ -150,7 +155,7 @@ public class ScanQueryMatcher {
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns,
ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
- RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+ long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator();
this.regionCoprocessorHost = regionCoprocessorHost;
@@ -160,6 +165,9 @@ public class ScanQueryMatcher {
scanInfo.getFamily());
this.filter = scan.getFilter();
this.earliestPutTs = earliestPutTs;
+ this.oldestUnexpiredTS = oldestUnexpiredTS;
+ this.now = now;
+
this.maxReadPointToTrackVersions = readPointToUse;
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
@@ -211,18 +219,18 @@ public class ScanQueryMatcher {
* @param scanInfo The store's immutable scan info
* @param columns
* @param earliestPutTs Earliest put seen in any of the store files.
- * @param oldestUnexpiredTS the oldest timestamp we are interested in,
- * based on TTL
+ * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
+ * @param now the current server time
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
* @param regionCoprocessorHost
* @throws IOException
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns,
- long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
+ long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
- oldestUnexpiredTS, regionCoprocessorHost);
+ oldestUnexpiredTS, now, regionCoprocessorHost);
Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
this.dropDeletesFromRow = dropDeletesFromRow;
this.dropDeletesToRow = dropDeletesToRow;
@@ -232,10 +240,10 @@ public class ScanQueryMatcher {
* Constructor for tests
*/
ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
- NavigableSet columns, long oldestUnexpiredTS) throws IOException {
+ NavigableSet columns, long oldestUnexpiredTS, long now) throws IOException {
this(scan, scanInfo, columns, ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */
- HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
+ HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
}
/**
@@ -294,7 +302,6 @@ public class ScanQueryMatcher {
}
}
-
// optimize case.
if (this.stickyNextRow)
return MatchCode.SEEK_NEXT_ROW;
@@ -317,8 +324,13 @@ public class ScanQueryMatcher {
long timestamp = Bytes.toLong(bytes, initialOffset + keyLength - KeyValue.TIMESTAMP_TYPE_SIZE);
// check for early out based on timestamp alone
if (columns.isDone(timestamp)) {
- return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
+ return columns.getNextRowOrNextColumn(kv.getQualifierArray(), kv.getQualifierOffset(),
+ kv.getQualifierLength());
}
+ // check if the cell is expired by cell TTL
+ if (HStore.isCellExpired(kv, this.oldestUnexpiredTS, this.now)) {
+ return MatchCode.SKIP;
+ }
/*
* The delete logic is pretty complicated now.
@@ -334,7 +346,7 @@ public class ScanQueryMatcher {
* they affect
*/
byte type = bytes[initialOffset + keyLength - 1];
- if (kv.isDelete()) {
+ if (CellUtil.isDelete(kv)) {
if (!keepDeletedCells) {
// first ignore delete markers if the scanner can do so, and the
// range does not include the marker
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index d19fe15..845583c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -75,6 +75,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final Scan scan;
protected final NavigableSet columns;
protected final long oldestUnexpiredTS;
+ protected final long now;
protected final int minVersions;
/**
@@ -120,7 +121,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
explicitColumnQuery = numCol > 0;
this.scan = scan;
this.columns = columns;
- oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
+ this.now = EnvironmentEdgeManager.currentTimeMillis();
+ this.oldestUnexpiredTS = now - ttl;
this.minVersions = minVersions;
// We look up row-column Bloom filters for multi-column queries as part of
@@ -162,7 +164,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
- oldestUnexpiredTS, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, store.getCoprocessorHost());
this.store.addChangedReaderObserver(this);
@@ -227,10 +229,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
if (dropDeletesFromRow == null) {
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
- earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
+ earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
} else {
matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
- oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
}
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
@@ -270,7 +272,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions(), readPt);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
- Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
+ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
// In unit tests, the store could be null
if (this.store != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a26aeb5..f63f0eb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -88,6 +89,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
@@ -109,6 +111,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -127,6 +130,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
@@ -5074,6 +5078,163 @@ public class TestHRegion {
}
}
+ @Test
+ public void testCellTTLs() throws IOException {
+ IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ final byte[] row = Bytes.toBytes("testRow");
+ final byte[] q1 = Bytes.toBytes("q1");
+ final byte[] q2 = Bytes.toBytes("q2");
+ final byte[] q3 = Bytes.toBytes("q3");
+ final byte[] q4 = Bytes.toBytes("q4");
+
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
+ HColumnDescriptor hcd = new HColumnDescriptor(fam1);
+ hcd.setTimeToLive(10); // 10 seconds
+ htd.addFamily(hcd);
+
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
+
+ HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
+ TEST_UTIL.getDataTestDir(), conf, htd);
+ assertNotNull(region);
+ try {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ // Add a cell that will expire in 5 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire after 10 seconds via family setting
+ region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
+ // Add a cell that will expire in 15 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire in 20 seconds via family setting
+ region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
+
+ // Flush so we are sure store scanning gets this right
+ region.flushcache();
+
+ // A query at time T+0 should return all cells
+ Result r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+5 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+10 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+15 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+20 seconds
+ edge.incrementTime(10000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNull(r.getValue(fam1, q4));
+
+ now = edge.currentTimeMillis();
+
+ // If the cell TTL is specified by a mutation attribute, this should
+ // override any TTLs that may have been passed in as tags.
+
+ // Add a cell that will expire in 5 seconds via mutation attribute
+ Put put = new Put(row).add(new KeyValue(row, fam1, q1, now,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // Include an earlier expiration time as tag, this should be ignored
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(1000L)) } ));
+ put.setTTL(5000L);
+ region.put(put);
+ r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+
+ // Increment time to T+21 seconds
+ edge.incrementTime(1000);
+ r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+
+ // Increment time to T+22 seconds
+ edge.incrementTime(1000);
+ r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+
+ // Increment time to T+25 seconds
+ edge.incrementTime(3000);
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+
+ // Fun with disappearing increments
+
+ // Start at 1
+ region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
+ r = region.get(new Get(row));
+ byte[] val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment with a TTL of 5 seconds
+ Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
+ incr.setTTL(5000);
+ region.increment(incr); // 2
+
+ // New value should be 2
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 2L);
+
+ // Increment time to T+30 seconds
+ edge.incrementTime(5000);
+
+ // Value should be back to 1
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment time to T+35 seconds
+ edge.incrementTime(5000);
+
+ // Original value written at T+25 should be gone now via family TTL
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+
+ } finally {
+ HRegion.closeHRegion(region);
+ }
+ }
+
private static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
index e8f5a5a..a0ba183 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
@@ -90,11 +90,12 @@ public class TestQueryMatcher extends HBaseTestCase {
}
- private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException {
- // 2,4,5
+ private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ // 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, false, 0, rowComparator), get.getFamilyMap().get(fam2),
- EnvironmentEdgeManager.currentTimeMillis() - ttl);
+ now - ttl, now);
List memstore = new ArrayList();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -174,9 +175,10 @@ public class TestQueryMatcher extends HBaseTestCase {
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
+ long now = EnvironmentEdgeManager.currentTimeMillis();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, false, 0, rowComparator), null,
- EnvironmentEdgeManager.currentTimeMillis() - ttl);
+ now - ttl, now);
List memstore = new ArrayList();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -230,7 +232,7 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTimeMillis();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, false, 0, rowComparator), get.getFamilyMap().get(fam2),
- now - testTTL);
+ now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -284,7 +286,7 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTimeMillis();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, false, 0, rowComparator), null,
- now - testTTL);
+ now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -342,7 +344,7 @@ public class TestQueryMatcher extends HBaseTestCase {
NavigableSet cols = get.getFamilyMap().get(fam2);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
- HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
+ HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
List actual =
new ArrayList(rows.length);
byte[] prevRow = null;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
index 3de6ed3..84ada41 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
@@ -451,8 +451,13 @@ public class TestTags {
tags = TestCoprocessorForTags.tags;
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ List tagValues = new ArrayList();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
@@ -508,8 +513,13 @@ public class TestTags {
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ tagValues.clear();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
| | | |