diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index a589cce..c396f5e 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -1830,6 +1830,11 @@ public class KeyValue implements Cell, HeapSize, Cloneable { right.getBuffer(), right.getOffset() + ROW_OFFSET, right.getKeyLength()); if (ret != 0) return ret; + // negative mvcc values store log sequence numbers of replayed KVs. See HBase-8701 + // when comparing negative mvcc, there is no need to negate the result of comparison + if (left.getMemstoreTS() < 0 && right.getMemstoreTS() < 0) { + return Longs.compare(left.getMemstoreTS(), right.getMemstoreTS()); + } // Negate this comparison so later edits show up first return -Longs.compare(left.getMemstoreTS(), right.getMemstoreTS()); } diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index f72c90e..e7c8c39 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -485,6 +485,10 @@ public final class WALProtos { // optional uint32 followingKvCount = 7; boolean hasFollowingKvCount(); int getFollowingKvCount(); + + // optional uint64 replayLogSequenceNumber = 8; + boolean hasReplayLogSequenceNumber(); + long getReplayLogSequenceNumber(); } public static final class WALKey extends com.google.protobuf.GeneratedMessage @@ -599,6 +603,16 @@ public final class WALProtos { return followingKvCount_; } + // optional uint64 replayLogSequenceNumber = 8; + public static final int REPLAYLOGSEQUENCENUMBER_FIELD_NUMBER = 8; + private long replayLogSequenceNumber_; + public boolean hasReplayLogSequenceNumber() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + public long getReplayLogSequenceNumber() { + return replayLogSequenceNumber_; + } + private void initFields() { encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; tableName_ = com.google.protobuf.ByteString.EMPTY; @@ -607,6 +621,7 @@ public final class WALProtos { clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance(); scopes_ = java.util.Collections.emptyList(); followingKvCount_ = 0; + replayLogSequenceNumber_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -669,6 +684,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeUInt32(7, followingKvCount_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeUInt64(8, replayLogSequenceNumber_); + } getUnknownFields().writeTo(output); } @@ -706,6 +724,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeUInt32Size(7, followingKvCount_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(8, replayLogSequenceNumber_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -761,6 +783,11 @@ public final class WALProtos { result = result && (getFollowingKvCount() == other.getFollowingKvCount()); } + result = result && (hasReplayLogSequenceNumber() == other.hasReplayLogSequenceNumber()); + if (hasReplayLogSequenceNumber()) { + result = result && (getReplayLogSequenceNumber() + == other.getReplayLogSequenceNumber()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -798,6 +825,10 @@ public final class WALProtos { hash = (37 * hash) + FOLLOWINGKVCOUNT_FIELD_NUMBER; hash = (53 * hash) + getFollowingKvCount(); } + if (hasReplayLogSequenceNumber()) { + hash = (37 * hash) + REPLAYLOGSEQUENCENUMBER_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getReplayLogSequenceNumber()); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -938,6 +969,8 @@ public final class WALProtos { } followingKvCount_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + replayLogSequenceNumber_ = 0L; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -1013,6 +1046,10 @@ public final class WALProtos { to_bitField0_ |= 0x00000020; } result.followingKvCount_ = followingKvCount_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000040; + } + result.replayLogSequenceNumber_ = replayLogSequenceNumber_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1073,6 +1110,9 @@ public final class WALProtos { if (other.hasFollowingKvCount()) { setFollowingKvCount(other.getFollowingKvCount()); } + if (other.hasReplayLogSequenceNumber()) { + setReplayLogSequenceNumber(other.getReplayLogSequenceNumber()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1172,6 +1212,11 @@ public final class WALProtos { followingKvCount_ = input.readUInt32(); break; } + case 64: { + bitField0_ |= 0x00000080; + replayLogSequenceNumber_ = input.readUInt64(); + break; + } } } } @@ -1565,6 +1610,27 @@ public final class WALProtos { return this; } + // optional uint64 replayLogSequenceNumber = 8; + private long replayLogSequenceNumber_ ; + public boolean hasReplayLogSequenceNumber() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + public long getReplayLogSequenceNumber() { + return replayLogSequenceNumber_; + } + public Builder setReplayLogSequenceNumber(long value) { + bitField0_ |= 0x00000080; + replayLogSequenceNumber_ = value; + onChanged(); + return this; + } + public Builder clearReplayLogSequenceNumber() { + bitField0_ = (bitField0_ & ~0x00000080); + replayLogSequenceNumber_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:WALKey) } @@ -3257,21 +3323,22 @@ public final class WALProtos { static { java.lang.String[] descriptorData = { "\n\tWAL.proto\032\013hbase.proto\"#\n\tWALHeader\022\026\n" + - "\016hasCompression\030\001 \001(\010\"\266\001\n\006WALKey\022\031\n\021enco" + + "\016hasCompression\030\001 \001(\010\"\327\001\n\006WALKey\022\031\n\021enco" + "dedRegionName\030\001 \002(\014\022\021\n\ttableName\030\002 \002(\014\022\031" + "\n\021logSequenceNumber\030\003 \002(\004\022\021\n\twriteTime\030\004" + " \002(\004\022\030\n\tclusterId\030\005 \001(\0132\005.UUID\022\034\n\006scopes" + "\030\006 \003(\0132\014.FamilyScope\022\030\n\020followingKvCount" + - "\030\007 \001(\r\"<\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\035\n" + - "\tscopeType\030\002 \002(\0162\n.ScopeType\"\241\001\n\024Compact" + - "ionDescriptor\022\021\n\ttableName\030\001 \002(\014\022\031\n\021enco" + - "dedRegionName\030\002 \002(\014\022\022\n\nfamilyName\030\003 \002(\014\022", - "\027\n\017compactionInput\030\004 \003(\t\022\030\n\020compactionOu" + - "tput\030\005 \003(\t\022\024\n\014storeHomeDir\030\006 \002(\t\"\014\n\nWALT" + - "railer*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE" + - "_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?" + - "\n*org.apache.hadoop.hbase.protobuf.gener" + - "atedB\tWALProtosH\001\210\001\000\240\001\001" + "\030\007 \001(\r\022\037\n\027replayLogSequenceNumber\030\010 \001(\004\"" + + "<\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\035\n\tscopeT" + + "ype\030\002 \002(\0162\n.ScopeType\"\241\001\n\024CompactionDesc" + + "riptor\022\021\n\ttableName\030\001 \002(\014\022\031\n\021encodedRegi", + "onName\030\002 \002(\014\022\022\n\nfamilyName\030\003 \002(\014\022\027\n\017comp" + + "actionInput\030\004 \003(\t\022\030\n\020compactionOutput\030\005 " + + "\003(\t\022\024\n\014storeHomeDir\030\006 \002(\t\"\014\n\nWALTrailer*" + + "F\n\tScopeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020" + + "\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.a" + + "pache.hadoop.hbase.protobuf.generatedB\tW" + + "ALProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3291,7 +3358,7 @@ public final class WALProtos { internal_static_WALKey_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALKey_descriptor, - new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", }, + new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ReplayLogSequenceNumber", }, org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.class, org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder.class); internal_static_FamilyScope_descriptor = diff --git hbase-protocol/src/main/protobuf/WAL.proto hbase-protocol/src/main/protobuf/WAL.proto index 5635c60..59143a0 100644 --- hbase-protocol/src/main/protobuf/WAL.proto +++ hbase-protocol/src/main/protobuf/WAL.proto @@ -37,6 +37,9 @@ message WALKey { repeated FamilyScope scopes = 6; optional uint32 followingKvCount = 7; + // replayLogSequenceNumber is used in distributedLogReplay to store original log sequence number + // in receiving region server WAL + optional uint64 replayLogSequenceNumber = 8; /* optional CustomEntryType customEntryType = 8; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 618c024..1fdc5cf 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -84,11 +84,14 @@ public class HFileReaderV2 extends AbstractHFileReader { /** Maximum minor version supported by this HFile format */ // We went to version 2 when we moved to pb'ing fileinfo and the trailer on // the file. This version can read Writables version 1. - static final int MAX_MINOR_VERSION = 3; + static final int MAX_MINOR_VERSION = 4; /** Minor versions starting with this number have faked index key */ static final int MINOR_VERSION_WITH_FAKED_KEY = 3; + /** Minor versions starting with this number have memstoreTS and SeqId as a union */ + static final int MINOR_VERSION_WITH_MVCC_SEQ_ID_UNION = 4; + /** * Opens a HFile. You must load the index before you can use it by calling * {@link #loadFileInfo()}. @@ -151,6 +154,10 @@ public class HFileReaderV2 extends AbstractHFileReader { fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS); if (includesMemstoreTS) { decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0; + byte[] minMemstoreTS = fileInfo.get(HFileWriterV2.MIN_MEMSTORE_TS_KEY); + if (!decodeMemstoreTS && minMemstoreTS != null) { + decodeMemstoreTS = Bytes.toLong(minMemstoreTS) < 0; + } } // Read data block encoding algorithm name from file info. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index 755409a..d1f0d98 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -54,6 +54,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); + /** whether memstore (mvcc) timestamp field needs decoding in reader */ + public static final byte[] MIN_MEMSTORE_TS_KEY = Bytes.toBytes("MIN_MEMSTORE_TS_KEY"); + /** KeyValue version in FileInfo */ public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); @@ -90,6 +93,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { private final boolean includeMemstoreTS; private long maxMemstoreTS = 0; + private long minMemstoreTS = 0; static class WriterFactoryV2 extends HFile.WriterFactory { WriterFactoryV2(Configuration conf, CacheConfig cacheConf) { @@ -292,6 +296,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { append(kv.getMemstoreTS(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMemstoreTS()); + this.minMemstoreTS = Math.min(this.minMemstoreTS, kv.getMemstoreTS()); } /** @@ -415,6 +420,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { if (this.includeMemstoreTS) { appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); + appendFileInfo(MIN_MEMSTORE_TS_KEY, Bytes.toBytes(minMemstoreTS)); appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7f21e3a..cec4d4c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -201,6 +201,11 @@ public class HRegion implements HeapSize { // , Writable{ final AtomicBoolean closing = new AtomicBoolean(false); protected long completeSequenceId = -1L; + + public static final String SEQ_ID = "_seq.id"; + + // signifies that there was no sequence Id + private static final long NO_SEQ_ID = -1; /** * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for @@ -2165,12 +2170,25 @@ public class HRegion implements HeapSize { // , Writable{ // moved only when the sync is complete. // ---------------------------------- long addedSize = 0; + long replaySeqId = NO_SEQ_ID; for (int i = firstIndex; i < lastIndexExclusive; i++) { if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; } - addedSize += applyFamilyMapToMemstore(familyMaps[i], w); + Mutation mutation = batchOp.operations[i].getFirst(); + replaySeqId = NO_SEQ_ID; + if (isInReplay) { + byte[] replaySeqIdBytes = mutation.getAttribute(SEQ_ID); + try { + if (replaySeqIdBytes != null) { + replaySeqId = Bytes.toLong(replaySeqIdBytes); + } + } catch (NumberFormatException nfe) { + LOG.warn("encountered bad seq Id: " + Bytes.toString(replaySeqIdBytes), nfe); + } + } + addedSize += applyFamilyMapToMemstore(familyMaps[i], w, replaySeqId); } // ------------------------------------ @@ -2209,8 +2227,8 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 5. Append the edit to WAL. Do not sync wal. // ------------------------- Mutation first = batchOp.operations[firstIndex].getFirst(); - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), - walEdit, first.getClusterId(), now, this.htableDescriptor); + txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), walEdit, + first.getClusterId(), now, this.htableDescriptor, (isInReplay) ? replaySeqId : NO_SEQ_ID); // ------------------------------- // STEP 6. Release row locks, etc. @@ -2610,11 +2628,12 @@ public class HRegion implements HeapSize { // , Writable{ * @param familyMap Map of kvs per family * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. * If null, then this method internally creates a mvcc transaction. + * @param seqId the sequence num that the mutations were carrying * @return the additional memory usage of the memstore caused by the * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) { + MultiVersionConsistencyControl.WriteEntry localizedWriteEntry, long seqId) { long size = 0; boolean freemvcc = false; @@ -2631,7 +2650,9 @@ public class HRegion implements HeapSize { // , Writable{ Store store = getStore(family); for (Cell cell: cells) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - kv.setMemstoreTS(localizedWriteEntry.getWriteNumber()); + // during replay, we store negative sequence id values into mvcc field. See HBase-8701 + kv.setMemstoreTS(seqId == NO_SEQ_ID ? localizedWriteEntry.getWriteNumber() : + -seqId); size += store.add(kv); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 3a290be..8db48d0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -28,6 +28,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import com.google.common.primitives.Longs; + /** * Implements a heap merge across any number of KeyValueScanners. *

@@ -101,7 +103,8 @@ public class KeyValueHeap extends NonLazyKeyValueScanner } else { KeyValueScanner topScanner = this.heap.peek(); if (topScanner == null || - this.comparator.compare(kvNext, topScanner.peek()) >= 0) { + this.comparator.compare(kvNext, this.current.getSequenceID(), + topScanner.peek(), topScanner.getSequenceID()) >= 0) { this.heap.add(this.current); this.current = pollRealKV(); } @@ -166,22 +169,36 @@ public class KeyValueHeap extends NonLazyKeyValueScanner public KVScannerComparator(KVComparator kvComparator) { this.kvComparator = kvComparator; } + public int compare(KeyValueScanner left, KeyValueScanner right) { - int comparison = compare(left.peek(), right.peek()); + return compare(left.peek(), left.getSequenceID(), right.peek(), right.getSequenceID()); + } + + public int compare(KeyValue leftKV, long leftSeqId, KeyValue rightKV, long rightSeqId) { + int comparison = this.kvComparator.getRawComparator().compare(leftKV.getBuffer(), + leftKV.getOffset() + KeyValue.ROW_OFFSET, leftKV.getKeyLength(), + rightKV.getBuffer(), rightKV.getOffset() + KeyValue.ROW_OFFSET, + rightKV.getKeyLength()); if (comparison != 0) { return comparison; + } + if (leftKV.getMemstoreTS() >= 0 && rightKV.getMemstoreTS() >= 0) { + int ret = -Longs.compare(leftKV.getMemstoreTS(), rightKV.getMemstoreTS()); + if (ret != 0) return ret; + } + // Since both the keys are exactly the same, we break the tie in favor + // of the key which came latest. + // For KeyValues coming from log replay, their sequence Id was encoded as MemstoreTS + long leftSequenceID = leftKV.getMemstoreTS() >= 0 ? leftSeqId : + -leftKV.getMemstoreTS(); + long rightSequenceID = rightKV.getMemstoreTS() >= 0 ? rightSeqId : + -rightKV.getMemstoreTS(); + if (leftSequenceID > rightSequenceID) { + return -1; + } else if (leftSequenceID < rightSequenceID) { + return 1; } else { - // Since both the keys are exactly the same, we break the tie in favor - // of the key which came latest. - long leftSequenceID = left.getSequenceID(); - long rightSequenceID = right.getSequenceID(); - if (leftSequenceID > rightSequenceID) { - return -1; - } else if (leftSequenceID < rightSequenceID) { - return 1; - } else { - return 0; - } + return 0; } } /** @@ -344,7 +361,8 @@ public class KeyValueHeap extends NonLazyKeyValueScanner // Compare the current scanner to the next scanner. We try to avoid // putting the current one back into the heap if possible. KeyValue nextKV = nextEarliestScanner.peek(); - if (nextKV == null || comparator.compare(curKV, nextKV) < 0) { + if (nextKV == null || comparator.compare(curKV, kvScanner.getSequenceID(), + nextKV, nextEarliestScanner.getSequenceID()) < 0) { // We already have the scanner with the earliest KV, so return it. return kvScanner; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 34fed09..38d01ef 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -199,7 +199,7 @@ public class StoreFileScanner implements KeyValueScanner { // older KV which was not reset to 0 (because it was // not old enough during flush). Make sure that we set it correctly now, // so that the comparision order does not change. - if (cur.getMemstoreTS() <= readPoint) { + if (cur.getMemstoreTS() > 0 && cur.getMemstoreTS() <= readPoint) { cur.setMemstoreTS(0); } return true; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index a87e352..fd00cdd 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -126,7 +126,7 @@ s */ // If we know that this KV is going to be included always, then let us // set its memstoreTS to 0. This will help us save space when writing to // disk. - if (kv.getMemstoreTS() <= smallestReadPoint) { + if (kv.getMemstoreTS() > 0 && kv.getMemstoreTS() <= smallestReadPoint) { // let us not change the original KV. It could be in the memstore // changing its memstoreTS could affect other threads/scanners. kv = kv.shallowCopy(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index fb0ac38..535347f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -398,7 +398,12 @@ public class StoreScanner extends NonLazyKeyValueScanner LOOP: while((kv = this.heap.peek()) != null) { ++kvsScanned; // Check that the heap gives us KVs in an increasing order. - assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : + // When MVCC carries negative values(sequence number of KVs. See HBase-8701), KeyValueHeap has + // resolved the ordering with the help of sequence numbers. Therefore the following assert may + // trigger because comparator.compare(prevKV, kv) may return a different order as there is no + // sequence numbers available in the compare function. + assert prevKV == null || comparator == null || prevKV.getMvccVersion() < 0 + || kv.getMvccVersion() < 0 || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store; prevKV = kv; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index ddbcb89..7fd0d1f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -115,6 +115,8 @@ public abstract class Compactor { public long maxSeqId = 0; /** Latest memstore read point found in any of the involved files */ public long maxMVCCReadpoint = 0; + /** Smallest MVCC values found in any of the involved files */ + public long minMVCCReadpoint = 0; } protected FileDetails getFileDetails( @@ -140,6 +142,10 @@ public abstract class Compactor { if (tmp != null) { fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); } + tmp = fileInfo.get(HFileWriterV2.MIN_MEMSTORE_TS_KEY); + if (tmp != null) { + fd.minMVCCReadpoint = Math.min(fd.minMVCCReadpoint, Bytes.toLong(tmp)); + } // If required, calculate the earliest put timestamp of all involved storefiles. // This is used to remove family delete marker during compaction. long earliestPutTs = 0; @@ -205,7 +211,7 @@ public abstract class Compactor { hasMore = scanner.next(kvs, compactionKVMax); // output to writer: for (KeyValue kv : kvs) { - if (kv.getMemstoreTS() <= smallestReadPoint) { + if (kv.getMemstoreTS() > 0 && kv.getMemstoreTS() <= smallestReadPoint) { kv.setMemstoreTS(0); } writer.append(kv); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index f7eefb0..2419e81 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -70,8 +70,10 @@ public class DefaultCompactor extends Compactor { } // Create the writer even if no kv(Empty store file is also ok), // because we need record the max seq id for the store file, see HBASE-6059 + // When fd.minMVCCReadpoint is negative, it means some files containing negative sequence + // number of kvs. Therefore, we still need to write mvcc values out. see HBASE-8701 writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, - fd.maxMVCCReadpoint >= smallestReadPoint); + (fd.maxMVCCReadpoint >= smallestReadPoint || fd.minMVCCReadpoint < 0)); boolean finished = performCompaction(scanner, writer, smallestReadPoint); if (!finished) { abortWriter(writer); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 31440a7..60ffe05 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -837,7 +837,7 @@ class FSHLog implements HLog, Syncable { @Override public void append(HRegionInfo info, byte [] tableName, WALEdit edits, final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException { - append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore); + append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore, 0); } /** @@ -863,11 +863,15 @@ class FSHLog implements HLog, Syncable { * @param clusterId The originating clusterId for this edit (for replication) * @param now * @param doSync shall we sync? + * @param isInMemstore + * @param replaySequenceNumber original sequence number of a replayed wal edit. When passed in as + * non-positive integers, replaySequenceNumber will be ignored * @return txid of this transaction * @throws IOException */ private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, - final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore) + final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, + long replaySequenceNumber) throws IOException { if (edits.isEmpty()) return this.unflushedEntries.get(); if (this.closed) { @@ -886,7 +890,7 @@ class FSHLog implements HLog, Syncable { byte [] encodedRegionName = info.getEncodedNameAsBytes(); if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); - doWrite(info, logKey, edits, htd); + doWrite(info, logKey, edits, htd, replaySequenceNumber); this.numEntries.incrementAndGet(); txid = this.unflushedEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { @@ -908,7 +912,13 @@ class FSHLog implements HLog, Syncable { public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { - return append(info, tableName, edits, clusterId, now, htd, false, true); + return append(info, tableName, edits, clusterId, now, htd, false, true, 0); + } + + @Override + public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits, UUID clusterId, + final long now, HTableDescriptor htd, final long replaySequenceNumber) throws IOException { + return append(info, tableName, edits, clusterId, now, htd, false, true, replaySequenceNumber); } /** @@ -1191,10 +1201,14 @@ class FSHLog implements HLog, Syncable { } } + protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit, HTableDescriptor htd) + throws IOException { + doWrite(info, logKey, logEdit, htd, 0); + } + // TODO: Remove info. Unused. - protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit, - HTableDescriptor htd) - throws IOException { + protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit, HTableDescriptor htd, + long replaySequenceNumber) throws IOException { if (!this.enabled) { return; } @@ -1210,6 +1224,9 @@ class FSHLog implements HLog, Syncable { if (logEdit.isReplay()) { // set replication scope null so that this won't be replicated logKey.setScopes(null); + if(replaySequenceNumber > 0) { + logKey.setReplayLogSequenceNumber(replaySequenceNumber); + } } // write to our buffer for the Hlog file. logSyncer.append(new FSHLog.Entry(logKey, logEdit)); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 97413b3..2fce425 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -299,8 +299,24 @@ public interface HLog { * @return txid of this transaction * @throws IOException */ - public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits, - UUID clusterId, final long now, HTableDescriptor htd) throws IOException; + public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits, UUID clusterId, + final long now, HTableDescriptor htd) throws IOException; + + /** + * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and + * log-sequence-id. The HLog is not flushed after this transaction is written to the log. + * @param info + * @param tableName + * @param edits + * @param clusterId The originating clusterId for this edit (for replication) + * @param now + * @param htd + * @param replaySequenceNumber sequence number of edits used in replay + * @return txid of this transaction + * @throws IOException + */ + public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits, UUID clusterId, + final long now, HTableDescriptor htd, final long replaySequenceNumber) throws IOException; public void hsync() throws IOException; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 0028abe..b3afbac 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -99,6 +99,8 @@ public class HLogKey implements WritableComparable { private byte [] encodedRegionName; private byte [] tablename; private long logSeqNum; + // used in distributedLogReplay to store original log sequence number of an edit + private long replayLogSeqNum; // Time at which this edit was written. private long writeTime; @@ -195,6 +197,14 @@ public class HLogKey implements WritableComparable { this.clusterId = clusterId; } + public long getReplaySequenceNumber() { + return this.replayLogSeqNum; + } + + public void setReplayLogSequenceNumber(long seqNum) { + this.replayLogSeqNum = seqNum; + } + @Override public String toString() { return Bytes.toString(tablename) + "/" + Bytes.toString(encodedRegionName) + "/" + @@ -383,6 +393,10 @@ public class HLogKey implements WritableComparable { .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue()))); } } + if (this.replayLogSeqNum > 0) { + // set replayLogSeqNum when there is one + builder.setReplayLogSequenceNumber(this.replayLogSeqNum); + } return builder; } @@ -413,5 +427,8 @@ public class HLogKey implements WritableComparable { } this.logSeqNum = walKey.getLogSequenceNumber(); this.writeTime = walKey.getWriteTime(); + if (walKey.hasReplayLogSequenceNumber()) { + this.replayLogSeqNum = walKey.getReplayLogSequenceNumber(); + } } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index bbac87f..3bbde3a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -1719,9 +1719,10 @@ public class HLogSplitter { Set nonExistentTables = null; Long cachedLastFlushedSequenceId = -1l; for (HLog.Entry entry : entries) { + HLogKey logKey = entry.getKey(); WALEdit edit = entry.getEdit(); - byte[] table = entry.getKey().getTablename(); - String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName()); + byte[] table = logKey.getTablename(); + String encodeRegionNameStr = Bytes.toString(logKey.getEncodedRegionName()); // skip edits of non-existent tables if (nonExistentTables != null && nonExistentTables.contains(table)) { this.skippedEdits.incrementAndGet(); @@ -1740,7 +1741,9 @@ public class HLogSplitter { String preKey = null; List kvs = edit.getKeyValues(); HConnection hconn = this.getConnectionByTableName(table); - + // use replaySequenceNumber when there is one + long sequenceNum = (logKey.getReplaySequenceNumber() > 0) ? logKey + .getReplaySequenceNumber() : logKey.getLogSeqNum(); for (KeyValue kv : kvs) { // filtering HLog meta entries // We don't handle HBASE-2231 because we may or may not replay a compaction event. @@ -1805,10 +1808,12 @@ public class HLogSplitter { if (kv.isDelete()) { del = new Delete(kv.getRow()); del.setClusterId(entry.getKey().getClusterId()); + del.setAttribute(HRegion.SEQ_ID, Bytes.toBytes(sequenceNum)); preRow = del; } else { put = new Put(kv.getRow()); put.setClusterId(entry.getKey().getClusterId()); + put.setAttribute(HRegion.SEQ_ID, Bytes.toBytes(sequenceNum)); preRow = put; } preKey = loc.getHostnamePort() + KEY_DELIMITER + Bytes.toString(table); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 53bb9d1..6ed2e7c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -65,12 +66,15 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -693,8 +697,194 @@ public class TestDistributedLogSplitting { } @Test(timeout = 300000) + public void testSameVersionUpdatesRecovery() throws Exception { + LOG.info("testSameVersionUpdatesRecovery"); + Configuration curConf = HBaseConfiguration.create(); + curConf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); + curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS, curConf); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable() + || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { + it.remove(); + } + } + if (regions.size() == 0) return; + HRegionInfo curRegionInfo = regions.get(0); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, 1); + // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key + row = Arrays.copyOfRange(row, 3, 8); + long value = 0; + byte[] tableName = Bytes.toBytes("table"); + byte[] family = Bytes.toBytes("family"); + byte[] qualifier = Bytes.toBytes("c1"); + long timeStamp = System.currentTimeMillis(); + HTableDescriptor htd = new HTableDescriptor(tableName); + for (int i = 0; i < NUM_LOG_LINES; i += 1) { + WALEdit e = new WALEdit(); + value++; + e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); + hrs.getWAL().append(curRegionInfo, tableName, e, System.currentTimeMillis(), htd); + } + hrs.getWAL().sync(); + hrs.getWAL().close(); + + // wait for abort completes + this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + + // verify we got the last value + LOG.info("Verification Starts..."); + Get g = new Get(row); + Result r = ht.get(g); + long theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + + // after flush + LOG.info("Verification after flush..."); + TEST_UTIL.getHBaseAdmin().flush(tableName); + r = ht.get(g); + theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + ht.close(); + } + + @Test(timeout = 300000) + public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception { + LOG.info("testSameVersionUpdatesRecoveryWithWrites"); + Configuration curConf = HBaseConfiguration.create(); + curConf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); + curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + curConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024); + curConf.setInt("hbase.hstore.compactionThreshold", 3); + startCluster(NUM_RS, curConf); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable() + || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { + it.remove(); + } + } + if (regions.size() == 0) return; + HRegionInfo curRegionInfo = regions.get(0); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, 1); + // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key + row = Arrays.copyOfRange(row, 3, 8); + long value = 0; + final byte[] tableName = Bytes.toBytes("table"); + byte[] family = Bytes.toBytes("family"); + byte[] qualifier = Bytes.toBytes("c1"); + long timeStamp = System.currentTimeMillis(); + HTableDescriptor htd = new HTableDescriptor(tableName); + for (int i = 0; i < NUM_LOG_LINES; i += 1) { + WALEdit e = new WALEdit(); + value++; + e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); + hrs.getWAL().append(curRegionInfo, tableName, e, System.currentTimeMillis(), htd); + } + hrs.getWAL().sync(); + hrs.getWAL().close(); + + // wait for abort completes + this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + + // verify we got the last value + LOG.info("Verification Starts..."); + Get g = new Get(row); + Result r = ht.get(g); + long theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + + // after flush & compaction + LOG.info("Verification after flush..."); + TEST_UTIL.getHBaseAdmin().flush(tableName); + TEST_UTIL.getHBaseAdmin().compact(tableName); + + // wait for compaction completes + TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE); + } + }); + + r = ht.get(g); + theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + ht.close(); + } + + @Test(timeout = 300000) public void testLogReplayForDisablingTable() throws Exception { - LOG.info("testLogReplayWithNonMetaRSDown"); + LOG.info("testLogReplayForDisablingTable"); Configuration curConf = HBaseConfiguration.create(); curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); startCluster(NUM_RS, curConf); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 579d249..a896332 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -186,7 +186,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool LOG.info("Rolling after " + appends + " edits"); rollWriter(); } - super.doWrite(info, logKey, logEdit, htd); + super.doWrite(info, logKey, logEdit, htd, 0); }; }; hlog.rollWriter();