From e48f996f77b95712771088d9900320c4ad198c72 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 1 Dec 2014 23:27:48 -0500 Subject: [PATCH] HBASE-10560 Per cell TTLs --- .../org/apache/hadoop/hbase/client/Delete.java | 5 + .../org/apache/hadoop/hbase/client/Mutation.java | 34 +++ .../src/main/java/org/apache/hadoop/hbase/Tag.java | 19 ++ .../main/java/org/apache/hadoop/hbase/TagType.java | 1 + .../apache/hadoop/hbase/mapreduce/CellCreator.java | 35 ++- .../apache/hadoop/hbase/mapreduce/ImportTsv.java | 55 ++++- .../hadoop/hbase/mapreduce/TextSortReducer.java | 24 +- .../hadoop/hbase/mapreduce/TsvImporterMapper.java | 26 ++- .../regionserver/GetClosestRowBeforeTracker.java | 26 ++- .../apache/hadoop/hbase/regionserver/HRegion.java | 255 +++++++++++++++------ .../apache/hadoop/hbase/regionserver/HStore.java | 37 ++- .../hbase/regionserver/ScanQueryMatcher.java | 30 ++- .../hadoop/hbase/regionserver/StoreScanner.java | 12 +- .../hbase/mapreduce/TestImportTSVWithTTLs.java | 171 ++++++++++++++ .../hadoop/hbase/regionserver/TestHRegion.java | 131 +++++++++++ .../hbase/regionserver/TestQueryMatcher.java | 16 +- .../apache/hadoop/hbase/regionserver/TestTags.java | 18 +- hbase-shell/src/main/ruby/hbase/table.rb | 24 +- hbase-shell/src/test/ruby/hbase/table_test.rb | 13 ++ 19 files changed, 813 insertions(+), 119 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java index f1c954f..8a47e44 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java @@ -323,4 +323,9 @@ public class Delete extends Mutation implements Comparable { map.put("ts", this.ts); return map; } + + @Override + public Delete setTTL(long ttl) { + throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported"); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index 869d940..ec41568 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -78,6 +78,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C */ private static final String CONSUMED_CLUSTER_IDS = "_cs.id"; + /** + * The attribute for storing TTL for the result of the mutation. + */ + private static final String OP_ATTRIBUTE_TTL = "_ttl"; + protected byte [] row = null; protected long ts = HConstants.LATEST_TIMESTAMP; protected Durability durability = Durability.USE_DEFAULT; @@ -206,6 +211,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C if (getId() != null) { map.put("id", getId()); } + // Add the TTL if set + // Long.MAX_VALUE is the default, and is interpreted to mean this attribute + // has not been set. + if (getTTL() != Long.MAX_VALUE) { + map.put("ttl", getTTL()); + } return map; } @@ -470,6 +481,29 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C } /** + * Return the TTL requested for the result of the mutation, in milliseconds. + * @return the TTL requested for the result of the mutation, in milliseconds, + * or Long.MAX_VALUE if unset + */ + public long getTTL() { + byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL); + if (ttlBytes != null) { + return Bytes.toLong(ttlBytes); + } + return Long.MAX_VALUE; + } + + /** + * Set the TTL desired for the result of the mutation, in milliseconds. + * @param ttl the TTL desired for the result of the mutation, in milliseconds + * @return this + */ + public Mutation setTTL(long ttl) { + setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl)); + return this; + } + + /** * Subclasses should override this method to add the heap size of their own fields. * @return the heap size to add (will be aligned). */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java index f214edb..31aaef0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java @@ -212,6 +212,25 @@ public class Tag { } /** + * Write a list of tags into a byte array + * @param tags + * @return the serialized tag data as bytes + */ + public static byte[] fromList(List tags) { + int length = 0; + for (Tag tag: tags) { + length += tag.length; + } + byte[] b = new byte[length]; + int pos = 0; + for (Tag tag: tags) { + System.arraycopy(tag.bytes, tag.offset, b, pos, tag.length); + pos += tag.length; + } + return b; + } + + /** * Returns the total length of the entire tag entity */ int getLength() { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java index 45c8476..9ed247e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@ -28,4 +28,5 @@ public final class TagType { public static final byte VISIBILITY_TAG_TYPE = (byte) 2; public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4; + public static final byte TTL_TAG_TYPE = (byte)7; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java index b3dfee7..001f64d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java @@ -69,7 +69,7 @@ public class CellCreator { byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, int vlength) throws IOException { return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength, - timestamp, value, voffset, vlength, null); + timestamp, value, voffset, vlength, (List)null); } /** @@ -90,6 +90,7 @@ public class CellCreator { * @return created Cell * @throws IOException */ + @Deprecated public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, int vlength, String visExpression) throws IOException { @@ -100,4 +101,36 @@ public class CellCreator { return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags); } + + /** + * @param row row key + * @param roffset row offset + * @param rlength row length + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * @param timestamp version timestamp + * @param value column value + * @param voffset value offset + * @param vlength value length + * @param tags + * @return created Cell + * @throws IOException + */ + public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, + byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, + int vlength, List tags) throws IOException { + return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, + qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags); + } + + /** + * @return Visibility expression resolver + */ + public VisibilityExpressionResolver getVisibilityExpressionResolver() { + return this.visExpResolver; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index a1f84bb..6c154f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -121,13 +121,20 @@ public class ImportTsv extends Configured implements Tool { public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY"; + public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL"; + private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX; public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1; public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1; + public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1; + private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + + private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX; + /** * @param columnsSpecification the list of columns to parser out, comma separated. * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC @@ -158,14 +165,18 @@ public class ImportTsv extends Configured implements Tool { timestampKeyColumnIndex = i; continue; } - if(ATTRIBUTES_COLUMN_SPEC.equals(str)) { + if (ATTRIBUTES_COLUMN_SPEC.equals(str)) { attrKeyColumnIndex = i; continue; } - if(CELL_VISIBILITY_COLUMN_SPEC.equals(str)) { + if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) { cellVisibilityColumnIndex = i; continue; } + if (CELL_TTL_COLUMN_SPEC.equals(str)) { + cellTTLColumnIndex = i; + continue; + } String[] parts = str.split(":", 2); if (parts.length == 1) { families[i] = str.getBytes(); @@ -193,6 +204,10 @@ public class ImportTsv extends Configured implements Tool { return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; } + public boolean hasCellTTL() { + return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + } + public int getAttributesKeyColumnIndex() { return attrKeyColumnIndex; } @@ -200,9 +215,15 @@ public class ImportTsv extends Configured implements Tool { public int getCellVisibilityColumnIndex() { return cellVisibilityColumnIndex; } + + public int getCellTTLColumnIndex() { + return cellTTLColumnIndex; + } + public int getRowKeyColumnIndex() { return rowKeyColumnIndex; } + public byte[] getFamily(int idx) { return families[idx]; } @@ -234,8 +255,10 @@ public class ImportTsv extends Configured implements Tool { throw new BadTsvLineException("No timestamp"); } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) { throw new BadTsvLineException("No attributes specified"); - } else if(hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) { + } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) { throw new BadTsvLineException("No cell visibility specified"); + } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) { + throw new BadTsvLineException("No cell TTL specified"); } return new ParsedLine(tabOffsets, lineBytes); } @@ -332,6 +355,31 @@ public class ImportTsv extends Configured implements Tool { } } + public int getCellTTLColumnOffset() { + if (hasCellTTL()) { + return getColumnOffset(cellTTLColumnIndex); + } else { + return DEFAULT_CELL_TTL_COLUMN_INDEX; + } + } + + public int getCellTTLColumnLength() { + if (hasCellTTL()) { + return getColumnLength(cellTTLColumnIndex); + } else { + return DEFAULT_CELL_TTL_COLUMN_INDEX; + } + } + + public long getCellTTL() { + if (!hasCellTTL()) { + return 0; + } else { + return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex), + getColumnLength(cellTTLColumnIndex)); + } + } + public int getColumnOffset(int idx) { if (idx > 0) return tabOffsets.get(idx - 1) + 1; @@ -489,6 +537,7 @@ public class ImportTsv extends Configured implements Tool { if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn) || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn) || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn) + || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn) || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn)) continue; // we are only concerned with the first one (in case this is a cf:cq) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java index 4a0e0fd..b3981a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -28,8 +30,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Reducer; @@ -62,6 +67,9 @@ public class TextSortReducer extends /** Cell visibility expr **/ private String cellVisibilityExpr; + /** Cell TTL */ + private long ttl; + private CellCreator kvCreator; public long getTs() { @@ -148,18 +156,30 @@ public class TextSortReducer extends // Retrieve timestamp if exists ts = parsed.getTimestamp(ts); cellVisibilityExpr = parsed.getCellVisibility(); + ttl = parsed.getCellTTL(); for (int i = 0; i < parsed.getColumnCount(); i++) { if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() - || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) { + || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex() + || i == parser.getCellTTLColumnIndex()) { continue; } // Creating the KV which needs to be directly written to HFiles. Using the Facade // KVCreator for creation of kvs. + List tags = new ArrayList(); + if (cellVisibilityExpr != null) { + tags.addAll(kvCreator.getVisibilityExpressionResolver() + .createVisibilityExpTags(cellVisibilityExpr)); + } + // Add TTL directly to the KV so we can vary them when packing more than one KV + // into puts + if (ttl > 0) { + tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + } Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes, - parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr); + parsed.getColumnOffset(i), parsed.getColumnLength(i), tags); KeyValue kv = KeyValueUtil.ensureKeyValue(cell); kvs.add(kv); curSize += kv.heapSize(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java index ff84081..270de75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java @@ -18,17 +18,22 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; import org.apache.hadoop.hbase.security.visibility.CellVisibility; import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; @@ -59,6 +64,8 @@ extends Mapper protected String cellVisibilityExpr; + protected long ttl; + protected CellCreator kvCreator; private String hfileOutPath; @@ -144,11 +151,13 @@ extends Mapper // Retrieve timestamp if exists ts = parsed.getTimestamp(ts); cellVisibilityExpr = parsed.getCellVisibility(); + ttl = parsed.getCellTTL(); Put put = new Put(rowKey.copyBytes()); for (int i = 0; i < parsed.getColumnCount(); i++) { if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() - || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) { + || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex() + || i == parser.getCellTTLColumnIndex()) { continue; } populatePut(lineBytes, parsed, put, i); @@ -192,13 +201,26 @@ extends Mapper // the validation put.setCellVisibility(new CellVisibility(cellVisibilityExpr)); } + if (ttl > 0) { + put.setTTL(ttl); + } } else { // Creating the KV which needs to be directly written to HFiles. Using the Facade // KVCreator for creation of kvs. + List tags = new ArrayList(); + if (cellVisibilityExpr != null) { + tags.addAll(kvCreator.getVisibilityExpressionResolver() + .createVisibilityExpTags(cellVisibilityExpr)); + } + // Add TTL directly to the KV so we can vary them when packing more than one KV + // into puts + if (ttl > 0) { + tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + } cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i), - parsed.getColumnLength(i), cellVisibilityExpr); + parsed.getColumnLength(i), tags); } put.add(cell); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java index d37aa5a..876aae0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java @@ -24,6 +24,7 @@ import java.util.TreeMap; import java.util.TreeSet; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; @@ -31,7 +32,7 @@ import org.apache.hadoop.hbase.util.Bytes; /** * State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}. - * Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not + * Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not * implement the {@link DeleteTracker} interface since state spans rows (There * is no update nor reset method). */ @@ -39,7 +40,8 @@ import org.apache.hadoop.hbase.util.Bytes; class GetClosestRowBeforeTracker { private final KeyValue targetkey; // Any cell w/ a ts older than this is expired. - private final long oldestts; + private final long now; + private final long oldestUnexpiredTs; private KeyValue candidate = null; private final KVComparator kvcomparator; // Flag for whether we're doing getclosest on a metaregion. @@ -72,20 +74,13 @@ class GetClosestRowBeforeTracker { HConstants.DELIMITER) - this.rowoffset; } this.tablenamePlusDelimiterLength = metaregion? l + 1: -1; - this.oldestts = System.currentTimeMillis() - ttl; + this.now = System.currentTimeMillis(); + this.oldestUnexpiredTs = now - ttl; this.kvcomparator = c; KeyValue.RowOnlyComparator rc = new KeyValue.RowOnlyComparator(this.kvcomparator); this.deletes = new TreeMap>(rc); } - /** - * @param kv - * @return True if this kv is expired. - */ - boolean isExpired(final KeyValue kv) { - return HStore.isExpired(kv, this.oldestts); - } - /* * Add the specified KeyValue to the list of deletes. * @param kv @@ -170,6 +165,15 @@ class GetClosestRowBeforeTracker { return false; } + /** + * @param cell + * @return true if the cell is expired + */ + public boolean isExpired(final Cell cell) { + return cell.getTimestamp() < this.oldestUnexpiredTs || + HStore.isCellTTLExpired(cell, this.oldestUnexpiredTs, this.now); + } + /* * Handle keys whose values hold deletes. * Add to the set of deletes and then if the candidate keys contain any that diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b8efc73..871875a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -84,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.Append; @@ -2468,6 +2471,7 @@ public class HRegion implements HeapSize { // , Writable{ } noOfDeletes++; } + rewriteCellTags(familyMaps[i], mutation); } lock(this.updatesLock.readLock(), numReadyToWrite); @@ -2880,6 +2884,7 @@ public class HRegion implements HeapSize { // , Writable{ closeRegionOperation(); } } + private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException { // Currently this is only called for puts and deletes, so no nonces. OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation }, @@ -2931,6 +2936,59 @@ public class HRegion implements HeapSize { // , Writable{ } } + /** + * Possibly rewrite incoming cell tags. + */ + void rewriteCellTags(Map> familyMap, final Mutation m) { + // Check if we have any work to do and early out otherwise + // Update these checks as more logic is added here + + if (m.getTTL() == Long.MAX_VALUE) { + return; + } + + // From this point we know we have some work to do + + for (Map.Entry> e: familyMap.entrySet()) { + List cells = e.getValue(); + assert cells instanceof RandomAccess; + int listSize = cells.size(); + for (int i = 0; i < listSize; i++) { + Cell cell = cells.get(i); + List newTags = new ArrayList(); + Iterator tagIterator = CellUtil.tagsIterator(cell.getTagsArray(), + cell.getTagsOffset(), cell.getTagsLengthUnsigned()); + + // Carry forward existing tags + + while (tagIterator.hasNext()) { + + // Add any filters or tag specific rewrites here + + newTags.add(tagIterator.next()); + } + + // Cell TTL handling + + // Check again if we need to add a cell TTL because early out logic + // above may change when there are more tag based features in core. + if (m.getTTL() != Long.MAX_VALUE) { + // Add a cell TTL tag + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL()))); + } + + // Rewrite the cell with the updated set of tags + + cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), + cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), + newTags)); + } + } + } + /* * Check if resources to support an update. * @@ -5037,6 +5095,9 @@ public class HRegion implements HeapSize { // , Writable{ processor.preBatchMutate(this, walEdit); // 7. Apply to memstore for (Mutation m : mutations) { + // Handle any tag based cell features + rewriteCellTags(m.getFamilyCellMap(), m); + for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current()); kv.setMvccVersion(writeEntry.getWriteNumber()); @@ -5163,8 +5224,9 @@ public class HRegion implements HeapSize { // , Writable{ return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); } - // TODO: There's a lot of boiler plate code identical - // to increment... See how to better unify that. + // TODO: There's a lot of boiler plate code identical to increment. + // We should refactor append and increment as local get-mutate-put + // transactions, so all stores only go through one code path for puts. /** * Perform one or more append operations on a row. * @@ -5232,67 +5294,111 @@ public class HRegion implements HeapSize { // , Writable{ // Iterate the input columns and update existing values if they were // found, otherwise add new column initialized to the append value - // Avoid as much copying as possible. Every byte is copied at most - // once. + // Avoid as much copying as possible. We may need to rewrite and + // consolidate tags. Bytes are only copied once. // Would be nice if KeyValue had scatter/gather logic int idx = 0; for (Cell cell : family.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - KeyValue newKV; + KeyValue newKv; KeyValue oldKv = null; if (idx < results.size() - && CellUtil.matchingQualifier(results.get(idx),kv)) { + && CellUtil.matchingQualifier(results.get(idx), kv)) { oldKv = KeyValueUtil.ensureKeyValue(results.get(idx)); - // allocate an empty kv once long ts = Math.max(now, oldKv.getTimestamp()); - newKV = new KeyValue(row.length, kv.getFamilyLength(), + + // Process cell tags + List newTags = new ArrayList(); + + // Make a union of the set of tags in the old and new KVs + + if (oldKv.getTagsLengthUnsigned() > 0) { + Iterator i = CellUtil.tagsIterator(oldKv.getTagsArray(), + oldKv.getTagsOffset(), oldKv.getTagsLengthUnsigned()); + while (i.hasNext()) { + newTags.add(i.next()); + } + } + if (kv.getTagsLengthUnsigned() > 0) { + Iterator i = CellUtil.tagsIterator(kv.getTagsArray(), kv.getTagsOffset(), + kv.getTagsLengthUnsigned()); + while (i.hasNext()) { + newTags.add(i.next()); + } + } + + // Cell TTL handling + + if (append.getTTL() != Long.MAX_VALUE) { + // Add the new TTL tag + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL()))); + } + + // Rebuild tags + byte[] tagBytes = Tag.fromList(newTags); + + // allocate an empty cell once + newKv = new KeyValue(row.length, kv.getFamilyLength(), kv.getQualifierLength(), ts, KeyValue.Type.Put, oldKv.getValueLength() + kv.getValueLength(), - oldKv.getTagsLengthUnsigned() + kv.getTagsLengthUnsigned()); - // copy in the value - System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), - newKV.getBuffer(), newKV.getValueOffset(), - oldKv.getValueLength()); - System.arraycopy(kv.getBuffer(), kv.getValueOffset(), - newKV.getBuffer(), - newKV.getValueOffset() + oldKv.getValueLength(), - kv.getValueLength()); - // copy in the tags - System.arraycopy(oldKv.getBuffer(), oldKv.getTagsOffset(), newKV.getBuffer(), - newKV.getTagsOffset(), oldKv.getTagsLengthUnsigned()); - System.arraycopy(kv.getBuffer(), kv.getTagsOffset(), newKV.getBuffer(), - newKV.getTagsOffset() + oldKv.getTagsLengthUnsigned(), - kv.getTagsLengthUnsigned()); + tagBytes.length); // copy in row, family, and qualifier - System.arraycopy(kv.getBuffer(), kv.getRowOffset(), - newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength()); - System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(), - newKV.getBuffer(), newKV.getFamilyOffset(), - kv.getFamilyLength()); - System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), - newKV.getBuffer(), newKV.getQualifierOffset(), - kv.getQualifierLength()); + System.arraycopy(kv.getRowArray(), kv.getRowOffset(), + newKv.getRowArray(), newKv.getRowOffset(), kv.getRowLength()); + System.arraycopy(kv.getFamilyArray(), kv.getFamilyOffset(), + newKv.getFamilyArray(), newKv.getFamilyOffset(), + kv.getFamilyLength()); + System.arraycopy(kv.getQualifierArray(), kv.getQualifierOffset(), + newKv.getQualifierArray(), newKv.getQualifierOffset(), + kv.getQualifierLength()); + // copy in the value + System.arraycopy(oldKv.getValueArray(), oldKv.getValueOffset(), + newKv.getValueArray(), newKv.getValueOffset(), + oldKv.getValueLength()); + System.arraycopy(kv.getValueArray(), kv.getValueOffset(), + newKv.getValueArray(), + newKv.getValueOffset() + oldKv.getValueLength(), + kv.getValueLength()); + // Copy in tag data + System.arraycopy(tagBytes, 0, newKv.getTagsArray(), newKv.getTagsOffset(), + tagBytes.length); idx++; } else { - newKV = kv; // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP, // so only need to update the timestamp to 'now' - newKV.updateLatestStamp(Bytes.toBytes(now)); + kv.updateLatestStamp(Bytes.toBytes(now)); + + // Cell TTL handling + + if (append.getTTL() != Long.MAX_VALUE) { + List newTags = new ArrayList(1); + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL()))); + // Add the new TTL tag + newKv = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), + kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), + kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), + kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()), + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(), + newTags); + } else { + newKv = kv; + } } - newKV.setMvccVersion(w.getWriteNumber()); + newKv.setMvccVersion(w.getWriteNumber()); + // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { - newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( - RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKV)); + newKv = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( + RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKv)); } - kvs.add(newKV); + kvs.add(newKv); // Append update to WAL if (writeToWAL) { if (walEdits == null) { walEdits = new WALEdit(); } - walEdits.add(newKV); + walEdits.add(newKv); } } @@ -5364,6 +5470,9 @@ public class HRegion implements HeapSize { // , Writable{ return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); } + // TODO: There's a lot of boiler plate code identical to append. + // We should refactor append and increment as local get-mutate-put + // transactions, so all stores only go through one code path for puts. /** * Perform one or more increment operations on a row. * @param increment @@ -5432,13 +5541,23 @@ public class HRegion implements HeapSize { // , Writable{ // Iterate the input columns and update existing values if they were // found, otherwise add new column initialized to the increment amount int idx = 0; - for (Cell kv: family.getValue()) { - long amount = Bytes.toLong(CellUtil.cloneValue(kv)); + for (Cell cell: family.getValue()) { + long amount = Bytes.toLong(CellUtil.cloneValue(cell)); boolean noWriteBack = (amount == 0); + List newTags = new ArrayList(); + + // Carry forward any tags that might have been added by a coprocessor + if (cell.getTagsLengthUnsigned() > 0) { + Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), + cell.getTagsOffset(), cell.getTagsLengthUnsigned()); + while (i.hasNext()) { + newTags.add(i.next()); + } + } Cell c = null; long ts = now; - if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) { + if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { c = results.get(idx); ts = Math.max(now, c.getTimestamp()); if(c.getValueLength() == Bytes.SIZEOF_LONG) { @@ -5448,48 +5567,52 @@ public class HRegion implements HeapSize { // , Writable{ throw new org.apache.hadoop.hbase.DoNotRetryIOException( "Attempted to increment field that isn't 64 bits wide"); } + // Carry tags forward from previous version + if (c.getTagsLength() > 0) { + Iterator i = CellUtil.tagsIterator(c.getTagsArray(), + c.getTagsOffset(), c.getTagsLength()); + while (i.hasNext()) { + newTags.add(i.next()); + } + } idx++; } // Append new incremented KeyValue to list - byte[] q = CellUtil.cloneQualifier(kv); + byte[] q = CellUtil.cloneQualifier(cell); byte[] val = Bytes.toBytes(amount); - int oldCellTagsLen = (c == null) ? 0 : c.getTagsLengthUnsigned(); - int incCellTagsLen = kv.getTagsLengthUnsigned(); - KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, ts, - KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen); - System.arraycopy(row, 0, newKV.getBuffer(), newKV.getRowOffset(), row.length); - System.arraycopy(family.getKey(), 0, newKV.getBuffer(), newKV.getFamilyOffset(), - family.getKey().length); - System.arraycopy(q, 0, newKV.getBuffer(), newKV.getQualifierOffset(), q.length); - // copy in the value - System.arraycopy(val, 0, newKV.getBuffer(), newKV.getValueOffset(), val.length); - // copy tags - if (oldCellTagsLen > 0) { - System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getBuffer(), - newKV.getTagsOffset(), oldCellTagsLen); - } - if (incCellTagsLen > 0) { - System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getBuffer(), - newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen); + + // Add the TTL tag if the mutation carried one + if (increment.getTTL() != Long.MAX_VALUE) { + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL()))); } - newKV.setMvccVersion(w.getWriteNumber()); + + KeyValue newKv = new KeyValue(row, 0, row.length, + family.getKey(), 0, family.getKey().length, + q, 0, q.length, + ts, + KeyValue.Type.Put, + val, 0, val.length, + newTags); + + newKv.setMvccVersion(w.getWriteNumber()); + // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { - newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( - RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV)); + newKv = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( + RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKv)); } - allKVs.add(newKV); + allKVs.add(newKv); if (!noWriteBack) { - kvs.add(newKV); + kvs.add(newKv); // Prepare WAL updates if (writeToWAL) { if (walEdits == null) { walEdits = new WALEdit(); } - walEdits.add(newKV); + walEdits.add(newKv); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 39a0677..5ea1a9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -50,6 +50,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; @@ -1597,8 +1600,38 @@ public class HStore implements Store { return wantedVersions > maxVersions ? maxVersions: wantedVersions; } - static boolean isExpired(final KeyValue key, final long oldestTimestamp) { - return key.getTimestamp() < oldestTimestamp; + /** + * @param kv + * @param oldestTimestamp + * @return true if the cell is expired + */ + static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) { + // Do not create an Iterator or Tag objects unless the cell actually has + // tags + if (cell.getTagsLengthUnsigned() > 0) { + // Look for a TTL tag first. Use it instead of the family setting if + // found. If a cell has multiple TTLs, resolve the conflict by using the + // first tag encountered. + Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLengthUnsigned()); + while (i.hasNext()) { + Tag t = i.next(); + if (TagType.TTL_TAG_TYPE == t.getType()) { + // Unlike in schema cell TTLs are stored in milliseconds, no need + // to convert + long ts = cell.getTimestamp(); + assert t.getTagLength() == Bytes.SIZEOF_LONG; + long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength()); + if (ts + ttl < now) { + return true; + } + // Per cell TTLs cannot extend lifetime beyond family settings, so + // fall through to check that + break; + } + } + } + return false; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index 98921f4..2b56edb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -24,6 +24,7 @@ import java.util.NavigableSet; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; @@ -100,6 +101,10 @@ public class ScanQueryMatcher { private final long earliestPutTs; private final long ttl; + /** The oldest timestamp we are interested in, based on TTL */ + private final long oldestUnexpiredTS; + private final long now; + /** readPoint over which the KVs are unconditionally included */ protected long maxReadPointToTrackVersions; @@ -152,7 +157,7 @@ public class ScanQueryMatcher { */ public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns, ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, - RegionCoprocessorHost regionCoprocessorHost) throws IOException { + long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException { this.tr = scan.getTimeRange(); this.rowComparator = scanInfo.getComparator(); this.regionCoprocessorHost = regionCoprocessorHost; @@ -162,6 +167,9 @@ public class ScanQueryMatcher { scanInfo.getFamily()); this.filter = scan.getFilter(); this.earliestPutTs = earliestPutTs; + this.oldestUnexpiredTS = oldestUnexpiredTS; + this.now = now; + this.maxReadPointToTrackVersions = readPointToUse; this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes(); this.ttl = oldestUnexpiredTS; @@ -216,18 +224,18 @@ public class ScanQueryMatcher { * @param scanInfo The store's immutable scan info * @param columns * @param earliestPutTs Earliest put seen in any of the store files. - * @param oldestUnexpiredTS the oldest timestamp we are interested in, - * based on TTL + * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL + * @param now the current server time * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. * @param regionCoprocessorHost * @throws IOException */ public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns, - long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow, + long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow, byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException { this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs, - oldestUnexpiredTS, regionCoprocessorHost); + oldestUnexpiredTS, now, regionCoprocessorHost); Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null)); this.dropDeletesFromRow = dropDeletesFromRow; this.dropDeletesToRow = dropDeletesToRow; @@ -237,10 +245,10 @@ public class ScanQueryMatcher { * Constructor for tests */ ScanQueryMatcher(Scan scan, ScanInfo scanInfo, - NavigableSet columns, long oldestUnexpiredTS) throws IOException { + NavigableSet columns, long oldestUnexpiredTS, long now) throws IOException { this(scan, scanInfo, columns, ScanType.USER_SCAN, Long.MAX_VALUE, /* max Readpoint to track versions */ - HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null); + HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null); } /** @@ -299,7 +307,6 @@ public class ScanQueryMatcher { } } - // optimize case. if (this.stickyNextRow) return MatchCode.SEEK_NEXT_ROW; @@ -322,8 +329,13 @@ public class ScanQueryMatcher { long timestamp = Bytes.toLong(bytes, initialOffset + keyLength - KeyValue.TIMESTAMP_TYPE_SIZE); // check for early out based on timestamp alone if (columns.isDone(timestamp)) { - return columns.getNextRowOrNextColumn(bytes, offset, qualLength); + return columns.getNextRowOrNextColumn(kv.getQualifierArray(), kv.getQualifierOffset(), + kv.getQualifierLength()); } + // check if the cell is expired by cell TTL + if (HStore.isCellTTLExpired(kv, this.oldestUnexpiredTS, this.now)) { + return MatchCode.SKIP; + } /* * The delete logic is pretty complicated now. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index e8d5f1d..853e1bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -76,6 +76,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected final Scan scan; protected final NavigableSet columns; protected final long oldestUnexpiredTS; + protected final long now; protected final int minVersions; /** @@ -121,7 +122,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner explicitColumnQuery = numCol > 0; this.scan = scan; this.columns = columns; - oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl; + this.now = EnvironmentEdgeManager.currentTimeMillis(); + this.oldestUnexpiredTS = now - ttl; this.minVersions = minVersions; if (store != null && ((HStore)store).getHRegion() != null @@ -171,7 +173,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } matcher = new ScanQueryMatcher(scan, scanInfo, columns, ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, - oldestUnexpiredTS, store.getCoprocessorHost()); + oldestUnexpiredTS, now, store.getCoprocessorHost()); this.store.addChangedReaderObserver(this); @@ -236,10 +238,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); if (dropDeletesFromRow == null) { matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint, - earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost()); + earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost()); } else { matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs, - oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); + oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); } // Filter the list of scanners using Bloom filters, time range, TTL, etc. @@ -279,7 +281,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), scanInfo.getMinVersions(), readPt); this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, - Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null); + Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null); // In unit tests, the store could be null if (this.store != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java new file mode 100644 index 0000000..062c05a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestImportTSVWithTTLs implements Configurable { + + protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class); + protected static final String NAME = TestImportTsv.class.getSimpleName(); + protected static HBaseTestingUtility util = new HBaseTestingUtility(); + + /** + * Delete the tmp directory after running doMROnTableTest. Boolean. Default is + * false. + */ + protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; + + /** + * Force use of combiner in doMROnTableTest. Boolean. Default is true. + */ + protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; + + private final String FAMILY = "FAM"; + private static Configuration conf; + + @Override + public Configuration getConf() { + return util.getConfiguration(); + } + + @Override + public void setConf(Configuration conf) { + throw new IllegalArgumentException("setConf not supported"); + } + + @BeforeClass + public static void provisionCluster() throws Exception { + conf = util.getConfiguration(); + // We don't check persistence in HFiles in this test, but if we ever do we will + // need this where the default hfile version is not 3 (i.e. 0.98) + conf.setInt("hfile.format.version", 3); + conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName()); + util.startMiniCluster(); + util.startMiniMapReduceCluster(); + } + + @AfterClass + public static void releaseCluster() throws Exception { + util.shutdownMiniMapReduceCluster(); + util.shutdownMiniCluster(); + } + + @Test + public void testMROnTable() throws Exception { + String tableName = "test-" + UUID.randomUUID(); + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + util.deleteTable(tableName); + } + + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, + String[] args, int valueMultiplier) throws Exception { + TableName table = TableName.valueOf(args[args.length - 1]); + Configuration conf = new Configuration(util.getConfiguration()); + + // populate input file + FileSystem fs = FileSystem.get(conf); + Path inputPath = fs.makeQualified(new Path(util + .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); + FSDataOutputStream op = fs.create(inputPath, true); + op.write(Bytes.toBytes(data)); + op.close(); + LOG.debug(String.format("Wrote test data to file: %s", inputPath)); + + if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { + LOG.debug("Forcing combiner."); + conf.setInt("mapreduce.map.combine.minspills", 1); + } + + // run the import + List argv = new ArrayList(Arrays.asList(args)); + argv.add(inputPath.toString()); + Tool tool = new ImportTsv(); + LOG.debug("Running ImportTsv with arguments: " + argv); + try { + // Job will fail if observer rejects entries without TTL + assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); + } finally { + // Clean up + if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { + LOG.debug("Deleting test subdirectory"); + util.cleanupDataTestDirOnTestFS(table.getNameAsString()); + } + } + + return tool; + } + + public static class TTLCheckingObserver extends BaseRegionObserver { + + @Override + public void prePut(ObserverContext e, Put put, WALEdit edit, + Durability durability) throws IOException { + HRegion region = e.getEnvironment().getRegion(); + if (!region.getRegionInfo().isMetaTable() + && !region.getRegionInfo().getTable().isSystemTable()) { + // The put carries the TTL attribute + if (put.getTTL() != Long.MAX_VALUE) { + return; + } + throw new IOException("Operation does not have TTL set"); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a26aeb5..212e277 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; @@ -88,6 +89,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; @@ -109,6 +111,7 @@ import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -127,6 +130,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; @@ -5074,6 +5078,133 @@ public class TestHRegion { } } + @Test + public void testCellTTLs() throws IOException { + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + + final byte[] row = Bytes.toBytes("testRow"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs")); + HColumnDescriptor hcd = new HColumnDescriptor(fam1); + hcd.setTimeToLive(10); // 10 seconds + htd.addFamily(hcd); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); + + HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY), + TEST_UTIL.getDataTestDir(), conf, htd); + assertNotNull(region); + try { + long now = EnvironmentEdgeManager.currentTimeMillis(); + // Add a cell that will expire in 5 seconds via cell TTL + region.put(new Put(row).add(new KeyValue(row, fam1, q1, now, + HConstants.EMPTY_BYTE_ARRAY, new Tag[] { + // TTL tags specify ts in milliseconds + new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } ))); + // Add a cell that will expire after 10 seconds via family setting + region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY)); + // Add a cell that will expire in 15 seconds via cell TTL + region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1, + HConstants.EMPTY_BYTE_ARRAY, new Tag[] { + // TTL tags specify ts in milliseconds + new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } ))); + // Add a cell that will expire in 20 seconds via family setting + region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY)); + + // Flush so we are sure store scanning gets this right + region.flushcache(); + + // A query at time T+0 should return all cells + Result r = region.get(new Get(row)); + assertNotNull(r.getValue(fam1, q1)); + assertNotNull(r.getValue(fam1, q2)); + assertNotNull(r.getValue(fam1, q3)); + assertNotNull(r.getValue(fam1, q4)); + + // Increment time to T+5 seconds + edge.incrementTime(5000); + + r = region.get(new Get(row)); + assertNull(r.getValue(fam1, q1)); + assertNotNull(r.getValue(fam1, q2)); + assertNotNull(r.getValue(fam1, q3)); + assertNotNull(r.getValue(fam1, q4)); + + // Increment time to T+10 seconds + edge.incrementTime(5000); + + r = region.get(new Get(row)); + assertNull(r.getValue(fam1, q1)); + assertNull(r.getValue(fam1, q2)); + assertNotNull(r.getValue(fam1, q3)); + assertNotNull(r.getValue(fam1, q4)); + + // Increment time to T+15 seconds + edge.incrementTime(5000); + + r = region.get(new Get(row)); + assertNull(r.getValue(fam1, q1)); + assertNull(r.getValue(fam1, q2)); + assertNull(r.getValue(fam1, q3)); + assertNotNull(r.getValue(fam1, q4)); + + // Increment time to T+20 seconds + edge.incrementTime(10000); + + r = region.get(new Get(row)); + assertNull(r.getValue(fam1, q1)); + assertNull(r.getValue(fam1, q2)); + assertNull(r.getValue(fam1, q3)); + assertNull(r.getValue(fam1, q4)); + + // Fun with disappearing increments + + // Start at 1 + region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L))); + r = region.get(new Get(row)); + byte[] val = r.getValue(fam1, q1); + assertNotNull(val); + assertEquals(Bytes.toLong(val), 1L); + + // Increment with a TTL of 5 seconds + Increment incr = new Increment(row).addColumn(fam1, q1, 1L); + incr.setTTL(5000); + region.increment(incr); // 2 + + // New value should be 2 + r = region.get(new Get(row)); + val = r.getValue(fam1, q1); + assertNotNull(val); + assertEquals(Bytes.toLong(val), 2L); + + // Increment time to T+25 seconds + edge.incrementTime(5000); + + // Value should be back to 1 + r = region.get(new Get(row)); + val = r.getValue(fam1, q1); + assertNotNull(val); + assertEquals(Bytes.toLong(val), 1L); + + // Increment time to T+30 seconds + edge.incrementTime(5000); + + // Original value written at T+20 should be gone now via family TTL + r = region.get(new Get(row)); + assertNull(r.getValue(fam1, q1)); + + } finally { + HRegion.closeHRegion(region); + } + } + private static HRegion initHRegion(byte[] tableName, String callingMethod, byte[]... families) throws IOException { return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java index aafa710..2ef9838 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java @@ -91,11 +91,12 @@ public class TestQueryMatcher extends HBaseTestCase { } - private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException { - // 2,4,5 + private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException { + long now = EnvironmentEdgeManager.currentTimeMillis(); + // 2,4,5 ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2), - EnvironmentEdgeManager.currentTimeMillis() - ttl); + now - ttl, now); List memstore = new ArrayList(); memstore.add(new KeyValue(row1, fam2, col1, 1, data)); @@ -175,9 +176,10 @@ public class TestQueryMatcher extends HBaseTestCase { expected.add(ScanQueryMatcher.MatchCode.INCLUDE); expected.add(ScanQueryMatcher.MatchCode.DONE); + long now = EnvironmentEdgeManager.currentTimeMillis(); ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null, - EnvironmentEdgeManager.currentTimeMillis() - ttl); + now - ttl, now); List memstore = new ArrayList(); memstore.add(new KeyValue(row1, fam2, col1, 1, data)); @@ -231,7 +233,7 @@ public class TestQueryMatcher extends HBaseTestCase { long now = EnvironmentEdgeManager.currentTimeMillis(); ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2), - now - testTTL); + now - testTTL, now); KeyValue [] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now-100, data), @@ -285,7 +287,7 @@ public class TestQueryMatcher extends HBaseTestCase { long now = EnvironmentEdgeManager.currentTimeMillis(); ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null, - now - testTTL); + now - testTTL, now); KeyValue [] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now-100, data), @@ -343,7 +345,7 @@ public class TestQueryMatcher extends HBaseTestCase { NavigableSet cols = get.getFamilyMap().get(fam2); ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE, - HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null); + HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null); List actual = new ArrayList(rows.length); byte[] prevRow = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java index f218a43..b00e988 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java @@ -414,8 +414,13 @@ public class TestTags { tags = TestCoprocessorForTags.tags; assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); assertEquals(2, tags.size()); - assertEquals("tag1", Bytes.toString(tags.get(0).getValue())); - assertEquals("tag2", Bytes.toString(tags.get(1).getValue())); + // We cannot assume the ordering of tags + List tagValues = new ArrayList(); + for (Tag tag: tags) { + tagValues.add(Bytes.toString(tag.getValue())); + } + assertTrue(tagValues.contains("tag1")); + assertTrue(tagValues.contains("tag2")); TestCoprocessorForTags.checkTagPresence = false; TestCoprocessorForTags.tags = null; @@ -471,8 +476,13 @@ public class TestTags { kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); tags = TestCoprocessorForTags.tags; assertEquals(2, tags.size()); - assertEquals("tag1", Bytes.toString(tags.get(0).getValue())); - assertEquals("tag2", Bytes.toString(tags.get(1).getValue())); + // We cannot assume the ordering of tags + tagValues.clear(); + for (Tag tag: tags) { + tagValues.add(Bytes.toString(tag.getValue())); + } + assertTrue(tagValues.contains("tag1")); + assertTrue(tagValues.contains("tag2")); TestCoprocessorForTags.checkTagPresence = false; TestCoprocessorForTags.tags = null; diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb index 3fe006b..10b754a 100644 --- a/hbase-shell/src/main/ruby/hbase/table.rb +++ b/hbase-shell/src/main/ruby/hbase/table.rb @@ -136,17 +136,19 @@ EOF set_attributes(p, attributes) if attributes visibility = args[VISIBILITY] set_cell_visibility(p, visibility) if visibility + ttl = args[TTL] + set_op_ttl(p, ttl) if ttl end #Case where attributes are specified without timestamp if timestamp.kind_of?(Hash) timestamp.each do |k, v| - if v.kind_of?(Hash) - set_attributes(p, v) if v - end - if v.kind_of?(String) - set_cell_visibility(p, v) if v - end - + if k == 'ATTRIBUTES' + set_attributes(p, v) + elsif k == 'VISIBILITY' + set_cell_visibility(p, v) + elsif k == "TTL" + set_op_ttl(p, v) + end end timestamp = nil end @@ -214,6 +216,8 @@ EOF visibility = args[VISIBILITY] set_attributes(incr, attributes) if attributes set_cell_visibility(incr, visibility) if visibility + ttl = args[TTL] + set_op_ttl(incr, ttl) if ttl end incr.addColumn(family, qualifier, value) @table.increment(incr) @@ -232,6 +236,8 @@ EOF visibility = args[VISIBILITY] set_attributes(append, attributes) if attributes set_cell_visibility(append, visibility) if visibility + ttl = args[TTL] + set_op_ttl(append, ttl) if ttl end append.add(family, qualifier, value.to_s.to_java_bytes) @table.append(append) @@ -532,6 +538,10 @@ EOF auths.to_java(:string))) end + def set_op_ttl(op, ttl) + op.setTTL(ttl.to_java(:long)) + end + #---------------------------- # Add general administration utilities to the shell # each of the names below adds this method name to the table diff --git a/hbase-shell/src/test/ruby/hbase/table_test.rb b/hbase-shell/src/test/ruby/hbase/table_test.rb index 7272229..fa2990d 100644 --- a/hbase-shell/src/test/ruby/hbase/table_test.rb +++ b/hbase-shell/src/test/ruby/hbase/table_test.rb @@ -530,5 +530,18 @@ module Hbase end end + define_test "mutation with TTL should expire" do + @test_table.put('ttlTest', 'x:a', 'foo', { TTL => 1000 } ) + begin + res = @test_table._get_internal('ttlTest', 'x:a') + assert_not_nil(res) + sleep 2 + res = @test_table._get_internal('ttlTest', 'x:a') + assert_nil(res) + ensure + @test_table.delete('ttlTest', 'x:a') + end + end + end end -- 1.7.12.4 (Apple Git-37)