diff --git a/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/src/main/java/org/apache/hadoop/hbase/KeyValue.java index dff1af6..f708889 100644 --- a/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Comparator; -import com.google.common.primitives.Longs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.io.HeapSize; @@ -36,52 +35,63 @@ import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Writable; /** - * An HBase Key/Value. This is the fundamental HBase Type. + * An HBase Key/Value. This is the fundamental HBase Type. Its persisted into + * StoreFiles/HFiles in the FileSystem and is what a + * {@link org.hadoop.hbase.client.Result} carries from server to HBase client. * - *

If being used client-side, the primary methods to access individual fields + *

If being used client-side, the primary methods to access KeyValue particles * are {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()}, * {@link #getTimestamp()}, and {@link #getValue()}. These methods allocate new * byte arrays and return copies. Avoid their use server-side. * - *

Instances of this class are immutable. They do not implement Comparable - * but Comparators are provided. Comparators change with context, - * whether user table or a catalog table comparison. Its critical you use the - * appropriate comparator. There are Comparators for KeyValue instances and - * then for just the Key portion of a KeyValue used mostly by {@link HFile}. + *

Instances of this class are immutable (EXCEPT for the fact that the server + * will stamp the KeyValue with an edit sequence number on receipt before the + * KV is written to the WAL and added to the memstore). It does not implement + * Comparable but Comparators are provided. Context dictates which Comparator + * to use. If a user table, use one type. If a catalog table comparison, you'd + * use another. Its critical you use the appropriate comparator. There are + * also Comparators for KeyValue instances and then for just the Key portion of + * a KeyValue used mostly by {@link HFile}. * - *

KeyValue wraps a byte array and takes offsets and lengths into passed + *

KeyValue takes a byte array with offsets and lengths into passed * array at where to start interpreting the content as KeyValue. The KeyValue * format inside a byte array is: * <keylength> <valuelength> <key> <value> * Key is further decomposed as: - * <rowlength> <row> <columnfamilylength> <columnfamily> <columnqualifier> <timestamp> <keytype> + * <rowlength> <row> <columnfamilylength> <columnfamily> <columnqualifier> <timestamp> <sequencenumber> <keytype> * The rowlength maximum is Short.MAX_SIZE, * column family length maximum is * Byte.MAX_SIZE, and column qualifier + key length must - * be < Integer.MAX_SIZE. - * The column does not contain the family/qualifier delimiter, {@link #COLUMN_FAMILY_DELIMITER} + * be < Integer.MAX_SIZE. A KeyValue does not contain the + * family/qualifier delimiter, {@link #COLUMN_FAMILY_DELIMITER}. + * + *

The sequencenumber is an internally maintained edit insertion + * order; its used breaking ties where all else in the key portion of two keys + * are the same; we'll favor the KeyValue that has a higher sequencenumber. */ public class KeyValue implements Writable, HeapSize { static final Log LOG = LogFactory.getLog(KeyValue.class); - // TODO: Group Key-only comparators and operations into a Key class, just - // for neatness sake, if can figure what to call it. /** * Colon character in UTF-8 */ public static final char COLUMN_FAMILY_DELIMITER = ':'; - public static final byte[] COLUMN_FAMILY_DELIM_ARRAY = - new byte[]{COLUMN_FAMILY_DELIMITER}; + /** + * {@link #COLUMN_FAMILY_DELIMITER} as a byte array. + */ + public static final byte [] COLUMN_FAMILY_DELIM_ARRAY = + new byte[] {COLUMN_FAMILY_DELIMITER}; /** - * Comparator for plain key/values; i.e. non-catalog table key/values. + * Comparator for plain {@link KeyValue}s.; i.e. user-space, non-catalog + * table KeyValues. */ public static KVComparator COMPARATOR = new KVComparator(); /** - * Comparator for plain key; i.e. non-catalog table key. Works on Key portion - * of KeyValue only. + * Comparator for key portion of plain {@link KeyValue}s; i.e. user-space, + * non-catalog table key. Works on Key portion of KeyValue only. */ public static KeyComparator KEY_COMPARATOR = new KeyComparator(); @@ -93,7 +103,7 @@ public class KeyValue implements Writable, HeapSize { /** * A {@link KVComparator} for .META. catalog table - * {@link KeyValue} keys. + * {@link KeyValue} keys. Works on Key portion of KeyValue only. */ public static KeyComparator META_KEY_COMPARATOR = new MetaKeyComparator(); @@ -105,53 +115,66 @@ public class KeyValue implements Writable, HeapSize { /** * A {@link KVComparator} for -ROOT- catalog table - * {@link KeyValue} keys. + * {@link KeyValue} keys. Works on Key portion of KeyValue only. */ public static KeyComparator ROOT_KEY_COMPARATOR = new RootKeyComparator(); /** - * Get the appropriate row comparator for the specified table. - * - * Hopefully we can get rid of this, I added this here because it's replacing - * something in HSK. We should move completely off of that. - * + * Get the appropriate KeyValue comparator for the specified table. * @param tableName The table name. - * @return The comparator. + * @return The KeyValue comparator to use on this table. */ public static KeyComparator getRowComparator(byte [] tableName) { - if(Bytes.equals(HTableDescriptor.ROOT_TABLEDESC.getName(),tableName)) { + if (Bytes.equals(HTableDescriptor.ROOT_TABLEDESC.getName(),tableName)) { return ROOT_COMPARATOR.getRawComparator(); } - if(Bytes.equals(HTableDescriptor.META_TABLEDESC.getName(), tableName)) { + if (Bytes.equals(HTableDescriptor.META_TABLEDESC.getName(), tableName)) { return META_COMPARATOR.getRawComparator(); } return COMPARATOR.getRawComparator(); } - // Size of the timestamp and type byte on end of a key -- a long + a byte. - public static final int TIMESTAMP_TYPE_SIZE = + static final int TIMESTAMP_TYPE_SIZE = Bytes.SIZEOF_LONG /* timestamp */ + Bytes.SIZEOF_BYTE /*keytype*/; - // Size of the length shorts and bytes in key. - public static final int KEY_INFRASTRUCTURE_SIZE = - Bytes.SIZEOF_SHORT /*rowlength*/ + - Bytes.SIZEOF_BYTE /*columnfamilylength*/ + + // Size of the timestamp, sequencenumber and the type byte on the end + // of the key portion of a KeyValue -- two longs and a byte. + static final int TIMESTAMP_SEQID_TYPE_SIZE = + Bytes.SIZEOF_LONG /* insertion sequence number */ + TIMESTAMP_TYPE_SIZE; // How far into the key the row starts at. First thing to read is the short - // that says how long the row is. + // that says how long the key is, then the data length... then row bytes start. public static final int ROW_OFFSET = Bytes.SIZEOF_INT /*keylength*/ + Bytes.SIZEOF_INT /*valuelength*/; - // Size of the length ints in a KeyValue datastructure. + // Size of the length ints in a KeyValue datastructure; the data and key shorts. public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET; /** - * Key type. - * Has space for other key types to be added later. Cannot rely on - * enum ordinals . They change if item is removed or moved. Do our own codes. + * Mask used comparing KeyValue Types without regard to KeyValue version. + * KeyValue version is kept in the upper two bits of the Type byte; i.e. only + * a maximum of 4 versions allowed -- versions 0-3. + */ + private static final byte VERSION_MASK = (byte)0xC0; + private static final byte VERSION_MASK_INVERSE = ~VERSION_MASK; + + /** + * This KeyValues version in bits that can be OR'd into a Type. + * This define will change as KeyValue version evolves. This is what you + * change changing the KeyValue version. We are currently version '1'; i.e. + * the bit at 0x40 is set. Previous to this there was version '0', none of + * the version bits were set. + */ + static final byte VERSION_BITS = (byte)0x40; + + /** + * KeyValue type. + * Has space for other key types to be added later. Does not rely on + * enum ordinals . They change if item is removed or moved. Do our own + * explicit codes. Keys are versioned using two most significant bits in Type. */ public static enum Type { Minimum((byte)0), @@ -162,7 +185,14 @@ public class KeyValue implements Writable, HeapSize { DeleteFamily((byte)14), // Maximum is used when searching; you look from maximum on down. - Maximum((byte)255); + Maximum((byte)63); + + // Bit 0x40 and 0x80 are reserved used specifying KeyValue version. If top + // two bits zero, then version is 0. If 7th bit is set -- 0x40 -- then we're + // version 1. Version is ignored when KV is compared. We define Version + // below just to show that the bits are occupied. The version bit twiddling + // and compares are done elsewhere, outside of this enum. See VERSION_MASK + // and VERSION_BITS defines. private final byte code; @@ -181,12 +211,12 @@ public class KeyValue implements Writable, HeapSize { * @return Type associated with passed code. */ public static Type codeToType(final byte b) { + byte bWithVersionStripped = stripVersionFromType(b); for (Type t : Type.values()) { - if (t.getCode() == b) { - return t; - } + // When comparing, do not compare on version. + if (t.getCode() == bWithVersionStripped) return t; } - throw new RuntimeException("Unknown code " + b); + throw new RuntimeException("Unknown code " + stripVersionFromType(b)); } } @@ -196,49 +226,41 @@ public class KeyValue implements Writable, HeapSize { * key can be equal or lower than this one in memstore or in store file. */ public static final KeyValue LOWESTKEY = - new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP); + new KeyValue(HConstants.EMPTY_BYTE_ARRAY, null, null, + HConstants.LATEST_TIMESTAMP, Long.MAX_VALUE, Type.Maximum, null); + // Bytes backing this KeyValue. private byte [] bytes = null; - private int offset = 0; - private int length = 0; - - // the row cached - private byte [] rowCache = null; + // Offset into bytes. + private int offset = 0; - /** Here be dragons **/ + // Length KeyValue occupies at offset in bytes. + private int length = 0; - // used to achieve atomic operations in the memstore. - public long getMemstoreTS() { - return memstoreTS; - } + // Default value for sequence number. + public static final long DEFAULT_SEQUENCE_NUMBER = 0; - public void setMemstoreTS(long memstoreTS) { - this.memstoreTS = memstoreTS; - } + // Cache of the sequence number portion of this KV. + // The cached items bulk up KV. + private long sequenceNumberCache = -1; - // default value is 0, aka DNC - private long memstoreTS = 0; + // Cache for timestamp long + private long timestampCache = -1; - /** Dragon time over, return to normal business */ + // Cached reference to the byte array of row content + private byte [] rowCache = null; + // Cache of keyLength. + private int keyLengthCache = -1; /** Writable Constructor -- DO NOT USE */ public KeyValue() {} /** - * Creates a KeyValue from the start of the specified byte array. - * Presumes bytes content is formatted as a KeyValue blob. - * @param bytes byte array - */ - public KeyValue(final byte [] bytes) { - this(bytes, 0); - } - - /** * Creates a KeyValue from the specified byte array and offset. * Presumes bytes content starting at offset is - * formatted as a KeyValue blob. + * formatted as a KeyValue. * @param bytes byte array * @param offset offset to start of KeyValue */ @@ -248,7 +270,10 @@ public class KeyValue implements Writable, HeapSize { /** * Creates a KeyValue from the specified byte array, starting at offset, and - * for length length. + * for length length. Does NOT make a copy of the passed + * array. + * Presumes bytes content starting at offset is + * formatted as a KeyValue. * @param bytes byte array * @param offset offset to start of the KeyValue * @param length length of the KeyValue @@ -259,29 +284,30 @@ public class KeyValue implements Writable, HeapSize { this.length = length; } - /** Constructors that build a new backing byte array from fields */ - /** - * Constructs KeyValue structure filled with null value. - * Sets type to {@link KeyValue.Type#Maximum} - * @param row - row key (arbitrary byte array) + * Constructs KeyValue filled with a null family, qualifier and + * data. Sets type to {@link KeyValue.Type#Maximum}. + * @param row Row key * @param timestamp + * @param sequenceNumber */ public KeyValue(final byte [] row, final long timestamp) { this(row, timestamp, Type.Maximum); } /** - * Constructs KeyValue structure filled with null value. - * @param row - row key (arbitrary byte array) + * Constructs KeyValue filled with a null family, qualifier and + * data. + * @param row Row key + * @param type Key Type to use. * @param timestamp */ public KeyValue(final byte [] row, final long timestamp, Type type) { - this(row, null, null, timestamp, type, null); + this(row, null, null, timestamp, DEFAULT_SEQUENCE_NUMBER, type, null); } /** - * Constructs KeyValue structure filled with null value. + * Constructs KeyValue filled with null data. * Sets type to {@link KeyValue.Type#Maximum} * @param row - row key (arbitrary byte array) * @param family family name @@ -289,73 +315,127 @@ public class KeyValue implements Writable, HeapSize { */ public KeyValue(final byte [] row, final byte [] family, final byte [] qualifier) { - this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum); + this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, + DEFAULT_SEQUENCE_NUMBER, Type.Maximum); } /** - * Constructs KeyValue structure filled with null value. + * Constructs KeyValue. + * Sets type to {@link KeyValue.Type#Put} * @param row - row key (arbitrary byte array) * @param family family name * @param qualifier column qualifier */ public KeyValue(final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value) { - this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value); + this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, + DEFAULT_SEQUENCE_NUMBER, Type.Put, value); } /** - * Constructs KeyValue structure filled with specified values. + * Constructs KeyValue filled with null data. * @param row row key * @param family family name * @param qualifier column qualifier - * @param timestamp version timestamp + * @param timestamp Timestamp * @param type key type * @throws IllegalArgumentException */ public KeyValue(final byte[] row, final byte[] family, final byte[] qualifier, final long timestamp, Type type) { - this(row, family, qualifier, timestamp, type, null); + // Used by tests + this(row, family, qualifier, timestamp, DEFAULT_SEQUENCE_NUMBER, type); + } + + /** + * Constructs KeyValue filled with null data. + * @param row row key + * @param family family name + * @param qualifier column qualifier + * @param timestamp Timestamp + * @param sequenceNumber + * @param type key type + * @throws IllegalArgumentException + */ + public KeyValue(final byte[] row, final byte[] family, + final byte[] qualifier, final long timestamp, + final long sequenceNumber, Type type) { + this(row, family, qualifier, timestamp, sequenceNumber, type, null); } /** - * Constructs KeyValue structure filled with specified values. + * Constructs KeyValue * @param row row key * @param family family name * @param qualifier column qualifier - * @param timestamp version timestamp + * @param timestamp Timestamp * @param value column value * @throws IllegalArgumentException */ public KeyValue(final byte[] row, final byte[] family, final byte[] qualifier, final long timestamp, final byte[] value) { - this(row, family, qualifier, timestamp, Type.Put, value); + // Used by tests. + this(row, family, qualifier, timestamp, DEFAULT_SEQUENCE_NUMBER, Type.Put, value); } /** - * Constructs KeyValue structure filled with specified values. + * Constructs KeyValue * @param row row key * @param family family name * @param qualifier column qualifier - * @param timestamp version timestamp - * @param type key type + * @param timestamp Timestamp + * @param sequenceNumber * @param value column value * @throws IllegalArgumentException */ public KeyValue(final byte[] row, final byte[] family, + final byte[] qualifier, final long timestamp, final long sequenceNumber, + final byte[] value) { + this(row, family, qualifier, timestamp, sequenceNumber, Type.Put, value); + } + + /** + * Constructs KeyValue + * @param row row key + * @param family family name + * @param qualifier column qualifier + * @param timestamp Timestamp + * @param type key type + * @param value column value/data. + * @throws IllegalArgumentException + */ + public KeyValue(final byte[] row, final byte[] family, final byte[] qualifier, final long timestamp, Type type, final byte[] value) { + this(row, family, qualifier, timestamp, DEFAULT_SEQUENCE_NUMBER, type, value); + } + + /** + * Constructs KeyValue + * @param row row key + * @param family family name + * @param qualifier column qualifier + * @param timestamp Timestamp + * @param type key type + * @param value column value/data. + * @throws IllegalArgumentException + */ + public KeyValue(final byte[] row, final byte[] family, + final byte[] qualifier, final long timestamp, final long sequenceNumber, + Type type, final byte[] value) { this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length, - timestamp, type, value, 0, value==null ? 0 : value.length); + timestamp, sequenceNumber, type, value, 0, value==null ? 0 : value.length); } /** - * Constructs KeyValue structure filled with specified values. + * Constructs KeyValue * @param row row key * @param family family name * @param qualifier column qualifier * @param qoffset qualifier offset * @param qlength qualifier length - * @param timestamp version timestamp + * @param timestamp Timestamp + * @param sequenceNumber * @param type key type * @param value column value * @param voffset value offset @@ -363,16 +443,17 @@ public class KeyValue implements Writable, HeapSize { * @throws IllegalArgumentException */ public KeyValue(byte [] row, byte [] family, - byte [] qualifier, int qoffset, int qlength, long timestamp, Type type, + byte [] qualifier, int qoffset, int qlength, long timestamp, + final long sequenceNumber, Type type, byte [] value, int voffset, int vlength) { this(row, 0, row==null ? 0 : row.length, family, 0, family==null ? 0 : family.length, - qualifier, qoffset, qlength, timestamp, type, + qualifier, qoffset, qlength, timestamp, sequenceNumber, type, value, voffset, vlength); } /** - * Constructs KeyValue structure filled with specified values. + * Constructs KeyValue. *

* Column is split into two fields, family and qualifier. * @param row row key @@ -384,7 +465,8 @@ public class KeyValue implements Writable, HeapSize { * @param qualifier column qualifier * @param qoffset qualifier offset * @param qlength qualifier length - * @param timestamp version timestamp + * @param timestamp Timestamp + * @param sequenceNumber * @param type key type * @param value column value * @param voffset value offset @@ -394,11 +476,11 @@ public class KeyValue implements Writable, HeapSize { public KeyValue(final byte [] row, final int roffset, final int rlength, final byte [] family, final int foffset, final int flength, final byte [] qualifier, final int qoffset, final int qlength, - final long timestamp, final Type type, + final long timestamp, final long sequenceNumber, final Type type, final byte [] value, final int voffset, final int vlength) { this.bytes = createByteArray(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength, - timestamp, type, value, voffset, vlength); + timestamp, sequenceNumber, type, value, voffset, vlength); this.length = bytes.length; this.offset = 0; } @@ -415,7 +497,8 @@ public class KeyValue implements Writable, HeapSize { * @param qualifier column qualifier * @param qoffset qualifier offset * @param qlength qualifier length - * @param timestamp version timestamp + * @param timestamp Timestamp + * @param sequenceNumber * @param type key type * @param value column value * @param voffset value offset @@ -425,7 +508,7 @@ public class KeyValue implements Writable, HeapSize { static byte [] createByteArray(final byte [] row, final int roffset, final int rlength, final byte [] family, final int foffset, int flength, final byte [] qualifier, final int qoffset, int qlength, - final long timestamp, final Type type, + final long timestamp, final long sequenceNumber, final Type type, final byte [] value, final int voffset, int vlength) { if (rlength > Short.MAX_VALUE) { throw new IllegalArgumentException("Row > " + Short.MAX_VALUE); @@ -443,8 +526,11 @@ public class KeyValue implements Writable, HeapSize { if (qlength > Integer.MAX_VALUE - rlength - flength) { throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE); } - // Key length - long longkeylength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength; + // Key length. KV infrastructure size is KV version particular; presume + // that if we are in this code that the version is that of the current state + // of KV. + long longkeylength = getKeyInfrastructureLength(getVersion(VERSION_BITS)) + + rlength + flength + qlength; if (longkeylength > Integer.MAX_VALUE) { throw new IllegalArgumentException("keylength " + longkeylength + " > " + Integer.MAX_VALUE); @@ -472,8 +558,12 @@ public class KeyValue implements Writable, HeapSize { if(qlength != 0) { pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength); } + // Timestamp pos = Bytes.putLong(bytes, pos, timestamp); - pos = Bytes.putByte(bytes, pos, type.getCode()); + // Sequencenumber + pos = Bytes.putLong(bytes, pos, sequenceNumber); + // Type with KV version included. + pos = Bytes.putByte(bytes, pos, addVersionToType(type.getCode())); if (value != null && value.length > 0) { pos = Bytes.putBytes(bytes, pos, value, voffset, vlength); } @@ -481,58 +571,42 @@ public class KeyValue implements Writable, HeapSize { } /** - * Write KeyValue format into a byte array. - *

- * Takes column in the form family:qualifier - * @param row - row key (arbitrary byte array) - * @param roffset - * @param rlength - * @param column - * @param coffset - * @param clength - * @param timestamp + * Decorate passed type with this KVs' version. * @param type - * @param value - * @param voffset - * @param vlength - * @return The newly created byte array. + * @return Type with Version added. */ - static byte [] createByteArray(final byte [] row, final int roffset, - final int rlength, - final byte [] column, final int coffset, int clength, - final long timestamp, final Type type, - final byte [] value, final int voffset, int vlength) { - // If column is non-null, figure where the delimiter is at. - int delimiteroffset = 0; - if (column != null && column.length > 0) { - delimiteroffset = getFamilyDelimiterIndex(column, coffset, clength); - if (delimiteroffset > Byte.MAX_VALUE) { - throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE); - } - } else { - return createByteArray(row,roffset,rlength,null,0,0,null,0,0,timestamp, - type,value,voffset,vlength); - } - int flength = delimiteroffset-coffset; - int qlength = clength - flength - 1; - return createByteArray(row, roffset, rlength, column, coffset, - flength, column, delimiteroffset+1, qlength, timestamp, type, - value, voffset, vlength); + private static byte addVersionToType(final byte type) { + // First blank out any existing version before adding this KVs. + byte b = (byte)(((type & VERSION_MASK_INVERSE)) | VERSION_BITS); + return b; + } + + static byte stripVersionFromType(final byte b) { + return (byte)(b & VERSION_MASK_INVERSE); + } + + private static int getTimestampToEndOfKeyLength(final int version) { + if (version == 1) return TIMESTAMP_SEQID_TYPE_SIZE; + else if (version == 0) return TIMESTAMP_TYPE_SIZE; + else throw new IllegalArgumentException("Unexpected version=" + version); + } + + private static int getKeyInfrastructureLength(final int version) { + return getTimestampToEndOfKeyLength(version) + + Bytes.SIZEOF_SHORT /*rowlength*/ + + Bytes.SIZEOF_BYTE /*columnfamilylength*/; } // Needed doing 'contains' on List. Only compares the key portion, not the // value. public boolean equals(Object other) { - if (!(other instanceof KeyValue)) { - return false; - } + if (!(other instanceof KeyValue)) return false; KeyValue kv = (KeyValue)other; // Comparing bytes should be fine doing equals test. Shouldn't have to // worry about special .META. comparators doing straight equals. - boolean result = Bytes.BYTES_RAWCOMPARATOR.compare(getBuffer(), - getKeyOffset(), getKeyLength(), - kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength()) == 0; - return result; + return Bytes.BYTES_RAWCOMPARATOR.compare(getBuffer(), + getKeyOffset(), getKeyLength(), kv.getBuffer(), kv.getKeyOffset(), + kv.getKeyLength()) == 0; } public int hashCode() { @@ -558,12 +632,7 @@ public class KeyValue implements Writable, HeapSize { public KeyValue clone() { byte [] b = new byte[this.length]; System.arraycopy(this.bytes, this.offset, b, 0, this.length); - KeyValue ret = new KeyValue(b, 0, b.length); - // Important to clone the memstoreTS as well - otherwise memstore's - // update-in-place methods (eg increment) will end up creating - // new entries - ret.setMemstoreTS(memstoreTS); - return ret; + return new KeyValue(b, 0, b.length); } /** @@ -577,13 +646,11 @@ public class KeyValue implements Writable, HeapSize { /** * Creates a shallow copy of this KeyValue, reusing the data byte buffer. - * http://en.wikipedia.org/wiki/Object_copy + * @see http://en.wikipedia.org/wiki/Object_copy * @return Shallow copy of this KeyValue */ public KeyValue shallowCopy() { - KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length); - shallowCopy.setMemstoreTS(this.memstoreTS); - return shallowCopy; + return new KeyValue(this.bytes, this.offset, this.length); } //--------------------------------------------------------------------------- @@ -617,24 +684,26 @@ public class KeyValue implements Writable, HeapSize { */ public static String keyToString(final byte [] b, final int o, final int l) { if (b == null) return ""; + byte type = b[o + l - 1]; + int version = getVersion(type); + int infrastructureSize = getTimestampToEndOfKeyLength(version); int rowlength = Bytes.toShort(b, o); String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength); int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength; int familylength = b[columnoffset - 1]; - int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE); + int columnlength = l - ((columnoffset - o) + infrastructureSize); String family = familylength == 0? "": Bytes.toStringBinary(b, columnoffset, familylength); String qualifier = columnlength == 0? "": Bytes.toStringBinary(b, columnoffset + familylength, columnlength - familylength); - long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE)); - byte type = b[o + l - 1]; -// return row + "/" + family + -// (family != null && family.length() > 0? COLUMN_FAMILY_DELIMITER: "") + -// qualifier + "/" + timestamp + "/" + Type.codeToType(type); + long timestamp = Bytes.toLong(b, o + (l - infrastructureSize)); + long sequenceNumber = version == 0? 0: + Bytes.toLong(b, o + (l - infrastructureSize + Bytes.SIZEOF_LONG)); return row + "/" + family + (family != null && family.length() > 0? ":" :"") + - qualifier + "/" + timestamp + "/" + Type.codeToType(type); + qualifier + "/" + timestamp + "/" + sequenceNumber + "/" + + Type.codeToType(type); } //--------------------------------------------------------------------------- @@ -664,6 +733,22 @@ public class KeyValue implements Writable, HeapSize { return length; } + /** + * @return This instance's version. + */ + int getVersion() { + return getVersion(getTypeByte(getKeyLength())); + } + + /** + * @param type + * @return Version that is in passed type + */ + static int getVersion(final byte type) { + // Not public. Version is an internal implementation detail. + return (type & VERSION_MASK) >>> 6; + } + //--------------------------------------------------------------------------- // // Length and Offset Calculators @@ -679,8 +764,8 @@ public class KeyValue implements Writable, HeapSize { */ private static int getLength(byte [] bytes, int offset) { return (2 * Bytes.SIZEOF_INT) + - Bytes.toInt(bytes, offset) + - Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT); + Bytes.toInt(bytes, offset) + + Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT); } /** @@ -691,19 +776,19 @@ public class KeyValue implements Writable, HeapSize { } public String getKeyString() { + // Used by tests. return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength()); } - /** - * @return Length of key portion. - */ - private int keyLength = 0; - public int getKeyLength() { - if (keyLength == 0) { - keyLength = Bytes.toInt(this.bytes, this.offset); + if (keyLengthCache == -1) { + keyLengthCache = getKeyLength(this.bytes, this.offset); } - return keyLength; + return keyLengthCache; + } + + static int getKeyLength(final byte [] bytes, final int offset) { + return Bytes.toInt(bytes, offset); } /** @@ -780,15 +865,15 @@ public class KeyValue implements Writable, HeapSize { * @return Qualifier length */ public int getQualifierLength() { - return getQualifierLength(getRowLength(),getFamilyLength()); + return getQualifierLength(getRowLength(), getFamilyLength()); } /** * @return Qualifier length */ - public int getQualifierLength(int rlength, int flength) { + public int getQualifierLength(int rowLength, int familyLength) { return getKeyLength() - - (KEY_INFRASTRUCTURE_SIZE + rlength + flength); + (getKeyInfrastructureLength(getVersion()) + rowLength + familyLength); } /** @@ -797,7 +882,7 @@ public class KeyValue implements Writable, HeapSize { public int getTotalColumnLength() { int rlength = getRowLength(); int foffset = getFamilyOffset(rlength); - return getTotalColumnLength(rlength,foffset); + return getTotalColumnLength(rlength, foffset); } /** @@ -805,7 +890,7 @@ public class KeyValue implements Writable, HeapSize { */ public int getTotalColumnLength(int rlength, int foffset) { int flength = getFamilyLength(foffset); - int qlength = getQualifierLength(rlength,flength); + int qlength = getQualifierLength(rlength, flength); return flength + qlength; } @@ -820,8 +905,9 @@ public class KeyValue implements Writable, HeapSize { * @param keylength Pass if you have it to save on a int creation. * @return Timestamp offset */ - public int getTimestampOffset(final int keylength) { - return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE; + private int getTimestampOffset(final int keylength) { + return getKeyOffset() + keylength - + getTimestampToEndOfKeyLength(getVersion()); } /** @@ -846,6 +932,22 @@ public class KeyValue implements Writable, HeapSize { return false; } + /** + * @return Edit Sequence Number offset + */ + int getSequenceNumberOffset() { + return getSequenceNumberOffset(getKeyLength()); + } + + /** + * @param keylength Pass if you have it to save on a int creation. + * @return Edit Sequence Number offset + */ + private int getSequenceNumberOffset(final int keylength) { + return getKeyOffset() + keylength - + getTimestampToEndOfKeyLength(getVersion()) + Bytes.SIZEOF_LONG; + } + //--------------------------------------------------------------------------- // // Methods that return copies of fields @@ -900,10 +1002,8 @@ public class KeyValue implements Writable, HeapSize { } /** - * * @return Timestamp */ - private long timestampCache = -1; public long getTimestamp() { if (timestampCache == -1) { timestampCache = getTimestamp(getKeyLength()); @@ -921,6 +1021,41 @@ public class KeyValue implements Writable, HeapSize { } /** + * @return Edit sequence number + */ + public long getSequenceNumber() { + if (this.sequenceNumberCache == -1) { + this.sequenceNumberCache = getSequenceNumber(getKeyLength()); + } + return sequenceNumberCache; + } + + /** + * @param keylength Pass if you have it to save on an int creation. + * @return Edit sequence number + */ + long getSequenceNumber(final int keylength) { + int version = getVersion(); + if (version == 0) return 0; + int offset = getSequenceNumberOffset(keylength); + return Bytes.toLong(this.bytes, offset); + } + + /** + * Set edit sequence number of this KeyValue. Advanced users only. Set by + * the server on receipt of an edit before the edit is added to the WAL or up + * into the memstore. + * @param sequenceNumber Edit number to use. + */ + public void setSequenceNumber(final long sequenceNumber) { + assert getVersion() > 0; + long offset = getSequenceNumberOffset(); + Bytes.toBytes(this.bytes, (int)offset, sequenceNumber); + // Clear cache. + this.sequenceNumberCache = -1; + } + + /** * @return Type of this KeyValue. */ public byte getType() { @@ -929,10 +1064,19 @@ public class KeyValue implements Writable, HeapSize { /** * @param keylength Pass if you have it to save on a int creation. - * @return Type of this KeyValue. + * @return Type of this KeyValue */ byte getType(final int keylength) { - return this.bytes[this.offset + keylength - 1 + ROW_OFFSET]; + // Strip version from Type before returning. + return stripVersionFromType(getTypeByte(keylength)); + } + + int getTypeByteOffset(final int keylength) { + return this.offset + keylength - 1 + ROW_OFFSET; + } + + byte getTypeByte(final int keylength) { + return this.bytes[getTypeByteOffset(keylength)]; } /** @@ -1015,20 +1159,22 @@ public class KeyValue implements Writable, HeapSize { public static class SplitKeyValue { private byte [][] split; SplitKeyValue() { - this.split = new byte[6][]; + this.split = new byte[7][]; } public void setRow(byte [] value) { this.split[0] = value; } public void setFamily(byte [] value) { this.split[1] = value; } public void setQualifier(byte [] value) { this.split[2] = value; } public void setTimestamp(byte [] value) { this.split[3] = value; } - public void setType(byte [] value) { this.split[4] = value; } - public void setValue(byte [] value) { this.split[5] = value; } + public void setSequenceNumber(byte [] value) { this.split[4] = value; } + public void setType(byte [] value) { this.split[5] = value; } + public void setValue(byte [] value) { this.split[6] = value; } public byte [] getRow() { return this.split[0]; } public byte [] getFamily() { return this.split[1]; } public byte [] getQualifier() { return this.split[2]; } public byte [] getTimestamp() { return this.split[3]; } - public byte [] getType() { return this.split[4]; } - public byte [] getValue() { return this.split[5]; } + public byte [] getSequenceNumber() { return this.split[4]; } + public byte [] getType() { return this.split[5]; } + public byte [] getValue() { return this.split[6]; } } public SplitKeyValue split() { @@ -1057,14 +1203,22 @@ public class KeyValue implements Writable, HeapSize { System.arraycopy(bytes, splitOffset, qualifier, 0, colLen); splitOffset += colLen; split.setQualifier(qualifier); + byte [] timestamp = new byte[Bytes.SIZEOF_LONG]; System.arraycopy(bytes, splitOffset, timestamp, 0, Bytes.SIZEOF_LONG); splitOffset += Bytes.SIZEOF_LONG; split.setTimestamp(timestamp); + + byte [] sequenceNumber = new byte[Bytes.SIZEOF_LONG]; + System.arraycopy(bytes, splitOffset, sequenceNumber, 0, Bytes.SIZEOF_LONG); + splitOffset += Bytes.SIZEOF_LONG; + split.setSequenceNumber(sequenceNumber); + byte [] type = new byte[1]; type[0] = bytes[splitOffset]; splitOffset += Bytes.SIZEOF_BYTE; split.setType(type); + byte [] value = new byte[valLen]; System.arraycopy(bytes, splitOffset, value, 0, valLen); split.setValue(value); @@ -1131,18 +1285,6 @@ public class KeyValue implements Writable, HeapSize { } /** - * @param column Column minus its delimiter - * @return True if column matches. - */ - public boolean matchingColumnNoDelimiter(final byte [] column) { - int rl = getRowLength(); - int o = getFamilyOffset(rl); - int fl = getFamilyLength(o); - int l = fl + getQualifierLength(rl,fl); - return Bytes.compareTo(column, 0, column.length, this.bytes, o, l) == 0; - } - - /** * * @param family column family * @param qualifier column qualifier @@ -1152,7 +1294,7 @@ public class KeyValue implements Writable, HeapSize { int rl = getRowLength(); int o = getFamilyOffset(rl); int fl = getFamilyLength(o); - int ql = getQualifierLength(rl,fl); + int ql = getQualifierLength(rl, fl); if (Bytes.compareTo(family, 0, family.length, this.bytes, o, family.length) != 0) { return false; @@ -1398,13 +1540,10 @@ public class KeyValue implements Writable, HeapSize { } public int compare(final KeyValue left, final KeyValue right) { - int ret = getRawComparator().compare(left.getBuffer(), - left.getOffset() + ROW_OFFSET, left.getKeyLength(), - right.getBuffer(), right.getOffset() + ROW_OFFSET, - right.getKeyLength()); - if (ret != 0) return ret; - // Negate this comparison so later edits show up first - return -Longs.compare(left.getMemstoreTS(), right.getMemstoreTS()); + return getRawComparator().compare(left.getBuffer(), + left.getOffset() + ROW_OFFSET, left.getKeyLength(), + right.getBuffer(), right.getOffset() + ROW_OFFSET, + right.getKeyLength()); } public int compareTimestamps(final KeyValue left, final KeyValue right) { @@ -1595,17 +1734,6 @@ public class KeyValue implements Writable, HeapSize { } /** - * Creates a KeyValue that is last on the specified row id. That is, - * every other possible KeyValue for the given row would compareTo() - * less than the result of this call. - * @param row row key - * @return Last possible KeyValue on passed row - */ - public static KeyValue createLastOnRow(final byte[] row) { - return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum); - } - - /** * Create a KeyValue that is smaller than all other possible KeyValues * for the given row. That is any (valid) KeyValue on 'row' would sort * _after_ the result. @@ -1624,23 +1752,8 @@ public class KeyValue implements Writable, HeapSize { * @param ts - timestamp * @return First possible key on passed row and timestamp. */ - public static KeyValue createFirstOnRow(final byte [] row, - final long ts) { - return new KeyValue(row, null, null, ts, Type.Maximum); - } - - /** - * @param row - row key (arbitrary byte array) - * @param c column - {@link #parseColumn(byte[])} is called to split - * the column. - * @param ts - timestamp - * @return First possible key on passed row, column and timestamp - * @deprecated - */ - public static KeyValue createFirstOnRow(final byte [] row, final byte [] c, - final long ts) { - byte [][] split = parseColumn(c); - return new KeyValue(row, split[0], split[1], ts, Type.Maximum); + public static KeyValue createFirstOnRow(final byte [] row, final long ts) { + return new KeyValue(row, ts); } /** @@ -1654,19 +1767,7 @@ public class KeyValue implements Writable, HeapSize { */ public static KeyValue createFirstOnRow(final byte [] row, final byte [] family, final byte [] qualifier) { - return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum); - } - - /** - * @param row - row key (arbitrary byte array) - * @param f - family name - * @param q - column qualifier - * @param ts - timestamp - * @return First possible key on passed row, column and timestamp - */ - public static KeyValue createFirstOnRow(final byte [] row, final byte [] f, - final byte [] q, final long ts) { - return new KeyValue(row, f, q, ts, Type.Maximum); + return new KeyValue(row, family, qualifier); } /** @@ -1690,8 +1791,21 @@ public class KeyValue implements Writable, HeapSize { final int foffset, final int flength, final byte [] qualifier, final int qoffset, final int qlength) { return new KeyValue(row, roffset, rlength, family, - foffset, flength, qualifier, qoffset, qlength, - HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0); + foffset, flength, qualifier, qoffset, qlength, + HConstants.LATEST_TIMESTAMP, DEFAULT_SEQUENCE_NUMBER, Type.Maximum, null, + 0, 0); + } + + + /** + * Creates a KeyValue that is last on the specified row id. That is, + * every other possible KeyValue for the given row would compareTo() + * less than the result of this call. + * @param row row key + * @return Last possible KeyValue on passed row + */ + public static KeyValue createLastOnRow(final byte[] row) { + return createLastOnRow(row, 0, row.length, null, 0, 0, null, 0, 0); } /** @@ -1716,7 +1830,7 @@ public class KeyValue implements Writable, HeapSize { final int qoffset, final int qlength) { return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength, - HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0); + HConstants.OLDEST_TIMESTAMP, Long.MIN_VALUE, Type.Minimum, null, 0, 0); } /** @@ -1750,7 +1864,7 @@ public class KeyValue implements Writable, HeapSize { System.arraycopy(b, o, newb, ROW_OFFSET, l); Bytes.putInt(newb, 0, b.length); Bytes.putInt(newb, Bytes.SIZEOF_INT, 0); - return new KeyValue(newb); + return new KeyValue(newb, 0); } /** @@ -1875,6 +1989,7 @@ public class KeyValue implements Writable, HeapSize { */ public static class KeyComparator implements RawComparator { volatile boolean ignoreTimestamp = false; + volatile boolean ignoreSequenceNumber = false; volatile boolean ignoreType = false; public int compare(byte[] left, int loffset, int llength, byte[] right, @@ -1889,12 +2004,20 @@ public class KeyValue implements Writable, HeapSize { return compare; } + // Get type now. Will need it later. + byte ltype = left[loffset + (llength - 1)]; + byte rtype = right[roffset + (rlength - 1)]; + int lversion = getVersion(ltype); + int rversion = getVersion(rtype); + int linfrastructureSize = getTimestampToEndOfKeyLength(lversion); + int rinfrastructureSize = getTimestampToEndOfKeyLength(rversion); + // Compare column family. Start compare past row and family length. int lcolumnoffset = Bytes.SIZEOF_SHORT + lrowlength + 1 + loffset; int rcolumnoffset = Bytes.SIZEOF_SHORT + rrowlength + 1 + roffset; - int lcolumnlength = llength - TIMESTAMP_TYPE_SIZE - + int lcolumnlength = llength - linfrastructureSize - (lcolumnoffset - loffset); - int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE - + int rcolumnlength = rlength - rinfrastructureSize - (rcolumnoffset - roffset); // if row matches, and no column in the 'left' AND put type is 'minimum', @@ -1905,13 +2028,12 @@ public class KeyValue implements Writable, HeapSize { // then we say the left is bigger. This will let us seek to the last key in // a row. - byte ltype = left[loffset + (llength - 1)]; - byte rtype = right[roffset + (rlength - 1)]; - - if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) { + if (lcolumnlength == 0 && + stripVersionFromType(ltype) == Type.Minimum.getCode()) { return 1; // left is bigger. } - if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) { + if (rcolumnlength == 0 && + stripVersionFromType(rtype) == Type.Minimum.getCode()) { return -1; } @@ -1925,19 +2047,35 @@ public class KeyValue implements Writable, HeapSize { if (!this.ignoreTimestamp) { // Get timestamps. long ltimestamp = Bytes.toLong(left, - loffset + (llength - TIMESTAMP_TYPE_SIZE)); + loffset + (llength - linfrastructureSize)); long rtimestamp = Bytes.toLong(right, - roffset + (rlength - TIMESTAMP_TYPE_SIZE)); + roffset + (rlength - rinfrastructureSize)); compare = compareTimestamps(ltimestamp, rtimestamp); if (compare != 0) { return compare; } } + if (!this.ignoreSequenceNumber) { + // Get timestamps. + long lseqnum = lversion == 0? 0: + Bytes.toLong(left, + loffset + (llength - linfrastructureSize + Bytes.SIZEOF_LONG)); + long rseqnum = rversion == 0? 0: + Bytes.toLong(right, + roffset + (rlength - rinfrastructureSize + Bytes.SIZEOF_LONG)); + // Sort same as we do for timestamps where the higher sequence number + // sorts before the lower. + compare = compareTimestamps(lseqnum, rseqnum); + if (compare != 0) { + return compare; + } + } + if (!this.ignoreType) { // Compare types. Let the delete types sort ahead of puts; i.e. types // of higher numbers sort before those of lesser numbers - return (0xff & rtype) - (0xff & ltype); + return stripVersionFromType(rtype) - stripVersionFromType(ltype); } return 0; } @@ -1961,7 +2099,7 @@ public class KeyValue implements Writable, HeapSize { int compareTimestamps(final long ltimestamp, final long rtimestamp) { // The below older timestamps sorting ahead of newer timestamps looks // wrong but it is intentional. This way, newer timestamps are first - // found when we iterate over a memstore and newer versions are the + // found when we iterate over a memstore and newer timestamps are the // first we trip over when reading from a store file. if (ltimestamp < rtimestamp) { return 1; @@ -1978,7 +2116,7 @@ public class KeyValue implements Writable, HeapSize { ClassSize.align(ClassSize.ARRAY) + ClassSize.align(length) + (3 * Bytes.SIZEOF_INT) + ClassSize.align(ClassSize.ARRAY) + - (2 * Bytes.SIZEOF_LONG)); + (3 * Bytes.SIZEOF_LONG)); } // this overload assumes that the length bytes have already been read, @@ -2001,4 +2139,11 @@ public class KeyValue implements Writable, HeapSize { out.writeInt(this.length); out.write(this.bytes, this.offset, this.length); } -} + + public static void main(String[] args) { + for (int i = 0; i < 1000; i++) { + new KeyValue(HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, + HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 80b00a7..08c67a8 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -1003,7 +1003,8 @@ public class HConnectionManager { Thread.sleep(getPauseTime(tries)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IOException("Giving up trying to get region server: thread is interrupted."); + throw new IOException("Giving up after tries=" + tries + + ": thread interrupted."); } } return null; diff --git a/src/main/java/org/apache/hadoop/hbase/client/Put.java b/src/main/java/org/apache/hadoop/hbase/client/Put.java index 2479b80..536da22 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/Put.java +++ b/src/main/java/org/apache/hadoop/hbase/client/Put.java @@ -182,7 +182,7 @@ public class Put implements HeapSize, Writable, Row, Comparable { */ private KeyValue createPutKeyValue(byte[] family, byte[] qualifier, long ts, byte[] value) { - return new KeyValue(this.row, family, qualifier, ts, KeyValue.Type.Put, + return new KeyValue(this.row, family, qualifier, ts, KeyValue.Type.Put, value); } diff --git a/src/main/java/org/apache/hadoop/hbase/client/Result.java b/src/main/java/org/apache/hadoop/hbase/client/Result.java index 6bdc892..d936517 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -20,14 +20,6 @@ package org.apache.hadoop.hbase.client; -import com.google.common.collect.Ordering; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.SplitKeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.WritableWithSize; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -39,6 +31,13 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.SplitKeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.WritableWithSize; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + /** * Single row result of a {@link Get} or {@link Scan} query.

* @@ -394,32 +393,6 @@ public class Result implements Writable, WritableWithSize { return returnMap; } - private Map.Entry getKeyValue(byte[] family, byte[] qualifier) { - if(this.familyMap == null) { - getMap(); - } - if(isEmpty()) { - return null; - } - NavigableMap> qualifierMap = - familyMap.get(family); - if(qualifierMap == null) { - return null; - } - NavigableMap versionMap = - getVersionMap(qualifierMap, qualifier); - if(versionMap == null) { - return null; - } - return versionMap.firstEntry(); - } - - private NavigableMap getVersionMap( - NavigableMap> qualifierMap, byte [] qualifier) { - return qualifier != null? - qualifierMap.get(qualifier): qualifierMap.get(new byte[0]); - } - /** * Returns the value of the first column in the Result. * @return value of the first column diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index e28e06f..0c8eb36 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -240,6 +240,7 @@ public class ImportTsv { parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, + KeyValue.DEFAULT_SEQUENCE_NUMBER, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i)); put.add(kv); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c760c17..92dba9d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -139,6 +139,7 @@ public class HRegion implements HeapSize { // , Writable{ static final String MERGEDIR = "merges"; final AtomicBoolean closed = new AtomicBoolean(false); + /* Closing can take some time; use the closing flag if there is stuff we don't * want to do while in closing state; e.g. like offer this region up to the * master as a region to close if the carrying regionserver is overloaded. @@ -146,10 +147,6 @@ public class HRegion implements HeapSize { // , Writable{ */ final AtomicBoolean closing = new AtomicBoolean(false); - ////////////////////////////////////////////////////////////////////////////// - // Members - ////////////////////////////////////////////////////////////////////////////// - private final Set lockedRows = new TreeSet(Bytes.BYTES_COMPARATOR); private final Map lockIds = @@ -164,12 +161,6 @@ public class HRegion implements HeapSize { // , Writable{ private ClassToInstanceMap protocolHandlers = MutableClassToInstanceMap.create(); - //These variable are just used for getting data out of the region, to test on - //client side - // private int numStores = 0; - // private int [] storeSize = null; - // private byte [] name = null; - final AtomicLong memstoreSize = new AtomicLong(0); /** @@ -231,8 +222,7 @@ public class HRegion implements HeapSize { // , Writable{ private final long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard closes - final ReentrantReadWriteLock lock = - new ReentrantReadWriteLock(); + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); // Stop updates lock private final ReentrantReadWriteLock updatesLock = @@ -240,8 +230,8 @@ public class HRegion implements HeapSize { // , Writable{ private boolean splitRequest; private byte[] splitPoint = null; - private final ReadWriteConsistencyControl rwcc = - new ReadWriteConsistencyControl(); + private final TransactionManager rwcc = + new TransactionManager(); // Coprocessor host private RegionCoprocessorHost coprocessorHost; @@ -251,7 +241,10 @@ public class HRegion implements HeapSize { // , Writable{ */ public final static String REGIONINFO_FILE = ".regioninfo"; + private final Object closeLock = new Object(); + /** + * Null-arg Constructor. * Should only be used for testing purposes */ public HRegion(){ @@ -466,7 +459,7 @@ public class HRegion implements HeapSize { // , Writable{ } } - public ReadWriteConsistencyControl getRWCC() { + TransactionManager getRWCC() { return rwcc; } @@ -487,8 +480,6 @@ public class HRegion implements HeapSize { // , Writable{ return close(false); } - private final Object closeLock = new Object(); - /** * Close down this HRegion. Flush the cache unless abort parameter is true, * Shut down each HStore, don't service any more calls. @@ -586,10 +577,6 @@ public class HRegion implements HeapSize { // , Writable{ this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5); } - ////////////////////////////////////////////////////////////////////////////// - // HRegion accessors - ////////////////////////////////////////////////////////////////////////////// - /** @return start key for region */ public byte [] getStartKey() { return this.regionInfo.getStartKey(); @@ -670,13 +657,6 @@ public class HRegion implements HeapSize { // , Writable{ return ret; } - ////////////////////////////////////////////////////////////////////////////// - // HRegion maintenance. - // - // These methods are meant to be called periodically by the HRegionServer for - // upkeep. - ////////////////////////////////////////////////////////////////////////////// - /** @return returns size of largest HStore. */ public long getLargestHStoreSize() { long size = 0; @@ -1075,9 +1055,6 @@ public class HRegion implements HeapSize { // , Writable{ return currentSequenceId; } - ////////////////////////////////////////////////////////////////////////////// - // get() methods for client use. - ////////////////////////////////////////////////////////////////////////////// /** * Return all the data for the row that matches row exactly, * or the one that immediately preceeds it, at or immediately before @@ -1311,6 +1288,8 @@ public class HRegion implements HeapSize { // , Writable{ addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, regionInfo.getTableDesc().getName(), walEdit, now); + } else { + this.log.insertSequenceNumber(familyMap); } // Now make changes to the memstore. @@ -1795,6 +1774,8 @@ public class HRegion implements HeapSize { // , Writable{ addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, regionInfo.getTableDesc().getName(), walEdit, now); + } else { + this.log.insertSequenceNumber(familyMap); } long addedSize = applyFamilyMapToMemstore(familyMap); @@ -1823,7 +1804,7 @@ public class HRegion implements HeapSize { // , Writable{ * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap) { - ReadWriteConsistencyControl.WriteEntry w = null; + TransactionManager.WriteEntry w = null; long size = 0; try { w = rwcc.beginMemstoreInsert(); @@ -1834,12 +1815,11 @@ public class HRegion implements HeapSize { // , Writable{ Store store = getStore(family); for (KeyValue kv: edits) { - kv.setMemstoreTS(w.getWriteNumber()); size += store.add(kv); } } } finally { - rwcc.completeMemstoreInsert(w); + rwcc.finish(w); } return size; } @@ -2336,7 +2316,10 @@ public class HRegion implements HeapSize { // , Writable{ private int batch; private int isScan; private boolean filterClosed = false; - private long readPt; + private final long readPoint = rwcc.getReadPoint(); + /* + // Don't show kvs newer than this scanner's read point. + if (kv.getSequenceNumber() > this.readPoint) continue;*/ public HRegionInfo getRegionName() { return regionInfo; @@ -2354,8 +2337,6 @@ public class HRegion implements HeapSize { // , Writable{ // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); - List scanners = new ArrayList(); if (additionalScanners != null) { scanners.addAll(additionalScanners); @@ -2401,7 +2382,7 @@ public class HRegion implements HeapSize { // , Writable{ try { // This could be a new thread from the last time we called next(). - ReadWriteConsistencyControl.setThreadReadPoint(this.readPt); + TransactionManager.setThreadReadPoint(this.readPt); results.clear(); @@ -3262,10 +3243,9 @@ public class HRegion implements HeapSize { // , Writable{ Get get = new Get(row); get.addColumn(family, qualifier); - // we don't want to invoke coprocessor in this case; ICV is wrapped + // We don't want to invoke coprocessor in this case; ICV is wrapped // in HRegionServer, so we leave getLastIncrement alone List results = getLastIncrement(get); - if (!results.isEmpty()) { KeyValue kv = results.get(0); byte [] buffer = kv.getBuffer(); @@ -3273,25 +3253,21 @@ public class HRegion implements HeapSize { // , Writable{ result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); } - // build the KeyValue now: - KeyValue newKv = new KeyValue(row, family, - qualifier, EnvironmentEdgeManager.currentTimeMillis(), - Bytes.toBytes(result)); + // Build the KeyValue now: + long now = EnvironmentEdgeManager.currentTimeMillis(); + KeyValue kv = new KeyValue(row, family, qualifier, now, Bytes.toBytes(result)); - // now log it: + // Now log it. this.log.append (or this.log.setSequence) write the + // sequence number into the KeyValue edit. if (writeToWAL) { - long now = EnvironmentEdgeManager.currentTimeMillis(); WALEdit walEdit = new WALEdit(); - walEdit.add(newKv); + walEdit.add(kv); this.log.append(regionInfo, regionInfo.getTableDesc().getName(), walEdit, now); + } else { + this.log.insertSequenceNumber(kv); } - - // Now request the ICV to the store, this will set the timestamp - // appropriately depending on if there is a value in memcache or not. - // returns the change in the size of the memstore from operation - long size = store.updateColumnValue(row, family, qualifier, result); - + long size = store.updateColumnValue(kv); size = this.memstoreSize.addAndGet(size); flush = isFlushSize(size); } finally { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 53ec17c..e91291d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -23,7 +23,6 @@ package org.apache.hadoop.hbase.regionserver; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.rmi.UnexpectedException; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -348,73 +347,25 @@ public class MemStore implements HeapSize { } /** - * Given the specs of a column, update it, first by inserting a new record, - * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS - * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying - * store will ensure that the insert/delete each are atomic. A scanner/reader will either - * get the new value, or the old value and all readers will eventually only see the new - * value after the old was removed. + * Update a column value. * - * @param row - * @param family - * @param qualifier - * @param newValue - * @param now - * @return Timestamp + * @param kv + * @return Difference in size made by addition of this edit to memstore. */ - public long updateColumnValue(byte[] row, - byte[] family, - byte[] qualifier, - long newValue, - long now) { + public long updateColumnValue(final KeyValue kv) { this.lock.readLock().lock(); try { - KeyValue firstKv = KeyValue.createFirstOnRow( - row, family, qualifier); - // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. - SortedSet snSs = snapshot.tailSet(firstKv); - if (!snSs.isEmpty()) { - KeyValue snKv = snSs.first(); - // is there a matching KV in the snapshot? - if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) { - if (snKv.getTimestamp() == now) { - // poop, - now += 1; - } - } - } - - // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary. - // But the timestamp should also be max(now, mostRecentTsInMemstore) - - // so we cant add the new KV w/o knowing what's there already, but we also - // want to take this chance to delete some kvs. So two loops (sad) - - SortedSet ss = kvset.tailSet(firstKv); - Iterator it = ss.iterator(); - while ( it.hasNext() ) { - KeyValue kv = it.next(); - - // if this isnt the row we are interested in, then bail: - if (!firstKv.matchingColumn(family,qualifier) || !firstKv.matchingRow(kv) ) { - break; // rows dont match, bail. - } - - // if the qualifier matches and it's a put, just RM it out of the kvset. - if (firstKv.matchingQualifier(kv)) { - // to be extra safe we only remove Puts that have a memstoreTS==0 - if (kv.getType() == KeyValue.Type.Put.getCode()) { - now = Math.max(now, kv.getTimestamp()); - } - } - } - - // create or update (upsert) a new KeyValue with - // 'now' and a 0 memstoreTS == immediately visible - return upsert(Arrays.asList(new KeyValue [] { - new KeyValue(row, family, qualifier, now, - Bytes.toBytes(newValue)) - })); + // There was code in here where we were checking for a timestamp in + // advance of ours. If we found one, then we'd use the one in advance + // of ours instead of the ts that was in the kv. We were making a new + // kv to do this, a kv that would not be the same as whats out in the + // WAL. Seems no longer necessary to me now we have sequence number. + // I'm removing it from here and adding an assert instead into the upsert + // we call that will look for incidence of the kv timestamp being less + // than that of one already up in memstore. + // We were doing max(now, mostRecentTsInMemstore). + // St.Ack 20110113 + return upsert(kv); } finally { this.lock.readLock().unlock(); } @@ -427,10 +378,6 @@ public class MemStore implements HeapSize { * value for that row/family/qualifier. If a KeyValue did already exist, * it will then be removed. *

- * Currently the memstoreTS is kept at 0 so as each insert happens, it will - * be immediately visible. May want to change this so it is atomic across - * all KeyValues. - *

* This is called under row lock, so Get operations will still see updates * atomically. Scans will only see each KeyValue update as atomic. * @@ -442,7 +389,6 @@ public class MemStore implements HeapSize { try { long size = 0; for (KeyValue kv : kvs) { - kv.setMemstoreTS(0); size += upsert(kv); } return size; @@ -453,7 +399,7 @@ public class MemStore implements HeapSize { /** * Inserts the specified KeyValue into MemStore and deletes any existing - * versions of the same row/family/qualifier as the specified KeyValue. + * versions of the same row/family/qualifier at the specified KeyValue. *

* First, the specified KeyValue is inserted into the Memstore. *

@@ -467,38 +413,40 @@ public class MemStore implements HeapSize { long addedSize = add(kv); // Get the KeyValues for the row/family/qualifier regardless of timestamp. - // For this case we want to clean up any other puts + // and remove any old values to help keep a cap on version bloat. + // TODO: Do this out-of-band in a background thread; add the cleanup to + // a queue for another thread to clean -- St.Ack 20110113. KeyValue firstKv = KeyValue.createFirstOnRow( kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()); SortedSet ss = kvset.tailSet(firstKv); Iterator it = ss.iterator(); + KeyValue firstFound = null; while ( it.hasNext() ) { KeyValue cur = it.next(); - + if (firstFound != null) firstFound = cur; if (kv == cur) { + if (firstFound != cur) { + // If we are not first on row, then somethings up! + throw new IllegalStateException("firstFound=" + firstFound + + ", cur=" + cur); + } // ignore the one just put in continue; } - // if this isn't the row we are interested in, then bail - if (!kv.matchingRow(cur)) { - break; - } - - // if the qualifier matches and it's a put, remove it - if (kv.matchingQualifier(cur)) { - // to be extra safe we only remove Puts that have a memstoreTS==0 - if (kv.getType() == KeyValue.Type.Put.getCode() && - kv.getMemstoreTS() == 0) { - // false means there was a change, so give us the size. - addedSize -= heapSizeChange(kv, true); - it.remove(); - } - } else { - // past the column, done - break; + // if this isn't the row we are interested in, then bail + if (!kv.matchingRow(cur)) break; + + // if the qualifier no longer matches, skip out + if (!kv.matchingQualifier(cur)) break; + + // If a a Put with matching r/cf/q, then remove. + if (kv.getType() == KeyValue.Type.Put.getCode()) { + // false means there was a change, so give us the size. + addedSize -= heapSizeChange(kv, true); + it.remove(); } } return addedSize; @@ -602,21 +550,12 @@ public class MemStore implements HeapSize { MemStoreScanner() { super(); - - //DebugPrint.println(" MS new@" + hashCode()); } protected KeyValue getNext(Iterator it) { KeyValue ret = null; - long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); - //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint); - while (ret == null && it.hasNext()) { - KeyValue v = it.next(); - if (v.getMemstoreTS() <= readPoint) { - // keep it. - ret = v; - } + ret = it.next(); } return ret; } @@ -638,14 +577,6 @@ public class MemStore implements HeapSize { kvsetNextRow = getNext(kvsetIt); snapshotNextRow = getNext(snapshotIt); - - //long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); - //DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " + - // kvset.size() + " threadread = " + readPoint); - //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " + - // snapshot.size() + " threadread = " + readPoint); - - KeyValue lowest = getLowest(); // has data := (lowest != null) @@ -686,9 +617,6 @@ public class MemStore implements HeapSize { snapshotNextRow = getNext(snapshotIt); } - //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint(); - //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " + - // getLowest() + " threadpoint=" + readpoint); return theNext; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index 48dd8e9..3dd3c40 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -93,6 +93,7 @@ public class ScanQueryMatcher { this.columns = new ExplicitColumnTracker(columns,maxVersions); } } + public ScanQueryMatcher(Scan scan, byte [] family, NavigableSet columns, long ttl, KeyValue.KeyComparator rowComparator, int maxVersions) { @@ -119,9 +120,7 @@ public class ScanQueryMatcher { byte [] bytes = kv.getBuffer(); int offset = kv.getOffset(); - int initialOffset = offset; - int keyLength = Bytes.toInt(bytes, offset, Bytes.SIZEOF_INT); offset += KeyValue.ROW_OFFSET; short rowLength = Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT); @@ -154,8 +153,7 @@ public class ScanQueryMatcher { byte familyLength = bytes [offset]; offset += familyLength + 1; - int qualLength = keyLength + KeyValue.ROW_OFFSET - - (offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE; + int qualLength = kv.getQualifierLength(rowLength, familyLength); long timestamp = kv.getTimestamp(); if (isExpired(timestamp)) { @@ -163,16 +161,14 @@ public class ScanQueryMatcher { return getNextRowOrNextColumn(bytes, offset, qualLength); } - byte type = kv.getType(); - if (isDelete(type)) { + if (kv.isDelete()) { if (tr.withinOrAfterTimeRange(timestamp)) { - this.deletes.add(bytes, offset, qualLength, timestamp, type); + this.deletes.add(bytes, offset, qualLength, timestamp, kv.getType()); // Can't early out now, because DelFam come before any other keys } if (retainDeletesInOutput) { return MatchCode.INCLUDE; - } - else { + } else { return MatchCode.SKIP; } } @@ -273,11 +269,6 @@ public class ScanQueryMatcher { stickyNextRow = false; } - // should be in KeyValue. - protected boolean isDelete(byte type) { - return (type != KeyValue.Type.Put.getCode()); - } - protected boolean isExpired(long timestamp) { return (timestamp < oldestStamp); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index ba9733d..caf9354 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -1517,11 +1517,7 @@ public class Store implements HeapSize { } /** - * Increments the value for the given row/family/qualifier. - * - * This function will always be seen as atomic by other readers - * because it only puts a single KV to memstore. Thus no - * read/write control necessary. + * Updates the value at given row/family/qualifier. * * @param row * @param f @@ -1530,20 +1526,11 @@ public class Store implements HeapSize { * @return memstore size delta * @throws IOException */ - public long updateColumnValue(byte [] row, byte [] f, - byte [] qualifier, long newValue) - throws IOException { - + public long updateColumnValue(final KeyValue kv) + throws IOException { this.lock.readLock().lock(); try { - long now = EnvironmentEdgeManager.currentTimeMillis(); - - return this.memstore.updateColumnValue(row, - f, - qualifier, - newValue, - now); - + return this.memstore.updateColumnValue(kv); } finally { this.lock.readLock().unlock(); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 039915e..761dcd8 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -25,11 +25,9 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; import java.nio.ByteBuffer; import java.text.NumberFormat; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.SortedSet; @@ -666,7 +664,6 @@ public class StoreFile { private final BloomType bloomType; private KVComparator kvComparator; private KeyValue lastKv = null; - private byte[] lastByteArray = null; TimeRangeTracker timeRangeTracker = new TimeRangeTracker(); /* isTimeRangeTrackerSet keeps track if the timeRange has already been set * When flushing a memstore, we set TimeRange and use this variable to @@ -854,18 +851,6 @@ public class StoreFile { return this.bloomFilter != null; } - public void append(final byte [] key, final byte [] value) throws IOException { - if (this.bloomFilter != null) { - // only add to the bloom filter on a new row - if (this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) { - this.bloomFilter.add(key); - this.lastByteArray = key; - } - } - writer.append(key, value); - includeInTimeRangeTracker(key); - } - public void close() throws IOException { // make sure we wrote something to the bloom before adding it if (this.bloomFilter != null && this.bloomFilter.getKeyCount() > 0) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index d149480..27c9a86 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -59,7 +59,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb * @throws IOException */ StoreScanner(Store store, Scan scan, final NavigableSet columns) - throws IOException { + throws IOException { this.store = store; this.cacheBlocks = scan.getCacheBlocks(); matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java index d3f1c65..a201a54 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java @@ -24,9 +24,7 @@ import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; /** @@ -77,21 +75,6 @@ public class TimeRangeTracker implements Writable { } /** - * Update the current TimestampRange to include the timestamp from Key. - * If the Key is of type DeleteColumn or DeleteFamily, it includes the - * entire time range from 0 to timestamp of the key. - * @param key - */ - public void includeTimestamp(final byte[] key) { - includeTimestamp(Bytes.toLong(key,key.length-KeyValue.TIMESTAMP_TYPE_SIZE)); - int type = key[key.length - 1]; - if (type == Type.DeleteColumn.getCode() || - type == Type.DeleteFamily.getCode()) { - includeTimestamp(0); - } - } - - /** * If required, update the current TimestampRange to include timestamp * @param timestamp the timestamp value to include */ diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index f06d263..b56c21a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; @@ -144,9 +143,6 @@ public class HLog implements Syncable { private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas final static Object [] NO_ARGS = new Object []{}; - // used to indirectly tell syncFs to force the sync - private boolean forceSync = false; - public interface Reader { void init(FileSystem fs, Path path, Configuration c) throws IOException; void close() throws IOException; @@ -435,7 +431,7 @@ public class HLog implements Syncable { } /** - * @return log sequence number + * @return The current log sequence number */ public long getSequenceNumber() { return logSeqNum.get(); @@ -926,6 +922,7 @@ public class HLog implements Syncable { // actual name. byte [] hriKey = info.getEncodedNameAsBytes(); this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + // Update edits adding in this sequence number HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); doWrite(info, logKey, edits); this.numEntries.incrementAndGet(); @@ -940,6 +937,36 @@ public class HLog implements Syncable { } /** + * Increment sequence number and add new number to kv. + * Use this method when you are skipping the WAL; {@link KeyValues} + * still need to be decorated with a valid sequence number though edit is + * not in the WAL. + * @param kv KeyValue to decorate. + */ + public void insertSequenceNumber(final KeyValue kv) { + synchronized (this.updateLock) { + kv.setSequenceNumber(obtainSeqNum()); + } + } + + /** + * Increment sequence number and add this new number to KeyValues passed in + * familyMap. + * Use this method when you are skipping the WAL; {@link KeyValues} + * still need to be decorated with a valid sequence number though edit is + * not in the WAL. + * @param familyMap KeyValues to decorate by family. + */ + public void insertSequenceNumber(final Map> familyMap) { + synchronized (this.updateLock) { + long sequenceNumber = obtainSeqNum(); + for (List kvs: familyMap.values()) { + for (KeyValue kv: kvs) kv.setSequenceNumber(sequenceNumber); + } + } + } + + /** * This thread is responsible to call syncFs and buffer up the writers while * it happens. */ @@ -947,8 +974,6 @@ public class HLog implements Syncable { private final long optionalFlushInterval; - private boolean syncerShuttingDown = false; - LogSyncer(long optionalFlushInterval) { this.optionalFlushInterval = optionalFlushInterval; } @@ -969,7 +994,6 @@ public class HLog implements Syncable { } catch (InterruptedException e) { LOG.debug(getName() + " interrupted while waiting for sync requests"); } finally { - syncerShuttingDown = true; LOG.info(getName() + " exiting"); } } diff --git a/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index aba2c3b..f043935 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ b/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -435,12 +435,18 @@ public class Bytes { */ public static byte[] toBytes(long val) { byte [] b = new byte[8]; + return toBytes(b, 0, val); + } + + public static byte [] toBytes(final byte [] buffer, final int offset, + final long originalVal) { + long val = originalVal; for (int i = 7; i > 0; i--) { - b[i] = (byte) val; + buffer[offset + i] = (byte)val; val >>>= 8; } - b[0] = (byte) val; - return b; + buffer[offset + 0] = (byte) val; + return buffer; } /** diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 883389f..1ffc2a6 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl; +import org.apache.hadoop.hbase.regionserver.TransactionManager; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -1259,7 +1259,7 @@ public class HBaseTestingUtility { */ public static List getFromStoreFile(Store store, Get get) throws IOException { - ReadWriteConsistencyControl.resetThreadReadPoint(); + TransactionManager.resetThreadReadPoint(); Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, scan.getFamilyMap().get(store.getFamily().getName())); diff --git a/src/test/java/org/apache/hadoop/hbase/KeyValueTestUtil.java b/src/test/java/org/apache/hadoop/hbase/KeyValueTestUtil.java index 36d768a..aae6246 100644 --- a/src/test/java/org/apache/hadoop/hbase/KeyValueTestUtil.java +++ b/src/test/java/org/apache/hadoop/hbase/KeyValueTestUtil.java @@ -22,33 +22,20 @@ package org.apache.hadoop.hbase; import org.apache.hadoop.hbase.util.Bytes; +/** + * Utility for creating {@link KeyValue} + */ public class KeyValueTestUtil { - public static KeyValue create( - String row, - String family, - String qualifier, - long timestamp, - String value) - { + public static KeyValue create(String row, String family, String qualifier, + long timestamp, String value) { return create(row, family, qualifier, timestamp, KeyValue.Type.Put, value); } - public static KeyValue create( - String row, - String family, - String qualifier, - long timestamp, - KeyValue.Type type, - String value) - { - return new KeyValue( - Bytes.toBytes(row), - Bytes.toBytes(family), - Bytes.toBytes(qualifier), - timestamp, - type, - Bytes.toBytes(value) + public static KeyValue create(String row, String family, String qualifier, + long timestamp, KeyValue.Type type, String value) { + return new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), + Bytes.toBytes(qualifier), timestamp, type, Bytes.toBytes(value) ); } -} +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java b/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java index 68fff55..27cc211 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java +++ b/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java @@ -30,10 +30,105 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; public class TestKeyValue extends TestCase { private final Log LOG = LogFactory.getLog(this.getClass().getName()); + public void testHandlingOfKeyValueVersionZero() { + long ts = System.currentTimeMillis(); + KeyValue kv = new KeyValue(HConstants.CATALOG_FAMILY, + HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts, + KeyValue.DEFAULT_SEQUENCE_NUMBER, HConstants.CATALOG_FAMILY); + KeyValue kv0 = convertToVersionZero(kv); + assertTrue(kv0.getVersion() == 0); + assertEquals(kv.getFamilyLength(), kv0.getFamilyLength()); + assertEquals(kv.getFamilyOffset(), kv0.getFamilyOffset()); + // Length is same but for absence of the sequence number. + assertEquals(kv.getKeyLength(), kv0.getKeyLength() + Bytes.SIZEOF_LONG); + assertEquals(kv.getKeyOffset(), kv0.getKeyOffset()); + assertEquals(kv.getQualifierLength(), kv0.getQualifierLength()); + assertEquals(kv.getQualifierOffset(), kv0.getQualifierOffset()); + assertEquals(kv.getRowLength(), kv0.getRowLength()); + assertEquals("kv.row=" + Bytes.toString(kv.getRow()) + ", kv0.row=" + + Bytes.toString(kv0.getRow()), Bytes.toString(kv.getRow()), + Bytes.toString(kv0.getRow())); + assertEquals(Bytes.toString(kv.getFamily()), + Bytes.toString(kv0.getFamily())); + assertEquals(Bytes.toString(kv.getQualifier()), + Bytes.toString(kv0.getQualifier())); + assertEquals(kv.getTimestamp(), kv0.getTimestamp()); + assertEquals(kv.getType(), kv0.getType()); + assertEquals(kv.getSequenceNumber(), kv0.getSequenceNumber()); + // See if these do right thing. + assertEquals(kv.toString(), kv0.toString()); + int compare = KeyValue.COMPARATOR.compare(kv, kv0); + assertEquals(0, compare); + // Make a kv that differes in sequence number only + KeyValue kvBiggerSeqNum = new KeyValue(HConstants.CATALOG_FAMILY, + HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts, + 1, HConstants.CATALOG_FAMILY); + compare = KeyValue.COMPARATOR.compare(kv, kvBiggerSeqNum); + assertTrue(compare > 0); + compare = KeyValue.COMPARATOR.compare(kvBiggerSeqNum, kv); + assertTrue(compare < 0); + compare = KeyValue.COMPARATOR.compare(kvBiggerSeqNum, kvBiggerSeqNum); + assertEquals(0, compare); + compare = KeyValue.COMPARATOR.compare(kv, kv); + } + + private KeyValue convertToVersionZero(final KeyValue kv) { + int offset = kv.getSequenceNumberOffset(); + int length = kv.getBuffer().length; + byte [] bytes = new byte[length - Bytes.SIZEOF_LONG]; + System.arraycopy(kv.getBuffer(), kv.getOffset(), bytes, 0, offset); + System.arraycopy(kv.getBuffer(), kv.getOffset() + offset + Bytes.SIZEOF_LONG, + bytes, offset, length - Bytes.SIZEOF_LONG - offset); + // Fix up keylength + int keylength = KeyValue.getKeyLength(bytes, 0); + keylength -= Bytes.SIZEOF_LONG; + Bytes.putInt(bytes, 0, keylength); + // Now set the Type to be of type zero. + offset = kv.getTypeByteOffset(KeyValue.getKeyLength(bytes, 0)); + bytes[offset] = KeyValue.stripVersionFromType(bytes[offset]); + return new KeyValue(bytes, 0); + } + + public void testSequenceNumberCompare() { + long ts = System.currentTimeMillis(); + KeyValue kvlower = new KeyValue(HConstants.CATALOG_FAMILY, + HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts, ts, + HConstants.CATALOG_FAMILY); + KeyValue kvhigher = new KeyValue(HConstants.CATALOG_FAMILY, + HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts, ts + 1, + HConstants.CATALOG_FAMILY); + int compare = KeyValue.COMPARATOR.compare(kvlower, kvhigher); + // Compare > 0 because kvhigher must sort before kvlower. + Assert.assertTrue(compare > 0); + assertEquals(0, KeyValue.COMPARATOR.compare(kvlower, kvlower)); + assertEquals(0, KeyValue.COMPARATOR.compare(kvhigher, kvhigher)); + compare = KeyValue.COMPARATOR.compare(kvhigher, kvlower); + Assert.assertTrue(compare < 0); + } + + public void testSettingSequenceNumber() { + long ts = System.currentTimeMillis(); + KeyValue kv = new KeyValue(HConstants.CATALOG_FAMILY, + HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts, + HConstants.CATALOG_FAMILY); + assertEquals(kv.getSequenceNumber(), KeyValue.DEFAULT_SEQUENCE_NUMBER); + kv.setSequenceNumber(ts); + assertEquals(kv.getSequenceNumber(), ts); + } + + public void testVersion() { + int expected = KeyValue.getVersion(KeyValue.VERSION_BITS); + KeyValue kv = new KeyValue(Bytes.toBytes("test"), + System.currentTimeMillis(), Type.Put); + int found = kv.getVersion(); + assertEquals(expected, found); + } + public void testColumnCompare() throws Exception { final byte [] a = Bytes.toBytes("aaa"); byte [] family1 = Bytes.toBytes("abc"); @@ -42,6 +137,7 @@ public class TestKeyValue extends TestCase { byte [] qualifier2 = Bytes.toBytes("ef"); KeyValue aaa = new KeyValue(a, family1, qualifier1, 0L, Type.Put, a); + LOG.info("aaa=" + aaa.toString()); assertFalse(aaa.matchingColumn(family2, qualifier2)); assertTrue(aaa.matchingColumn(family1, qualifier1)); aaa = new KeyValue(a, family2, qualifier2, 0L, Type.Put, a); diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestResult.java b/src/test/java/org/apache/hadoop/hbase/client/TestResult.java index becabcf..00ee844 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestResult.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestResult.java @@ -20,16 +20,15 @@ package org.apache.hadoop.hbase.client; -import junit.framework.TestCase; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.util.Bytes; - import static org.apache.hadoop.hbase.HBaseTestCase.assertByteEquals; import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; + +import junit.framework.TestCase; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; public class TestResult extends TestCase { diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 48a7011..adda9f2 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1413,12 +1413,12 @@ public class TestHRegion extends HBaseTestCase { scan.addFamily(fam2); scan.addFamily(fam4); is = (RegionScanner) region.getScanner(scan); - ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + TransactionManager.resetThreadReadPoint(region.getRWCC()); assertEquals(1, ((RegionScanner)is).storeHeap.getHeap().size()); scan = new Scan(); is = (RegionScanner) region.getScanner(scan); - ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + TransactionManager.resetThreadReadPoint(region.getRWCC()); assertEquals(families.length -1, ((RegionScanner)is).storeHeap.getHeap().size()); } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 3779114..111c9ed 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -24,8 +24,6 @@ import java.rmi.UnexpectedException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.NavigableSet; -import java.util.TreeSet; import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -35,7 +33,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; @@ -53,12 +50,12 @@ public class TestMemStore extends TestCase { private static final byte [] CONTENTS = Bytes.toBytes("contents"); private static final byte [] BASIC = Bytes.toBytes("basic"); private static final String CONTENTSTR = "contentstr"; - private ReadWriteConsistencyControl rwcc; + private TransactionManager rwcc; @Override public void setUp() throws Exception { super.setUp(); - this.rwcc = new ReadWriteConsistencyControl(); + this.rwcc = new TransactionManager(); this.memstore = new MemStore(); } @@ -84,7 +81,7 @@ public class TestMemStore extends TestCase { List memstorescanners = this.memstore.getScanners(); Scan scan = new Scan(); List result = new ArrayList(); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, this.memstore.comparator, null, memstorescanners); int count = 0; @@ -104,7 +101,7 @@ public class TestMemStore extends TestCase { scanner.close(); } - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); memstorescanners = this.memstore.getScanners(); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, @@ -196,7 +193,7 @@ public class TestMemStore extends TestCase { private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); List memstorescanners = this.memstore.getScanners(); assertEquals(1, memstorescanners.size()); final KeyValueScanner scanner = memstorescanners.get(0); @@ -231,35 +228,35 @@ public class TestMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v = Bytes.toBytes("value"); - ReadWriteConsistencyControl.WriteEntry w = + TransactionManager.WriteEntry w = rwcc.beginMemstoreInsert(); KeyValue kv1 = new KeyValue(row, f, q1, v); - kv1.setMemstoreTS(w.getWriteNumber()); + kv1.setSequenceNumber(w.getWriteNumber()); memstore.add(kv1); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{}); - rwcc.completeMemstoreInsert(w); + rwcc.finish(w); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); w = rwcc.beginMemstoreInsert(); KeyValue kv2 = new KeyValue(row, f, q2, v); - kv2.setMemstoreTS(w.getWriteNumber()); + kv2.setSequenceNumber(w.getWriteNumber()); memstore.add(kv2); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); - rwcc.completeMemstoreInsert(w); + rwcc.finish(w); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1, kv2}); } @@ -279,45 +276,45 @@ public class TestMemStore extends TestCase { final byte[] v2 = Bytes.toBytes("value2"); // INSERT 1: Write both columns val1 - ReadWriteConsistencyControl.WriteEntry w = + TransactionManager.WriteEntry w = rwcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); - kv11.setMemstoreTS(w.getWriteNumber()); + kv11.setSequenceNumber(w.getWriteNumber()); memstore.add(kv11); KeyValue kv12 = new KeyValue(row, f, q2, v1); - kv12.setMemstoreTS(w.getWriteNumber()); + kv12.setSequenceNumber(w.getWriteNumber()); memstore.add(kv12); - rwcc.completeMemstoreInsert(w); + rwcc.finish(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 w = rwcc.beginMemstoreInsert(); KeyValue kv21 = new KeyValue(row, f, q1, v2); - kv21.setMemstoreTS(w.getWriteNumber()); + kv21.setSequenceNumber(w.getWriteNumber()); memstore.add(kv21); KeyValue kv22 = new KeyValue(row, f, q2, v2); - kv22.setMemstoreTS(w.getWriteNumber()); + kv22.setSequenceNumber(w.getWriteNumber()); memstore.add(kv22); // BEFORE COMPLETING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE INSERT 2 - rwcc.completeMemstoreInsert(w); + rwcc.finish(w); // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // See HBASE-1485 for discussion about what we should do with // the duplicate-TS inserts - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); } @@ -334,20 +331,20 @@ public class TestMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 - ReadWriteConsistencyControl.WriteEntry w = + TransactionManager.WriteEntry w = rwcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); - kv11.setMemstoreTS(w.getWriteNumber()); + kv11.setSequenceNumber(w.getWriteNumber()); memstore.add(kv11); KeyValue kv12 = new KeyValue(row, f, q2, v1); - kv12.setMemstoreTS(w.getWriteNumber()); + kv12.setSequenceNumber(w.getWriteNumber()); memstore.add(kv12); - rwcc.completeMemstoreInsert(w); + rwcc.finish(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); @@ -355,19 +352,19 @@ public class TestMemStore extends TestCase { w = rwcc.beginMemstoreInsert(); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); - kvDel.setMemstoreTS(w.getWriteNumber()); + kvDel.setSequenceNumber(w.getWriteNumber()); memstore.add(kvDel); // BEFORE COMPLETING DELETE, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE DELETE - rwcc.completeMemstoreInsert(w); + rwcc.finish(w); // NOW WE SHOULD SEE DELETE - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); } @@ -381,7 +378,7 @@ public class TestMemStore extends TestCase { final byte[] f = Bytes.toBytes("family"); final byte[] q1 = Bytes.toBytes("q1"); - final ReadWriteConsistencyControl rwcc; + final TransactionManager rwcc; final MemStore memstore; AtomicReference caughtException; @@ -389,7 +386,7 @@ public class TestMemStore extends TestCase { public ReadOwnWritesTester(int id, MemStore memstore, - ReadWriteConsistencyControl rwcc, + TransactionManager rwcc, AtomicReference caughtException) { this.rwcc = rwcc; @@ -408,19 +405,19 @@ public class TestMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { - ReadWriteConsistencyControl.WriteEntry w = + TransactionManager.WriteEntry w = rwcc.beginMemstoreInsert(); // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); KeyValue kv = new KeyValue(row, f, q1, i, v); - kv.setMemstoreTS(w.getWriteNumber()); + kv.setSequenceNumber(w.getWriteNumber()); memstore.add(kv); - rwcc.completeMemstoreInsert(w); + rwcc.finish(w); // Assert that we can read back - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); KeyValueScanner s = this.memstore.getScanners().get(0); s.seek(kv); @@ -898,7 +895,7 @@ public class TestMemStore extends TestCase { } public static void main(String [] args) throws IOException { - ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + TransactionManager rwcc = new TransactionManager(); MemStore ms = new MemStore(); long n1 = System.nanoTime(); @@ -908,7 +905,7 @@ public class TestMemStore extends TestCase { System.out.println("foo"); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + TransactionManager.resetThreadReadPoint(rwcc); for (int i = 0 ; i < 50 ; i++) doScan(ms, i); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java index 92075b0..f4827d1 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java @@ -28,10 +28,10 @@ import java.util.concurrent.atomic.AtomicLong; public class TestReadWriteConsistencyControl extends TestCase { static class Writer implements Runnable { final AtomicBoolean finished; - final ReadWriteConsistencyControl rwcc; + final TransactionManager rwcc; final AtomicBoolean status; - Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) { + Writer(AtomicBoolean finished, TransactionManager rwcc, AtomicBoolean status) { this.finished = finished; this.rwcc = rwcc; this.status = status; @@ -41,7 +41,7 @@ public class TestReadWriteConsistencyControl extends TestCase { public void run() { while (!finished.get()) { - ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert(); + TransactionManager.WriteEntry e = rwcc.beginMemstoreInsert(); // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) int sleepTime = rnd.nextInt(500); @@ -53,7 +53,7 @@ public class TestReadWriteConsistencyControl extends TestCase { } catch (InterruptedException e1) { } try { - rwcc.completeMemstoreInsert(e); + rwcc.finish(e); } catch (RuntimeException ex) { // got failure System.out.println(ex.toString()); @@ -67,7 +67,7 @@ public class TestReadWriteConsistencyControl extends TestCase { } public void testParallelism() throws Exception { - final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + final TransactionManager rwcc = new TransactionManager(); final AtomicBoolean finished = new AtomicBoolean(false); @@ -76,9 +76,9 @@ public class TestReadWriteConsistencyControl extends TestCase { final AtomicLong failedAt = new AtomicLong(); Runnable reader = new Runnable() { public void run() { - long prev = rwcc.memstoreReadPoint(); + long prev = rwcc.getReadPoint(); while (!finished.get()) { - long newPrev = rwcc.memstoreReadPoint(); + long newPrev = rwcc.getReadPoint(); if (newPrev < prev) { // serious problem. System.out.println("Reader got out of order, prev: " + diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index de6f097..3eacaf8 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; -import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import com.google.common.base.Joiner; @@ -306,7 +305,8 @@ public class TestStore extends TestCase { Bytes.toBytes(oldValue))); // update during the snapshot. - long ret = this.store.updateColumnValue(row, family, qf1, newValue); + long ret = this.store.updateColumnValue(new KeyValue(row, family, qf1, + Bytes.toBytes(newValue))); // memstore should have grown by some amount. assertTrue(ret > 0); @@ -366,8 +366,8 @@ public class TestStore extends TestCase { for ( int i = 0 ; i < 10000 ; ++i) { newValue++; - long ret = this.store.updateColumnValue(row, family, qf1, newValue); - long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue); + long ret = this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue))); + long ret2 = this.store.updateColumnValue(new KeyValue(row2, family, qf1, Bytes.toBytes(newValue))); if (ret != 0) System.out.println("ret: " + ret); if (ret2 != 0) System.out.println("ret2: " + ret2); @@ -406,7 +406,7 @@ public class TestStore extends TestCase { this.store.snapshot(); // update during the snapshot, the exact same TS as the Put (lololol) - long ret = this.store.updateColumnValue(row, family, qf1, newValue); + long ret = this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue))); // memstore should have grown by some amount. assertTrue(ret > 0); @@ -418,11 +418,11 @@ public class TestStore extends TestCase { // now increment again: newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue))); // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again: newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue))); // the second TS should be TS=2 or higher., even though 'time=1' right now. @@ -445,7 +445,7 @@ public class TestStore extends TestCase { mee.setValue(2); // time goes up slightly newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue))); results = HBaseTestingUtility.getFromStoreFile(store, get); assertEquals(2, results.size()); diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java b/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java index e70135f..44ee10a 100644 --- a/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java +++ b/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java @@ -109,6 +109,11 @@ public class TestBytes extends TestCase { byte [] b = Bytes.toBytes(longs[i]); assertEquals(longs[i], Bytes.toLong(b)); } + byte [] bytes = new byte[16]; + final int offset = 8; + bytes = Bytes.toBytes(bytes, offset, Integer.MAX_VALUE); + long result = Bytes.toLong(bytes, offset); + assertEquals(Integer.MAX_VALUE, result); } public void testToFloat() throws Exception {