diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index a589cce..be4bda1 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -1830,6 +1830,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable { right.getBuffer(), right.getOffset() + ROW_OFFSET, right.getKeyLength()); if (ret != 0) return ret; + // If both KeyValues carry seq Id, there is no need to negate the result of comparison + if (left.getMemstoreTS() < 0 && right.getMemstoreTS() < 0) { + return Longs.compare(left.getMemstoreTS(), right.getMemstoreTS()); + } // Negate this comparison so later edits show up first return -Longs.compare(left.getMemstoreTS(), right.getMemstoreTS()); } diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index f72c90e..df0182f 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -485,6 +485,10 @@ public final class WALProtos { // optional uint32 followingKvCount = 7; boolean hasFollowingKvCount(); int getFollowingKvCount(); + + // optional uint64 origLogSequenceNumber = 8; + boolean hasOrigLogSequenceNumber(); + long getOrigLogSequenceNumber(); } public static final class WALKey extends com.google.protobuf.GeneratedMessage @@ -599,6 +603,16 @@ public final class WALProtos { return followingKvCount_; } + // optional uint64 origLogSequenceNumber = 8; + public static final int ORIGLOGSEQUENCENUMBER_FIELD_NUMBER = 8; + private long origLogSequenceNumber_; + public boolean hasOrigLogSequenceNumber() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + public long getOrigLogSequenceNumber() { + return origLogSequenceNumber_; + } + private void initFields() { encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; tableName_ = com.google.protobuf.ByteString.EMPTY; @@ -607,6 +621,7 @@ public final class WALProtos { clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance(); scopes_ = java.util.Collections.emptyList(); followingKvCount_ = 0; + origLogSequenceNumber_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -669,6 +684,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeUInt32(7, followingKvCount_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeUInt64(8, origLogSequenceNumber_); + } getUnknownFields().writeTo(output); } @@ -706,6 +724,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeUInt32Size(7, followingKvCount_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(8, origLogSequenceNumber_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -761,6 +783,11 @@ public final class WALProtos { result = result && (getFollowingKvCount() == other.getFollowingKvCount()); } + result = result && (hasOrigLogSequenceNumber() == other.hasOrigLogSequenceNumber()); + if (hasOrigLogSequenceNumber()) { + result = result && (getOrigLogSequenceNumber() + == other.getOrigLogSequenceNumber()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -798,6 +825,10 @@ public final class WALProtos { hash = (37 * hash) + FOLLOWINGKVCOUNT_FIELD_NUMBER; hash = (53 * hash) + getFollowingKvCount(); } + if (hasOrigLogSequenceNumber()) { + hash = (37 * hash) + ORIGLOGSEQUENCENUMBER_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getOrigLogSequenceNumber()); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -938,6 +969,8 @@ public final class WALProtos { } followingKvCount_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + origLogSequenceNumber_ = 0L; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -1013,6 +1046,10 @@ public final class WALProtos { to_bitField0_ |= 0x00000020; } result.followingKvCount_ = followingKvCount_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000040; + } + result.origLogSequenceNumber_ = origLogSequenceNumber_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1073,6 +1110,9 @@ public final class WALProtos { if (other.hasFollowingKvCount()) { setFollowingKvCount(other.getFollowingKvCount()); } + if (other.hasOrigLogSequenceNumber()) { + setOrigLogSequenceNumber(other.getOrigLogSequenceNumber()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1172,6 +1212,11 @@ public final class WALProtos { followingKvCount_ = input.readUInt32(); break; } + case 64: { + bitField0_ |= 0x00000080; + origLogSequenceNumber_ = input.readUInt64(); + break; + } } } } @@ -1565,6 +1610,27 @@ public final class WALProtos { return this; } + // optional uint64 origLogSequenceNumber = 8; + private long origLogSequenceNumber_ ; + public boolean hasOrigLogSequenceNumber() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + public long getOrigLogSequenceNumber() { + return origLogSequenceNumber_; + } + public Builder setOrigLogSequenceNumber(long value) { + bitField0_ |= 0x00000080; + origLogSequenceNumber_ = value; + onChanged(); + return this; + } + public Builder clearOrigLogSequenceNumber() { + bitField0_ = (bitField0_ & ~0x00000080); + origLogSequenceNumber_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:WALKey) } @@ -3257,21 +3323,22 @@ public final class WALProtos { static { java.lang.String[] descriptorData = { "\n\tWAL.proto\032\013hbase.proto\"#\n\tWALHeader\022\026\n" + - "\016hasCompression\030\001 \001(\010\"\266\001\n\006WALKey\022\031\n\021enco" + + "\016hasCompression\030\001 \001(\010\"\325\001\n\006WALKey\022\031\n\021enco" + "dedRegionName\030\001 \002(\014\022\021\n\ttableName\030\002 \002(\014\022\031" + "\n\021logSequenceNumber\030\003 \002(\004\022\021\n\twriteTime\030\004" + " \002(\004\022\030\n\tclusterId\030\005 \001(\0132\005.UUID\022\034\n\006scopes" + "\030\006 \003(\0132\014.FamilyScope\022\030\n\020followingKvCount" + - "\030\007 \001(\r\"<\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\035\n" + - "\tscopeType\030\002 \002(\0162\n.ScopeType\"\241\001\n\024Compact" + - "ionDescriptor\022\021\n\ttableName\030\001 \002(\014\022\031\n\021enco" + - "dedRegionName\030\002 \002(\014\022\022\n\nfamilyName\030\003 \002(\014\022", - "\027\n\017compactionInput\030\004 \003(\t\022\030\n\020compactionOu" + - "tput\030\005 \003(\t\022\024\n\014storeHomeDir\030\006 \002(\t\"\014\n\nWALT" + - "railer*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE" + - "_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?" + - "\n*org.apache.hadoop.hbase.protobuf.gener" + - "atedB\tWALProtosH\001\210\001\000\240\001\001" + "\030\007 \001(\r\022\035\n\025origLogSequenceNumber\030\010 \001(\004\"<\n" + + "\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\035\n\tscopeTyp" + + "e\030\002 \002(\0162\n.ScopeType\"\241\001\n\024CompactionDescri" + + "ptor\022\021\n\ttableName\030\001 \002(\014\022\031\n\021encodedRegion", + "Name\030\002 \002(\014\022\022\n\nfamilyName\030\003 \002(\014\022\027\n\017compac" + + "tionInput\030\004 \003(\t\022\030\n\020compactionOutput\030\005 \003(" + + "\t\022\024\n\014storeHomeDir\030\006 \002(\t\"\014\n\nWALTrailer*F\n" + + "\tScopeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022" + + "\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apa" + + "che.hadoop.hbase.protobuf.generatedB\tWAL" + + "ProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3291,7 +3358,7 @@ public final class WALProtos { internal_static_WALKey_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALKey_descriptor, - new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", }, + new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "OrigLogSequenceNumber", }, org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.class, org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder.class); internal_static_FamilyScope_descriptor = diff --git hbase-protocol/src/main/protobuf/WAL.proto hbase-protocol/src/main/protobuf/WAL.proto index 5635c60..70a6074 100644 --- hbase-protocol/src/main/protobuf/WAL.proto +++ hbase-protocol/src/main/protobuf/WAL.proto @@ -37,6 +37,9 @@ message WALKey { repeated FamilyScope scopes = 6; optional uint32 followingKvCount = 7; + // origLogSequenceNumber is used in distributedLogReplay to store original log sequence number + // in receiving region server + optional uint64 origLogSequenceNumber = 8; /* optional CustomEntryType customEntryType = 8; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index bac6f14..e81fcda 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -192,6 +192,15 @@ public class HRegion implements HeapSize { // , Writable{ public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand"; + // in distributedLogReplay mode, we haven't read all wals so we don't know the last exact + // sequence number used by previous failed RS. Hence we introduce SEQNUM_SAFETY_BUMPER to add a + // large enough number to be sure that the new sequence number of the just opened region won't + // overlap with old sequence numbers. + // Using 200 million: + // 1) it'd take 300+ years to overflow long integer assuming the same region recovers every second + // 2) it'd take 2+ days for a RS receives a change every millisecond and without a single flush + static final long SEQNUM_SAFETY_BUMPER = 200 * 1024 * 1024; // 200 millions + final AtomicBoolean closed = new AtomicBoolean(false); /* Closing can take some time; use the closing flag if there is stuff we don't * want to do while in closing state; e.g. like offer this region up to the @@ -201,6 +210,11 @@ public class HRegion implements HeapSize { // , Writable{ final AtomicBoolean closing = new AtomicBoolean(false); protected long completeSequenceId = -1L; + + public static final String SEQ_ID = "_seq.id"; + + // signifies that there was no sequence Id + private static final long NO_SEQ_ID = -1; /** * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for @@ -617,6 +631,18 @@ public class HRegion implements HeapSize { // , Writable{ coprocessorHost.postOpen(); } + // check if we need set current region in recovering state + if (this.rsServices != null) { + this.setRecovering(false); + Map recoveringRegions = this.rsServices.getRecoveringRegions(); + if (recoveringRegions != null && !recoveringRegions.isEmpty() + && recoveringRegions.containsKey(this.getRegionInfo().getEncodedName())) { + this.setRecovering(true); + recoveringRegions.put(this.getRegionInfo().getEncodedName(), this); + nextSeqid += SEQNUM_SAFETY_BUMPER; + } + } + status.markComplete("Region opened successfully"); return nextSeqid; } @@ -2156,7 +2182,19 @@ public class HRegion implements HeapSize { // , Writable{ != OperationStatusCode.NOT_RUN) { continue; } - addedSize += applyFamilyMapToMemstore(familyMaps[i], w); + Mutation mutation = batchOp.operations[i].getFirst(); + long seqId = NO_SEQ_ID; + if (isInReplay) { + byte[] seqIdBytes = mutation.getAttribute(SEQ_ID); + try { + if (seqIdBytes != null) { + seqId = Bytes.toLong(seqIdBytes); + } + } catch (NumberFormatException nfe) { + LOG.warn("encountered bad seq Id: " + Bytes.toString(seqIdBytes), nfe); + } + } + addedSize += applyFamilyMapToMemstore(familyMaps[i], w, seqId); } // ------------------------------------ @@ -2190,6 +2228,9 @@ public class HRegion implements HeapSize { // , Writable{ walEdit.add(kv); } } + /* + * if(isInReplay) { walEdit. } + */ addFamilyMapToWALEdit(familyMaps[i], walEdit); } @@ -2598,11 +2639,12 @@ public class HRegion implements HeapSize { // , Writable{ * @param familyMap Map of kvs per family * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. * If null, then this method internally creates a mvcc transaction. + * @param seqId the sequence num that the mutations were carrying * @return the additional memory usage of the memstore caused by the * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) { + MultiVersionConsistencyControl.WriteEntry localizedWriteEntry, long seqId) { long size = 0; boolean freemvcc = false; @@ -2619,7 +2661,8 @@ public class HRegion implements HeapSize { // , Writable{ Store store = getStore(family); for (Cell cell: cells) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - kv.setMemstoreTS(localizedWriteEntry.getWriteNumber()); + kv.setMemstoreTS(seqId == NO_SEQ_ID ? localizedWriteEntry.getWriteNumber() : + -seqId); size += store.add(kv); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 9d2aceb..31bb46b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -28,6 +28,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import com.google.common.primitives.Longs; + /** * Implements a heap merge across any number of KeyValueScanners. *

@@ -101,7 +103,8 @@ public class KeyValueHeap extends NonLazyKeyValueScanner } else { KeyValueScanner topScanner = this.heap.peek(); if (topScanner == null || - this.comparator.compare(kvNext, topScanner.peek()) >= 0) { + this.comparator.compare(kvNext, this.current.getSequenceID(), + topScanner.peek(), topScanner.getSequenceID()) >= 0) { this.heap.add(this.current); this.current = pollRealKV(); } @@ -167,21 +170,33 @@ public class KeyValueHeap extends NonLazyKeyValueScanner this.kvComparator = kvComparator; } public int compare(KeyValueScanner left, KeyValueScanner right) { - int comparison = compare(left.peek(), right.peek()); + return compare(left.peek(), left.getSequenceID(), right.peek(), right.getSequenceID()); + } + public int compare(KeyValue leftKV, long leftSeqId, KeyValue rightKV, long rightSeqId) { + int comparison = this.kvComparator.getRawComparator().compare(leftKV.getBuffer(), + leftKV.getOffset() + KeyValue.ROW_OFFSET, leftKV.getKeyLength(), + rightKV.getBuffer(), rightKV.getOffset() + KeyValue.ROW_OFFSET, + rightKV.getKeyLength()); if (comparison != 0) { return comparison; + } + if (leftKV.getMemstoreTS() >= 0 && rightKV.getMemstoreTS() >= 0) { + int ret = -Longs.compare(leftKV.getMemstoreTS(), rightKV.getMemstoreTS()); + if (ret != 0) return ret; + } + // Since both the keys are exactly the same, we break the tie in favor + // of the key which came latest. + // For KeyValues coming from log replay, their sequence Id was encoded as MemstoreTS + long leftSequenceID = leftKV.getMemstoreTS() >= 0 ? leftSeqId : + -leftKV.getMemstoreTS(); + long rightSequenceID = rightKV.getMemstoreTS() >= 0 ? rightSeqId : + -rightKV.getMemstoreTS(); + if (leftSequenceID > rightSequenceID) { + return -1; + } else if (leftSequenceID < rightSequenceID) { + return 1; } else { - // Since both the keys are exactly the same, we break the tie in favor - // of the key which came latest. - long leftSequenceID = left.getSequenceID(); - long rightSequenceID = right.getSequenceID(); - if (leftSequenceID > rightSequenceID) { - return -1; - } else if (leftSequenceID < rightSequenceID) { - return 1; - } else { - return 0; - } + return 0; } } /** @@ -343,7 +358,8 @@ public class KeyValueHeap extends NonLazyKeyValueScanner // Compare the current scanner to the next scanner. We try to avoid // putting the current one back into the heap if possible. KeyValue nextKV = nextEarliestScanner.peek(); - if (nextKV == null || comparator.compare(curKV, nextKV) < 0) { + if (nextKV == null || comparator.compare(curKV, kvScanner.getSequenceID(), + nextKV, nextEarliestScanner.getSequenceID()) < 0) { // We already have the scanner with the earliest KV, so return it. return kvScanner; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 0dcbdad..1250de4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -205,7 +205,7 @@ public abstract class Compactor { hasMore = scanner.next(kvs, compactionKVMax); // output to writer: for (KeyValue kv : kvs) { - if (kv.getMemstoreTS() <= smallestReadPoint) { + if (kv.getMemstoreTS() > 0 && kv.getMemstoreTS() <= smallestReadPoint) { kv.setMemstoreTS(0); } writer.append(kv); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index 0b47a74..193ab36 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -139,15 +139,6 @@ public class OpenRegionHandler extends EventHandler { return; } - // check if we need set current region in recovering state - region.setRecovering(false); - Map recoveringRegions = this.rsServices.getRecoveringRegions(); - if (recoveringRegions != null && !recoveringRegions.isEmpty() - && recoveringRegions.containsKey(region.getRegionInfo().getEncodedName())) { - region.setRecovering(true); - recoveringRegions.put(region.getRegionInfo().getEncodedName(), region); - } - boolean failed = true; if (tickleOpening("post_region_open")) { if (updateMeta(region)) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 31440a7..a2a9b1e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1210,6 +1210,7 @@ class FSHLog implements HLog, Syncable { if (logEdit.isReplay()) { // set replication scope null so that this won't be replicated logKey.setScopes(null); + logKey.setOrigLogSequenceNumber(logEdit.getOriginalSequenceNumber()); } // write to our buffer for the Hlog file. logSyncer.append(new FSHLog.Entry(logKey, logEdit)); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 0028abe..3ea7bc2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -99,6 +99,8 @@ public class HLogKey implements WritableComparable { private byte [] encodedRegionName; private byte [] tablename; private long logSeqNum; + // used in distributedLogReplay to store original log sequence number of an edit + private long origLogSeqNum; // Time at which this edit was written. private long writeTime; @@ -114,19 +116,18 @@ public class HLogKey implements WritableComparable { } /** - * Create the log key for writing to somewhere. - * We maintain the tablename mainly for debugging purposes. - * A regionName is always a sub-table object. - * + * Create the log key for writing to somewhere. We maintain the tablename mainly for debugging + * purposes. A regionName is always a sub-table object. * @param encodedRegionName Encoded name of the region as returned by - * HRegionInfo#getEncodedNameAsBytes(). - * @param tablename - name of table - * @param logSeqNum - log sequence number + * HRegionInfo#getEncodedNameAsBytes(). + * @param tablename - name of table + * @param logSeqNum - log sequence number * @param now Time at which this edit was written. * @param clusterId of the cluster (used in Replication) + * @param origSeqNum Original sequence number of current edit used in distributedLogReplay */ - public HLogKey(final byte [] encodedRegionName, final byte [] tablename, - long logSeqNum, final long now, UUID clusterId) { + public HLogKey(final byte[] encodedRegionName, final byte[] tablename, long logSeqNum, + final long now, UUID clusterId) { this.logSeqNum = logSeqNum; this.writeTime = now; this.clusterId = clusterId; @@ -195,6 +196,14 @@ public class HLogKey implements WritableComparable { this.clusterId = clusterId; } + public long getOrigSequenceNumber() { + return this.origLogSeqNum; + } + + public void setOrigLogSequenceNumber(long seqNum) { + this.origLogSeqNum = seqNum; + } + @Override public String toString() { return Bytes.toString(tablename) + "/" + Bytes.toString(encodedRegionName) + "/" + @@ -383,6 +392,9 @@ public class HLogKey implements WritableComparable { .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue()))); } } + if (this.origLogSeqNum > 0) { + builder.setOrigLogSequenceNumber(this.origLogSeqNum); + } return builder; } @@ -413,5 +425,8 @@ public class HLogKey implements WritableComparable { } this.logSeqNum = walKey.getLogSequenceNumber(); this.writeTime = walKey.getWriteTime(); + if (walKey.hasOrigLogSequenceNumber()) { + this.origLogSeqNum = walKey.getOrigLogSequenceNumber(); + } } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index bbac87f..4f138b6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -1719,9 +1719,10 @@ public class HLogSplitter { Set nonExistentTables = null; Long cachedLastFlushedSequenceId = -1l; for (HLog.Entry entry : entries) { + HLogKey logKey = entry.getKey(); WALEdit edit = entry.getEdit(); - byte[] table = entry.getKey().getTablename(); - String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName()); + byte[] table = logKey.getTablename(); + String encodeRegionNameStr = Bytes.toString(logKey.getEncodedRegionName()); // skip edits of non-existent tables if (nonExistentTables != null && nonExistentTables.contains(table)) { this.skippedEdits.incrementAndGet(); @@ -1740,7 +1741,8 @@ public class HLogSplitter { String preKey = null; List kvs = edit.getKeyValues(); HConnection hconn = this.getConnectionByTableName(table); - + long sequenceNum = (logKey.getOrigSequenceNumber() > 0) ? logKey.getOrigSequenceNumber() + : logKey.getLogSeqNum(); for (KeyValue kv : kvs) { // filtering HLog meta entries // We don't handle HBASE-2231 because we may or may not replay a compaction event. @@ -1805,10 +1807,12 @@ public class HLogSplitter { if (kv.isDelete()) { del = new Delete(kv.getRow()); del.setClusterId(entry.getKey().getClusterId()); + del.setAttribute(HRegion.SEQ_ID, Bytes.toBytes(sequenceNum)); preRow = del; } else { put = new Put(kv.getRow()); put.setClusterId(entry.getKey().getClusterId()); + put.setAttribute(HRegion.SEQ_ID, Bytes.toBytes(sequenceNum)); preRow = put; } preKey = loc.getHostnamePort() + KEY_DELIMITER + Bytes.toString(table); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index d8b163f..499612e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -84,6 +84,7 @@ public class WALEdit implements Writable, HeapSize { static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION"); private final int VERSION_2 = -1; private final boolean isReplay; + private long origLogSequenceNumber = 0; private final ArrayList kvs = new ArrayList(); @@ -117,6 +118,14 @@ public class WALEdit implements Writable, HeapSize { return this.isReplay; } + public void setOriginalSequenceNumber(long seqNum) { + this.origLogSequenceNumber = seqNum; + } + + public long getOriginalSequenceNumber() { + return this.origLogSequenceNumber; + } + public void setCompressionContext(final CompressionContext compressionContext) { this.compressionContext = compressionContext; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 1815293..f93ec91 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -65,8 +66,10 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -698,8 +701,90 @@ public class TestDistributedLogSplitting { } @Test(timeout = 300000) + public void testSameVersionUpdatesRecovery() throws Exception { + LOG.info("testSameVersionUpdatesRecovery"); + Configuration curConf = HBaseConfiguration.create(); + curConf.setLong("hbase.regionserver.hlog.blocksize", 10 * 1024); + curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS, curConf); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable() + || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { + it.remove(); + } + } + if (regions.size() == 0) return; + HRegionInfo curRegionInfo = regions.get(0); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, 1); + // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key + row = Arrays.copyOfRange(row, 3, 8); + long value = 0; + byte[] tableName = Bytes.toBytes("table"); + byte[] family = Bytes.toBytes("family"); + byte[] qualifier = Bytes.toBytes("c1"); + long timeStamp = System.currentTimeMillis(); + HTableDescriptor htd = new HTableDescriptor(tableName); + for (int i = 0; i < NUM_LOG_LINES; i += 1) { + WALEdit e = new WALEdit(); + value++; + e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); + hrs.getWAL().append(curRegionInfo, tableName, e, System.currentTimeMillis(), htd); + } + hrs.getWAL().sync(); + hrs.getWAL().close(); + + // wait for abort completes + this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + + // verify we got the last value + LOG.info("Verification Starts..."); + Get g = new Get(row); + Result r = ht.get(g); + LOG.info("Result fetched."); + long theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + + ht.close(); + } + + @Test(timeout = 300000) public void testLogReplayForDisablingTable() throws Exception { - LOG.info("testLogReplayWithNonMetaRSDown"); + LOG.info("testLogReplayForDisablingTable"); Configuration curConf = HBaseConfiguration.create(); curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); startCluster(NUM_RS, curConf);