Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1493351) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import com.google.common.primitives.Longs; + /** * Implements a heap merge across any number of KeyValueScanners. *

@@ -101,7 +103,8 @@ } else { KeyValueScanner topScanner = this.heap.peek(); if (topScanner == null || - this.comparator.compare(kvNext, topScanner.peek()) >= 0) { + this.comparator.compare(kvNext, this.current.getSequenceID(), + topScanner.peek(), topScanner.getSequenceID()) >= 0) { this.heap.add(this.current); this.current = pollRealKV(); } @@ -167,21 +170,33 @@ this.kvComparator = kvComparator; } public int compare(KeyValueScanner left, KeyValueScanner right) { - int comparison = compare(left.peek(), right.peek()); + return compare(left.peek(), left.getSequenceID(), right.peek(), right.getSequenceID()); + } + public int compare(KeyValue leftKV, long leftSeqId, KeyValue rightKV, long rightSeqId) { + int comparison = this.kvComparator.getRawComparator().compare(leftKV.getBuffer(), + leftKV.getOffset() + KeyValue.ROW_OFFSET, leftKV.getKeyLength(), + rightKV.getBuffer(), rightKV.getOffset() + KeyValue.ROW_OFFSET, + rightKV.getKeyLength()); if (comparison != 0) { return comparison; + } + if (leftKV.getMemstoreTS() >= 0 && rightKV.getMemstoreTS() >= 0) { + int ret = -Longs.compare(leftKV.getMemstoreTS(), rightKV.getMemstoreTS()); + if (ret != 0) return ret; + } + // Since both the keys are exactly the same, we break the tie in favor + // of the key which came latest. + // For KeyValues coming from log replay, their sequence Id was encoded as MemstoreTS + long leftSequenceID = leftKV.getMemstoreTS() >= 0 ? leftSeqId : + -leftKV.getMemstoreTS(); + long rightSequenceID = rightKV.getMemstoreTS() >= 0 ? rightSeqId : + -rightKV.getMemstoreTS(); + if (leftSequenceID > rightSequenceID) { + return -1; + } else if (leftSequenceID < rightSequenceID) { + return 1; } else { - // Since both the keys are exactly the same, we break the tie in favor - // of the key which came latest. - long leftSequenceID = left.getSequenceID(); - long rightSequenceID = right.getSequenceID(); - if (leftSequenceID > rightSequenceID) { - return -1; - } else if (leftSequenceID < rightSequenceID) { - return 1; - } else { - return 0; - } + return 0; } } /** @@ -343,7 +358,8 @@ // Compare the current scanner to the next scanner. We try to avoid // putting the current one back into the heap if possible. KeyValue nextKV = nextEarliestScanner.peek(); - if (nextKV == null || comparator.compare(curKV, nextKV) < 0) { + if (nextKV == null || comparator.compare(curKV, kvScanner.getSequenceID(), + nextKV, nextEarliestScanner.getSequenceID()) < 0) { // We already have the scanner with the earliest KV, so return it. return kvScanner; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1493351) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -1805,10 +1805,12 @@ if (kv.isDelete()) { del = new Delete(kv.getRow()); del.setClusterId(entry.getKey().getClusterId()); + del.setAttribute(HRegion.SEQ_ID, Bytes.toBytes(entry.getKey().getLogSeqNum())); preRow = del; } else { put = new Put(kv.getRow()); put.setClusterId(entry.getKey().getClusterId()); + put.setAttribute(HRegion.SEQ_ID, Bytes.toBytes(entry.getKey().getLogSeqNum())); preRow = put; } preKey = loc.getHostnamePort() + KEY_DELIMITER + Bytes.toString(table); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1493351) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -201,6 +201,11 @@ final AtomicBoolean closing = new AtomicBoolean(false); protected long completeSequenceId = -1L; + + public static final String SEQ_ID = "_seq.id"; + + // signifies that there was no sequence Id + private static final long NO_SEQ_ID = -1; /** * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for @@ -2155,7 +2160,17 @@ != OperationStatusCode.NOT_RUN) { continue; } - addedSize += applyFamilyMapToMemstore(familyMaps[i], w); + Mutation mutation = batchOp.operations[i].getFirst(); + byte[] seqIdBytes = mutation.getAttribute(SEQ_ID); + long seqId = NO_SEQ_ID; + try { + if (seqIdBytes != null){ + seqId = Long.parseLong(Bytes.toString(seqIdBytes)); + } + } catch (NumberFormatException nfe) { + LOG.warn("encountered bad seq Id: " + Bytes.toString(seqIdBytes), nfe); + } + addedSize += applyFamilyMapToMemstore(familyMaps[i], w, seqId); } // ------------------------------------ @@ -2595,11 +2610,12 @@ * @param familyMap Map of kvs per family * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. * If null, then this method internally creates a mvcc transaction. + * @param seqId the sequence num that the mutations were carrying * @return the additional memory usage of the memstore caused by the * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) { + MultiVersionConsistencyControl.WriteEntry localizedWriteEntry, long seqId) { long size = 0; boolean freemvcc = false; @@ -2616,7 +2632,8 @@ Store store = getStore(family); for (Cell cell: cells) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - kv.setMemstoreTS(localizedWriteEntry.getWriteNumber()); + kv.setMemstoreTS(seqId == NO_SEQ_ID ? localizedWriteEntry.getWriteNumber() : + -seqId); size += store.add(kv); } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (revision 1493351) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (working copy) @@ -205,7 +205,7 @@ hasMore = scanner.next(kvs, compactionKVMax); // output to writer: for (KeyValue kv : kvs) { - if (kv.getMemstoreTS() <= smallestReadPoint) { + if (kv.getMemstoreTS() > 0 && kv.getMemstoreTS() <= smallestReadPoint) { kv.setMemstoreTS(0); } writer.append(kv); Index: hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (revision 1493351) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (working copy) @@ -1830,6 +1830,10 @@ right.getBuffer(), right.getOffset() + ROW_OFFSET, right.getKeyLength()); if (ret != 0) return ret; + // If both KeyValues carry seq Id, there is no need to negate the result of comparison + if (left.getMemstoreTS() < 0 && right.getMemstoreTS() < 0) { + return Longs.compare(left.getMemstoreTS(), right.getMemstoreTS()); + } // Negate this comparison so later edits show up first return -Longs.compare(left.getMemstoreTS(), right.getMemstoreTS()); }