diff --git src/main/java/org/apache/hadoop/hbase/KeyValue.java src/main/java/org/apache/hadoop/hbase/KeyValue.java index 243d76f..bc8ae4c 100644 --- src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -505,8 +505,12 @@ public class KeyValue implements Writable, HeapSize { } /** - * Write KeyValue format into a byte array. + * Constructs KeyValue structure filled with specified values. Uses the provided buffer as the + * data buffer. + *

+ * Column is split into two fields, family and qualifier. * + * @param buffer the bytes buffer to use * @param row row key * @param roffset row offset * @param rlength row length @@ -521,13 +525,42 @@ public class KeyValue implements Writable, HeapSize { * @param value column value * @param voffset value offset * @param vlength value length - * @return The newly created byte array. + * @throws IllegalArgumentException */ - static byte [] createByteArray(final byte [] row, final int roffset, - final int rlength, final byte [] family, final int foffset, int flength, - final byte [] qualifier, final int qoffset, int qlength, - final long timestamp, final Type type, - final byte [] value, final int voffset, int vlength) { + public KeyValue(byte [] buffer, + final byte [] row, final int roffset, final int rlength, + final byte [] family, final int foffset, final int flength, + final byte [] qualifier, final int qoffset, final int qlength, + final long timestamp, final Type type, + final byte [] value, final int voffset, final int vlength) { + + this.bytes = buffer; + this.length = writeByteArray(buffer, row, roffset, rlength, + family, foffset, flength, qualifier, qoffset, qlength, + timestamp, type, value, voffset, vlength); + this.offset = 0; + } + + /** + * Checks the parameters passed to a constructor. + * + * @param row row key + * @param rlength row length + * @param family family name + * @param flength family length + * @param qualifier column qualifier + * @param qlength qualifier length + * @param value column value + * @param vlength value length + * + * @throws IllegalArgumentException an illegal value was passed + */ + private static void checkParameters(final byte [] row, final int rlength, + final byte [] family, int flength, + final byte [] qualifier, int qlength, + final byte [] value, int vlength) + throws IllegalArgumentException { + if (rlength > Short.MAX_VALUE) { throw new IllegalArgumentException("Row > " + Short.MAX_VALUE); } @@ -545,24 +578,108 @@ public class KeyValue implements Writable, HeapSize { throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE); } // Key length - long longkeylength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength; - if (longkeylength > Integer.MAX_VALUE) { - throw new IllegalArgumentException("keylength " + longkeylength + " > " + - Integer.MAX_VALUE); + long longKeyLength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength; + if (longKeyLength > Integer.MAX_VALUE) { + throw new IllegalArgumentException("keylength " + longKeyLength + " > " + Integer.MAX_VALUE); } - int keylength = (int)longkeylength; // Value length vlength = value == null? 0 : vlength; if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON - throw new IllegalArgumentException("Valuer > " + - HConstants.MAXIMUM_VALUE_LENGTH); + throw new IllegalArgumentException("Value > " + HConstants.MAXIMUM_VALUE_LENGTH); } + } + + /** + * Write KeyValue format into the provided byte array. + * + * @param buffer the bytes buffer to use + * @param row row key + * @param roffset row offset + * @param rlength row length + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * @param timestamp version timestamp + * @param type key type + * @param value column value + * @param voffset value offset + * @param vlength value length + * + * @return The number of useful bytes in the buffer. + */ + static int writeByteArray(byte [] buffer, + final byte [] row, final int roffset, final int rlength, + final byte [] family, final int foffset, int flength, + final byte [] qualifier, final int qoffset, int qlength, + final long timestamp, final Type type, + final byte [] value, final int voffset, int vlength) { + + checkParameters(row, rlength, family, flength, qualifier, qlength, value, vlength); + + int keyLength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength; + int keyValueLength = KEYVALUE_INFRASTRUCTURE_SIZE + keyLength + vlength; + if (keyValueLength > buffer.length) { + throw new IllegalArgumentException("Buffer size < " + keyValueLength); + } + + // Write key, value and key row length. + int pos = 0; + pos = Bytes.putInt(buffer, pos, keyLength); + pos = Bytes.putInt(buffer, pos, vlength); + pos = Bytes.putShort(buffer, pos, (short)(rlength & 0x0000ffff)); + pos = Bytes.putBytes(buffer, pos, row, roffset, rlength); + pos = Bytes.putByte(buffer, pos, (byte) (flength & 0x0000ff)); + if (flength != 0) { + pos = Bytes.putBytes(buffer, pos, family, foffset, flength); + } + if (qlength != 0) { + pos = Bytes.putBytes(buffer, pos, qualifier, qoffset, qlength); + } + pos = Bytes.putLong(buffer, pos, timestamp); + pos = Bytes.putByte(buffer, pos, type.getCode()); + if (value != null && value.length > 0) { + pos = Bytes.putBytes(buffer, pos, value, voffset, vlength); + } + + return keyValueLength; + } + + /** + * Write KeyValue format into a byte array. + * + * @param row row key + * @param roffset row offset + * @param rlength row length + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * @param timestamp version timestamp + * @param type key type + * @param value column value + * @param voffset value offset + * @param vlength value length + * @return The newly created byte array. + */ + static byte [] createByteArray(final byte [] row, final int roffset, + final int rlength, final byte [] family, final int foffset, int flength, + final byte [] qualifier, final int qoffset, int qlength, + final long timestamp, final Type type, + final byte [] value, final int voffset, int vlength) { + + checkParameters(row, rlength, family, flength, qualifier, qlength, value, vlength); // Allocate right-sized byte array. - byte [] bytes = new byte[KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength]; + int keyLength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength; + byte [] bytes = new byte[KEYVALUE_INFRASTRUCTURE_SIZE + keyLength + vlength]; // Write key, value and key row length. int pos = 0; - pos = Bytes.putInt(bytes, pos, keylength); + pos = Bytes.putInt(bytes, pos, keyLength); pos = Bytes.putInt(bytes, pos, vlength); pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff)); pos = Bytes.putBytes(bytes, pos, row, roffset, rlength); @@ -1007,6 +1124,18 @@ public class KeyValue implements Writable, HeapSize { } /** + * Loads this object's value into the provided ByteBuffer. + *

+ * Does not clear or flip the buffer. + * + * @param dst the buffer where to write the value + */ + public void loadValue(ByteBuffer dst) { + + dst.put(getBuffer(), getValueOffset(), getValueLength()); + } + + /** * Primarily for use client-side. Returns the row of this KeyValue in a new * byte array.

* @@ -1295,6 +1424,33 @@ public class KeyValue implements Writable, HeapSize { } /** + * Checks if column matches. + * + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * @return True if column matches + */ + public boolean matchingColumn(final byte [] family, final int foffset, final int flength, + final byte [] qualifier, final int qoffset, final int qlength) { + int rl = getRowLength(); + int o = getFamilyOffset(rl); + int fl = getFamilyLength(o); + int ql = getQualifierLength(rl,fl); + + if (!Bytes.equals(family, foffset, flength, this.bytes, o, fl)) { + return false; + } + if (qualifier == null || qlength == 0) { + return (ql == 0); + } + return Bytes.equals(qualifier, qoffset, qlength, this.bytes, o + fl, ql); + } + + /** * @param left * @param loffset * @param llength @@ -1819,6 +1975,74 @@ public class KeyValue implements Writable, HeapSize { HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0); } + + /** + * Create a KeyValue for the specified row, family and qualifier that would be + * smaller than all other possible KeyValues that have the same row, + * family, qualifier. + * Used for seeking. + * + * @param buffer the buffer to use for the new KeyValue object + * @param row the value key + * @param family family name + * @param qualifier column qualifier + * + * @return First possible key on passed Row, Family, Qualifier. + * + * @throws IllegalArgumentException The resulting KeyValue object would be larger + * than the provided buffer or than Integer.MAX_VALUE + */ + public static KeyValue createFirstOnRow(byte [] buffer, final byte [] row, + final byte [] family, final byte [] qualifier) + throws IllegalArgumentException { + + return createFirstOnRow(buffer, row, family, 0, family.length, qualifier, 0, qualifier.length); + } + + /** + * Create a KeyValue for the specified row, family and qualifier that would be + * smaller than all other possible KeyValues that have the same row, + * family, qualifier. + * Used for seeking. + * + * @param buffer the buffer to use for the new KeyValue object + * @param row the value key + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * + * @return First possible key on passed Row, Family, Qualifier. + * + * @throws IllegalArgumentException The resulting KeyValue object would be larger + * than the provided buffer or than Integer.MAX_VALUE + */ + public static KeyValue createFirstOnRow(byte [] buffer, final byte [] row, + final byte [] family, final int foffset, final int flength, + final byte [] qualifier, final int qoffset, final int qlength) + throws IllegalArgumentException { + + long lLength = KeyValue.KEY_INFRASTRUCTURE_SIZE + + row.length + flength + qlength + + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; + + if (lLength > Integer.MAX_VALUE) { + throw new IllegalArgumentException("KeyValue length " + lLength + " > " + Integer.MAX_VALUE); + } + int iLength = (int) lLength; + if (buffer.length < iLength) { + throw new IllegalArgumentException("Buffer size " + buffer.length + " < " + iLength); + } + return new KeyValue(buffer, + row, 0, row.length, + family, foffset, flength, + qualifier, qoffset, qlength, + HConstants.LATEST_TIMESTAMP, KeyValue.Type.Maximum, + null, 0, 0); + } + /** * Create a KeyValue for the specified row, family and qualifier that would be * larger than or equal to all other possible KeyValues that have the same diff --git src/main/java/org/apache/hadoop/hbase/client/Result.java src/main/java/org/apache/hadoop/hbase/client/Result.java index df0b3ef..bc1efd7 100644 --- src/main/java/org/apache/hadoop/hbase/client/Result.java +++ src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.client; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -33,6 +34,7 @@ import java.util.TreeMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.SplitKeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -80,6 +82,9 @@ public class Result implements Writable, WritableWithSize { private transient byte [] row = null; private ImmutableBytesWritable bytes = null; + // never use directly + private static byte [] buffer = new byte[1024]; + /** * Constructor used for Writable. */ @@ -228,6 +233,40 @@ public class Result implements Writable, WritableWithSize { } /** + * Searches for the latest value for the specified column. + * + * @param kvs the array to search + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * + * @return the index where the value was found, or -1 otherwise + */ + protected int binarySearch(final KeyValue [] kvs, + final byte [] family, final int foffset, final int flength, + final byte [] qualifier, final int qoffset, final int qlength) { + + KeyValue searchTerm = KeyValue.createFirstOnRow(buffer, kvs[0].getRow(), + family, foffset, flength, + qualifier, qoffset, qlength); + + // pos === ( -(insertion point) - 1) + int pos = Arrays.binarySearch(kvs, searchTerm, KeyValue.COMPARATOR); + // never will exact match + if (pos < 0) { + pos = (pos+1) * -1; + // pos is now insertion point + } + if (pos == kvs.length) { + return -1; // doesn't exist + } + return pos; + } + + /** * The KeyValue for the most recent for a given column. If the column does * not exist in the result set - if it wasn't selected in the query (Get/Scan) * or just does not exist in the row the return value is null. @@ -253,6 +292,38 @@ public class Result implements Writable, WritableWithSize { } /** + * The most recent KeyValue for a given column. If the column does not exist in the result set - + * if it wasn't selected in the query (Get/Scan) or just does not exist in the row the return + * value is null. + * + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * + * @return KeyValue for the column or null + */ + public KeyValue getColumnLatest(byte [] family, int foffset, int flength, + byte [] qualifier, int qoffset, int qlength) { + + KeyValue [] kvs = raw(); // side effect possibly. + if (kvs == null || kvs.length == 0) { + return null; + } + int pos = binarySearch(kvs, family, foffset, flength, qualifier, qoffset, qlength); + if (pos == -1) { + return null; + } + KeyValue kv = kvs[pos]; + if (kv.matchingColumn(family, foffset, flength, qualifier, qoffset, qlength)) { + return kv; + } + return null; + } + + /** * Get the latest version of the specified column. * @param family family name * @param qualifier column qualifier @@ -267,6 +338,116 @@ public class Result implements Writable, WritableWithSize { } /** + * Loads the latest version of the specified column into the provided ByteBuffer. + *

+ * Does not clear or flip the buffer. + * + * @param family family name + * @param qualifier column qualifier + * @param dst the buffer where to write the value + * + * @return true if a value was found, false otherwise + */ + public boolean loadValue(byte [] family, byte [] qualifier, ByteBuffer dst) { + + return loadValue(family, 0, family.length, qualifier, 0, qualifier.length, dst); + } + + /** + * Loads the latest version of the specified column into the provided ByteBuffer. + *

+ * Does not clear or flip the buffer. + * + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * @param dst the buffer where to write the value + * + * @return true if a value was found, false otherwise + */ + public boolean loadValue(byte [] family, int foffset, int flength, + byte [] qualifier, int qoffset, int qlength, + ByteBuffer dst) { + + KeyValue kv = getColumnLatest(family, foffset, flength, qualifier, qoffset, qlength); + + if (kv == null) { + return false; + } + kv.loadValue(dst); + return true; + } + + /** + * Checks if the specified column contains a non-empty value. + * + * @param family family name + * @param qualifier column qualifier + * + * @return whether or not a latest value exists and is not empty + */ + public boolean containsNonEmptyColumn(byte [] family, byte [] qualifier) { + + return containsNonEmptyColumn(family, 0, family.length, qualifier, 0, qualifier.length); + } + + /** + * Checks if the specified column contains a non-empty value. + * + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * + * @return whether or not a latest value exists and is not empty + */ + public boolean containsNonEmptyColumn(byte [] family, int foffset, int flength, + byte [] qualifier, int qoffset, int qlength) { + + KeyValue kv = getColumnLatest(family, foffset, flength, qualifier, qoffset, qlength); + + return (kv != null) && (kv.getValueLength() > 0); + } + + /** + * Checks if the specified column contains an empty value. + * + * @param family family name + * @param qualifier column qualifier + * + * @return whether or not a latest value exists and is empty + */ + public boolean containsEmptyColumn(byte [] family, byte [] qualifier) { + + return containsEmptyColumn(family, 0, family.length, qualifier, 0, qualifier.length); + } + + /** + * Checks if the specified column contains an empty value. + * + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * + * @return whether or not a latest value exists and is empty + */ + public boolean containsEmptyColumn(byte [] family, int foffset, int flength, + byte [] qualifier, int qoffset, int qlength) { + + KeyValue kv = getColumnLatest(family, foffset, flength, qualifier, qoffset, qlength); + + return (kv != null) && (kv.getValueLength() == 0); + } + + /** * Checks for existence of the specified column. * @param family family name * @param qualifier column qualifier @@ -278,6 +459,24 @@ public class Result implements Writable, WritableWithSize { } /** + * Checks for existence of the specified column. + * + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * + * @return true if at least one value exists in the result, false if not + */ + public boolean containsColumn(byte [] family, int foffset, int flength, + byte [] qualifier, int qoffset, int qlength) { + + return getColumnLatest(family, foffset, flength, qualifier, qoffset, qlength) != null; + } + + /** * Map of families to all versions of its qualifiers and values. *

* Returns a three level Map of the form: diff --git src/test/java/org/apache/hadoop/hbase/client/TestResult.java src/test/java/org/apache/hadoop/hbase/client/TestResult.java index f9e29c2..c104240 100644 --- src/test/java/org/apache/hadoop/hbase/client/TestResult.java +++ src/test/java/org/apache/hadoop/hbase/client/TestResult.java @@ -28,6 +28,7 @@ import org.junit.experimental.categories.Category; import static org.apache.hadoop.hbase.HBaseTestCase.assertByteEquals; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -61,6 +62,8 @@ public class TestResult extends TestCase { Arrays.sort(kvs, KeyValue.COMPARATOR); Result r = new Result(kvs); + + ByteBuffer loadValueBuffer = ByteBuffer.allocate(1024); for (int i = 0; i < 100; ++i) { final byte[] qf = Bytes.toBytes(i); @@ -72,6 +75,11 @@ public class TestResult extends TestCase { assertEquals(ks.get(0), r.getColumnLatest(family, qf)); assertByteEquals(Bytes.add(value, Bytes.toBytes(i)), r.getValue(family, qf)); assertTrue(r.containsColumn(family, qf)); + + loadValueBuffer.clear(); + r.loadValue(family, qf, loadValueBuffer); + loadValueBuffer.flip(); + assertEquals(ByteBuffer.wrap(Bytes.add(value, Bytes.toBytes(i))), loadValueBuffer); } } public void testMultiVersion() throws Exception { @@ -84,6 +92,8 @@ public class TestResult extends TestCase { Arrays.sort(kvs, KeyValue.COMPARATOR); + ByteBuffer loadValueBuffer = ByteBuffer.allocate(1024); + Result r = new Result(kvs); for (int i = 0; i < 100; ++i) { final byte[] qf = Bytes.toBytes(i); @@ -96,7 +106,66 @@ public class TestResult extends TestCase { assertEquals(ks.get(0), r.getColumnLatest(family, qf)); assertByteEquals(Bytes.add(value, Bytes.toBytes(i)), r.getValue(family, qf)); assertTrue(r.containsColumn(family, qf)); + + loadValueBuffer.clear(); + r.loadValue(family, qf, loadValueBuffer); + loadValueBuffer.flip(); + assertEquals(ByteBuffer.wrap(Bytes.add(value, Bytes.toBytes(i))), loadValueBuffer); + } + } + + public void testPerformance() throws Exception { + + final int n = 5; + final int m = 100000000; + + StringBuilder valueSB = new StringBuilder(); + for (int i = 0; i < 100; i++) + valueSB.append((byte)(Math.random() * 10)); + + StringBuilder rowSB = new StringBuilder(); + for (int i = 0; i < 50; i++) + rowSB.append((byte)(Math.random() * 10)); + + KeyValue [] kvs = genKVs(Bytes.toBytes(rowSB.toString()), family, + Bytes.toBytes(valueSB.toString()), 1, n); + Arrays.sort(kvs, KeyValue.COMPARATOR); + ByteBuffer loadValueBuffer = ByteBuffer.allocate(1024); + Result r = new Result(kvs); + + byte[][] qfs = new byte[n][Bytes.SIZEOF_INT]; + for (int i = 0; i < n; ++i) { + System.arraycopy(qfs[i], 0, Bytes.toBytes(i), 0, Bytes.SIZEOF_INT); } + + r.getValue(family, qfs[0]); + r.loadValue(family, qfs[0], loadValueBuffer); + + System.gc(); + long start = System.currentTimeMillis(); + + for (int k = 0; k < m; k++) + for (int i = 0; i < n; ++i) { + + loadValueBuffer.clear(); + r.loadValue(family, qfs[i], loadValueBuffer); + loadValueBuffer.flip(); + } + + long stop = System.currentTimeMillis(); + System.out.println("loadValue(): " + (stop - start)); + + System.gc(); + start = System.currentTimeMillis(); + + for (int k = 0; k < m; k++) + for (int i = 0; i < n; i++) { + + r.getValue(family, qfs[i]); + } + + stop = System.currentTimeMillis(); + System.out.println("getValue(): " + (stop - start)); } /**