Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1493351) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import com.google.common.primitives.Longs; + /** * Implements a heap merge across any number of KeyValueScanners. *
@@ -101,7 +103,8 @@
} else {
KeyValueScanner topScanner = this.heap.peek();
if (topScanner == null ||
- this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
+ this.comparator.compare(kvNext, this.current.getSequenceID(),
+ topScanner.peek(), topScanner.getSequenceID()) >= 0) {
this.heap.add(this.current);
this.current = pollRealKV();
}
@@ -167,21 +170,33 @@
this.kvComparator = kvComparator;
}
public int compare(KeyValueScanner left, KeyValueScanner right) {
- int comparison = compare(left.peek(), right.peek());
+ return compare(left.peek(), left.getSequenceID(), right.peek(), right.getSequenceID());
+ }
+ public int compare(KeyValue leftKV, long leftSeqId, KeyValue rightKV, long rightSeqId) {
+ int comparison = this.kvComparator.getRawComparator().compare(leftKV.getBuffer(),
+ leftKV.getOffset() + KeyValue.ROW_OFFSET, leftKV.getKeyLength(),
+ rightKV.getBuffer(), rightKV.getOffset() + KeyValue.ROW_OFFSET,
+ rightKV.getKeyLength());
if (comparison != 0) {
return comparison;
+ }
+ if (leftKV.getMemstoreTS() >= 0 && rightKV.getMemstoreTS() >= 0) {
+ int ret = -Longs.compare(leftKV.getMemstoreTS(), rightKV.getMemstoreTS());
+ if (ret != 0) return ret;
+ }
+ // Since both the keys are exactly the same, we break the tie in favor
+ // of the key which came latest.
+ // For KeyValues coming from log replay, their sequence Id was encoded as MemstoreTS
+ long leftSequenceID = leftKV.getMemstoreTS() >= 0 ? leftSeqId :
+ -leftKV.getMemstoreTS();
+ long rightSequenceID = rightKV.getMemstoreTS() >= 0 ? rightSeqId :
+ -rightKV.getMemstoreTS();
+ if (leftSequenceID > rightSequenceID) {
+ return -1;
+ } else if (leftSequenceID < rightSequenceID) {
+ return 1;
} else {
- // Since both the keys are exactly the same, we break the tie in favor
- // of the key which came latest.
- long leftSequenceID = left.getSequenceID();
- long rightSequenceID = right.getSequenceID();
- if (leftSequenceID > rightSequenceID) {
- return -1;
- } else if (leftSequenceID < rightSequenceID) {
- return 1;
- } else {
- return 0;
- }
+ return 0;
}
}
/**
@@ -343,7 +358,8 @@
// Compare the current scanner to the next scanner. We try to avoid
// putting the current one back into the heap if possible.
KeyValue nextKV = nextEarliestScanner.peek();
- if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
+ if (nextKV == null || comparator.compare(curKV, kvScanner.getSequenceID(),
+ nextKV, nextEarliestScanner.getSequenceID()) < 0) {
// We already have the scanner with the earliest KV, so return it.
return kvScanner;
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1493351)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy)
@@ -1805,10 +1805,12 @@
if (kv.isDelete()) {
del = new Delete(kv.getRow());
del.setClusterId(entry.getKey().getClusterId());
+ del.setAttribute(HRegion.SEQ_ID, Bytes.toBytes(entry.getKey().getLogSeqNum()));
preRow = del;
} else {
put = new Put(kv.getRow());
put.setClusterId(entry.getKey().getClusterId());
+ put.setAttribute(HRegion.SEQ_ID, Bytes.toBytes(entry.getKey().getLogSeqNum()));
preRow = put;
}
preKey = loc.getHostnamePort() + KEY_DELIMITER + Bytes.toString(table);
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1493351)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -201,6 +201,11 @@
final AtomicBoolean closing = new AtomicBoolean(false);
protected long completeSequenceId = -1L;
+
+ public static final String SEQ_ID = "_seq.id";
+
+ // signifies that there was no sequence Id
+ private static final long NO_SEQ_ID = -1;
/**
* Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
@@ -2155,7 +2160,17 @@
!= OperationStatusCode.NOT_RUN) {
continue;
}
- addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
+ Mutation mutation = batchOp.operations[i].getFirst();
+ byte[] seqIdBytes = mutation.getAttribute(SEQ_ID);
+ long seqId = NO_SEQ_ID;
+ try {
+ if (seqIdBytes != null){
+ seqId = Long.parseLong(Bytes.toString(seqIdBytes));
+ }
+ } catch (NumberFormatException nfe) {
+ LOG.warn("encountered bad seq Id: " + Bytes.toString(seqIdBytes), nfe);
+ }
+ addedSize += applyFamilyMapToMemstore(familyMaps[i], w, seqId);
}
// ------------------------------------
@@ -2595,11 +2610,12 @@
* @param familyMap Map of kvs per family
* @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
* If null, then this method internally creates a mvcc transaction.
+ * @param seqId the sequence num that the mutations were carrying
* @return the additional memory usage of the memstore caused by the
* new entries.
*/
private long applyFamilyMapToMemstore(Map