diff --git a/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index dff1af6..f708889 100644
--- a/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
-import com.google.common.primitives.Longs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.HeapSize;
@@ -36,52 +35,63 @@ import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
/**
- * An HBase Key/Value. This is the fundamental HBase Type.
+ * An HBase Key/Value. This is the fundamental HBase Type. Its persisted into
+ * StoreFiles/HFiles in the FileSystem and is what a
+ * {@link org.hadoop.hbase.client.Result} carries from server to HBase client.
*
- *
If being used client-side, the primary methods to access individual fields
+ *
If being used client-side, the primary methods to access KeyValue particles
* are {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()},
* {@link #getTimestamp()}, and {@link #getValue()}. These methods allocate new
* byte arrays and return copies. Avoid their use server-side.
*
- *
Instances of this class are immutable. They do not implement Comparable
- * but Comparators are provided. Comparators change with context,
- * whether user table or a catalog table comparison. Its critical you use the
- * appropriate comparator. There are Comparators for KeyValue instances and
- * then for just the Key portion of a KeyValue used mostly by {@link HFile}.
+ *
Instances of this class are immutable (EXCEPT for the fact that the server
+ * will stamp the KeyValue with an edit sequence number on receipt before the
+ * KV is written to the WAL and added to the memstore). It does not implement
+ * Comparable but Comparators are provided. Context dictates which Comparator
+ * to use. If a user table, use one type. If a catalog table comparison, you'd
+ * use another. Its critical you use the appropriate comparator. There are
+ * also Comparators for KeyValue instances and then for just the Key portion of
+ * a KeyValue used mostly by {@link HFile}.
*
- *
KeyValue wraps a byte array and takes offsets and lengths into passed
+ *
KeyValue takes a byte array with offsets and lengths into passed
* array at where to start interpreting the content as KeyValue. The KeyValue
* format inside a byte array is:
* <keylength> <valuelength> <key> <value>
* Key is further decomposed as:
- * <rowlength> <row> <columnfamilylength> <columnfamily> <columnqualifier> <timestamp> <keytype>
+ * <rowlength> <row> <columnfamilylength> <columnfamily> <columnqualifier> <timestamp> <sequencenumber> <keytype>
* The rowlength maximum is Short.MAX_SIZE,
* column family length maximum is
* Byte.MAX_SIZE, and column qualifier + key length must
- * be < Integer.MAX_SIZE.
- * The column does not contain the family/qualifier delimiter, {@link #COLUMN_FAMILY_DELIMITER}
+ * be < Integer.MAX_SIZE. A KeyValue does not contain the
+ * family/qualifier delimiter, {@link #COLUMN_FAMILY_DELIMITER}.
+ *
+ *
The sequencenumber is an internally maintained edit insertion
+ * order; its used breaking ties where all else in the key portion of two keys
+ * are the same; we'll favor the KeyValue that has a higher sequencenumber.
*/
public class KeyValue implements Writable, HeapSize {
static final Log LOG = LogFactory.getLog(KeyValue.class);
- // TODO: Group Key-only comparators and operations into a Key class, just
- // for neatness sake, if can figure what to call it.
/**
* Colon character in UTF-8
*/
public static final char COLUMN_FAMILY_DELIMITER = ':';
- public static final byte[] COLUMN_FAMILY_DELIM_ARRAY =
- new byte[]{COLUMN_FAMILY_DELIMITER};
+ /**
+ * {@link #COLUMN_FAMILY_DELIMITER} as a byte array.
+ */
+ public static final byte [] COLUMN_FAMILY_DELIM_ARRAY =
+ new byte[] {COLUMN_FAMILY_DELIMITER};
/**
- * Comparator for plain key/values; i.e. non-catalog table key/values.
+ * Comparator for plain {@link KeyValue}s.; i.e. user-space, non-catalog
+ * table KeyValues.
*/
public static KVComparator COMPARATOR = new KVComparator();
/**
- * Comparator for plain key; i.e. non-catalog table key. Works on Key portion
- * of KeyValue only.
+ * Comparator for key portion of plain {@link KeyValue}s; i.e. user-space,
+ * non-catalog table key. Works on Key portion of KeyValue only.
*/
public static KeyComparator KEY_COMPARATOR = new KeyComparator();
@@ -93,7 +103,7 @@ public class KeyValue implements Writable, HeapSize {
/**
* A {@link KVComparator} for .META. catalog table
- * {@link KeyValue} keys.
+ * {@link KeyValue} keys. Works on Key portion of KeyValue only.
*/
public static KeyComparator META_KEY_COMPARATOR = new MetaKeyComparator();
@@ -105,53 +115,66 @@ public class KeyValue implements Writable, HeapSize {
/**
* A {@link KVComparator} for -ROOT- catalog table
- * {@link KeyValue} keys.
+ * {@link KeyValue} keys. Works on Key portion of KeyValue only.
*/
public static KeyComparator ROOT_KEY_COMPARATOR = new RootKeyComparator();
/**
- * Get the appropriate row comparator for the specified table.
- *
- * Hopefully we can get rid of this, I added this here because it's replacing
- * something in HSK. We should move completely off of that.
- *
+ * Get the appropriate KeyValue comparator for the specified table.
* @param tableName The table name.
- * @return The comparator.
+ * @return The KeyValue comparator to use on this table.
*/
public static KeyComparator getRowComparator(byte [] tableName) {
- if(Bytes.equals(HTableDescriptor.ROOT_TABLEDESC.getName(),tableName)) {
+ if (Bytes.equals(HTableDescriptor.ROOT_TABLEDESC.getName(),tableName)) {
return ROOT_COMPARATOR.getRawComparator();
}
- if(Bytes.equals(HTableDescriptor.META_TABLEDESC.getName(), tableName)) {
+ if (Bytes.equals(HTableDescriptor.META_TABLEDESC.getName(), tableName)) {
return META_COMPARATOR.getRawComparator();
}
return COMPARATOR.getRawComparator();
}
- // Size of the timestamp and type byte on end of a key -- a long + a byte.
- public static final int TIMESTAMP_TYPE_SIZE =
+ static final int TIMESTAMP_TYPE_SIZE =
Bytes.SIZEOF_LONG /* timestamp */ +
Bytes.SIZEOF_BYTE /*keytype*/;
- // Size of the length shorts and bytes in key.
- public static final int KEY_INFRASTRUCTURE_SIZE =
- Bytes.SIZEOF_SHORT /*rowlength*/ +
- Bytes.SIZEOF_BYTE /*columnfamilylength*/ +
+ // Size of the timestamp, sequencenumber and the type byte on the end
+ // of the key portion of a KeyValue -- two longs and a byte.
+ static final int TIMESTAMP_SEQID_TYPE_SIZE =
+ Bytes.SIZEOF_LONG /* insertion sequence number */ +
TIMESTAMP_TYPE_SIZE;
// How far into the key the row starts at. First thing to read is the short
- // that says how long the row is.
+ // that says how long the key is, then the data length... then row bytes start.
public static final int ROW_OFFSET =
Bytes.SIZEOF_INT /*keylength*/ +
Bytes.SIZEOF_INT /*valuelength*/;
- // Size of the length ints in a KeyValue datastructure.
+ // Size of the length ints in a KeyValue datastructure; the data and key shorts.
public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET;
/**
- * Key type.
- * Has space for other key types to be added later. Cannot rely on
- * enum ordinals . They change if item is removed or moved. Do our own codes.
+ * Mask used comparing KeyValue Types without regard to KeyValue version.
+ * KeyValue version is kept in the upper two bits of the Type byte; i.e. only
+ * a maximum of 4 versions allowed -- versions 0-3.
+ */
+ private static final byte VERSION_MASK = (byte)0xC0;
+ private static final byte VERSION_MASK_INVERSE = ~VERSION_MASK;
+
+ /**
+ * This KeyValues version in bits that can be OR'd into a Type.
+ * This define will change as KeyValue version evolves. This is what you
+ * change changing the KeyValue version. We are currently version '1'; i.e.
+ * the bit at 0x40 is set. Previous to this there was version '0', none of
+ * the version bits were set.
+ */
+ static final byte VERSION_BITS = (byte)0x40;
+
+ /**
+ * KeyValue type.
+ * Has space for other key types to be added later. Does not rely on
+ * enum ordinals . They change if item is removed or moved. Do our own
+ * explicit codes. Keys are versioned using two most significant bits in Type.
*/
public static enum Type {
Minimum((byte)0),
@@ -162,7 +185,14 @@ public class KeyValue implements Writable, HeapSize {
DeleteFamily((byte)14),
// Maximum is used when searching; you look from maximum on down.
- Maximum((byte)255);
+ Maximum((byte)63);
+
+ // Bit 0x40 and 0x80 are reserved used specifying KeyValue version. If top
+ // two bits zero, then version is 0. If 7th bit is set -- 0x40 -- then we're
+ // version 1. Version is ignored when KV is compared. We define Version
+ // below just to show that the bits are occupied. The version bit twiddling
+ // and compares are done elsewhere, outside of this enum. See VERSION_MASK
+ // and VERSION_BITS defines.
private final byte code;
@@ -181,12 +211,12 @@ public class KeyValue implements Writable, HeapSize {
* @return Type associated with passed code.
*/
public static Type codeToType(final byte b) {
+ byte bWithVersionStripped = stripVersionFromType(b);
for (Type t : Type.values()) {
- if (t.getCode() == b) {
- return t;
- }
+ // When comparing, do not compare on version.
+ if (t.getCode() == bWithVersionStripped) return t;
}
- throw new RuntimeException("Unknown code " + b);
+ throw new RuntimeException("Unknown code " + stripVersionFromType(b));
}
}
@@ -196,49 +226,41 @@ public class KeyValue implements Writable, HeapSize {
* key can be equal or lower than this one in memstore or in store file.
*/
public static final KeyValue LOWESTKEY =
- new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP);
+ new KeyValue(HConstants.EMPTY_BYTE_ARRAY, null, null,
+ HConstants.LATEST_TIMESTAMP, Long.MAX_VALUE, Type.Maximum, null);
+ // Bytes backing this KeyValue.
private byte [] bytes = null;
- private int offset = 0;
- private int length = 0;
-
- // the row cached
- private byte [] rowCache = null;
+ // Offset into bytes.
+ private int offset = 0;
- /** Here be dragons **/
+ // Length KeyValue occupies at offset in bytes.
+ private int length = 0;
- // used to achieve atomic operations in the memstore.
- public long getMemstoreTS() {
- return memstoreTS;
- }
+ // Default value for sequence number.
+ public static final long DEFAULT_SEQUENCE_NUMBER = 0;
- public void setMemstoreTS(long memstoreTS) {
- this.memstoreTS = memstoreTS;
- }
+ // Cache of the sequence number portion of this KV.
+ // The cached items bulk up KV.
+ private long sequenceNumberCache = -1;
- // default value is 0, aka DNC
- private long memstoreTS = 0;
+ // Cache for timestamp long
+ private long timestampCache = -1;
- /** Dragon time over, return to normal business */
+ // Cached reference to the byte array of row content
+ private byte [] rowCache = null;
+ // Cache of keyLength.
+ private int keyLengthCache = -1;
/** Writable Constructor -- DO NOT USE */
public KeyValue() {}
/**
- * Creates a KeyValue from the start of the specified byte array.
- * Presumes bytes content is formatted as a KeyValue blob.
- * @param bytes byte array
- */
- public KeyValue(final byte [] bytes) {
- this(bytes, 0);
- }
-
- /**
* Creates a KeyValue from the specified byte array and offset.
* Presumes bytes content starting at offset is
- * formatted as a KeyValue blob.
+ * formatted as a KeyValue.
* @param bytes byte array
* @param offset offset to start of KeyValue
*/
@@ -248,7 +270,10 @@ public class KeyValue implements Writable, HeapSize {
/**
* Creates a KeyValue from the specified byte array, starting at offset, and
- * for length length.
+ * for length length. Does NOT make a copy of the passed
+ * array.
+ * Presumes bytes content starting at offset is
+ * formatted as a KeyValue.
* @param bytes byte array
* @param offset offset to start of the KeyValue
* @param length length of the KeyValue
@@ -259,29 +284,30 @@ public class KeyValue implements Writable, HeapSize {
this.length = length;
}
- /** Constructors that build a new backing byte array from fields */
-
/**
- * Constructs KeyValue structure filled with null value.
- * Sets type to {@link KeyValue.Type#Maximum}
- * @param row - row key (arbitrary byte array)
+ * Constructs KeyValue filled with a null family, qualifier and
+ * data. Sets type to {@link KeyValue.Type#Maximum}.
+ * @param row Row key
* @param timestamp
+ * @param sequenceNumber
*/
public KeyValue(final byte [] row, final long timestamp) {
this(row, timestamp, Type.Maximum);
}
/**
- * Constructs KeyValue structure filled with null value.
- * @param row - row key (arbitrary byte array)
+ * Constructs KeyValue filled with a null family, qualifier and
+ * data.
+ * @param row Row key
+ * @param type Key Type to use.
* @param timestamp
*/
public KeyValue(final byte [] row, final long timestamp, Type type) {
- this(row, null, null, timestamp, type, null);
+ this(row, null, null, timestamp, DEFAULT_SEQUENCE_NUMBER, type, null);
}
/**
- * Constructs KeyValue structure filled with null value.
+ * Constructs KeyValue filled with null data.
* Sets type to {@link KeyValue.Type#Maximum}
* @param row - row key (arbitrary byte array)
* @param family family name
@@ -289,73 +315,127 @@ public class KeyValue implements Writable, HeapSize {
*/
public KeyValue(final byte [] row, final byte [] family,
final byte [] qualifier) {
- this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
+ this(row, family, qualifier, HConstants.LATEST_TIMESTAMP,
+ DEFAULT_SEQUENCE_NUMBER, Type.Maximum);
}
/**
- * Constructs KeyValue structure filled with null value.
+ * Constructs KeyValue.
+ * Sets type to {@link KeyValue.Type#Put}
* @param row - row key (arbitrary byte array)
* @param family family name
* @param qualifier column qualifier
*/
public KeyValue(final byte [] row, final byte [] family,
final byte [] qualifier, final byte [] value) {
- this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value);
+ this(row, family, qualifier, HConstants.LATEST_TIMESTAMP,
+ DEFAULT_SEQUENCE_NUMBER, Type.Put, value);
}
/**
- * Constructs KeyValue structure filled with specified values.
+ * Constructs KeyValue filled with null data.
* @param row row key
* @param family family name
* @param qualifier column qualifier
- * @param timestamp version timestamp
+ * @param timestamp Timestamp
* @param type key type
* @throws IllegalArgumentException
*/
public KeyValue(final byte[] row, final byte[] family,
final byte[] qualifier, final long timestamp, Type type) {
- this(row, family, qualifier, timestamp, type, null);
+ // Used by tests
+ this(row, family, qualifier, timestamp, DEFAULT_SEQUENCE_NUMBER, type);
+ }
+
+ /**
+ * Constructs KeyValue filled with null data.
+ * @param row row key
+ * @param family family name
+ * @param qualifier column qualifier
+ * @param timestamp Timestamp
+ * @param sequenceNumber
+ * @param type key type
+ * @throws IllegalArgumentException
+ */
+ public KeyValue(final byte[] row, final byte[] family,
+ final byte[] qualifier, final long timestamp,
+ final long sequenceNumber, Type type) {
+ this(row, family, qualifier, timestamp, sequenceNumber, type, null);
}
/**
- * Constructs KeyValue structure filled with specified values.
+ * Constructs KeyValue
* @param row row key
* @param family family name
* @param qualifier column qualifier
- * @param timestamp version timestamp
+ * @param timestamp Timestamp
* @param value column value
* @throws IllegalArgumentException
*/
public KeyValue(final byte[] row, final byte[] family,
final byte[] qualifier, final long timestamp, final byte[] value) {
- this(row, family, qualifier, timestamp, Type.Put, value);
+ // Used by tests.
+ this(row, family, qualifier, timestamp, DEFAULT_SEQUENCE_NUMBER, Type.Put, value);
}
/**
- * Constructs KeyValue structure filled with specified values.
+ * Constructs KeyValue
* @param row row key
* @param family family name
* @param qualifier column qualifier
- * @param timestamp version timestamp
- * @param type key type
+ * @param timestamp Timestamp
+ * @param sequenceNumber
* @param value column value
* @throws IllegalArgumentException
*/
public KeyValue(final byte[] row, final byte[] family,
+ final byte[] qualifier, final long timestamp, final long sequenceNumber,
+ final byte[] value) {
+ this(row, family, qualifier, timestamp, sequenceNumber, Type.Put, value);
+ }
+
+ /**
+ * Constructs KeyValue
+ * @param row row key
+ * @param family family name
+ * @param qualifier column qualifier
+ * @param timestamp Timestamp
+ * @param type key type
+ * @param value column value/data.
+ * @throws IllegalArgumentException
+ */
+ public KeyValue(final byte[] row, final byte[] family,
final byte[] qualifier, final long timestamp, Type type,
final byte[] value) {
+ this(row, family, qualifier, timestamp, DEFAULT_SEQUENCE_NUMBER, type, value);
+ }
+
+ /**
+ * Constructs KeyValue
+ * @param row row key
+ * @param family family name
+ * @param qualifier column qualifier
+ * @param timestamp Timestamp
+ * @param type key type
+ * @param value column value/data.
+ * @throws IllegalArgumentException
+ */
+ public KeyValue(final byte[] row, final byte[] family,
+ final byte[] qualifier, final long timestamp, final long sequenceNumber,
+ Type type, final byte[] value) {
this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
- timestamp, type, value, 0, value==null ? 0 : value.length);
+ timestamp, sequenceNumber, type, value, 0, value==null ? 0 : value.length);
}
/**
- * Constructs KeyValue structure filled with specified values.
+ * Constructs KeyValue
* @param row row key
* @param family family name
* @param qualifier column qualifier
* @param qoffset qualifier offset
* @param qlength qualifier length
- * @param timestamp version timestamp
+ * @param timestamp Timestamp
+ * @param sequenceNumber
* @param type key type
* @param value column value
* @param voffset value offset
@@ -363,16 +443,17 @@ public class KeyValue implements Writable, HeapSize {
* @throws IllegalArgumentException
*/
public KeyValue(byte [] row, byte [] family,
- byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
+ byte [] qualifier, int qoffset, int qlength, long timestamp,
+ final long sequenceNumber, Type type,
byte [] value, int voffset, int vlength) {
this(row, 0, row==null ? 0 : row.length,
family, 0, family==null ? 0 : family.length,
- qualifier, qoffset, qlength, timestamp, type,
+ qualifier, qoffset, qlength, timestamp, sequenceNumber, type,
value, voffset, vlength);
}
/**
- * Constructs KeyValue structure filled with specified values.
+ * Constructs KeyValue.
*
* Column is split into two fields, family and qualifier.
* @param row row key
@@ -384,7 +465,8 @@ public class KeyValue implements Writable, HeapSize {
* @param qualifier column qualifier
* @param qoffset qualifier offset
* @param qlength qualifier length
- * @param timestamp version timestamp
+ * @param timestamp Timestamp
+ * @param sequenceNumber
* @param type key type
* @param value column value
* @param voffset value offset
@@ -394,11 +476,11 @@ public class KeyValue implements Writable, HeapSize {
public KeyValue(final byte [] row, final int roffset, final int rlength,
final byte [] family, final int foffset, final int flength,
final byte [] qualifier, final int qoffset, final int qlength,
- final long timestamp, final Type type,
+ final long timestamp, final long sequenceNumber, final Type type,
final byte [] value, final int voffset, final int vlength) {
this.bytes = createByteArray(row, roffset, rlength,
family, foffset, flength, qualifier, qoffset, qlength,
- timestamp, type, value, voffset, vlength);
+ timestamp, sequenceNumber, type, value, voffset, vlength);
this.length = bytes.length;
this.offset = 0;
}
@@ -415,7 +497,8 @@ public class KeyValue implements Writable, HeapSize {
* @param qualifier column qualifier
* @param qoffset qualifier offset
* @param qlength qualifier length
- * @param timestamp version timestamp
+ * @param timestamp Timestamp
+ * @param sequenceNumber
* @param type key type
* @param value column value
* @param voffset value offset
@@ -425,7 +508,7 @@ public class KeyValue implements Writable, HeapSize {
static byte [] createByteArray(final byte [] row, final int roffset,
final int rlength, final byte [] family, final int foffset, int flength,
final byte [] qualifier, final int qoffset, int qlength,
- final long timestamp, final Type type,
+ final long timestamp, final long sequenceNumber, final Type type,
final byte [] value, final int voffset, int vlength) {
if (rlength > Short.MAX_VALUE) {
throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
@@ -443,8 +526,11 @@ public class KeyValue implements Writable, HeapSize {
if (qlength > Integer.MAX_VALUE - rlength - flength) {
throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
}
- // Key length
- long longkeylength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
+ // Key length. KV infrastructure size is KV version particular; presume
+ // that if we are in this code that the version is that of the current state
+ // of KV.
+ long longkeylength = getKeyInfrastructureLength(getVersion(VERSION_BITS)) +
+ rlength + flength + qlength;
if (longkeylength > Integer.MAX_VALUE) {
throw new IllegalArgumentException("keylength " + longkeylength + " > " +
Integer.MAX_VALUE);
@@ -472,8 +558,12 @@ public class KeyValue implements Writable, HeapSize {
if(qlength != 0) {
pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
}
+ // Timestamp
pos = Bytes.putLong(bytes, pos, timestamp);
- pos = Bytes.putByte(bytes, pos, type.getCode());
+ // Sequencenumber
+ pos = Bytes.putLong(bytes, pos, sequenceNumber);
+ // Type with KV version included.
+ pos = Bytes.putByte(bytes, pos, addVersionToType(type.getCode()));
if (value != null && value.length > 0) {
pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
}
@@ -481,58 +571,42 @@ public class KeyValue implements Writable, HeapSize {
}
/**
- * Write KeyValue format into a byte array.
- *
- * Takes column in the form family:qualifier
- * @param row - row key (arbitrary byte array)
- * @param roffset
- * @param rlength
- * @param column
- * @param coffset
- * @param clength
- * @param timestamp
+ * Decorate passed type with this KVs' version.
* @param type
- * @param value
- * @param voffset
- * @param vlength
- * @return The newly created byte array.
+ * @return Type with Version added.
*/
- static byte [] createByteArray(final byte [] row, final int roffset,
- final int rlength,
- final byte [] column, final int coffset, int clength,
- final long timestamp, final Type type,
- final byte [] value, final int voffset, int vlength) {
- // If column is non-null, figure where the delimiter is at.
- int delimiteroffset = 0;
- if (column != null && column.length > 0) {
- delimiteroffset = getFamilyDelimiterIndex(column, coffset, clength);
- if (delimiteroffset > Byte.MAX_VALUE) {
- throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
- }
- } else {
- return createByteArray(row,roffset,rlength,null,0,0,null,0,0,timestamp,
- type,value,voffset,vlength);
- }
- int flength = delimiteroffset-coffset;
- int qlength = clength - flength - 1;
- return createByteArray(row, roffset, rlength, column, coffset,
- flength, column, delimiteroffset+1, qlength, timestamp, type,
- value, voffset, vlength);
+ private static byte addVersionToType(final byte type) {
+ // First blank out any existing version before adding this KVs.
+ byte b = (byte)(((type & VERSION_MASK_INVERSE)) | VERSION_BITS);
+ return b;
+ }
+
+ static byte stripVersionFromType(final byte b) {
+ return (byte)(b & VERSION_MASK_INVERSE);
+ }
+
+ private static int getTimestampToEndOfKeyLength(final int version) {
+ if (version == 1) return TIMESTAMP_SEQID_TYPE_SIZE;
+ else if (version == 0) return TIMESTAMP_TYPE_SIZE;
+ else throw new IllegalArgumentException("Unexpected version=" + version);
+ }
+
+ private static int getKeyInfrastructureLength(final int version) {
+ return getTimestampToEndOfKeyLength(version) +
+ Bytes.SIZEOF_SHORT /*rowlength*/ +
+ Bytes.SIZEOF_BYTE /*columnfamilylength*/;
}
// Needed doing 'contains' on List. Only compares the key portion, not the
// value.
public boolean equals(Object other) {
- if (!(other instanceof KeyValue)) {
- return false;
- }
+ if (!(other instanceof KeyValue)) return false;
KeyValue kv = (KeyValue)other;
// Comparing bytes should be fine doing equals test. Shouldn't have to
// worry about special .META. comparators doing straight equals.
- boolean result = Bytes.BYTES_RAWCOMPARATOR.compare(getBuffer(),
- getKeyOffset(), getKeyLength(),
- kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength()) == 0;
- return result;
+ return Bytes.BYTES_RAWCOMPARATOR.compare(getBuffer(),
+ getKeyOffset(), getKeyLength(), kv.getBuffer(), kv.getKeyOffset(),
+ kv.getKeyLength()) == 0;
}
public int hashCode() {
@@ -558,12 +632,7 @@ public class KeyValue implements Writable, HeapSize {
public KeyValue clone() {
byte [] b = new byte[this.length];
System.arraycopy(this.bytes, this.offset, b, 0, this.length);
- KeyValue ret = new KeyValue(b, 0, b.length);
- // Important to clone the memstoreTS as well - otherwise memstore's
- // update-in-place methods (eg increment) will end up creating
- // new entries
- ret.setMemstoreTS(memstoreTS);
- return ret;
+ return new KeyValue(b, 0, b.length);
}
/**
@@ -577,13 +646,11 @@ public class KeyValue implements Writable, HeapSize {
/**
* Creates a shallow copy of this KeyValue, reusing the data byte buffer.
- * http://en.wikipedia.org/wiki/Object_copy
+ * @see http://en.wikipedia.org/wiki/Object_copy
* @return Shallow copy of this KeyValue
*/
public KeyValue shallowCopy() {
- KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
- shallowCopy.setMemstoreTS(this.memstoreTS);
- return shallowCopy;
+ return new KeyValue(this.bytes, this.offset, this.length);
}
//---------------------------------------------------------------------------
@@ -617,24 +684,26 @@ public class KeyValue implements Writable, HeapSize {
*/
public static String keyToString(final byte [] b, final int o, final int l) {
if (b == null) return "";
+ byte type = b[o + l - 1];
+ int version = getVersion(type);
+ int infrastructureSize = getTimestampToEndOfKeyLength(version);
int rowlength = Bytes.toShort(b, o);
String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
int familylength = b[columnoffset - 1];
- int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE);
+ int columnlength = l - ((columnoffset - o) + infrastructureSize);
String family = familylength == 0? "":
Bytes.toStringBinary(b, columnoffset, familylength);
String qualifier = columnlength == 0? "":
Bytes.toStringBinary(b, columnoffset + familylength,
columnlength - familylength);
- long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE));
- byte type = b[o + l - 1];
-// return row + "/" + family +
-// (family != null && family.length() > 0? COLUMN_FAMILY_DELIMITER: "") +
-// qualifier + "/" + timestamp + "/" + Type.codeToType(type);
+ long timestamp = Bytes.toLong(b, o + (l - infrastructureSize));
+ long sequenceNumber = version == 0? 0:
+ Bytes.toLong(b, o + (l - infrastructureSize + Bytes.SIZEOF_LONG));
return row + "/" + family +
(family != null && family.length() > 0? ":" :"") +
- qualifier + "/" + timestamp + "/" + Type.codeToType(type);
+ qualifier + "/" + timestamp + "/" + sequenceNumber + "/" +
+ Type.codeToType(type);
}
//---------------------------------------------------------------------------
@@ -664,6 +733,22 @@ public class KeyValue implements Writable, HeapSize {
return length;
}
+ /**
+ * @return This instance's version.
+ */
+ int getVersion() {
+ return getVersion(getTypeByte(getKeyLength()));
+ }
+
+ /**
+ * @param type
+ * @return Version that is in passed type
+ */
+ static int getVersion(final byte type) {
+ // Not public. Version is an internal implementation detail.
+ return (type & VERSION_MASK) >>> 6;
+ }
+
//---------------------------------------------------------------------------
//
// Length and Offset Calculators
@@ -679,8 +764,8 @@ public class KeyValue implements Writable, HeapSize {
*/
private static int getLength(byte [] bytes, int offset) {
return (2 * Bytes.SIZEOF_INT) +
- Bytes.toInt(bytes, offset) +
- Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
+ Bytes.toInt(bytes, offset) +
+ Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
}
/**
@@ -691,19 +776,19 @@ public class KeyValue implements Writable, HeapSize {
}
public String getKeyString() {
+ // Used by tests.
return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength());
}
- /**
- * @return Length of key portion.
- */
- private int keyLength = 0;
-
public int getKeyLength() {
- if (keyLength == 0) {
- keyLength = Bytes.toInt(this.bytes, this.offset);
+ if (keyLengthCache == -1) {
+ keyLengthCache = getKeyLength(this.bytes, this.offset);
}
- return keyLength;
+ return keyLengthCache;
+ }
+
+ static int getKeyLength(final byte [] bytes, final int offset) {
+ return Bytes.toInt(bytes, offset);
}
/**
@@ -780,15 +865,15 @@ public class KeyValue implements Writable, HeapSize {
* @return Qualifier length
*/
public int getQualifierLength() {
- return getQualifierLength(getRowLength(),getFamilyLength());
+ return getQualifierLength(getRowLength(), getFamilyLength());
}
/**
* @return Qualifier length
*/
- public int getQualifierLength(int rlength, int flength) {
+ public int getQualifierLength(int rowLength, int familyLength) {
return getKeyLength() -
- (KEY_INFRASTRUCTURE_SIZE + rlength + flength);
+ (getKeyInfrastructureLength(getVersion()) + rowLength + familyLength);
}
/**
@@ -797,7 +882,7 @@ public class KeyValue implements Writable, HeapSize {
public int getTotalColumnLength() {
int rlength = getRowLength();
int foffset = getFamilyOffset(rlength);
- return getTotalColumnLength(rlength,foffset);
+ return getTotalColumnLength(rlength, foffset);
}
/**
@@ -805,7 +890,7 @@ public class KeyValue implements Writable, HeapSize {
*/
public int getTotalColumnLength(int rlength, int foffset) {
int flength = getFamilyLength(foffset);
- int qlength = getQualifierLength(rlength,flength);
+ int qlength = getQualifierLength(rlength, flength);
return flength + qlength;
}
@@ -820,8 +905,9 @@ public class KeyValue implements Writable, HeapSize {
* @param keylength Pass if you have it to save on a int creation.
* @return Timestamp offset
*/
- public int getTimestampOffset(final int keylength) {
- return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE;
+ private int getTimestampOffset(final int keylength) {
+ return getKeyOffset() + keylength -
+ getTimestampToEndOfKeyLength(getVersion());
}
/**
@@ -846,6 +932,22 @@ public class KeyValue implements Writable, HeapSize {
return false;
}
+ /**
+ * @return Edit Sequence Number offset
+ */
+ int getSequenceNumberOffset() {
+ return getSequenceNumberOffset(getKeyLength());
+ }
+
+ /**
+ * @param keylength Pass if you have it to save on a int creation.
+ * @return Edit Sequence Number offset
+ */
+ private int getSequenceNumberOffset(final int keylength) {
+ return getKeyOffset() + keylength -
+ getTimestampToEndOfKeyLength(getVersion()) + Bytes.SIZEOF_LONG;
+ }
+
//---------------------------------------------------------------------------
//
// Methods that return copies of fields
@@ -900,10 +1002,8 @@ public class KeyValue implements Writable, HeapSize {
}
/**
- *
* @return Timestamp
*/
- private long timestampCache = -1;
public long getTimestamp() {
if (timestampCache == -1) {
timestampCache = getTimestamp(getKeyLength());
@@ -921,6 +1021,41 @@ public class KeyValue implements Writable, HeapSize {
}
/**
+ * @return Edit sequence number
+ */
+ public long getSequenceNumber() {
+ if (this.sequenceNumberCache == -1) {
+ this.sequenceNumberCache = getSequenceNumber(getKeyLength());
+ }
+ return sequenceNumberCache;
+ }
+
+ /**
+ * @param keylength Pass if you have it to save on an int creation.
+ * @return Edit sequence number
+ */
+ long getSequenceNumber(final int keylength) {
+ int version = getVersion();
+ if (version == 0) return 0;
+ int offset = getSequenceNumberOffset(keylength);
+ return Bytes.toLong(this.bytes, offset);
+ }
+
+ /**
+ * Set edit sequence number of this KeyValue. Advanced users only. Set by
+ * the server on receipt of an edit before the edit is added to the WAL or up
+ * into the memstore.
+ * @param sequenceNumber Edit number to use.
+ */
+ public void setSequenceNumber(final long sequenceNumber) {
+ assert getVersion() > 0;
+ long offset = getSequenceNumberOffset();
+ Bytes.toBytes(this.bytes, (int)offset, sequenceNumber);
+ // Clear cache.
+ this.sequenceNumberCache = -1;
+ }
+
+ /**
* @return Type of this KeyValue.
*/
public byte getType() {
@@ -929,10 +1064,19 @@ public class KeyValue implements Writable, HeapSize {
/**
* @param keylength Pass if you have it to save on a int creation.
- * @return Type of this KeyValue.
+ * @return Type of this KeyValue
*/
byte getType(final int keylength) {
- return this.bytes[this.offset + keylength - 1 + ROW_OFFSET];
+ // Strip version from Type before returning.
+ return stripVersionFromType(getTypeByte(keylength));
+ }
+
+ int getTypeByteOffset(final int keylength) {
+ return this.offset + keylength - 1 + ROW_OFFSET;
+ }
+
+ byte getTypeByte(final int keylength) {
+ return this.bytes[getTypeByteOffset(keylength)];
}
/**
@@ -1015,20 +1159,22 @@ public class KeyValue implements Writable, HeapSize {
public static class SplitKeyValue {
private byte [][] split;
SplitKeyValue() {
- this.split = new byte[6][];
+ this.split = new byte[7][];
}
public void setRow(byte [] value) { this.split[0] = value; }
public void setFamily(byte [] value) { this.split[1] = value; }
public void setQualifier(byte [] value) { this.split[2] = value; }
public void setTimestamp(byte [] value) { this.split[3] = value; }
- public void setType(byte [] value) { this.split[4] = value; }
- public void setValue(byte [] value) { this.split[5] = value; }
+ public void setSequenceNumber(byte [] value) { this.split[4] = value; }
+ public void setType(byte [] value) { this.split[5] = value; }
+ public void setValue(byte [] value) { this.split[6] = value; }
public byte [] getRow() { return this.split[0]; }
public byte [] getFamily() { return this.split[1]; }
public byte [] getQualifier() { return this.split[2]; }
public byte [] getTimestamp() { return this.split[3]; }
- public byte [] getType() { return this.split[4]; }
- public byte [] getValue() { return this.split[5]; }
+ public byte [] getSequenceNumber() { return this.split[4]; }
+ public byte [] getType() { return this.split[5]; }
+ public byte [] getValue() { return this.split[6]; }
}
public SplitKeyValue split() {
@@ -1057,14 +1203,22 @@ public class KeyValue implements Writable, HeapSize {
System.arraycopy(bytes, splitOffset, qualifier, 0, colLen);
splitOffset += colLen;
split.setQualifier(qualifier);
+
byte [] timestamp = new byte[Bytes.SIZEOF_LONG];
System.arraycopy(bytes, splitOffset, timestamp, 0, Bytes.SIZEOF_LONG);
splitOffset += Bytes.SIZEOF_LONG;
split.setTimestamp(timestamp);
+
+ byte [] sequenceNumber = new byte[Bytes.SIZEOF_LONG];
+ System.arraycopy(bytes, splitOffset, sequenceNumber, 0, Bytes.SIZEOF_LONG);
+ splitOffset += Bytes.SIZEOF_LONG;
+ split.setSequenceNumber(sequenceNumber);
+
byte [] type = new byte[1];
type[0] = bytes[splitOffset];
splitOffset += Bytes.SIZEOF_BYTE;
split.setType(type);
+
byte [] value = new byte[valLen];
System.arraycopy(bytes, splitOffset, value, 0, valLen);
split.setValue(value);
@@ -1131,18 +1285,6 @@ public class KeyValue implements Writable, HeapSize {
}
/**
- * @param column Column minus its delimiter
- * @return True if column matches.
- */
- public boolean matchingColumnNoDelimiter(final byte [] column) {
- int rl = getRowLength();
- int o = getFamilyOffset(rl);
- int fl = getFamilyLength(o);
- int l = fl + getQualifierLength(rl,fl);
- return Bytes.compareTo(column, 0, column.length, this.bytes, o, l) == 0;
- }
-
- /**
*
* @param family column family
* @param qualifier column qualifier
@@ -1152,7 +1294,7 @@ public class KeyValue implements Writable, HeapSize {
int rl = getRowLength();
int o = getFamilyOffset(rl);
int fl = getFamilyLength(o);
- int ql = getQualifierLength(rl,fl);
+ int ql = getQualifierLength(rl, fl);
if (Bytes.compareTo(family, 0, family.length, this.bytes, o, family.length)
!= 0) {
return false;
@@ -1398,13 +1540,10 @@ public class KeyValue implements Writable, HeapSize {
}
public int compare(final KeyValue left, final KeyValue right) {
- int ret = getRawComparator().compare(left.getBuffer(),
- left.getOffset() + ROW_OFFSET, left.getKeyLength(),
- right.getBuffer(), right.getOffset() + ROW_OFFSET,
- right.getKeyLength());
- if (ret != 0) return ret;
- // Negate this comparison so later edits show up first
- return -Longs.compare(left.getMemstoreTS(), right.getMemstoreTS());
+ return getRawComparator().compare(left.getBuffer(),
+ left.getOffset() + ROW_OFFSET, left.getKeyLength(),
+ right.getBuffer(), right.getOffset() + ROW_OFFSET,
+ right.getKeyLength());
}
public int compareTimestamps(final KeyValue left, final KeyValue right) {
@@ -1595,17 +1734,6 @@ public class KeyValue implements Writable, HeapSize {
}
/**
- * Creates a KeyValue that is last on the specified row id. That is,
- * every other possible KeyValue for the given row would compareTo()
- * less than the result of this call.
- * @param row row key
- * @return Last possible KeyValue on passed row
- */
- public static KeyValue createLastOnRow(final byte[] row) {
- return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
- }
-
- /**
* Create a KeyValue that is smaller than all other possible KeyValues
* for the given row. That is any (valid) KeyValue on 'row' would sort
* _after_ the result.
@@ -1624,23 +1752,8 @@ public class KeyValue implements Writable, HeapSize {
* @param ts - timestamp
* @return First possible key on passed row and timestamp.
*/
- public static KeyValue createFirstOnRow(final byte [] row,
- final long ts) {
- return new KeyValue(row, null, null, ts, Type.Maximum);
- }
-
- /**
- * @param row - row key (arbitrary byte array)
- * @param c column - {@link #parseColumn(byte[])} is called to split
- * the column.
- * @param ts - timestamp
- * @return First possible key on passed row, column and timestamp
- * @deprecated
- */
- public static KeyValue createFirstOnRow(final byte [] row, final byte [] c,
- final long ts) {
- byte [][] split = parseColumn(c);
- return new KeyValue(row, split[0], split[1], ts, Type.Maximum);
+ public static KeyValue createFirstOnRow(final byte [] row, final long ts) {
+ return new KeyValue(row, ts);
}
/**
@@ -1654,19 +1767,7 @@ public class KeyValue implements Writable, HeapSize {
*/
public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
final byte [] qualifier) {
- return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
- }
-
- /**
- * @param row - row key (arbitrary byte array)
- * @param f - family name
- * @param q - column qualifier
- * @param ts - timestamp
- * @return First possible key on passed row, column and timestamp
- */
- public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
- final byte [] q, final long ts) {
- return new KeyValue(row, f, q, ts, Type.Maximum);
+ return new KeyValue(row, family, qualifier);
}
/**
@@ -1690,8 +1791,21 @@ public class KeyValue implements Writable, HeapSize {
final int foffset, final int flength, final byte [] qualifier,
final int qoffset, final int qlength) {
return new KeyValue(row, roffset, rlength, family,
- foffset, flength, qualifier, qoffset, qlength,
- HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
+ foffset, flength, qualifier, qoffset, qlength,
+ HConstants.LATEST_TIMESTAMP, DEFAULT_SEQUENCE_NUMBER, Type.Maximum, null,
+ 0, 0);
+ }
+
+
+ /**
+ * Creates a KeyValue that is last on the specified row id. That is,
+ * every other possible KeyValue for the given row would compareTo()
+ * less than the result of this call.
+ * @param row row key
+ * @return Last possible KeyValue on passed row
+ */
+ public static KeyValue createLastOnRow(final byte[] row) {
+ return createLastOnRow(row, 0, row.length, null, 0, 0, null, 0, 0);
}
/**
@@ -1716,7 +1830,7 @@ public class KeyValue implements Writable, HeapSize {
final int qoffset, final int qlength) {
return new KeyValue(row, roffset, rlength, family,
foffset, flength, qualifier, qoffset, qlength,
- HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
+ HConstants.OLDEST_TIMESTAMP, Long.MIN_VALUE, Type.Minimum, null, 0, 0);
}
/**
@@ -1750,7 +1864,7 @@ public class KeyValue implements Writable, HeapSize {
System.arraycopy(b, o, newb, ROW_OFFSET, l);
Bytes.putInt(newb, 0, b.length);
Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
- return new KeyValue(newb);
+ return new KeyValue(newb, 0);
}
/**
@@ -1875,6 +1989,7 @@ public class KeyValue implements Writable, HeapSize {
*/
public static class KeyComparator implements RawComparator {
volatile boolean ignoreTimestamp = false;
+ volatile boolean ignoreSequenceNumber = false;
volatile boolean ignoreType = false;
public int compare(byte[] left, int loffset, int llength, byte[] right,
@@ -1889,12 +2004,20 @@ public class KeyValue implements Writable, HeapSize {
return compare;
}
+ // Get type now. Will need it later.
+ byte ltype = left[loffset + (llength - 1)];
+ byte rtype = right[roffset + (rlength - 1)];
+ int lversion = getVersion(ltype);
+ int rversion = getVersion(rtype);
+ int linfrastructureSize = getTimestampToEndOfKeyLength(lversion);
+ int rinfrastructureSize = getTimestampToEndOfKeyLength(rversion);
+
// Compare column family. Start compare past row and family length.
int lcolumnoffset = Bytes.SIZEOF_SHORT + lrowlength + 1 + loffset;
int rcolumnoffset = Bytes.SIZEOF_SHORT + rrowlength + 1 + roffset;
- int lcolumnlength = llength - TIMESTAMP_TYPE_SIZE -
+ int lcolumnlength = llength - linfrastructureSize -
(lcolumnoffset - loffset);
- int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE -
+ int rcolumnlength = rlength - rinfrastructureSize -
(rcolumnoffset - roffset);
// if row matches, and no column in the 'left' AND put type is 'minimum',
@@ -1905,13 +2028,12 @@ public class KeyValue implements Writable, HeapSize {
// then we say the left is bigger. This will let us seek to the last key in
// a row.
- byte ltype = left[loffset + (llength - 1)];
- byte rtype = right[roffset + (rlength - 1)];
-
- if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
+ if (lcolumnlength == 0 &&
+ stripVersionFromType(ltype) == Type.Minimum.getCode()) {
return 1; // left is bigger.
}
- if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
+ if (rcolumnlength == 0 &&
+ stripVersionFromType(rtype) == Type.Minimum.getCode()) {
return -1;
}
@@ -1925,19 +2047,35 @@ public class KeyValue implements Writable, HeapSize {
if (!this.ignoreTimestamp) {
// Get timestamps.
long ltimestamp = Bytes.toLong(left,
- loffset + (llength - TIMESTAMP_TYPE_SIZE));
+ loffset + (llength - linfrastructureSize));
long rtimestamp = Bytes.toLong(right,
- roffset + (rlength - TIMESTAMP_TYPE_SIZE));
+ roffset + (rlength - rinfrastructureSize));
compare = compareTimestamps(ltimestamp, rtimestamp);
if (compare != 0) {
return compare;
}
}
+ if (!this.ignoreSequenceNumber) {
+ // Get timestamps.
+ long lseqnum = lversion == 0? 0:
+ Bytes.toLong(left,
+ loffset + (llength - linfrastructureSize + Bytes.SIZEOF_LONG));
+ long rseqnum = rversion == 0? 0:
+ Bytes.toLong(right,
+ roffset + (rlength - rinfrastructureSize + Bytes.SIZEOF_LONG));
+ // Sort same as we do for timestamps where the higher sequence number
+ // sorts before the lower.
+ compare = compareTimestamps(lseqnum, rseqnum);
+ if (compare != 0) {
+ return compare;
+ }
+ }
+
if (!this.ignoreType) {
// Compare types. Let the delete types sort ahead of puts; i.e. types
// of higher numbers sort before those of lesser numbers
- return (0xff & rtype) - (0xff & ltype);
+ return stripVersionFromType(rtype) - stripVersionFromType(ltype);
}
return 0;
}
@@ -1961,7 +2099,7 @@ public class KeyValue implements Writable, HeapSize {
int compareTimestamps(final long ltimestamp, final long rtimestamp) {
// The below older timestamps sorting ahead of newer timestamps looks
// wrong but it is intentional. This way, newer timestamps are first
- // found when we iterate over a memstore and newer versions are the
+ // found when we iterate over a memstore and newer timestamps are the
// first we trip over when reading from a store file.
if (ltimestamp < rtimestamp) {
return 1;
@@ -1978,7 +2116,7 @@ public class KeyValue implements Writable, HeapSize {
ClassSize.align(ClassSize.ARRAY) + ClassSize.align(length) +
(3 * Bytes.SIZEOF_INT) +
ClassSize.align(ClassSize.ARRAY) +
- (2 * Bytes.SIZEOF_LONG));
+ (3 * Bytes.SIZEOF_LONG));
}
// this overload assumes that the length bytes have already been read,
@@ -2001,4 +2139,11 @@ public class KeyValue implements Writable, HeapSize {
out.writeInt(this.length);
out.write(this.bytes, this.offset, this.length);
}
-}
+
+ public static void main(String[] args) {
+ for (int i = 0; i < 1000; i++) {
+ new KeyValue(HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY,
+ HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
index 80b00a7..08c67a8 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
@@ -1003,7 +1003,8 @@ public class HConnectionManager {
Thread.sleep(getPauseTime(tries));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IOException("Giving up trying to get region server: thread is interrupted.");
+ throw new IOException("Giving up after tries=" + tries +
+ ": thread interrupted.");
}
}
return null;
diff --git a/src/main/java/org/apache/hadoop/hbase/client/Put.java b/src/main/java/org/apache/hadoop/hbase/client/Put.java
index 2479b80..536da22 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/Put.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/Put.java
@@ -182,7 +182,7 @@ public class Put implements HeapSize, Writable, Row, Comparable {
*/
private KeyValue createPutKeyValue(byte[] family, byte[] qualifier, long ts,
byte[] value) {
- return new KeyValue(this.row, family, qualifier, ts, KeyValue.Type.Put,
+ return new KeyValue(this.row, family, qualifier, ts, KeyValue.Type.Put,
value);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/client/Result.java b/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 6bdc892..d936517 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -20,14 +20,6 @@
package org.apache.hadoop.hbase.client;
-import com.google.common.collect.Ordering;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.SplitKeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.WritableWithSize;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -39,6 +31,13 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.SplitKeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
/**
* Single row result of a {@link Get} or {@link Scan} query.
*
@@ -394,32 +393,6 @@ public class Result implements Writable, WritableWithSize {
return returnMap;
}
- private Map.Entry getKeyValue(byte[] family, byte[] qualifier) {
- if(this.familyMap == null) {
- getMap();
- }
- if(isEmpty()) {
- return null;
- }
- NavigableMap> qualifierMap =
- familyMap.get(family);
- if(qualifierMap == null) {
- return null;
- }
- NavigableMap versionMap =
- getVersionMap(qualifierMap, qualifier);
- if(versionMap == null) {
- return null;
- }
- return versionMap.firstEntry();
- }
-
- private NavigableMap getVersionMap(
- NavigableMap> qualifierMap, byte [] qualifier) {
- return qualifier != null?
- qualifierMap.get(qualifier): qualifierMap.get(new byte[0]);
- }
-
/**
* Returns the value of the first column in the Result.
* @return value of the first column
diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
index e28e06f..0c8eb36 100644
--- a/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -240,6 +240,7 @@ public class ImportTsv {
parser.getFamily(i), 0, parser.getFamily(i).length,
parser.getQualifier(i), 0, parser.getQualifier(i).length,
ts,
+ KeyValue.DEFAULT_SEQUENCE_NUMBER,
KeyValue.Type.Put,
lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
put.add(kv);
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c760c17..92dba9d 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -139,6 +139,7 @@ public class HRegion implements HeapSize { // , Writable{
static final String MERGEDIR = "merges";
final AtomicBoolean closed = new AtomicBoolean(false);
+
/* Closing can take some time; use the closing flag if there is stuff we don't
* want to do while in closing state; e.g. like offer this region up to the
* master as a region to close if the carrying regionserver is overloaded.
@@ -146,10 +147,6 @@ public class HRegion implements HeapSize { // , Writable{
*/
final AtomicBoolean closing = new AtomicBoolean(false);
- //////////////////////////////////////////////////////////////////////////////
- // Members
- //////////////////////////////////////////////////////////////////////////////
-
private final Set lockedRows =
new TreeSet(Bytes.BYTES_COMPARATOR);
private final Map lockIds =
@@ -164,12 +161,6 @@ public class HRegion implements HeapSize { // , Writable{
private ClassToInstanceMap
protocolHandlers = MutableClassToInstanceMap.create();
- //These variable are just used for getting data out of the region, to test on
- //client side
- // private int numStores = 0;
- // private int [] storeSize = null;
- // private byte [] name = null;
-
final AtomicLong memstoreSize = new AtomicLong(0);
/**
@@ -231,8 +222,7 @@ public class HRegion implements HeapSize { // , Writable{
private final long blockingMemStoreSize;
final long threadWakeFrequency;
// Used to guard closes
- final ReentrantReadWriteLock lock =
- new ReentrantReadWriteLock();
+ final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// Stop updates lock
private final ReentrantReadWriteLock updatesLock =
@@ -240,8 +230,8 @@ public class HRegion implements HeapSize { // , Writable{
private boolean splitRequest;
private byte[] splitPoint = null;
- private final ReadWriteConsistencyControl rwcc =
- new ReadWriteConsistencyControl();
+ private final TransactionManager rwcc =
+ new TransactionManager();
// Coprocessor host
private RegionCoprocessorHost coprocessorHost;
@@ -251,7 +241,10 @@ public class HRegion implements HeapSize { // , Writable{
*/
public final static String REGIONINFO_FILE = ".regioninfo";
+ private final Object closeLock = new Object();
+
/**
+ * Null-arg Constructor.
* Should only be used for testing purposes
*/
public HRegion(){
@@ -466,7 +459,7 @@ public class HRegion implements HeapSize { // , Writable{
}
}
- public ReadWriteConsistencyControl getRWCC() {
+ TransactionManager getRWCC() {
return rwcc;
}
@@ -487,8 +480,6 @@ public class HRegion implements HeapSize { // , Writable{
return close(false);
}
- private final Object closeLock = new Object();
-
/**
* Close down this HRegion. Flush the cache unless abort parameter is true,
* Shut down each HStore, don't service any more calls.
@@ -586,10 +577,6 @@ public class HRegion implements HeapSize { // , Writable{
this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
}
- //////////////////////////////////////////////////////////////////////////////
- // HRegion accessors
- //////////////////////////////////////////////////////////////////////////////
-
/** @return start key for region */
public byte [] getStartKey() {
return this.regionInfo.getStartKey();
@@ -670,13 +657,6 @@ public class HRegion implements HeapSize { // , Writable{
return ret;
}
- //////////////////////////////////////////////////////////////////////////////
- // HRegion maintenance.
- //
- // These methods are meant to be called periodically by the HRegionServer for
- // upkeep.
- //////////////////////////////////////////////////////////////////////////////
-
/** @return returns size of largest HStore. */
public long getLargestHStoreSize() {
long size = 0;
@@ -1075,9 +1055,6 @@ public class HRegion implements HeapSize { // , Writable{
return currentSequenceId;
}
- //////////////////////////////////////////////////////////////////////////////
- // get() methods for client use.
- //////////////////////////////////////////////////////////////////////////////
/**
* Return all the data for the row that matches row exactly,
* or the one that immediately preceeds it, at or immediately before
@@ -1311,6 +1288,8 @@ public class HRegion implements HeapSize { // , Writable{
addFamilyMapToWALEdit(familyMap, walEdit);
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now);
+ } else {
+ this.log.insertSequenceNumber(familyMap);
}
// Now make changes to the memstore.
@@ -1795,6 +1774,8 @@ public class HRegion implements HeapSize { // , Writable{
addFamilyMapToWALEdit(familyMap, walEdit);
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now);
+ } else {
+ this.log.insertSequenceNumber(familyMap);
}
long addedSize = applyFamilyMapToMemstore(familyMap);
@@ -1823,7 +1804,7 @@ public class HRegion implements HeapSize { // , Writable{
* new entries.
*/
private long applyFamilyMapToMemstore(Map> familyMap) {
- ReadWriteConsistencyControl.WriteEntry w = null;
+ TransactionManager.WriteEntry w = null;
long size = 0;
try {
w = rwcc.beginMemstoreInsert();
@@ -1834,12 +1815,11 @@ public class HRegion implements HeapSize { // , Writable{
Store store = getStore(family);
for (KeyValue kv: edits) {
- kv.setMemstoreTS(w.getWriteNumber());
size += store.add(kv);
}
}
} finally {
- rwcc.completeMemstoreInsert(w);
+ rwcc.finish(w);
}
return size;
}
@@ -2336,7 +2316,10 @@ public class HRegion implements HeapSize { // , Writable{
private int batch;
private int isScan;
private boolean filterClosed = false;
- private long readPt;
+ private final long readPoint = rwcc.getReadPoint();
+ /*
+ // Don't show kvs newer than this scanner's read point.
+ if (kv.getSequenceNumber() > this.readPoint) continue;*/
public HRegionInfo getRegionName() {
return regionInfo;
@@ -2354,8 +2337,6 @@ public class HRegion implements HeapSize { // , Writable{
// it is [startRow,endRow) and if startRow=endRow we get nothing.
this.isScan = scan.isGetScan() ? -1 : 0;
- this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
-
List scanners = new ArrayList();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
@@ -2401,7 +2382,7 @@ public class HRegion implements HeapSize { // , Writable{
try {
// This could be a new thread from the last time we called next().
- ReadWriteConsistencyControl.setThreadReadPoint(this.readPt);
+ TransactionManager.setThreadReadPoint(this.readPt);
results.clear();
@@ -3262,10 +3243,9 @@ public class HRegion implements HeapSize { // , Writable{
Get get = new Get(row);
get.addColumn(family, qualifier);
- // we don't want to invoke coprocessor in this case; ICV is wrapped
+ // We don't want to invoke coprocessor in this case; ICV is wrapped
// in HRegionServer, so we leave getLastIncrement alone
List results = getLastIncrement(get);
-
if (!results.isEmpty()) {
KeyValue kv = results.get(0);
byte [] buffer = kv.getBuffer();
@@ -3273,25 +3253,21 @@ public class HRegion implements HeapSize { // , Writable{
result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
}
- // build the KeyValue now:
- KeyValue newKv = new KeyValue(row, family,
- qualifier, EnvironmentEdgeManager.currentTimeMillis(),
- Bytes.toBytes(result));
+ // Build the KeyValue now:
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ KeyValue kv = new KeyValue(row, family, qualifier, now, Bytes.toBytes(result));
- // now log it:
+ // Now log it. this.log.append (or this.log.setSequence) write the
+ // sequence number into the KeyValue edit.
if (writeToWAL) {
- long now = EnvironmentEdgeManager.currentTimeMillis();
WALEdit walEdit = new WALEdit();
- walEdit.add(newKv);
+ walEdit.add(kv);
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now);
+ } else {
+ this.log.insertSequenceNumber(kv);
}
-
- // Now request the ICV to the store, this will set the timestamp
- // appropriately depending on if there is a value in memcache or not.
- // returns the change in the size of the memstore from operation
- long size = store.updateColumnValue(row, family, qualifier, result);
-
+ long size = store.updateColumnValue(kv);
size = this.memstoreSize.addAndGet(size);
flush = isFlushSize(size);
} finally {
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 53ec17c..e91291d 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -23,7 +23,6 @@ package org.apache.hadoop.hbase.regionserver;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.rmi.UnexpectedException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -348,73 +347,25 @@ public class MemStore implements HeapSize {
}
/**
- * Given the specs of a column, update it, first by inserting a new record,
- * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
- * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
- * store will ensure that the insert/delete each are atomic. A scanner/reader will either
- * get the new value, or the old value and all readers will eventually only see the new
- * value after the old was removed.
+ * Update a column value.
*
- * @param row
- * @param family
- * @param qualifier
- * @param newValue
- * @param now
- * @return Timestamp
+ * @param kv
+ * @return Difference in size made by addition of this edit to memstore.
*/
- public long updateColumnValue(byte[] row,
- byte[] family,
- byte[] qualifier,
- long newValue,
- long now) {
+ public long updateColumnValue(final KeyValue kv) {
this.lock.readLock().lock();
try {
- KeyValue firstKv = KeyValue.createFirstOnRow(
- row, family, qualifier);
- // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
- SortedSet snSs = snapshot.tailSet(firstKv);
- if (!snSs.isEmpty()) {
- KeyValue snKv = snSs.first();
- // is there a matching KV in the snapshot?
- if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
- if (snKv.getTimestamp() == now) {
- // poop,
- now += 1;
- }
- }
- }
-
- // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
- // But the timestamp should also be max(now, mostRecentTsInMemstore)
-
- // so we cant add the new KV w/o knowing what's there already, but we also
- // want to take this chance to delete some kvs. So two loops (sad)
-
- SortedSet ss = kvset.tailSet(firstKv);
- Iterator it = ss.iterator();
- while ( it.hasNext() ) {
- KeyValue kv = it.next();
-
- // if this isnt the row we are interested in, then bail:
- if (!firstKv.matchingColumn(family,qualifier) || !firstKv.matchingRow(kv) ) {
- break; // rows dont match, bail.
- }
-
- // if the qualifier matches and it's a put, just RM it out of the kvset.
- if (firstKv.matchingQualifier(kv)) {
- // to be extra safe we only remove Puts that have a memstoreTS==0
- if (kv.getType() == KeyValue.Type.Put.getCode()) {
- now = Math.max(now, kv.getTimestamp());
- }
- }
- }
-
- // create or update (upsert) a new KeyValue with
- // 'now' and a 0 memstoreTS == immediately visible
- return upsert(Arrays.asList(new KeyValue [] {
- new KeyValue(row, family, qualifier, now,
- Bytes.toBytes(newValue))
- }));
+ // There was code in here where we were checking for a timestamp in
+ // advance of ours. If we found one, then we'd use the one in advance
+ // of ours instead of the ts that was in the kv. We were making a new
+ // kv to do this, a kv that would not be the same as whats out in the
+ // WAL. Seems no longer necessary to me now we have sequence number.
+ // I'm removing it from here and adding an assert instead into the upsert
+ // we call that will look for incidence of the kv timestamp being less
+ // than that of one already up in memstore.
+ // We were doing max(now, mostRecentTsInMemstore).
+ // St.Ack 20110113
+ return upsert(kv);
} finally {
this.lock.readLock().unlock();
}
@@ -427,10 +378,6 @@ public class MemStore implements HeapSize {
* value for that row/family/qualifier. If a KeyValue did already exist,
* it will then be removed.
*
- * Currently the memstoreTS is kept at 0 so as each insert happens, it will
- * be immediately visible. May want to change this so it is atomic across
- * all KeyValues.
- *
* This is called under row lock, so Get operations will still see updates
* atomically. Scans will only see each KeyValue update as atomic.
*
@@ -442,7 +389,6 @@ public class MemStore implements HeapSize {
try {
long size = 0;
for (KeyValue kv : kvs) {
- kv.setMemstoreTS(0);
size += upsert(kv);
}
return size;
@@ -453,7 +399,7 @@ public class MemStore implements HeapSize {
/**
* Inserts the specified KeyValue into MemStore and deletes any existing
- * versions of the same row/family/qualifier as the specified KeyValue.
+ * versions of the same row/family/qualifier at the specified KeyValue.
*
* First, the specified KeyValue is inserted into the Memstore.
*
@@ -467,38 +413,40 @@ public class MemStore implements HeapSize {
long addedSize = add(kv);
// Get the KeyValues for the row/family/qualifier regardless of timestamp.
- // For this case we want to clean up any other puts
+ // and remove any old values to help keep a cap on version bloat.
+ // TODO: Do this out-of-band in a background thread; add the cleanup to
+ // a queue for another thread to clean -- St.Ack 20110113.
KeyValue firstKv = KeyValue.createFirstOnRow(
kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
SortedSet ss = kvset.tailSet(firstKv);
Iterator it = ss.iterator();
+ KeyValue firstFound = null;
while ( it.hasNext() ) {
KeyValue cur = it.next();
-
+ if (firstFound != null) firstFound = cur;
if (kv == cur) {
+ if (firstFound != cur) {
+ // If we are not first on row, then somethings up!
+ throw new IllegalStateException("firstFound=" + firstFound +
+ ", cur=" + cur);
+ }
// ignore the one just put in
continue;
}
- // if this isn't the row we are interested in, then bail
- if (!kv.matchingRow(cur)) {
- break;
- }
-
- // if the qualifier matches and it's a put, remove it
- if (kv.matchingQualifier(cur)) {
- // to be extra safe we only remove Puts that have a memstoreTS==0
- if (kv.getType() == KeyValue.Type.Put.getCode() &&
- kv.getMemstoreTS() == 0) {
- // false means there was a change, so give us the size.
- addedSize -= heapSizeChange(kv, true);
- it.remove();
- }
- } else {
- // past the column, done
- break;
+ // if this isn't the row we are interested in, then bail
+ if (!kv.matchingRow(cur)) break;
+
+ // if the qualifier no longer matches, skip out
+ if (!kv.matchingQualifier(cur)) break;
+
+ // If a a Put with matching r/cf/q, then remove.
+ if (kv.getType() == KeyValue.Type.Put.getCode()) {
+ // false means there was a change, so give us the size.
+ addedSize -= heapSizeChange(kv, true);
+ it.remove();
}
}
return addedSize;
@@ -602,21 +550,12 @@ public class MemStore implements HeapSize {
MemStoreScanner() {
super();
-
- //DebugPrint.println(" MS new@" + hashCode());
}
protected KeyValue getNext(Iterator it) {
KeyValue ret = null;
- long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
- //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
-
while (ret == null && it.hasNext()) {
- KeyValue v = it.next();
- if (v.getMemstoreTS() <= readPoint) {
- // keep it.
- ret = v;
- }
+ ret = it.next();
}
return ret;
}
@@ -638,14 +577,6 @@ public class MemStore implements HeapSize {
kvsetNextRow = getNext(kvsetIt);
snapshotNextRow = getNext(snapshotIt);
-
- //long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
- //DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " +
- // kvset.size() + " threadread = " + readPoint);
- //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
- // snapshot.size() + " threadread = " + readPoint);
-
-
KeyValue lowest = getLowest();
// has data := (lowest != null)
@@ -686,9 +617,6 @@ public class MemStore implements HeapSize {
snapshotNextRow = getNext(snapshotIt);
}
- //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
- //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
- // getLowest() + " threadpoint=" + readpoint);
return theNext;
}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index 48dd8e9..3dd3c40 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -93,6 +93,7 @@ public class ScanQueryMatcher {
this.columns = new ExplicitColumnTracker(columns,maxVersions);
}
}
+
public ScanQueryMatcher(Scan scan, byte [] family,
NavigableSet columns, long ttl,
KeyValue.KeyComparator rowComparator, int maxVersions) {
@@ -119,9 +120,7 @@ public class ScanQueryMatcher {
byte [] bytes = kv.getBuffer();
int offset = kv.getOffset();
- int initialOffset = offset;
- int keyLength = Bytes.toInt(bytes, offset, Bytes.SIZEOF_INT);
offset += KeyValue.ROW_OFFSET;
short rowLength = Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT);
@@ -154,8 +153,7 @@ public class ScanQueryMatcher {
byte familyLength = bytes [offset];
offset += familyLength + 1;
- int qualLength = keyLength + KeyValue.ROW_OFFSET -
- (offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE;
+ int qualLength = kv.getQualifierLength(rowLength, familyLength);
long timestamp = kv.getTimestamp();
if (isExpired(timestamp)) {
@@ -163,16 +161,14 @@ public class ScanQueryMatcher {
return getNextRowOrNextColumn(bytes, offset, qualLength);
}
- byte type = kv.getType();
- if (isDelete(type)) {
+ if (kv.isDelete()) {
if (tr.withinOrAfterTimeRange(timestamp)) {
- this.deletes.add(bytes, offset, qualLength, timestamp, type);
+ this.deletes.add(bytes, offset, qualLength, timestamp, kv.getType());
// Can't early out now, because DelFam come before any other keys
}
if (retainDeletesInOutput) {
return MatchCode.INCLUDE;
- }
- else {
+ } else {
return MatchCode.SKIP;
}
}
@@ -273,11 +269,6 @@ public class ScanQueryMatcher {
stickyNextRow = false;
}
- // should be in KeyValue.
- protected boolean isDelete(byte type) {
- return (type != KeyValue.Type.Put.getCode());
- }
-
protected boolean isExpired(long timestamp) {
return (timestamp < oldestStamp);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index ba9733d..caf9354 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -1517,11 +1517,7 @@ public class Store implements HeapSize {
}
/**
- * Increments the value for the given row/family/qualifier.
- *
- * This function will always be seen as atomic by other readers
- * because it only puts a single KV to memstore. Thus no
- * read/write control necessary.
+ * Updates the value at given row/family/qualifier.
*
* @param row
* @param f
@@ -1530,20 +1526,11 @@ public class Store implements HeapSize {
* @return memstore size delta
* @throws IOException
*/
- public long updateColumnValue(byte [] row, byte [] f,
- byte [] qualifier, long newValue)
- throws IOException {
-
+ public long updateColumnValue(final KeyValue kv)
+ throws IOException {
this.lock.readLock().lock();
try {
- long now = EnvironmentEdgeManager.currentTimeMillis();
-
- return this.memstore.updateColumnValue(row,
- f,
- qualifier,
- newValue,
- now);
-
+ return this.memstore.updateColumnValue(kv);
} finally {
this.lock.readLock().unlock();
}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 039915e..761dcd8 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -25,11 +25,9 @@ import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.nio.ByteBuffer;
import java.text.NumberFormat;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedSet;
@@ -666,7 +664,6 @@ public class StoreFile {
private final BloomType bloomType;
private KVComparator kvComparator;
private KeyValue lastKv = null;
- private byte[] lastByteArray = null;
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
* When flushing a memstore, we set TimeRange and use this variable to
@@ -854,18 +851,6 @@ public class StoreFile {
return this.bloomFilter != null;
}
- public void append(final byte [] key, final byte [] value) throws IOException {
- if (this.bloomFilter != null) {
- // only add to the bloom filter on a new row
- if (this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) {
- this.bloomFilter.add(key);
- this.lastByteArray = key;
- }
- }
- writer.append(key, value);
- includeInTimeRangeTracker(key);
- }
-
public void close() throws IOException {
// make sure we wrote something to the bloom before adding it
if (this.bloomFilter != null && this.bloomFilter.getKeyCount() > 0) {
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index d149480..27c9a86 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -59,7 +59,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
* @throws IOException
*/
StoreScanner(Store store, Scan scan, final NavigableSet columns)
- throws IOException {
+ throws IOException {
this.store = store;
this.cacheBlocks = scan.getCacheBlocks();
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
index d3f1c65..a201a54 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
@@ -24,9 +24,7 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
/**
@@ -77,21 +75,6 @@ public class TimeRangeTracker implements Writable {
}
/**
- * Update the current TimestampRange to include the timestamp from Key.
- * If the Key is of type DeleteColumn or DeleteFamily, it includes the
- * entire time range from 0 to timestamp of the key.
- * @param key
- */
- public void includeTimestamp(final byte[] key) {
- includeTimestamp(Bytes.toLong(key,key.length-KeyValue.TIMESTAMP_TYPE_SIZE));
- int type = key[key.length - 1];
- if (type == Type.DeleteColumn.getCode() ||
- type == Type.DeleteFamily.getCode()) {
- includeTimestamp(0);
- }
- }
-
- /**
* If required, update the current TimestampRange to include timestamp
* @param timestamp the timestamp value to include
*/
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index f06d263..b56c21a 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
@@ -144,9 +143,6 @@ public class HLog implements Syncable {
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
final static Object [] NO_ARGS = new Object []{};
- // used to indirectly tell syncFs to force the sync
- private boolean forceSync = false;
-
public interface Reader {
void init(FileSystem fs, Path path, Configuration c) throws IOException;
void close() throws IOException;
@@ -435,7 +431,7 @@ public class HLog implements Syncable {
}
/**
- * @return log sequence number
+ * @return The current log sequence number
*/
public long getSequenceNumber() {
return logSeqNum.get();
@@ -926,6 +922,7 @@ public class HLog implements Syncable {
// actual name.
byte [] hriKey = info.getEncodedNameAsBytes();
this.lastSeqWritten.putIfAbsent(hriKey, seqNum);
+ // Update edits adding in this sequence number
HLogKey logKey = makeKey(hriKey, tableName, seqNum, now);
doWrite(info, logKey, edits);
this.numEntries.incrementAndGet();
@@ -940,6 +937,36 @@ public class HLog implements Syncable {
}
/**
+ * Increment sequence number and add new number to kv.
+ * Use this method when you are skipping the WAL; {@link KeyValues}
+ * still need to be decorated with a valid sequence number though edit is
+ * not in the WAL.
+ * @param kv KeyValue to decorate.
+ */
+ public void insertSequenceNumber(final KeyValue kv) {
+ synchronized (this.updateLock) {
+ kv.setSequenceNumber(obtainSeqNum());
+ }
+ }
+
+ /**
+ * Increment sequence number and add this new number to KeyValues passed in
+ * familyMap.
+ * Use this method when you are skipping the WAL; {@link KeyValues}
+ * still need to be decorated with a valid sequence number though edit is
+ * not in the WAL.
+ * @param familyMap KeyValues to decorate by family.
+ */
+ public void insertSequenceNumber(final Map> familyMap) {
+ synchronized (this.updateLock) {
+ long sequenceNumber = obtainSeqNum();
+ for (List kvs: familyMap.values()) {
+ for (KeyValue kv: kvs) kv.setSequenceNumber(sequenceNumber);
+ }
+ }
+ }
+
+ /**
* This thread is responsible to call syncFs and buffer up the writers while
* it happens.
*/
@@ -947,8 +974,6 @@ public class HLog implements Syncable {
private final long optionalFlushInterval;
- private boolean syncerShuttingDown = false;
-
LogSyncer(long optionalFlushInterval) {
this.optionalFlushInterval = optionalFlushInterval;
}
@@ -969,7 +994,6 @@ public class HLog implements Syncable {
} catch (InterruptedException e) {
LOG.debug(getName() + " interrupted while waiting for sync requests");
} finally {
- syncerShuttingDown = true;
LOG.info(getName() + " exiting");
}
}
diff --git a/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
index aba2c3b..f043935 100644
--- a/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
+++ b/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
@@ -435,12 +435,18 @@ public class Bytes {
*/
public static byte[] toBytes(long val) {
byte [] b = new byte[8];
+ return toBytes(b, 0, val);
+ }
+
+ public static byte [] toBytes(final byte [] buffer, final int offset,
+ final long originalVal) {
+ long val = originalVal;
for (int i = 7; i > 0; i--) {
- b[i] = (byte) val;
+ buffer[offset + i] = (byte)val;
val >>>= 8;
}
- b[0] = (byte) val;
- return b;
+ buffer[offset + 0] = (byte) val;
+ return buffer;
}
/**
diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 883389f..1ffc2a6 100644
--- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.TransactionManager;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@@ -1259,7 +1259,7 @@ public class HBaseTestingUtility {
*/
public static List getFromStoreFile(Store store,
Get get) throws IOException {
- ReadWriteConsistencyControl.resetThreadReadPoint();
+ TransactionManager.resetThreadReadPoint();
Scan scan = new Scan(get);
InternalScanner scanner = (InternalScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getFamily().getName()));
diff --git a/src/test/java/org/apache/hadoop/hbase/KeyValueTestUtil.java b/src/test/java/org/apache/hadoop/hbase/KeyValueTestUtil.java
index 36d768a..aae6246 100644
--- a/src/test/java/org/apache/hadoop/hbase/KeyValueTestUtil.java
+++ b/src/test/java/org/apache/hadoop/hbase/KeyValueTestUtil.java
@@ -22,33 +22,20 @@ package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.util.Bytes;
+/**
+ * Utility for creating {@link KeyValue}
+ */
public class KeyValueTestUtil {
- public static KeyValue create(
- String row,
- String family,
- String qualifier,
- long timestamp,
- String value)
- {
+ public static KeyValue create(String row, String family, String qualifier,
+ long timestamp, String value) {
return create(row, family, qualifier, timestamp, KeyValue.Type.Put, value);
}
- public static KeyValue create(
- String row,
- String family,
- String qualifier,
- long timestamp,
- KeyValue.Type type,
- String value)
- {
- return new KeyValue(
- Bytes.toBytes(row),
- Bytes.toBytes(family),
- Bytes.toBytes(qualifier),
- timestamp,
- type,
- Bytes.toBytes(value)
+ public static KeyValue create(String row, String family, String qualifier,
+ long timestamp, KeyValue.Type type, String value) {
+ return new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family),
+ Bytes.toBytes(qualifier), timestamp, type, Bytes.toBytes(value)
);
}
-}
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java b/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java
index 68fff55..27cc211 100644
--- a/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java
+++ b/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java
@@ -30,10 +30,105 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
public class TestKeyValue extends TestCase {
private final Log LOG = LogFactory.getLog(this.getClass().getName());
+ public void testHandlingOfKeyValueVersionZero() {
+ long ts = System.currentTimeMillis();
+ KeyValue kv = new KeyValue(HConstants.CATALOG_FAMILY,
+ HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts,
+ KeyValue.DEFAULT_SEQUENCE_NUMBER, HConstants.CATALOG_FAMILY);
+ KeyValue kv0 = convertToVersionZero(kv);
+ assertTrue(kv0.getVersion() == 0);
+ assertEquals(kv.getFamilyLength(), kv0.getFamilyLength());
+ assertEquals(kv.getFamilyOffset(), kv0.getFamilyOffset());
+ // Length is same but for absence of the sequence number.
+ assertEquals(kv.getKeyLength(), kv0.getKeyLength() + Bytes.SIZEOF_LONG);
+ assertEquals(kv.getKeyOffset(), kv0.getKeyOffset());
+ assertEquals(kv.getQualifierLength(), kv0.getQualifierLength());
+ assertEquals(kv.getQualifierOffset(), kv0.getQualifierOffset());
+ assertEquals(kv.getRowLength(), kv0.getRowLength());
+ assertEquals("kv.row=" + Bytes.toString(kv.getRow()) + ", kv0.row=" +
+ Bytes.toString(kv0.getRow()), Bytes.toString(kv.getRow()),
+ Bytes.toString(kv0.getRow()));
+ assertEquals(Bytes.toString(kv.getFamily()),
+ Bytes.toString(kv0.getFamily()));
+ assertEquals(Bytes.toString(kv.getQualifier()),
+ Bytes.toString(kv0.getQualifier()));
+ assertEquals(kv.getTimestamp(), kv0.getTimestamp());
+ assertEquals(kv.getType(), kv0.getType());
+ assertEquals(kv.getSequenceNumber(), kv0.getSequenceNumber());
+ // See if these do right thing.
+ assertEquals(kv.toString(), kv0.toString());
+ int compare = KeyValue.COMPARATOR.compare(kv, kv0);
+ assertEquals(0, compare);
+ // Make a kv that differes in sequence number only
+ KeyValue kvBiggerSeqNum = new KeyValue(HConstants.CATALOG_FAMILY,
+ HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts,
+ 1, HConstants.CATALOG_FAMILY);
+ compare = KeyValue.COMPARATOR.compare(kv, kvBiggerSeqNum);
+ assertTrue(compare > 0);
+ compare = KeyValue.COMPARATOR.compare(kvBiggerSeqNum, kv);
+ assertTrue(compare < 0);
+ compare = KeyValue.COMPARATOR.compare(kvBiggerSeqNum, kvBiggerSeqNum);
+ assertEquals(0, compare);
+ compare = KeyValue.COMPARATOR.compare(kv, kv);
+ }
+
+ private KeyValue convertToVersionZero(final KeyValue kv) {
+ int offset = kv.getSequenceNumberOffset();
+ int length = kv.getBuffer().length;
+ byte [] bytes = new byte[length - Bytes.SIZEOF_LONG];
+ System.arraycopy(kv.getBuffer(), kv.getOffset(), bytes, 0, offset);
+ System.arraycopy(kv.getBuffer(), kv.getOffset() + offset + Bytes.SIZEOF_LONG,
+ bytes, offset, length - Bytes.SIZEOF_LONG - offset);
+ // Fix up keylength
+ int keylength = KeyValue.getKeyLength(bytes, 0);
+ keylength -= Bytes.SIZEOF_LONG;
+ Bytes.putInt(bytes, 0, keylength);
+ // Now set the Type to be of type zero.
+ offset = kv.getTypeByteOffset(KeyValue.getKeyLength(bytes, 0));
+ bytes[offset] = KeyValue.stripVersionFromType(bytes[offset]);
+ return new KeyValue(bytes, 0);
+ }
+
+ public void testSequenceNumberCompare() {
+ long ts = System.currentTimeMillis();
+ KeyValue kvlower = new KeyValue(HConstants.CATALOG_FAMILY,
+ HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts, ts,
+ HConstants.CATALOG_FAMILY);
+ KeyValue kvhigher = new KeyValue(HConstants.CATALOG_FAMILY,
+ HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts, ts + 1,
+ HConstants.CATALOG_FAMILY);
+ int compare = KeyValue.COMPARATOR.compare(kvlower, kvhigher);
+ // Compare > 0 because kvhigher must sort before kvlower.
+ Assert.assertTrue(compare > 0);
+ assertEquals(0, KeyValue.COMPARATOR.compare(kvlower, kvlower));
+ assertEquals(0, KeyValue.COMPARATOR.compare(kvhigher, kvhigher));
+ compare = KeyValue.COMPARATOR.compare(kvhigher, kvlower);
+ Assert.assertTrue(compare < 0);
+ }
+
+ public void testSettingSequenceNumber() {
+ long ts = System.currentTimeMillis();
+ KeyValue kv = new KeyValue(HConstants.CATALOG_FAMILY,
+ HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts,
+ HConstants.CATALOG_FAMILY);
+ assertEquals(kv.getSequenceNumber(), KeyValue.DEFAULT_SEQUENCE_NUMBER);
+ kv.setSequenceNumber(ts);
+ assertEquals(kv.getSequenceNumber(), ts);
+ }
+
+ public void testVersion() {
+ int expected = KeyValue.getVersion(KeyValue.VERSION_BITS);
+ KeyValue kv = new KeyValue(Bytes.toBytes("test"),
+ System.currentTimeMillis(), Type.Put);
+ int found = kv.getVersion();
+ assertEquals(expected, found);
+ }
+
public void testColumnCompare() throws Exception {
final byte [] a = Bytes.toBytes("aaa");
byte [] family1 = Bytes.toBytes("abc");
@@ -42,6 +137,7 @@ public class TestKeyValue extends TestCase {
byte [] qualifier2 = Bytes.toBytes("ef");
KeyValue aaa = new KeyValue(a, family1, qualifier1, 0L, Type.Put, a);
+ LOG.info("aaa=" + aaa.toString());
assertFalse(aaa.matchingColumn(family2, qualifier2));
assertTrue(aaa.matchingColumn(family1, qualifier1));
aaa = new KeyValue(a, family2, qualifier2, 0L, Type.Put, a);
diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestResult.java b/src/test/java/org/apache/hadoop/hbase/client/TestResult.java
index becabcf..00ee844 100644
--- a/src/test/java/org/apache/hadoop/hbase/client/TestResult.java
+++ b/src/test/java/org/apache/hadoop/hbase/client/TestResult.java
@@ -20,16 +20,15 @@
package org.apache.hadoop.hbase.client;
-import junit.framework.TestCase;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
-
import static org.apache.hadoop.hbase.HBaseTestCase.assertByteEquals;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
public class TestResult extends TestCase {
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 48a7011..adda9f2 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -1413,12 +1413,12 @@ public class TestHRegion extends HBaseTestCase {
scan.addFamily(fam2);
scan.addFamily(fam4);
is = (RegionScanner) region.getScanner(scan);
- ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
+ TransactionManager.resetThreadReadPoint(region.getRWCC());
assertEquals(1, ((RegionScanner)is).storeHeap.getHeap().size());
scan = new Scan();
is = (RegionScanner) region.getScanner(scan);
- ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
+ TransactionManager.resetThreadReadPoint(region.getRWCC());
assertEquals(families.length -1,
((RegionScanner)is).storeHeap.getHeap().size());
}
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
index 3779114..111c9ed 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
@@ -24,8 +24,6 @@ import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.NavigableSet;
-import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
@@ -35,7 +33,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
-import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@@ -53,12 +50,12 @@ public class TestMemStore extends TestCase {
private static final byte [] CONTENTS = Bytes.toBytes("contents");
private static final byte [] BASIC = Bytes.toBytes("basic");
private static final String CONTENTSTR = "contentstr";
- private ReadWriteConsistencyControl rwcc;
+ private TransactionManager rwcc;
@Override
public void setUp() throws Exception {
super.setUp();
- this.rwcc = new ReadWriteConsistencyControl();
+ this.rwcc = new TransactionManager();
this.memstore = new MemStore();
}
@@ -84,7 +81,7 @@ public class TestMemStore extends TestCase {
List memstorescanners = this.memstore.getScanners();
Scan scan = new Scan();
List result = new ArrayList();
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
this.memstore.comparator, null, memstorescanners);
int count = 0;
@@ -104,7 +101,7 @@ public class TestMemStore extends TestCase {
scanner.close();
}
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
memstorescanners = this.memstore.getScanners();
// Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
@@ -196,7 +193,7 @@ public class TestMemStore extends TestCase {
private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2)
throws IOException {
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
List memstorescanners = this.memstore.getScanners();
assertEquals(1, memstorescanners.size());
final KeyValueScanner scanner = memstorescanners.get(0);
@@ -231,35 +228,35 @@ public class TestMemStore extends TestCase {
final byte[] q2 = Bytes.toBytes("q2");
final byte[] v = Bytes.toBytes("value");
- ReadWriteConsistencyControl.WriteEntry w =
+ TransactionManager.WriteEntry w =
rwcc.beginMemstoreInsert();
KeyValue kv1 = new KeyValue(row, f, q1, v);
- kv1.setMemstoreTS(w.getWriteNumber());
+ kv1.setSequenceNumber(w.getWriteNumber());
memstore.add(kv1);
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
KeyValueScanner s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{});
- rwcc.completeMemstoreInsert(w);
+ rwcc.finish(w);
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv1});
w = rwcc.beginMemstoreInsert();
KeyValue kv2 = new KeyValue(row, f, q2, v);
- kv2.setMemstoreTS(w.getWriteNumber());
+ kv2.setSequenceNumber(w.getWriteNumber());
memstore.add(kv2);
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv1});
- rwcc.completeMemstoreInsert(w);
+ rwcc.finish(w);
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv1, kv2});
}
@@ -279,45 +276,45 @@ public class TestMemStore extends TestCase {
final byte[] v2 = Bytes.toBytes("value2");
// INSERT 1: Write both columns val1
- ReadWriteConsistencyControl.WriteEntry w =
+ TransactionManager.WriteEntry w =
rwcc.beginMemstoreInsert();
KeyValue kv11 = new KeyValue(row, f, q1, v1);
- kv11.setMemstoreTS(w.getWriteNumber());
+ kv11.setSequenceNumber(w.getWriteNumber());
memstore.add(kv11);
KeyValue kv12 = new KeyValue(row, f, q2, v1);
- kv12.setMemstoreTS(w.getWriteNumber());
+ kv12.setSequenceNumber(w.getWriteNumber());
memstore.add(kv12);
- rwcc.completeMemstoreInsert(w);
+ rwcc.finish(w);
// BEFORE STARTING INSERT 2, SEE FIRST KVS
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
KeyValueScanner s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START INSERT 2: Write both columns val2
w = rwcc.beginMemstoreInsert();
KeyValue kv21 = new KeyValue(row, f, q1, v2);
- kv21.setMemstoreTS(w.getWriteNumber());
+ kv21.setSequenceNumber(w.getWriteNumber());
memstore.add(kv21);
KeyValue kv22 = new KeyValue(row, f, q2, v2);
- kv22.setMemstoreTS(w.getWriteNumber());
+ kv22.setSequenceNumber(w.getWriteNumber());
memstore.add(kv22);
// BEFORE COMPLETING INSERT 2, SEE FIRST KVS
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// COMPLETE INSERT 2
- rwcc.completeMemstoreInsert(w);
+ rwcc.finish(w);
// NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS.
// See HBASE-1485 for discussion about what we should do with
// the duplicate-TS inserts
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12});
}
@@ -334,20 +331,20 @@ public class TestMemStore extends TestCase {
final byte[] q2 = Bytes.toBytes("q2");
final byte[] v1 = Bytes.toBytes("value1");
// INSERT 1: Write both columns val1
- ReadWriteConsistencyControl.WriteEntry w =
+ TransactionManager.WriteEntry w =
rwcc.beginMemstoreInsert();
KeyValue kv11 = new KeyValue(row, f, q1, v1);
- kv11.setMemstoreTS(w.getWriteNumber());
+ kv11.setSequenceNumber(w.getWriteNumber());
memstore.add(kv11);
KeyValue kv12 = new KeyValue(row, f, q2, v1);
- kv12.setMemstoreTS(w.getWriteNumber());
+ kv12.setSequenceNumber(w.getWriteNumber());
memstore.add(kv12);
- rwcc.completeMemstoreInsert(w);
+ rwcc.finish(w);
// BEFORE STARTING INSERT 2, SEE FIRST KVS
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
KeyValueScanner s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
@@ -355,19 +352,19 @@ public class TestMemStore extends TestCase {
w = rwcc.beginMemstoreInsert();
KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
KeyValue.Type.DeleteColumn);
- kvDel.setMemstoreTS(w.getWriteNumber());
+ kvDel.setSequenceNumber(w.getWriteNumber());
memstore.add(kvDel);
// BEFORE COMPLETING DELETE, SEE FIRST KVS
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// COMPLETE DELETE
- rwcc.completeMemstoreInsert(w);
+ rwcc.finish(w);
// NOW WE SHOULD SEE DELETE
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12});
}
@@ -381,7 +378,7 @@ public class TestMemStore extends TestCase {
final byte[] f = Bytes.toBytes("family");
final byte[] q1 = Bytes.toBytes("q1");
- final ReadWriteConsistencyControl rwcc;
+ final TransactionManager rwcc;
final MemStore memstore;
AtomicReference caughtException;
@@ -389,7 +386,7 @@ public class TestMemStore extends TestCase {
public ReadOwnWritesTester(int id,
MemStore memstore,
- ReadWriteConsistencyControl rwcc,
+ TransactionManager rwcc,
AtomicReference caughtException)
{
this.rwcc = rwcc;
@@ -408,19 +405,19 @@ public class TestMemStore extends TestCase {
private void internalRun() throws IOException {
for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
- ReadWriteConsistencyControl.WriteEntry w =
+ TransactionManager.WriteEntry w =
rwcc.beginMemstoreInsert();
// Insert the sequence value (i)
byte[] v = Bytes.toBytes(i);
KeyValue kv = new KeyValue(row, f, q1, i, v);
- kv.setMemstoreTS(w.getWriteNumber());
+ kv.setSequenceNumber(w.getWriteNumber());
memstore.add(kv);
- rwcc.completeMemstoreInsert(w);
+ rwcc.finish(w);
// Assert that we can read back
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
KeyValueScanner s = this.memstore.getScanners().get(0);
s.seek(kv);
@@ -898,7 +895,7 @@ public class TestMemStore extends TestCase {
}
public static void main(String [] args) throws IOException {
- ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
+ TransactionManager rwcc = new TransactionManager();
MemStore ms = new MemStore();
long n1 = System.nanoTime();
@@ -908,7 +905,7 @@ public class TestMemStore extends TestCase {
System.out.println("foo");
- ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ TransactionManager.resetThreadReadPoint(rwcc);
for (int i = 0 ; i < 50 ; i++)
doScan(ms, i);
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
index 92075b0..f4827d1 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
@@ -28,10 +28,10 @@ import java.util.concurrent.atomic.AtomicLong;
public class TestReadWriteConsistencyControl extends TestCase {
static class Writer implements Runnable {
final AtomicBoolean finished;
- final ReadWriteConsistencyControl rwcc;
+ final TransactionManager rwcc;
final AtomicBoolean status;
- Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) {
+ Writer(AtomicBoolean finished, TransactionManager rwcc, AtomicBoolean status) {
this.finished = finished;
this.rwcc = rwcc;
this.status = status;
@@ -41,7 +41,7 @@ public class TestReadWriteConsistencyControl extends TestCase {
public void run() {
while (!finished.get()) {
- ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert();
+ TransactionManager.WriteEntry e = rwcc.beginMemstoreInsert();
// System.out.println("Begin write: " + e.getWriteNumber());
// 10 usec - 500usec (including 0)
int sleepTime = rnd.nextInt(500);
@@ -53,7 +53,7 @@ public class TestReadWriteConsistencyControl extends TestCase {
} catch (InterruptedException e1) {
}
try {
- rwcc.completeMemstoreInsert(e);
+ rwcc.finish(e);
} catch (RuntimeException ex) {
// got failure
System.out.println(ex.toString());
@@ -67,7 +67,7 @@ public class TestReadWriteConsistencyControl extends TestCase {
}
public void testParallelism() throws Exception {
- final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
+ final TransactionManager rwcc = new TransactionManager();
final AtomicBoolean finished = new AtomicBoolean(false);
@@ -76,9 +76,9 @@ public class TestReadWriteConsistencyControl extends TestCase {
final AtomicLong failedAt = new AtomicLong();
Runnable reader = new Runnable() {
public void run() {
- long prev = rwcc.memstoreReadPoint();
+ long prev = rwcc.getReadPoint();
while (!finished.get()) {
- long newPrev = rwcc.memstoreReadPoint();
+ long newPrev = rwcc.getReadPoint();
if (newPrev < prev) {
// serious problem.
System.out.println("Reader got out of order, prev: " +
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index de6f097..3eacaf8 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
-import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import com.google.common.base.Joiner;
@@ -306,7 +305,8 @@ public class TestStore extends TestCase {
Bytes.toBytes(oldValue)));
// update during the snapshot.
- long ret = this.store.updateColumnValue(row, family, qf1, newValue);
+ long ret = this.store.updateColumnValue(new KeyValue(row, family, qf1,
+ Bytes.toBytes(newValue)));
// memstore should have grown by some amount.
assertTrue(ret > 0);
@@ -366,8 +366,8 @@ public class TestStore extends TestCase {
for ( int i = 0 ; i < 10000 ; ++i) {
newValue++;
- long ret = this.store.updateColumnValue(row, family, qf1, newValue);
- long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
+ long ret = this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue)));
+ long ret2 = this.store.updateColumnValue(new KeyValue(row2, family, qf1, Bytes.toBytes(newValue)));
if (ret != 0) System.out.println("ret: " + ret);
if (ret2 != 0) System.out.println("ret2: " + ret2);
@@ -406,7 +406,7 @@ public class TestStore extends TestCase {
this.store.snapshot();
// update during the snapshot, the exact same TS as the Put (lololol)
- long ret = this.store.updateColumnValue(row, family, qf1, newValue);
+ long ret = this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue)));
// memstore should have grown by some amount.
assertTrue(ret > 0);
@@ -418,11 +418,11 @@ public class TestStore extends TestCase {
// now increment again:
newValue += 1;
- this.store.updateColumnValue(row, family, qf1, newValue);
+ this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue)));
// at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again:
newValue += 1;
- this.store.updateColumnValue(row, family, qf1, newValue);
+ this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue)));
// the second TS should be TS=2 or higher., even though 'time=1' right now.
@@ -445,7 +445,7 @@ public class TestStore extends TestCase {
mee.setValue(2); // time goes up slightly
newValue += 1;
- this.store.updateColumnValue(row, family, qf1, newValue);
+ this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue)));
results = HBaseTestingUtility.getFromStoreFile(store, get);
assertEquals(2, results.size());
diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java b/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
index e70135f..44ee10a 100644
--- a/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
+++ b/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
@@ -109,6 +109,11 @@ public class TestBytes extends TestCase {
byte [] b = Bytes.toBytes(longs[i]);
assertEquals(longs[i], Bytes.toLong(b));
}
+ byte [] bytes = new byte[16];
+ final int offset = 8;
+ bytes = Bytes.toBytes(bytes, offset, Integer.MAX_VALUE);
+ long result = Bytes.toLong(bytes, offset);
+ assertEquals(Integer.MAX_VALUE, result);
}
public void testToFloat() throws Exception {