Index: src/test/java/org/apache/hadoop/hbase/KeyValueTestUtil.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/KeyValueTestUtil.java (revision 1057786) +++ src/test/java/org/apache/hadoop/hbase/KeyValueTestUtil.java (working copy) @@ -22,33 +22,20 @@ import org.apache.hadoop.hbase.util.Bytes; +/** + * Utility for creating {@link KeyValue} + */ public class KeyValueTestUtil { - public static KeyValue create( - String row, - String family, - String qualifier, - long timestamp, - String value) - { + public static KeyValue create(String row, String family, String qualifier, + long timestamp, String value) { return create(row, family, qualifier, timestamp, KeyValue.Type.Put, value); } - public static KeyValue create( - String row, - String family, - String qualifier, - long timestamp, - KeyValue.Type type, - String value) - { - return new KeyValue( - Bytes.toBytes(row), - Bytes.toBytes(family), - Bytes.toBytes(qualifier), - timestamp, - type, - Bytes.toBytes(value) + public static KeyValue create(String row, String family, String qualifier, + long timestamp, KeyValue.Type type, String value) { + return new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), + Bytes.toBytes(qualifier), timestamp, type, Bytes.toBytes(value) ); } -} +} \ No newline at end of file Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java (revision 1057786) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java (working copy) @@ -76,9 +76,9 @@ final AtomicLong failedAt = new AtomicLong(); Runnable reader = new Runnable() { public void run() { - long prev = rwcc.memstoreReadPoint(); + long prev = rwcc.getReadPoint(); while (!finished.get()) { - long newPrev = rwcc.memstoreReadPoint(); + long newPrev = rwcc.getReadPoint(); if (newPrev < prev) { // serious problem. System.out.println("Reader got out of order, prev: " + Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1057786) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy) @@ -24,8 +24,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.NavigableSet; -import java.util.TreeSet; import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -35,7 +33,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; @@ -235,7 +232,7 @@ rwcc.beginMemstoreInsert(); KeyValue kv1 = new KeyValue(row, f, q1, v); - kv1.setMemstoreTS(w.getWriteNumber()); + kv1.setSequenceNumber(w.getWriteNumber()); memstore.add(kv1); ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); @@ -250,7 +247,7 @@ w = rwcc.beginMemstoreInsert(); KeyValue kv2 = new KeyValue(row, f, q2, v); - kv2.setMemstoreTS(w.getWriteNumber()); + kv2.setSequenceNumber(w.getWriteNumber()); memstore.add(kv2); ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); @@ -283,11 +280,11 @@ rwcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); - kv11.setMemstoreTS(w.getWriteNumber()); + kv11.setSequenceNumber(w.getWriteNumber()); memstore.add(kv11); KeyValue kv12 = new KeyValue(row, f, q2, v1); - kv12.setMemstoreTS(w.getWriteNumber()); + kv12.setSequenceNumber(w.getWriteNumber()); memstore.add(kv12); rwcc.completeMemstoreInsert(w); @@ -299,11 +296,11 @@ // START INSERT 2: Write both columns val2 w = rwcc.beginMemstoreInsert(); KeyValue kv21 = new KeyValue(row, f, q1, v2); - kv21.setMemstoreTS(w.getWriteNumber()); + kv21.setSequenceNumber(w.getWriteNumber()); memstore.add(kv21); KeyValue kv22 = new KeyValue(row, f, q2, v2); - kv22.setMemstoreTS(w.getWriteNumber()); + kv22.setSequenceNumber(w.getWriteNumber()); memstore.add(kv22); // BEFORE COMPLETING INSERT 2, SEE FIRST KVS @@ -338,11 +335,11 @@ rwcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); - kv11.setMemstoreTS(w.getWriteNumber()); + kv11.setSequenceNumber(w.getWriteNumber()); memstore.add(kv11); KeyValue kv12 = new KeyValue(row, f, q2, v1); - kv12.setMemstoreTS(w.getWriteNumber()); + kv12.setSequenceNumber(w.getWriteNumber()); memstore.add(kv12); rwcc.completeMemstoreInsert(w); @@ -355,7 +352,7 @@ w = rwcc.beginMemstoreInsert(); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); - kvDel.setMemstoreTS(w.getWriteNumber()); + kvDel.setSequenceNumber(w.getWriteNumber()); memstore.add(kvDel); // BEFORE COMPLETING DELETE, SEE FIRST KVS @@ -415,7 +412,7 @@ byte[] v = Bytes.toBytes(i); KeyValue kv = new KeyValue(row, f, q1, i, v); - kv.setMemstoreTS(w.getWriteNumber()); + kv.setSequenceNumber(w.getWriteNumber()); memstore.add(kv); rwcc.completeMemstoreInsert(w); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 1057786) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy) @@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; -import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import com.google.common.base.Joiner; @@ -306,7 +305,8 @@ Bytes.toBytes(oldValue))); // update during the snapshot. - long ret = this.store.updateColumnValue(row, family, qf1, newValue); + long ret = this.store.updateColumnValue(new KeyValue(row, family, qf1, + Bytes.toBytes(newValue))); // memstore should have grown by some amount. assertTrue(ret > 0); @@ -366,8 +366,8 @@ for ( int i = 0 ; i < 10000 ; ++i) { newValue++; - long ret = this.store.updateColumnValue(row, family, qf1, newValue); - long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue); + long ret = this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue))); + long ret2 = this.store.updateColumnValue(new KeyValue(row2, family, qf1, Bytes.toBytes(newValue))); if (ret != 0) System.out.println("ret: " + ret); if (ret2 != 0) System.out.println("ret2: " + ret2); @@ -406,7 +406,7 @@ this.store.snapshot(); // update during the snapshot, the exact same TS as the Put (lololol) - long ret = this.store.updateColumnValue(row, family, qf1, newValue); + long ret = this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue))); // memstore should have grown by some amount. assertTrue(ret > 0); @@ -418,11 +418,11 @@ // now increment again: newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue))); // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again: newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue))); // the second TS should be TS=2 or higher., even though 'time=1' right now. @@ -445,7 +445,7 @@ mee.setValue(2); // time goes up slightly newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + this.store.updateColumnValue(new KeyValue(row, family, qf1, Bytes.toBytes(newValue))); results = HBaseTestingUtility.getFromStoreFile(store, get); assertEquals(2, results.size()); Index: src/test/java/org/apache/hadoop/hbase/TestKeyValue.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestKeyValue.java (revision 1057786) +++ src/test/java/org/apache/hadoop/hbase/TestKeyValue.java (working copy) @@ -30,10 +30,46 @@ import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; public class TestKeyValue extends TestCase { private final Log LOG = LogFactory.getLog(this.getClass().getName()); + public void testSequenceNumberCompare() { + long ts = System.currentTimeMillis(); + KeyValue kvlower = new KeyValue(HConstants.CATALOG_FAMILY, + HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts, ts, + HConstants.CATALOG_FAMILY); + KeyValue kvhigher = new KeyValue(HConstants.CATALOG_FAMILY, + HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts, ts + 1, + HConstants.CATALOG_FAMILY); + int compare = KeyValue.COMPARATOR.compare(kvlower, kvhigher); + // Compare > 0 because kvhigher must sort before kvlower. + Assert.assertTrue(compare > 0); + assertEquals(0, KeyValue.COMPARATOR.compare(kvlower, kvlower)); + assertEquals(0, KeyValue.COMPARATOR.compare(kvhigher, kvhigher)); + compare = KeyValue.COMPARATOR.compare(kvhigher, kvlower); + Assert.assertTrue(compare < 0); + } + + public void testSettingSequenceNumber() { + long ts = System.currentTimeMillis(); + KeyValue kv = new KeyValue(HConstants.CATALOG_FAMILY, + HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY, ts, + HConstants.CATALOG_FAMILY); + assertEquals(kv.getSequenceNumber(), KeyValue.DEFAULT_SEQUENCE_NUMBER); + kv.setSequenceNumber(ts); + assertEquals(kv.getSequenceNumber(), ts); + } + + public void testVersion() { + int expected = KeyValue.getVersion(KeyValue.VERSION_BITS); + KeyValue kv = new KeyValue(Bytes.toBytes("test"), + System.currentTimeMillis(), Type.Put); + int found = kv.getVersion(); + assertEquals(expected, found); + } + public void testColumnCompare() throws Exception { final byte [] a = Bytes.toBytes("aaa"); byte [] family1 = Bytes.toBytes("abc"); @@ -42,6 +78,7 @@ byte [] qualifier2 = Bytes.toBytes("ef"); KeyValue aaa = new KeyValue(a, family1, qualifier1, 0L, Type.Put, a); + LOG.info("aaa=" + aaa.toString()); assertFalse(aaa.matchingColumn(family2, qualifier2)); assertTrue(aaa.matchingColumn(family1, qualifier1)); aaa = new KeyValue(a, family2, qualifier2, 0L, Type.Put, a); Index: src/test/java/org/apache/hadoop/hbase/util/TestBytes.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/TestBytes.java (revision 1057786) +++ src/test/java/org/apache/hadoop/hbase/util/TestBytes.java (working copy) @@ -109,6 +109,11 @@ byte [] b = Bytes.toBytes(longs[i]); assertEquals(longs[i], Bytes.toLong(b)); } + byte [] bytes = new byte[16]; + final int offset = 8; + bytes = Bytes.toBytes(bytes, offset, Integer.MAX_VALUE); + long result = Bytes.toLong(bytes, offset); + assertEquals(Integer.MAX_VALUE, result); } public void testToFloat() throws Exception { Index: src/test/java/org/apache/hadoop/hbase/client/TestResult.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestResult.java (revision 1057786) +++ src/test/java/org/apache/hadoop/hbase/client/TestResult.java (working copy) @@ -20,17 +20,16 @@ package org.apache.hadoop.hbase.client; -import junit.framework.TestCase; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.util.Bytes; - import static org.apache.hadoop.hbase.HBaseTestCase.assertByteEquals; import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; +import junit.framework.TestCase; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; + public class TestResult extends TestCase { static KeyValue[] genKVs(final byte[] row, final byte[] family, Index: src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (revision 1057786) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (working copy) @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.util.LinkedList; -import java.util.concurrent.atomic.AtomicLong; /** * Manages the read/write consistency within memstore. This provides @@ -29,8 +28,7 @@ * the new writes for readers to read (thus forming atomic transactions). */ public class ReadWriteConsistencyControl { - private volatile long memstoreRead = 0; - private volatile long memstoreWrite = 0; + private volatile long readPoint = 0; private final Object readWaiters = new Object(); @@ -38,47 +36,9 @@ private final LinkedList writeQueue = new LinkedList(); - private static final ThreadLocal perThreadReadPoint = - new ThreadLocal(); - - /** - * Get this thread's read point. Used primarily by the memstore scanner to - * know which values to skip (ie: have not been completed/committed to - * memstore). - */ - public static long getThreadReadPoint() { - return perThreadReadPoint.get(); - } - - /** - * Set the thread read point to the given value. The thread RWCC - * is used by the Memstore scanner so it knows which values to skip. - * Give it a value of 0 if you want everything. - */ - public static void setThreadReadPoint(long readPoint) { - perThreadReadPoint.set(readPoint); - } - - /** - * Set the thread RWCC read point to whatever the current read point is in - * this particular instance of RWCC. Returns the new thread read point value. - */ - public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) { - perThreadReadPoint.set(rwcc.memstoreReadPoint()); - return getThreadReadPoint(); - } - - /** - * Set the thread RWCC read point to 0 (include everything). - */ - public static void resetThreadReadPoint() { - perThreadReadPoint.set(0L); - } - - public WriteEntry beginMemstoreInsert() { + public WriteEntry beginMemstoreInsert(final long writePoint) { synchronized (writeQueue) { - long nextWriteNumber = ++memstoreWrite; - WriteEntry e = new WriteEntry(nextWriteNumber); + WriteEntry e = new WriteEntry(writePoint); writeQueue.add(e); return e; } @@ -115,7 +75,7 @@ if (nextReadValue > 0) { synchronized (readWaiters) { - memstoreRead = nextReadValue; + readPoint = nextReadValue; readWaiters.notifyAll(); } @@ -124,7 +84,7 @@ boolean interrupted = false; synchronized (readWaiters) { - while (memstoreRead < e.getWriteNumber()) { + while (readPoint < e.getWriteNumber()) { try { readWaiters.wait(0); } catch (InterruptedException ie) { @@ -137,8 +97,8 @@ if (interrupted) Thread.currentThread().interrupt(); } - public long memstoreReadPoint() { - return memstoreRead; + public long getReadPoint() { + return readPoint; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1057786) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -40,7 +40,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; @@ -144,9 +143,6 @@ private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas final static Object [] NO_ARGS = new Object []{}; - // used to indirectly tell syncFs to force the sync - private boolean forceSync = false; - public interface Reader { void init(FileSystem fs, Path path, Configuration c) throws IOException; void close() throws IOException; @@ -435,7 +431,7 @@ } /** - * @return log sequence number + * @return The current log sequence number */ public long getSequenceNumber() { return logSeqNum.get(); @@ -926,6 +922,7 @@ // actual name. byte [] hriKey = info.getEncodedNameAsBytes(); this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + // Update edits adding in this sequence number HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); doWrite(info, logKey, edits); this.numEntries.incrementAndGet(); @@ -940,6 +937,36 @@ } /** + * Increment sequence number and add new number to kv. + * Use this method when you are skipping the WAL; {@link KeyValues} + * still need to be decorated with a valid sequence number though edit is + * not in the WAL. + * @param kv KeyValue to decorate. + */ + public void insertSequenceNumber(final KeyValue kv) { + synchronized (this.updateLock) { + kv.setSequenceNumber(obtainSeqNum()); + } + } + + /** + * Increment sequence number and add this new number to KeyValues passed in + * familyMap. + * Use this method when you are skipping the WAL; {@link KeyValues} + * still need to be decorated with a valid sequence number though edit is + * not in the WAL. + * @param familyMap KeyValues to decorate by family. + */ + public void insertSequenceNumber(final Map> familyMap) { + synchronized (this.updateLock) { + long sequenceNumber = obtainSeqNum(); + for (List kvs: familyMap.values()) { + for (KeyValue kv: kvs) kv.setSequenceNumber(sequenceNumber); + } + } + } + + /** * This thread is responsible to call syncFs and buffer up the writers while * it happens. */ @@ -947,8 +974,6 @@ private final long optionalFlushInterval; - private boolean syncerShuttingDown = false; - LogSyncer(long optionalFlushInterval) { this.optionalFlushInterval = optionalFlushInterval; } @@ -969,7 +994,6 @@ } catch (InterruptedException e) { LOG.debug(getName() + " interrupted while waiting for sync requests"); } finally { - syncerShuttingDown = true; LOG.info(getName() + " exiting"); } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1057786) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -23,7 +23,6 @@ import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.rmi.UnexpectedException; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -348,73 +347,25 @@ } /** - * Given the specs of a column, update it, first by inserting a new record, - * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS - * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying - * store will ensure that the insert/delete each are atomic. A scanner/reader will either - * get the new value, or the old value and all readers will eventually only see the new - * value after the old was removed. + * Update a column value. * - * @param row - * @param family - * @param qualifier - * @param newValue - * @param now - * @return Timestamp + * @param kv + * @return Difference in size made by addition of this edit to memstore. */ - public long updateColumnValue(byte[] row, - byte[] family, - byte[] qualifier, - long newValue, - long now) { + public long updateColumnValue(final KeyValue kv) { this.lock.readLock().lock(); try { - KeyValue firstKv = KeyValue.createFirstOnRow( - row, family, qualifier); - // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. - SortedSet snSs = snapshot.tailSet(firstKv); - if (!snSs.isEmpty()) { - KeyValue snKv = snSs.first(); - // is there a matching KV in the snapshot? - if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) { - if (snKv.getTimestamp() == now) { - // poop, - now += 1; - } - } - } - - // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary. - // But the timestamp should also be max(now, mostRecentTsInMemstore) - - // so we cant add the new KV w/o knowing what's there already, but we also - // want to take this chance to delete some kvs. So two loops (sad) - - SortedSet ss = kvset.tailSet(firstKv); - Iterator it = ss.iterator(); - while ( it.hasNext() ) { - KeyValue kv = it.next(); - - // if this isnt the row we are interested in, then bail: - if (!firstKv.matchingColumn(family,qualifier) || !firstKv.matchingRow(kv) ) { - break; // rows dont match, bail. - } - - // if the qualifier matches and it's a put, just RM it out of the kvset. - if (firstKv.matchingQualifier(kv)) { - // to be extra safe we only remove Puts that have a memstoreTS==0 - if (kv.getType() == KeyValue.Type.Put.getCode()) { - now = Math.max(now, kv.getTimestamp()); - } - } - } - - // create or update (upsert) a new KeyValue with - // 'now' and a 0 memstoreTS == immediately visible - return upsert(Arrays.asList(new KeyValue [] { - new KeyValue(row, family, qualifier, now, - Bytes.toBytes(newValue)) - })); + // There was code in here where we were checking for a timestamp in + // advance of ours. If we found one, then we'd use the one in advance + // of ours instead of the ts that was in the kv. We were making a new + // kv to do this, a kv that would not be the same as whats out in the + // WAL. Seems no longer necessary to me now we have sequence number. + // I'm removing it from here and adding an assert instead into the upsert + // we call that will look for incidence of the kv timestamp being less + // than that of one already up in memstore. + // We were doing max(now, mostRecentTsInMemstore). + // St.Ack 20110113 + return upsert(kv); } finally { this.lock.readLock().unlock(); } @@ -427,10 +378,6 @@ * value for that row/family/qualifier. If a KeyValue did already exist, * it will then be removed. *

- * Currently the memstoreTS is kept at 0 so as each insert happens, it will - * be immediately visible. May want to change this so it is atomic across - * all KeyValues. - *

* This is called under row lock, so Get operations will still see updates * atomically. Scans will only see each KeyValue update as atomic. * @@ -442,7 +389,6 @@ try { long size = 0; for (KeyValue kv : kvs) { - kv.setMemstoreTS(0); size += upsert(kv); } return size; @@ -453,7 +399,7 @@ /** * Inserts the specified KeyValue into MemStore and deletes any existing - * versions of the same row/family/qualifier as the specified KeyValue. + * versions of the same row/family/qualifier at the specified KeyValue. *

* First, the specified KeyValue is inserted into the Memstore. *

@@ -467,38 +413,40 @@ long addedSize = add(kv); // Get the KeyValues for the row/family/qualifier regardless of timestamp. - // For this case we want to clean up any other puts + // and remove any old values to help keep a cap on version bloat. + // TODO: Do this out-of-band in a background thread; add the cleanup to + // a queue for another thread to clean -- St.Ack 20110113. KeyValue firstKv = KeyValue.createFirstOnRow( kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()); SortedSet ss = kvset.tailSet(firstKv); Iterator it = ss.iterator(); + KeyValue firstFound = null; while ( it.hasNext() ) { KeyValue cur = it.next(); - + if (firstFound != null) firstFound = cur; if (kv == cur) { + if (firstFound != cur) { + // If we are not first on row, then somethings up! + throw new IllegalStateException("firstFound=" + firstFound + + ", cur=" + cur); + } // ignore the one just put in continue; } + // if this isn't the row we are interested in, then bail - if (!kv.matchingRow(cur)) { - break; - } + if (!kv.matchingRow(cur)) break; - // if the qualifier matches and it's a put, remove it - if (kv.matchingQualifier(cur)) { - - // to be extra safe we only remove Puts that have a memstoreTS==0 - if (kv.getType() == KeyValue.Type.Put.getCode() && - kv.getMemstoreTS() == 0) { - // false means there was a change, so give us the size. - addedSize -= heapSizeChange(kv, true); - it.remove(); - } - } else { - // past the column, done - break; + // if the qualifier no longer matches, skip out + if (!kv.matchingQualifier(cur)) break; + + // If a a Put with matching r/cf/q, then remove. + if (kv.getType() == KeyValue.Type.Put.getCode()) { + // false means there was a change, so give us the size. + addedSize -= heapSizeChange(kv, true); + it.remove(); } } return addedSize; @@ -602,21 +550,12 @@ MemStoreScanner() { super(); - - //DebugPrint.println(" MS new@" + hashCode()); } protected KeyValue getNext(Iterator it) { KeyValue ret = null; - long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); - //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint); - while (ret == null && it.hasNext()) { - KeyValue v = it.next(); - if (v.getMemstoreTS() <= readPoint) { - // keep it. - ret = v; - } + ret = it.next(); } return ret; } @@ -638,14 +577,6 @@ kvsetNextRow = getNext(kvsetIt); snapshotNextRow = getNext(snapshotIt); - - //long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); - //DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " + - // kvset.size() + " threadread = " + readPoint); - //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " + - // snapshot.size() + " threadread = " + readPoint); - - KeyValue lowest = getLowest(); // has data := (lowest != null) @@ -686,9 +617,6 @@ snapshotNextRow = getNext(snapshotIt); } - //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint(); - //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " + - // getLowest() + " threadpoint=" + readpoint); return theNext; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1057786) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -139,6 +139,7 @@ static final String MERGEDIR = "merges"; final AtomicBoolean closed = new AtomicBoolean(false); + /* Closing can take some time; use the closing flag if there is stuff we don't * want to do while in closing state; e.g. like offer this region up to the * master as a region to close if the carrying regionserver is overloaded. @@ -146,10 +147,6 @@ */ final AtomicBoolean closing = new AtomicBoolean(false); - ////////////////////////////////////////////////////////////////////////////// - // Members - ////////////////////////////////////////////////////////////////////////////// - private final Set lockedRows = new TreeSet(Bytes.BYTES_COMPARATOR); private final Map lockIds = @@ -164,12 +161,6 @@ private ClassToInstanceMap protocolHandlers = MutableClassToInstanceMap.create(); - //These variable are just used for getting data out of the region, to test on - //client side - // private int numStores = 0; - // private int [] storeSize = null; - // private byte [] name = null; - final AtomicLong memstoreSize = new AtomicLong(0); /** @@ -231,8 +222,7 @@ private final long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard closes - final ReentrantReadWriteLock lock = - new ReentrantReadWriteLock(); + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); // Stop updates lock private final ReentrantReadWriteLock updatesLock = @@ -251,7 +241,10 @@ */ public final static String REGIONINFO_FILE = ".regioninfo"; + private final Object closeLock = new Object(); + /** + * Null-arg Constructor. * Should only be used for testing purposes */ public HRegion(){ @@ -466,7 +459,7 @@ } } - public ReadWriteConsistencyControl getRWCC() { + ReadWriteConsistencyControl getRWCC() { return rwcc; } @@ -487,8 +480,6 @@ return close(false); } - private final Object closeLock = new Object(); - /** * Close down this HRegion. Flush the cache unless abort parameter is true, * Shut down each HStore, don't service any more calls. @@ -586,10 +577,6 @@ this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5); } - ////////////////////////////////////////////////////////////////////////////// - // HRegion accessors - ////////////////////////////////////////////////////////////////////////////// - /** @return start key for region */ public byte [] getStartKey() { return this.regionInfo.getStartKey(); @@ -670,13 +657,6 @@ return ret; } - ////////////////////////////////////////////////////////////////////////////// - // HRegion maintenance. - // - // These methods are meant to be called periodically by the HRegionServer for - // upkeep. - ////////////////////////////////////////////////////////////////////////////// - /** @return returns size of largest HStore. */ public long getLargestHStoreSize() { long size = 0; @@ -1075,9 +1055,6 @@ return currentSequenceId; } - ////////////////////////////////////////////////////////////////////////////// - // get() methods for client use. - ////////////////////////////////////////////////////////////////////////////// /** * Return all the data for the row that matches row exactly, * or the one that immediately preceeds it, at or immediately before @@ -1311,6 +1288,8 @@ addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, regionInfo.getTableDesc().getName(), walEdit, now); + } else { + this.log.insertSequenceNumber(familyMap); } // Now make changes to the memstore. @@ -1787,6 +1766,8 @@ addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, regionInfo.getTableDesc().getName(), walEdit, now); + } else { + this.log.insertSequenceNumber(familyMap); } long addedSize = applyFamilyMapToMemstore(familyMap); @@ -1826,7 +1807,6 @@ Store store = getStore(family); for (KeyValue kv: edits) { - kv.setMemstoreTS(w.getWriteNumber()); size += store.add(kv); } } @@ -2328,7 +2308,10 @@ private int batch; private int isScan; private boolean filterClosed = false; - private long readPt; + private final long readPoint = rwcc.getReadPoint(); + /* + // Don't show kvs newer than this scanner's read point. + if (kv.getSequenceNumber() > this.readPoint) continue;*/ public HRegionInfo getRegionName() { return regionInfo; @@ -2346,8 +2329,6 @@ // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); - List scanners = new ArrayList(); if (additionalScanners != null) { scanners.addAll(additionalScanners); @@ -3251,10 +3232,9 @@ Get get = new Get(row); get.addColumn(family, qualifier); - // we don't want to invoke coprocessor in this case; ICV is wrapped + // We don't want to invoke coprocessor in this case; ICV is wrapped // in HRegionServer, so we leave getLastIncrement alone List results = getLastIncrement(get); - if (!results.isEmpty()) { KeyValue kv = results.get(0); byte [] buffer = kv.getBuffer(); @@ -3262,25 +3242,21 @@ result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); } - // build the KeyValue now: - KeyValue newKv = new KeyValue(row, family, - qualifier, EnvironmentEdgeManager.currentTimeMillis(), - Bytes.toBytes(result)); + // Build the KeyValue now: + long now = EnvironmentEdgeManager.currentTimeMillis(); + KeyValue kv = new KeyValue(row, family, qualifier, now, Bytes.toBytes(result)); - // now log it: + // Now log it. this.log.append (or this.log.setSequence) write the + // sequence number into the KeyValue edit. if (writeToWAL) { - long now = EnvironmentEdgeManager.currentTimeMillis(); WALEdit walEdit = new WALEdit(); - walEdit.add(newKv); + walEdit.add(kv); this.log.append(regionInfo, regionInfo.getTableDesc().getName(), walEdit, now); + } else { + this.log.insertSequenceNumber(kv); } - - // Now request the ICV to the store, this will set the timestamp - // appropriately depending on if there is a value in memcache or not. - // returns the change in the size of the memstore from operation - long size = store.updateColumnValue(row, family, qualifier, result); - + long size = store.updateColumnValue(kv); size = this.memstoreSize.addAndGet(size); flush = isFlushSize(size); } finally { Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1057786) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -1518,12 +1518,8 @@ } /** - * Increments the value for the given row/family/qualifier. + * Updates the value at given row/family/qualifier. * - * This function will always be seen as atomic by other readers - * because it only puts a single KV to memstore. Thus no - * read/write control necessary. - * * @param row * @param f * @param qualifier @@ -1531,20 +1527,11 @@ * @return memstore size delta * @throws IOException */ - public long updateColumnValue(byte [] row, byte [] f, - byte [] qualifier, long newValue) - throws IOException { - + public long updateColumnValue(final KeyValue kv) + throws IOException { this.lock.readLock().lock(); try { - long now = EnvironmentEdgeManager.currentTimeMillis(); - - return this.memstore.updateColumnValue(row, - f, - qualifier, - newValue, - now); - + return this.memstore.updateColumnValue(kv); } finally { this.lock.readLock().unlock(); } Index: src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (revision 1057786) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (working copy) @@ -240,6 +240,7 @@ parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, + KeyValue.DEFAULT_SEQUENCE_NUMBER, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i)); put.add(kv); Index: src/main/java/org/apache/hadoop/hbase/KeyValue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/KeyValue.java (revision 1057786) +++ src/main/java/org/apache/hadoop/hbase/KeyValue.java (working copy) @@ -25,7 +25,6 @@ import java.nio.ByteBuffer; import java.util.Comparator; -import com.google.common.primitives.Longs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.io.HeapSize; @@ -36,52 +35,61 @@ import org.apache.hadoop.io.Writable; /** - * An HBase Key/Value. This is the fundamental HBase Type. + * An HBase Key/Value. This is the fundamental HBase Type. Its persisted into + * StoreFiles/HFiles in the FileSystem and is what a + * {@link org.hadoop.hbase.client.Result} carries from server to HBase client. * - *

If being used client-side, the primary methods to access individual fields + *

If being used client-side, the primary methods to access KeyValue particles * are {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()}, * {@link #getTimestamp()}, and {@link #getValue()}. These methods allocate new * byte arrays and return copies. Avoid their use server-side. * - *

Instances of this class are immutable. They do not implement Comparable - * but Comparators are provided. Comparators change with context, - * whether user table or a catalog table comparison. Its critical you use the - * appropriate comparator. There are Comparators for KeyValue instances and - * then for just the Key portion of a KeyValue used mostly by {@link HFile}. + *

Instances of this class are immutable (EXCEPT for the fact that the server + * will stamp the KeyValue with an edit sequence number on receipt before the + * KV is written to the WAL and added to the memstore). It does not implement + * Comparable but Comparators are provided. Context dictates which Comparator + * to use. If a user table, use one type. If a catalog table comparison, you'd + * use anotther. Its critical you use the appropriate comparator. There are + * also Comparators for KeyValue instances and then for just the Key portion of + * a KeyValue used mostly by {@link HFile}. * - *

KeyValue wraps a byte array and takes offsets and lengths into passed + *

KeyValue takes a byte array and offsets and lengths into passed * array at where to start interpreting the content as KeyValue. The KeyValue * format inside a byte array is: * <keylength> <valuelength> <key> <value> * Key is further decomposed as: - * <rowlength> <row> <columnfamilylength> <columnfamily> <columnqualifier> <timestamp> <keytype> + * <rowlength> <row> <columnfamilylength> <columnfamily> <columnqualifier> <timestamp> <sequencenumber> <keytype> * The rowlength maximum is Short.MAX_SIZE, * column family length maximum is * Byte.MAX_SIZE, and column qualifier + key length must - * be < Integer.MAX_SIZE. - * The column does not contain the family/qualifier delimiter, {@link #COLUMN_FAMILY_DELIMITER} + * be < Integer.MAX_SIZE. A KeyValue does not contain the + * family/qualifier delimiter, {@link #COLUMN_FAMILY_DELIMITER}. The + * sequencenumber is an internally maintained edit insertion order; + * its used breaking ties where all else in the key portion of two keys are + * the same; we'll favor the KeyValue that has a higher sequencenumber. */ public class KeyValue implements Writable, HeapSize { static final Log LOG = LogFactory.getLog(KeyValue.class); - // TODO: Group Key-only comparators and operations into a Key class, just - // for neatness sake, if can figure what to call it. /** * Colon character in UTF-8 */ public static final char COLUMN_FAMILY_DELIMITER = ':'; - public static final byte[] COLUMN_FAMILY_DELIM_ARRAY = - new byte[]{COLUMN_FAMILY_DELIMITER}; + /** + * {@link #COLUMN_FAMILY_DELIMITER} as a byte array. + */ + public static final byte [] COLUMN_FAMILY_DELIM_ARRAY = + new byte[] {COLUMN_FAMILY_DELIMITER}; /** - * Comparator for plain key/values; i.e. non-catalog table key/values. + * Comparator for plain {@link KeyValue}s.; i.e. non-catalog table KeyValues. */ public static KVComparator COMPARATOR = new KVComparator(); /** - * Comparator for plain key; i.e. non-catalog table key. Works on Key portion - * of KeyValue only. + * Comparator for plain key portion of a {@link KeyValue}s; i.e. non-catalog + * table key. Works on Key portion of KeyValue only. */ public static KeyComparator KEY_COMPARATOR = new KeyComparator(); @@ -111,12 +119,8 @@ /** * Get the appropriate row comparator for the specified table. - * - * Hopefully we can get rid of this, I added this here because it's replacing - * something in HSK. We should move completely off of that. - * * @param tableName The table name. - * @return The comparator. + * @return The comparator to use on this table. */ public static KeyComparator getRowComparator(byte [] tableName) { if(Bytes.equals(HTableDescriptor.ROOT_TABLEDESC.getName(),tableName)) { @@ -128,30 +132,47 @@ return COMPARATOR.getRawComparator(); } - // Size of the timestamp and type byte on end of a key -- a long + a byte. - public static final int TIMESTAMP_TYPE_SIZE = + // Size of the timestamp, insertion seqid long and the type byte on the end + // of the key portion of a KeyValue -- two longs and a byte. + public static final int TIMESTAMP_SEQID_TYPE_SIZE = Bytes.SIZEOF_LONG /* timestamp */ + + Bytes.SIZEOF_LONG /* insertion seqid */ + Bytes.SIZEOF_BYTE /*keytype*/; - // Size of the length shorts and bytes in key. + // Size of the length shorts and bytes in a key portion of a KeyValue. public static final int KEY_INFRASTRUCTURE_SIZE = Bytes.SIZEOF_SHORT /*rowlength*/ + Bytes.SIZEOF_BYTE /*columnfamilylength*/ + - TIMESTAMP_TYPE_SIZE; + TIMESTAMP_SEQID_TYPE_SIZE; // How far into the key the row starts at. First thing to read is the short - // that says how long the row is. + // that says how long the key is, then the data length... then row bytes start. public static final int ROW_OFFSET = Bytes.SIZEOF_INT /*keylength*/ + Bytes.SIZEOF_INT /*valuelength*/; - // Size of the length ints in a KeyValue datastructure. + // Size of the length ints in a KeyValue datastructure; the data and key shorts. public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET; /** + * Mask used comparing KeyValue Types without regard to KeyValue version. + * KeyValue version is kept in the upper two bits of the Type byte. + */ + private static final byte VERSION_MASK = (byte)192; + private static final byte VERSION_MASK_INVERSE = ~VERSION_MASK; + + /** + * This KeyValues version in bits that can be OR'd into a Type. + * This define will change as KeyValue version evolves. This is what you + * change changing the KeyValue type. + */ + static final byte VERSION_BITS = (byte)64; + + /** * Key type. - * Has space for other key types to be added later. Cannot rely on - * enum ordinals . They change if item is removed or moved. Do our own codes. + * Has space for other key types to be added later. Does not rely on + * enum ordinals . They change if item is removed or moved. Do our own + * explicit codes. Keys are versioned using two most significant bytes in Type. */ public static enum Type { Minimum((byte)0), @@ -161,9 +182,17 @@ DeleteColumn((byte)12), DeleteFamily((byte)14), - // Maximum is used when searching; you look from maximum on down. - Maximum((byte)255); + // Maximum is used when searching; you look from maximum on down. Shouldn't + // be written out to files or into memstore. + Maximum((byte)63); + // Bit 64 and 128 are reserved used specifying KeyValue version. If top + // two bits zero, then version is 0. If 7th bit is set -- 64 -- then we're + // version 1. Version is ignored when KV is compared. We define Version + // below just to show that the bits are occupied. The version bit twiddling + // and compares are done elsewhere, outside of this enum. See VERSION_MASK + // and VERSION_BITS defines. + private final byte code; Type(final byte c) { @@ -181,12 +210,14 @@ * @return Type associated with passed code. */ public static Type codeToType(final byte b) { + byte bWithVersionStripped = stripVersionFromType(b); for (Type t : Type.values()) { - if (t.getCode() == b) { + // When comparing, do not compare on version. + if (t.getCode() == bWithVersionStripped) { return t; } } - throw new RuntimeException("Unknown code " + b); + throw new RuntimeException("Unknown code " + stripVersionFromType(b)); } } @@ -196,49 +227,31 @@ * key can be equal or lower than this one in memstore or in store file. */ public static final KeyValue LOWESTKEY = - new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP); + new KeyValue(HConstants.EMPTY_BYTE_ARRAY, null, null, + HConstants.LATEST_TIMESTAMP, Long.MAX_VALUE, Type.Maximum, null); private byte [] bytes = null; private int offset = 0; private int length = 0; - // the row cached - private byte [] rowCache = null; + public static final long DEFAULT_SEQUENCE_NUMBER = 0; + // Cache of the sequence number portion of this KV. + private long sequenceNumberCache = -1; - /** Here be dragons **/ + // Cache for timestmp long + private long timestampCache = -1; - // used to achieve atomic operations in the memstore. - public long getMemstoreTS() { - return memstoreTS; - } + // Cached reference to the byte array of row content + private byte [] rowCache = null; - public void setMemstoreTS(long memstoreTS) { - this.memstoreTS = memstoreTS; - } - - // default value is 0, aka DNC - private long memstoreTS = 0; - - /** Dragon time over, return to normal business */ - - /** Writable Constructor -- DO NOT USE */ public KeyValue() {} /** - * Creates a KeyValue from the start of the specified byte array. - * Presumes bytes content is formatted as a KeyValue blob. - * @param bytes byte array - */ - public KeyValue(final byte [] bytes) { - this(bytes, 0); - } - - /** * Creates a KeyValue from the specified byte array and offset. * Presumes bytes content starting at offset is - * formatted as a KeyValue blob. + * formatted as a KeyValue. * @param bytes byte array * @param offset offset to start of KeyValue */ @@ -248,7 +261,10 @@ /** * Creates a KeyValue from the specified byte array, starting at offset, and - * for length length. + * for length length. Does NOT make a copy of the passed + * array. + * Presumes bytes content starting at offset is + * formatted as a KeyValue. * @param bytes byte array * @param offset offset to start of the KeyValue * @param length length of the KeyValue @@ -259,29 +275,30 @@ this.length = length; } - /** Constructors that build a new backing byte array from fields */ - /** - * Constructs KeyValue structure filled with null value. - * Sets type to {@link KeyValue.Type#Maximum} - * @param row - row key (arbitrary byte array) + * Constructs KeyValue filled with a null family, qualifier and + * data. Sets type to {@link KeyValue.Type#Maximum}. + * @param row Row key * @param timestamp + * @param sequenceNumber */ public KeyValue(final byte [] row, final long timestamp) { this(row, timestamp, Type.Maximum); } /** - * Constructs KeyValue structure filled with null value. - * @param row - row key (arbitrary byte array) + * Constructs KeyValue filled with a null family, qualifier and + * data. + * @param row Row key + * @param type Key Type to use. * @param timestamp */ public KeyValue(final byte [] row, final long timestamp, Type type) { - this(row, null, null, timestamp, type, null); + this(row, null, null, timestamp, DEFAULT_SEQUENCE_NUMBER, type, null); } /** - * Constructs KeyValue structure filled with null value. + * Constructs KeyValue filled with null data. * Sets type to {@link KeyValue.Type#Maximum} * @param row - row key (arbitrary byte array) * @param family family name @@ -289,22 +306,25 @@ */ public KeyValue(final byte [] row, final byte [] family, final byte [] qualifier) { - this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum); + this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, + DEFAULT_SEQUENCE_NUMBER, Type.Maximum); } /** - * Constructs KeyValue structure filled with null value. + * Constructs KeyValue. + * Sets type to {@link KeyValue.Type#Put} * @param row - row key (arbitrary byte array) * @param family family name * @param qualifier column qualifier */ public KeyValue(final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value) { - this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value); + this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, + DEFAULT_SEQUENCE_NUMBER, Type.Put, value); } /** - * Constructs KeyValue structure filled with specified values. + * Constructs KeyValue filled with null data. * @param row row key * @param family family name * @param qualifier column qualifier @@ -314,48 +334,99 @@ */ public KeyValue(final byte[] row, final byte[] family, final byte[] qualifier, final long timestamp, Type type) { - this(row, family, qualifier, timestamp, type, null); + // Used by tests + this(row, family, qualifier, timestamp, DEFAULT_SEQUENCE_NUMBER, type); } /** - * Constructs KeyValue structure filled with specified values. + * Constructs KeyValue filled with null data. * @param row row key * @param family family name * @param qualifier column qualifier * @param timestamp version timestamp + * @param sequenceNumber + * @param type key type + * @throws IllegalArgumentException + */ + public KeyValue(final byte[] row, final byte[] family, + final byte[] qualifier, final long timestamp, + final long sequenceNumber, Type type) { + this(row, family, qualifier, timestamp, sequenceNumber, type, null); + } + + /** + * Constructs KeyValue + * @param row row key + * @param family family name + * @param qualifier column qualifier + * @param timestamp version timestamp * @param value column value * @throws IllegalArgumentException */ public KeyValue(final byte[] row, final byte[] family, final byte[] qualifier, final long timestamp, final byte[] value) { - this(row, family, qualifier, timestamp, Type.Put, value); + // Used by tests. + this(row, family, qualifier, timestamp, DEFAULT_SEQUENCE_NUMBER, Type.Put, value); } /** - * Constructs KeyValue structure filled with specified values. + * Constructs KeyValue * @param row row key * @param family family name * @param qualifier column qualifier * @param timestamp version timestamp - * @param type key type + * @param sequenceNumber * @param value column value * @throws IllegalArgumentException */ public KeyValue(final byte[] row, final byte[] family, + final byte[] qualifier, final long timestamp, final long sequenceNumber, + final byte[] value) { + this(row, family, qualifier, timestamp, sequenceNumber, Type.Put, value); + } + + /** + * Constructs KeyValue + * @param row row key + * @param family family name + * @param qualifier column qualifier + * @param timestamp version timestamp + * @param type key type + * @param value column value/data. + * @throws IllegalArgumentException + */ + public KeyValue(final byte[] row, final byte[] family, final byte[] qualifier, final long timestamp, Type type, final byte[] value) { + this(row, family, qualifier, timestamp, DEFAULT_SEQUENCE_NUMBER, type, value); + } + + /** + * Constructs KeyValue + * @param row row key + * @param family family name + * @param qualifier column qualifier + * @param timestamp version timestamp + * @param type key type + * @param value column value/data. + * @throws IllegalArgumentException + */ + public KeyValue(final byte[] row, final byte[] family, + final byte[] qualifier, final long timestamp, final long sequenceNumber, + Type type, final byte[] value) { this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length, - timestamp, type, value, 0, value==null ? 0 : value.length); + timestamp, sequenceNumber, type, value, 0, value==null ? 0 : value.length); } /** - * Constructs KeyValue structure filled with specified values. + * Constructs KeyValue * @param row row key * @param family family name * @param qualifier column qualifier * @param qoffset qualifier offset * @param qlength qualifier length * @param timestamp version timestamp + * @param sequenceNumber * @param type key type * @param value column value * @param voffset value offset @@ -363,16 +434,17 @@ * @throws IllegalArgumentException */ public KeyValue(byte [] row, byte [] family, - byte [] qualifier, int qoffset, int qlength, long timestamp, Type type, + byte [] qualifier, int qoffset, int qlength, long timestamp, + final long sequenceNumber, Type type, byte [] value, int voffset, int vlength) { this(row, 0, row==null ? 0 : row.length, family, 0, family==null ? 0 : family.length, - qualifier, qoffset, qlength, timestamp, type, + qualifier, qoffset, qlength, timestamp, sequenceNumber, type, value, voffset, vlength); } /** - * Constructs KeyValue structure filled with specified values. + * Constructs KeyValue. *

* Column is split into two fields, family and qualifier. * @param row row key @@ -385,6 +457,7 @@ * @param qoffset qualifier offset * @param qlength qualifier length * @param timestamp version timestamp + * @param sequenceNumber * @param type key type * @param value column value * @param voffset value offset @@ -394,11 +467,11 @@ public KeyValue(final byte [] row, final int roffset, final int rlength, final byte [] family, final int foffset, final int flength, final byte [] qualifier, final int qoffset, final int qlength, - final long timestamp, final Type type, + final long timestamp, final long sequenceNumber, final Type type, final byte [] value, final int voffset, final int vlength) { this.bytes = createByteArray(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength, - timestamp, type, value, voffset, vlength); + timestamp, sequenceNumber, type, value, voffset, vlength); this.length = bytes.length; this.offset = 0; } @@ -416,6 +489,7 @@ * @param qoffset qualifier offset * @param qlength qualifier length * @param timestamp version timestamp + * @param sequenceNumber * @param type key type * @param value column value * @param voffset value offset @@ -425,7 +499,7 @@ static byte [] createByteArray(final byte [] row, final int roffset, final int rlength, final byte [] family, final int foffset, int flength, final byte [] qualifier, final int qoffset, int qlength, - final long timestamp, final Type type, + final long timestamp, final long sequenceNumber, final Type type, final byte [] value, final int voffset, int vlength) { if (rlength > Short.MAX_VALUE) { throw new IllegalArgumentException("Row > " + Short.MAX_VALUE); @@ -472,8 +546,12 @@ if(qlength != 0) { pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength); } + // Timestamp pos = Bytes.putLong(bytes, pos, timestamp); - pos = Bytes.putByte(bytes, pos, type.getCode()); + // Sequencenumber + pos = Bytes.putLong(bytes, pos, sequenceNumber); + // Type with KV version added. + pos = Bytes.putByte(bytes, pos, addVersionToType(type.getCode())); if (value != null && value.length > 0) { pos = Bytes.putBytes(bytes, pos, value, voffset, vlength); } @@ -481,45 +559,20 @@ } /** - * Write KeyValue format into a byte array. - *

- * Takes column in the form family:qualifier - * @param row - row key (arbitrary byte array) - * @param roffset - * @param rlength - * @param column - * @param coffset - * @param clength - * @param timestamp + * Decorate passed type with this KVs version. * @param type - * @param value - * @param voffset - * @param vlength - * @return The newly created byte array. + * @return Type with Version added. */ - static byte [] createByteArray(final byte [] row, final int roffset, - final int rlength, - final byte [] column, final int coffset, int clength, - final long timestamp, final Type type, - final byte [] value, final int voffset, int vlength) { - // If column is non-null, figure where the delimiter is at. - int delimiteroffset = 0; - if (column != null && column.length > 0) { - delimiteroffset = getFamilyDelimiterIndex(column, coffset, clength); - if (delimiteroffset > Byte.MAX_VALUE) { - throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE); - } - } else { - return createByteArray(row,roffset,rlength,null,0,0,null,0,0,timestamp, - type,value,voffset,vlength); - } - int flength = delimiteroffset-coffset; - int qlength = clength - flength - 1; - return createByteArray(row, roffset, rlength, column, coffset, - flength, column, delimiteroffset+1, qlength, timestamp, type, - value, voffset, vlength); + private static byte addVersionToType(final byte type) { + // First blank out any existing version before adding this KVs. + byte b = (byte)(((type & VERSION_MASK_INVERSE)) | VERSION_BITS); + return b; } + private static byte stripVersionFromType(final byte b) { + return (byte)(b & VERSION_MASK_INVERSE); + } + // Needed doing 'contains' on List. Only compares the key portion, not the // value. public boolean equals(Object other) { @@ -558,12 +611,7 @@ public KeyValue clone() { byte [] b = new byte[this.length]; System.arraycopy(this.bytes, this.offset, b, 0, this.length); - KeyValue ret = new KeyValue(b, 0, b.length); - // Important to clone the memstoreTS as well - otherwise memstore's - // update-in-place methods (eg increment) will end up creating - // new entries - ret.setMemstoreTS(memstoreTS); - return ret; + return new KeyValue(b, 0, b.length); } //--------------------------------------------------------------------------- @@ -601,20 +649,20 @@ String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength); int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength; int familylength = b[columnoffset - 1]; - int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE); + int columnlength = l - ((columnoffset - o) + TIMESTAMP_SEQID_TYPE_SIZE); String family = familylength == 0? "": Bytes.toStringBinary(b, columnoffset, familylength); String qualifier = columnlength == 0? "": Bytes.toStringBinary(b, columnoffset + familylength, columnlength - familylength); - long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE)); + long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_SEQID_TYPE_SIZE)); + long sequenceNumber = + Bytes.toLong(b, o + (l - TIMESTAMP_SEQID_TYPE_SIZE + Bytes.SIZEOF_LONG)); byte type = b[o + l - 1]; -// return row + "/" + family + -// (family != null && family.length() > 0? COLUMN_FAMILY_DELIMITER: "") + -// qualifier + "/" + timestamp + "/" + Type.codeToType(type); return row + "/" + family + (family != null && family.length() > 0? ":" :"") + - qualifier + "/" + timestamp + "/" + Type.codeToType(type); + qualifier + "/" + timestamp + "/" + sequenceNumber + "/" + + Type.codeToType(type); } //--------------------------------------------------------------------------- @@ -644,6 +692,22 @@ return length; } + /** + * @return This instance's version. + */ + public int getVersion() { + return getVersion(getTypeByte(getKeyLength())); + } + + /** + * @param type + * @return Version that is in passed type + */ + static int getVersion(final byte type) { + // Not public. Version is an internal implementation detail. + return (type & VERSION_MASK) >>> 6; + } + //--------------------------------------------------------------------------- // // Length and Offset Calculators @@ -777,7 +841,7 @@ public int getTotalColumnLength() { int rlength = getRowLength(); int foffset = getFamilyOffset(rlength); - return getTotalColumnLength(rlength,foffset); + return getTotalColumnLength(rlength, foffset); } /** @@ -785,7 +849,7 @@ */ public int getTotalColumnLength(int rlength, int foffset) { int flength = getFamilyLength(foffset); - int qlength = getQualifierLength(rlength,flength); + int qlength = getQualifierLength(rlength, flength); return flength + qlength; } @@ -801,7 +865,7 @@ * @return Timestamp offset */ public int getTimestampOffset(final int keylength) { - return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE; + return getKeyOffset() + keylength - TIMESTAMP_SEQID_TYPE_SIZE; } /** @@ -826,6 +890,22 @@ return false; } + /** + * @return Edit Sequence Number offset + */ + public int getSequenceNumberOffset() { + return getSequenceNumberOffset(getKeyLength()); + } + + /** + * @param keylength Pass if you have it to save on a int creation. + * @return Edit Sequence Number offset + */ + public int getSequenceNumberOffset(final int keylength) { + return getKeyOffset() + keylength - TIMESTAMP_SEQID_TYPE_SIZE + + Bytes.SIZEOF_LONG; + } + //--------------------------------------------------------------------------- // // Methods that return copies of fields @@ -880,10 +960,8 @@ } /** - * * @return Timestamp */ - private long timestampCache = -1; public long getTimestamp() { if (timestampCache == -1) { timestampCache = getTimestamp(getKeyLength()); @@ -901,6 +979,38 @@ } /** + * @return Edit sequence number + */ + public long getSequenceNumber() { + if (this.sequenceNumberCache == -1) { + this.sequenceNumberCache = getSequenceNumber(getKeyLength()); + } + return sequenceNumberCache; + } + + /** + * @param keylength Pass if you have it to save on an int creation. + * @return Edit sequence number + */ + long getSequenceNumber(final int keylength) { + int offset = getSequenceNumberOffset(keylength); + return Bytes.toLong(this.bytes, offset); + } + + /** + * Set edit sequence number of this KeyValue. Advanced users only. Set by + * the server on receipt of an edit before the edit is added to the WAL or up + * into the memstore. + * @param sequenceNumber Edit number to use. + */ + public void setSequenceNumber(final long sequenceNumber) { + long offset = getSequenceNumberOffset(); + Bytes.toBytes(this.bytes, (int)offset, sequenceNumber); + // Clear cache. + this.sequenceNumberCache = -1; + } + + /** * @return Type of this KeyValue. */ public byte getType() { @@ -909,9 +1019,14 @@ /** * @param keylength Pass if you have it to save on a int creation. - * @return Type of this KeyValue. + * @return Type of this KeyValue */ byte getType(final int keylength) { + // Strip version from Type before returning. + return stripVersionFromType(getTypeByte(keylength)); + } + + byte getTypeByte(final int keylength) { return this.bytes[this.offset + keylength - 1 + ROW_OFFSET]; } @@ -995,20 +1110,22 @@ public static class SplitKeyValue { private byte [][] split; SplitKeyValue() { - this.split = new byte[6][]; + this.split = new byte[7][]; } public void setRow(byte [] value) { this.split[0] = value; } public void setFamily(byte [] value) { this.split[1] = value; } public void setQualifier(byte [] value) { this.split[2] = value; } public void setTimestamp(byte [] value) { this.split[3] = value; } - public void setType(byte [] value) { this.split[4] = value; } - public void setValue(byte [] value) { this.split[5] = value; } + public void setSequenceNumber(byte [] value) { this.split[4] = value; } + public void setType(byte [] value) { this.split[5] = value; } + public void setValue(byte [] value) { this.split[6] = value; } public byte [] getRow() { return this.split[0]; } public byte [] getFamily() { return this.split[1]; } public byte [] getQualifier() { return this.split[2]; } public byte [] getTimestamp() { return this.split[3]; } - public byte [] getType() { return this.split[4]; } - public byte [] getValue() { return this.split[5]; } + public byte [] getSequenceNumber() { return this.split[4]; } + public byte [] getType() { return this.split[5]; } + public byte [] getValue() { return this.split[6]; } } public SplitKeyValue split() { @@ -1037,14 +1154,22 @@ System.arraycopy(bytes, splitOffset, qualifier, 0, colLen); splitOffset += colLen; split.setQualifier(qualifier); + byte [] timestamp = new byte[Bytes.SIZEOF_LONG]; System.arraycopy(bytes, splitOffset, timestamp, 0, Bytes.SIZEOF_LONG); splitOffset += Bytes.SIZEOF_LONG; split.setTimestamp(timestamp); + + byte [] sequenceNumber = new byte[Bytes.SIZEOF_LONG]; + System.arraycopy(bytes, splitOffset, sequenceNumber, 0, Bytes.SIZEOF_LONG); + splitOffset += Bytes.SIZEOF_LONG; + split.setSequenceNumber(sequenceNumber); + byte [] type = new byte[1]; type[0] = bytes[splitOffset]; splitOffset += Bytes.SIZEOF_BYTE; split.setType(type); + byte [] value = new byte[valLen]; System.arraycopy(bytes, splitOffset, value, 0, valLen); split.setValue(value); @@ -1111,18 +1236,6 @@ } /** - * @param column Column minus its delimiter - * @return True if column matches. - */ - public boolean matchingColumnNoDelimiter(final byte [] column) { - int rl = getRowLength(); - int o = getFamilyOffset(rl); - int fl = getFamilyLength(o); - int l = fl + getQualifierLength(rl,fl); - return Bytes.compareTo(column, 0, column.length, this.bytes, o, l) == 0; - } - - /** * * @param family column family * @param qualifier column qualifier @@ -1378,13 +1491,10 @@ } public int compare(final KeyValue left, final KeyValue right) { - int ret = getRawComparator().compare(left.getBuffer(), - left.getOffset() + ROW_OFFSET, left.getKeyLength(), - right.getBuffer(), right.getOffset() + ROW_OFFSET, - right.getKeyLength()); - if (ret != 0) return ret; - // Negate this comparison so later edits show up first - return -Longs.compare(left.getMemstoreTS(), right.getMemstoreTS()); + return getRawComparator().compare(left.getBuffer(), + left.getOffset() + ROW_OFFSET, left.getKeyLength(), + right.getBuffer(), right.getOffset() + ROW_OFFSET, + right.getKeyLength()); } public int compareTimestamps(final KeyValue left, final KeyValue right) { @@ -1575,17 +1685,6 @@ } /** - * Creates a KeyValue that is last on the specified row id. That is, - * every other possible KeyValue for the given row would compareTo() - * less than the result of this call. - * @param row row key - * @return Last possible KeyValue on passed row - */ - public static KeyValue createLastOnRow(final byte[] row) { - return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum); - } - - /** * Create a KeyValue that is smaller than all other possible KeyValues * for the given row. That is any (valid) KeyValue on 'row' would sort * _after_ the result. @@ -1604,26 +1703,11 @@ * @param ts - timestamp * @return First possible key on passed row and timestamp. */ - public static KeyValue createFirstOnRow(final byte [] row, - final long ts) { - return new KeyValue(row, null, null, ts, Type.Maximum); + public static KeyValue createFirstOnRow(final byte [] row, final long ts) { + return new KeyValue(row, ts); } /** - * @param row - row key (arbitrary byte array) - * @param c column - {@link #parseColumn(byte[])} is called to split - * the column. - * @param ts - timestamp - * @return First possible key on passed row, column and timestamp - * @deprecated - */ - public static KeyValue createFirstOnRow(final byte [] row, final byte [] c, - final long ts) { - byte [][] split = parseColumn(c); - return new KeyValue(row, split[0], split[1], ts, Type.Maximum); - } - - /** * Create a KeyValue for the specified row, family and qualifier that would be * smaller than all other possible KeyValues that have the same row,family,qualifier. * Used for seeking. @@ -1634,22 +1718,10 @@ */ public static KeyValue createFirstOnRow(final byte [] row, final byte [] family, final byte [] qualifier) { - return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum); + return new KeyValue(row, family, qualifier); } /** - * @param row - row key (arbitrary byte array) - * @param f - family name - * @param q - column qualifier - * @param ts - timestamp - * @return First possible key on passed row, column and timestamp - */ - public static KeyValue createFirstOnRow(final byte [] row, final byte [] f, - final byte [] q, final long ts) { - return new KeyValue(row, f, q, ts, Type.Maximum); - } - - /** * Create a KeyValue for the specified row, family and qualifier that would be * smaller than all other possible KeyValues that have the same row, * family, qualifier. @@ -1670,11 +1742,24 @@ final int foffset, final int flength, final byte [] qualifier, final int qoffset, final int qlength) { return new KeyValue(row, roffset, rlength, family, - foffset, flength, qualifier, qoffset, qlength, - HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0); + foffset, flength, qualifier, qoffset, qlength, + HConstants.LATEST_TIMESTAMP, DEFAULT_SEQUENCE_NUMBER, Type.Maximum, null, + 0, 0); } + /** + * Creates a KeyValue that is last on the specified row id. That is, + * every other possible KeyValue for the given row would compareTo() + * less than the result of this call. + * @param row row key + * @return Last possible KeyValue on passed row + */ + public static KeyValue createLastOnRow(final byte[] row) { + return createLastOnRow(row, 0, row.length, null, 0, 0, null, 0, 0); + } + + /** * Create a KeyValue for the specified row, family and qualifier that would be * larger than or equal to all other possible KeyValues that have the same * row, family, qualifier. @@ -1696,7 +1781,7 @@ final int qoffset, final int qlength) { return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength, - HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0); + HConstants.OLDEST_TIMESTAMP, Long.MIN_VALUE, Type.Minimum, null, 0, 0); } /** @@ -1730,7 +1815,7 @@ System.arraycopy(b, o, newb, ROW_OFFSET, l); Bytes.putInt(newb, 0, b.length); Bytes.putInt(newb, Bytes.SIZEOF_INT, 0); - return new KeyValue(newb); + return new KeyValue(newb, 0); } /** @@ -1855,6 +1940,7 @@ */ public static class KeyComparator implements RawComparator { volatile boolean ignoreTimestamp = false; + volatile boolean ignoreSequenceNumber = false; volatile boolean ignoreType = false; public int compare(byte[] left, int loffset, int llength, byte[] right, @@ -1872,9 +1958,9 @@ // Compare column family. Start compare past row and family length. int lcolumnoffset = Bytes.SIZEOF_SHORT + lrowlength + 1 + loffset; int rcolumnoffset = Bytes.SIZEOF_SHORT + rrowlength + 1 + roffset; - int lcolumnlength = llength - TIMESTAMP_TYPE_SIZE - + int lcolumnlength = llength - TIMESTAMP_SEQID_TYPE_SIZE - (lcolumnoffset - loffset); - int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE - + int rcolumnlength = rlength - TIMESTAMP_SEQID_TYPE_SIZE - (rcolumnoffset - roffset); // if row matches, and no column in the 'left' AND put type is 'minimum', @@ -1888,10 +1974,12 @@ byte ltype = left[loffset + (llength - 1)]; byte rtype = right[roffset + (rlength - 1)]; - if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) { + if (lcolumnlength == 0 && + stripVersionFromType(ltype) == Type.Minimum.getCode()) { return 1; // left is bigger. } - if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) { + if (rcolumnlength == 0 && + stripVersionFromType(rtype) == Type.Minimum.getCode()) { return -1; } @@ -1905,15 +1993,29 @@ if (!this.ignoreTimestamp) { // Get timestamps. long ltimestamp = Bytes.toLong(left, - loffset + (llength - TIMESTAMP_TYPE_SIZE)); + loffset + (llength - TIMESTAMP_SEQID_TYPE_SIZE)); long rtimestamp = Bytes.toLong(right, - roffset + (rlength - TIMESTAMP_TYPE_SIZE)); + roffset + (rlength - TIMESTAMP_SEQID_TYPE_SIZE)); compare = compareTimestamps(ltimestamp, rtimestamp); if (compare != 0) { return compare; } } + if (!this.ignoreSequenceNumber) { + // Get timestamps. + long lseqnum = Bytes.toLong(left, + loffset + (llength - TIMESTAMP_SEQID_TYPE_SIZE + Bytes.SIZEOF_LONG)); + long rseqnum = Bytes.toLong(right, + roffset + (rlength - TIMESTAMP_SEQID_TYPE_SIZE + Bytes.SIZEOF_LONG)); + // Sort same as we do for timestamps where the higher sequence number + // sorts before the lower. + compare = compareTimestamps(lseqnum, rseqnum); + if (compare != 0) { + return compare; + } + } + if (!this.ignoreType) { // Compare types. Let the delete types sort ahead of puts; i.e. types // of higher numbers sort before those of lesser numbers @@ -1958,7 +2060,7 @@ ClassSize.align(ClassSize.ARRAY) + ClassSize.align(length) + (3 * Bytes.SIZEOF_INT) + ClassSize.align(ClassSize.ARRAY) + - (2 * Bytes.SIZEOF_LONG)); + (3 * Bytes.SIZEOF_LONG)); } // this overload assumes that the length bytes have already been read, @@ -1981,4 +2083,4 @@ out.writeInt(this.length); out.write(this.bytes, this.offset, this.length); } -} +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/util/Bytes.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/Bytes.java (revision 1057786) +++ src/main/java/org/apache/hadoop/hbase/util/Bytes.java (working copy) @@ -435,12 +435,18 @@ */ public static byte[] toBytes(long val) { byte [] b = new byte[8]; + return toBytes(b, 0, val); + } + + public static byte [] toBytes(final byte [] buffer, final int offset, + final long originalVal) { + long val = originalVal; for (int i = 7; i > 0; i--) { - b[i] = (byte) val; + buffer[offset + i] = (byte)val; val >>>= 8; } - b[0] = (byte) val; - return b; + buffer[offset + 0] = (byte) val; + return buffer; } /** Index: src/main/java/org/apache/hadoop/hbase/client/Put.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Put.java (revision 1057786) +++ src/main/java/org/apache/hadoop/hbase/client/Put.java (working copy) @@ -182,7 +182,7 @@ */ private KeyValue createPutKeyValue(byte[] family, byte[] qualifier, long ts, byte[] value) { - return new KeyValue(this.row, family, qualifier, ts, KeyValue.Type.Put, + return new KeyValue(this.row, family, qualifier, ts, KeyValue.Type.Put, value); }