diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 43e91d2..038b148 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -2584,6 +2584,7 @@ public final class ProtobufUtil { FlushDescriptor.Builder desc = FlushDescriptor.newBuilder() .setAction(action) .setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes())) + .setRegionName(ByteStringer.wrap(hri.getRegionName())) .setFlushSequenceNumber(flushSeqId) .setTableName(ByteStringer.wrap(hri.getTable().getName())); @@ -2609,6 +2610,7 @@ public final class ProtobufUtil { .setEventType(eventType) .setTableName(ByteStringer.wrap(hri.getTable().getName())) .setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes())) + .setRegionName(ByteStringer.wrap(hri.getRegionName())) .setLogSequenceNumber(seqId) .setServer(toServerName(server)); diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 8566a88..d388267 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -47,7 +47,7 @@ import org.apache.hadoop.io.RawComparator; import com.google.common.annotations.VisibleForTesting; /** - * An HBase Key/Value. This is the fundamental HBase Type. + * An HBase Key/Value. This is the fundamental HBase Type. *

* HBase applications and users should use the Cell interface and avoid directly using KeyValue * and member functions not defined in Cell. @@ -297,6 +297,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, return seqId; } + @Override public void setSequenceId(long seqId) { this.seqId = seqId; } @@ -577,7 +578,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, this(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength, timestamp, type, value, voffset, vlength, null); } - + /** * Constructs KeyValue structure filled with specified values. Uses the provided buffer as the * data buffer. @@ -742,9 +743,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, public KeyValue(Cell c) { this(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(), - c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), - c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), - c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), + c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), + c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), + c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength()); this.seqId = c.getSequenceId(); } @@ -955,7 +956,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, final int rlength, final byte [] family, final int foffset, int flength, final byte [] qualifier, final int qoffset, int qlength, final long timestamp, final Type type, - final byte [] value, final int voffset, + final byte [] value, final int voffset, int vlength, byte[] tags, int tagsOffset, int tagsLength) { checkParameters(row, rlength, family, flength, qlength, vlength); @@ -1115,6 +1116,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, // //--------------------------------------------------------------------------- + @Override public String toString() { if (this.bytes == null || this.bytes.length == 0) { return "empty"; @@ -1125,10 +1127,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, /** * @param k Key portion of a KeyValue. - * @return Key as a String, empty string if k is null. + * @return Key as a String, empty string if k is null. */ public static String keyToString(final byte [] k) { - if (k == null) { + if (k == null) { return ""; } return keyToString(k, 0, k.length); @@ -1464,6 +1466,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * save on allocations. * @return Value in a new byte array. */ + @Override @Deprecated // use CellUtil.getValueArray() public byte [] getValue() { return CellUtil.cloneValue(this); @@ -1477,6 +1480,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * lengths instead. * @return Row in a new byte array. */ + @Override @Deprecated // use CellUtil.getRowArray() public byte [] getRow() { return CellUtil.cloneRow(this); @@ -1534,6 +1538,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * lengths instead. * @return Returns family. Makes a copy. */ + @Override @Deprecated // use CellUtil.getFamilyArray public byte [] getFamily() { return CellUtil.cloneFamily(this); @@ -1548,6 +1553,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * Use {@link #getBuffer()} with appropriate offsets and lengths instead. * @return Returns qualifier. Makes a copy. */ + @Override @Deprecated // use CellUtil.getQualifierArray public byte [] getQualifier() { return CellUtil.cloneQualifier(this); @@ -1846,7 +1852,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, return compareFlatKey(l,loff,llen, r,roff,rlen); } - + /** * Compares the only the user specified portion of a Key. This is overridden by MetaComparator. * @param left @@ -2214,7 +2220,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * @param leftKey * @param rightKey * @return 0 if equal, <0 if left smaller, >0 if right smaller - * @deprecated Since 0.99.2; Use {@link CellComparator#getMidpoint(Cell, Cell)} instead. + * @deprecated Since 0.99.2; Use {@link CellComparator#getMidpoint(KVComparator, Cell, Cell)} + * instead. */ @Deprecated public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) { @@ -2354,7 +2361,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, in.readFully(bytes); return new KeyValue(bytes, 0, length); } - + /** * Create a new KeyValue by copying existing cell and adding new tags * @param c @@ -2370,9 +2377,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, existingTags = newTags; } return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(), - c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), - c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), - c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), + c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), + c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), + c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), c.getValueLength(), existingTags); } @@ -2477,6 +2484,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, this.comparator = c; } + @Override public int compare(KeyValue left, KeyValue right) { return comparator.compareRows(left, right); } @@ -2485,7 +2493,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, /** * Avoids redundant comparisons for better performance. - * + * * TODO get rid of this wart */ public interface SamePrefixComparator { @@ -2508,6 +2516,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * TODO: With V3 consider removing this. * @return legacy class name for FileFileTrailer#comparatorClassName */ + @Override public String getLegacyKeyComparatorName() { return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator"; } @@ -2515,6 +2524,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, /** * @deprecated Since 0.99.2. */ + @Override @Deprecated public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength) { @@ -2526,6 +2536,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, return compareOnlyKeyPortion(left, right); } + @Override @VisibleForTesting public int compareOnlyKeyPortion(Cell left, Cell right) { int c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getRowArray(), left.getRowOffset(), @@ -2552,6 +2563,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, return (0xff & left.getTypeByte()) - (0xff & right.getTypeByte()); } + @Override public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) { return firstKeyInBlock; } diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index c9fa854..35192cc 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -5522,6 +5522,24 @@ public final class WALProtos { */ org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptorOrBuilder getStoreFlushesOrBuilder( int index); + + // optional bytes region_name = 6; + /** + * optional bytes region_name = 6; + * + *

+     * full region name
+     * 
+ */ + boolean hasRegionName(); + /** + * optional bytes region_name = 6; + * + *
+     * full region name
+     * 
+ */ + com.google.protobuf.ByteString getRegionName(); } /** * Protobuf type {@code FlushDescriptor} @@ -5613,6 +5631,11 @@ public final class WALProtos { storeFlushes_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor.PARSER, extensionRegistry)); break; } + case 50: { + bitField0_ |= 0x00000010; + regionName_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -6772,12 +6795,37 @@ public final class WALProtos { return storeFlushes_.get(index); } + // optional bytes region_name = 6; + public static final int REGION_NAME_FIELD_NUMBER = 6; + private com.google.protobuf.ByteString regionName_; + /** + * optional bytes region_name = 6; + * + *
+     * full region name
+     * 
+ */ + public boolean hasRegionName() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes region_name = 6; + * + *
+     * full region name
+     * 
+ */ + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + private void initFields() { action_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction.START_FLUSH; tableName_ = com.google.protobuf.ByteString.EMPTY; encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; flushSequenceNumber_ = 0L; storeFlushes_ = java.util.Collections.emptyList(); + regionName_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -6824,6 +6872,9 @@ public final class WALProtos { for (int i = 0; i < storeFlushes_.size(); i++) { output.writeMessage(5, storeFlushes_.get(i)); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(6, regionName_); + } getUnknownFields().writeTo(output); } @@ -6853,6 +6904,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(5, storeFlushes_.get(i)); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(6, regionName_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -6898,6 +6953,11 @@ public final class WALProtos { } result = result && getStoreFlushesList() .equals(other.getStoreFlushesList()); + result = result && (hasRegionName() == other.hasRegionName()); + if (hasRegionName()) { + result = result && getRegionName() + .equals(other.getRegionName()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -6931,6 +6991,10 @@ public final class WALProtos { hash = (37 * hash) + STORE_FLUSHES_FIELD_NUMBER; hash = (53 * hash) + getStoreFlushesList().hashCode(); } + if (hasRegionName()) { + hash = (37 * hash) + REGION_NAME_FIELD_NUMBER; + hash = (53 * hash) + getRegionName().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -7060,6 +7124,8 @@ public final class WALProtos { } else { storeFlushesBuilder_.clear(); } + regionName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -7113,6 +7179,10 @@ public final class WALProtos { } else { result.storeFlushes_ = storeFlushesBuilder_.build(); } + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000010; + } + result.regionName_ = regionName_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -7167,6 +7237,9 @@ public final class WALProtos { } } } + if (other.hasRegionName()) { + setRegionName(other.getRegionName()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -7593,6 +7666,58 @@ public final class WALProtos { return storeFlushesBuilder_; } + // optional bytes region_name = 6; + private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes region_name = 6; + * + *
+       * full region name
+       * 
+ */ + public boolean hasRegionName() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional bytes region_name = 6; + * + *
+       * full region name
+       * 
+ */ + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + /** + * optional bytes region_name = 6; + * + *
+       * full region name
+       * 
+ */ + public Builder setRegionName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000020; + regionName_ = value; + onChanged(); + return this; + } + /** + * optional bytes region_name = 6; + * + *
+       * full region name
+       * 
+ */ + public Builder clearRegionName() { + bitField0_ = (bitField0_ & ~0x00000020); + regionName_ = getDefaultInstance().getRegionName(); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:FlushDescriptor) } @@ -9772,6 +9897,24 @@ public final class WALProtos { * */ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerOrBuilder(); + + // optional bytes region_name = 7; + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + boolean hasRegionName(); + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + com.google.protobuf.ByteString getRegionName(); } /** * Protobuf type {@code RegionEventDescriptor} @@ -9876,6 +10019,11 @@ public final class WALProtos { bitField0_ |= 0x00000010; break; } + case 58: { + bitField0_ |= 0x00000020; + regionName_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -10135,6 +10283,30 @@ public final class WALProtos { return server_; } + // optional bytes region_name = 7; + public static final int REGION_NAME_FIELD_NUMBER = 7; + private com.google.protobuf.ByteString regionName_; + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + public boolean hasRegionName() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + private void initFields() { eventType_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType.REGION_OPEN; tableName_ = com.google.protobuf.ByteString.EMPTY; @@ -10142,6 +10314,7 @@ public final class WALProtos { logSequenceNumber_ = 0L; stores_ = java.util.Collections.emptyList(); server_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + regionName_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -10197,6 +10370,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeMessage(6, server_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeBytes(7, regionName_); + } getUnknownFields().writeTo(output); } @@ -10230,6 +10406,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(6, server_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(7, regionName_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -10280,6 +10460,11 @@ public final class WALProtos { result = result && getServer() .equals(other.getServer()); } + result = result && (hasRegionName() == other.hasRegionName()); + if (hasRegionName()) { + result = result && getRegionName() + .equals(other.getRegionName()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -10317,6 +10502,10 @@ public final class WALProtos { hash = (37 * hash) + SERVER_FIELD_NUMBER; hash = (53 * hash) + getServer().hashCode(); } + if (hasRegionName()) { + hash = (37 * hash) + REGION_NAME_FIELD_NUMBER; + hash = (53 * hash) + getRegionName().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -10453,6 +10642,8 @@ public final class WALProtos { serverBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000020); + regionName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -10514,6 +10705,10 @@ public final class WALProtos { } else { result.server_ = serverBuilder_.build(); } + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000020; + } + result.regionName_ = regionName_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -10571,6 +10766,9 @@ public final class WALProtos { if (other.hasServer()) { mergeServer(other.getServer()); } + if (other.hasRegionName()) { + setRegionName(other.getRegionName()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -11156,6 +11354,58 @@ public final class WALProtos { return serverBuilder_; } + // optional bytes region_name = 7; + private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public boolean hasRegionName() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public Builder setRegionName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000040; + regionName_ = value; + onChanged(); + return this; + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public Builder clearRegionName() { + bitField0_ = (bitField0_ & ~0x00000040); + regionName_ = getDefaultInstance().getRegionName(); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:RegionEventDescriptor) } @@ -11598,32 +11848,33 @@ public final class WALProtos { "n_name\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020com" + "paction_input\030\004 \003(\t\022\031\n\021compaction_output" + "\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region" + - "_name\030\007 \001(\014\"\353\002\n\017FlushDescriptor\022,\n\006actio" + + "_name\030\007 \001(\014\"\200\003\n\017FlushDescriptor\022,\n\006actio" + "n\030\001 \002(\0162\034.FlushDescriptor.FlushAction\022\022\n", "\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_name" + "\030\003 \002(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n" + "\rstore_flushes\030\005 \003(\0132%.FlushDescriptor.S" + - "toreFlushDescriptor\032Y\n\024StoreFlushDescrip" + - "tor\022\023\n\013family_name\030\001 \002(\014\022\026\n\016store_home_d" + - "ir\030\002 \002(\t\022\024\n\014flush_output\030\003 \003(\t\"A\n\013FlushA" + - "ction\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FLUSH\020\001" + - "\022\017\n\013ABORT_FLUSH\020\002\"R\n\017StoreDescriptor\022\023\n\013" + - "family_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(" + - "\t\022\022\n\nstore_file\030\003 \003(\t\"\215\001\n\022BulkLoadDescri", - "ptor\022\036\n\ntable_name\030\001 \002(\0132\n.TableName\022\033\n\023" + - "encoded_region_name\030\002 \002(\014\022 \n\006stores\030\003 \003(" + - "\0132\020.StoreDescriptor\022\030\n\020bulkload_seq_num\030" + - "\004 \002(\003\"\212\002\n\025RegionEventDescriptor\0224\n\nevent" + - "_type\030\001 \002(\0162 .RegionEventDescriptor.Even" + - "tType\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_reg" + - "ion_name\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 " + - "\001(\004\022 \n\006stores\030\005 \003(\0132\020.StoreDescriptor\022\033\n" + - "\006server\030\006 \001(\0132\013.ServerName\".\n\tEventType\022" + - "\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWA", - "LTrailer*F\n\tScopeType\022\033\n\027REPLICATION_SCO" + - "PE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001" + - "B?\n*org.apache.hadoop.hbase.protobuf.gen" + - "eratedB\tWALProtosH\001\210\001\000\240\001\001" + "toreFlushDescriptor\022\023\n\013region_name\030\006 \001(\014" + + "\032Y\n\024StoreFlushDescriptor\022\023\n\013family_name\030" + + "\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t\022\024\n\014flush_o" + + "utput\030\003 \003(\t\"A\n\013FlushAction\022\017\n\013START_FLUS" + + "H\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\"R" + + "\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014\022\026" + + "\n\016store_home_dir\030\002 \002(\t\022\022\n\nstore_file\030\003 \003", + "(\t\"\215\001\n\022BulkLoadDescriptor\022\036\n\ntable_name\030" + + "\001 \002(\0132\n.TableName\022\033\n\023encoded_region_name" + + "\030\002 \002(\014\022 \n\006stores\030\003 \003(\0132\020.StoreDescriptor" + + "\022\030\n\020bulkload_seq_num\030\004 \002(\003\"\237\002\n\025RegionEve" + + "ntDescriptor\0224\n\nevent_type\030\001 \002(\0162 .Regio" + + "nEventDescriptor.EventType\022\022\n\ntable_name" + + "\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002(\014\022\033\n\023l" + + "og_sequence_number\030\004 \001(\004\022 \n\006stores\030\005 \003(\013" + + "2\020.StoreDescriptor\022\033\n\006server\030\006 \001(\0132\013.Ser" + + "verName\022\023\n\013region_name\030\007 \001(\014\".\n\tEventTyp", + "e\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\n" + + "WALTrailer*F\n\tScopeType\022\033\n\027REPLICATION_S" + + "COPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL" + + "\020\001B?\n*org.apache.hadoop.hbase.protobuf.g" + + "eneratedB\tWALProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -11659,7 +11910,7 @@ public final class WALProtos { internal_static_FlushDescriptor_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_FlushDescriptor_descriptor, - new java.lang.String[] { "Action", "TableName", "EncodedRegionName", "FlushSequenceNumber", "StoreFlushes", }); + new java.lang.String[] { "Action", "TableName", "EncodedRegionName", "FlushSequenceNumber", "StoreFlushes", "RegionName", }); internal_static_FlushDescriptor_StoreFlushDescriptor_descriptor = internal_static_FlushDescriptor_descriptor.getNestedTypes().get(0); internal_static_FlushDescriptor_StoreFlushDescriptor_fieldAccessorTable = new @@ -11683,7 +11934,7 @@ public final class WALProtos { internal_static_RegionEventDescriptor_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RegionEventDescriptor_descriptor, - new java.lang.String[] { "EventType", "TableName", "EncodedRegionName", "LogSequenceNumber", "Stores", "Server", }); + new java.lang.String[] { "EventType", "TableName", "EncodedRegionName", "LogSequenceNumber", "Stores", "Server", "RegionName", }); internal_static_WALTrailer_descriptor = getDescriptor().getMessageTypes().get(8); internal_static_WALTrailer_fieldAccessorTable = new diff --git hbase-protocol/src/main/protobuf/WAL.proto hbase-protocol/src/main/protobuf/WAL.proto index 169a9b2..3fd6025 100644 --- hbase-protocol/src/main/protobuf/WAL.proto +++ hbase-protocol/src/main/protobuf/WAL.proto @@ -122,6 +122,7 @@ message FlushDescriptor { required bytes encoded_region_name = 3; optional uint64 flush_sequence_number = 4; repeated StoreFlushDescriptor store_flushes = 5; + optional bytes region_name = 6; // full region name } message StoreDescriptor { @@ -155,6 +156,7 @@ message RegionEventDescriptor { optional uint64 log_sequence_number = 4; repeated StoreDescriptor stores = 5; optional ServerName server = 6; // Server who opened the region + optional bytes region_name = 7; // full region name } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 48b78c2..081d7a5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -208,6 +208,11 @@ public class DefaultMemStore implements MemStore { return this.snapshotSize > 0 ? this.snapshotSize : keySize(); } + @Override + public long getSnapshotSize() { + return this.snapshotSize; + } + /** * Write an update * @param cell @@ -462,6 +467,7 @@ public class DefaultMemStore implements MemStore { * @param now * @return Timestamp */ + @Override public long updateColumnValue(byte[] row, byte[] family, byte[] qualifier, @@ -524,7 +530,7 @@ public class DefaultMemStore implements MemStore { * atomically. Scans will only see each KeyValue update as atomic. * * @param cells - * @param readpoint readpoint below which we can safely remove duplicate KVs + * @param readpoint readpoint below which we can safely remove duplicate KVs * @return change in memstore size */ @Override @@ -1031,7 +1037,7 @@ public class DefaultMemStore implements MemStore { public long size() { return heapSize(); } - + /** * Code to help figure if our approximation of object heap sizes is close * enough. See hbase-900. Fills memstores then waits so user can heap diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 53e732a..aa65ddd 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -34,6 +34,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NavigableMap; import java.util.NavigableSet; import java.util.RandomAccess; @@ -61,7 +62,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompoundConfiguration; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -100,6 +99,7 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; @@ -133,12 +133,16 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; @@ -176,6 +180,7 @@ import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; +import com.google.protobuf.TextFormat; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -255,6 +260,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // private final AtomicLong sequenceId = new AtomicLong(-1L); /** + * The sequence id of the last replayed open region event from the primary region. This is used + * to skip entries before this due to the possibility of replay edits coming out of order from + * replication. + */ + protected volatile long lastReplayedOpenRegionSeqId = -1L; + + /** * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for * startRegionOperation to possibly invoke different checks before any region operations. Not all * operations have to be defined here. It's only needed when a special check is need in @@ -262,7 +274,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // */ public enum Operation { ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, - REPLAY_BATCH_MUTATE, COMPACT_REGION + REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT } ////////////////////////////////////////////////////////////////////////////// @@ -367,6 +379,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // The following map is populated when opening the region Map maxSeqIdInStores = new TreeMap(Bytes.BYTES_COMPARATOR); + /** Saved state from replaying prepare flush cache */ + private PrepareFlushResult prepareFlushResult = null; + /** * Config setting for whether to allow writes when a region is in recovering or not. */ @@ -516,6 +531,54 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // public boolean isCompactionNeeded() { return result == Result.FLUSHED_COMPACTION_NEEDED; } + + @Override + public String toString() { + return new StringBuilder() + .append("flush result:").append(result).append(", ") + .append("failureReason:").append(failureReason).append(",") + .append("flush seq id").append(flushSequenceId).toString(); + } + } + + /** A result object from prepare flush cache stage */ + @VisibleForTesting + static class PrepareFlushResult { + final FlushResult result; // indicating a failure result from prepare + final TreeMap storeFlushCtxs; + final TreeMap> committedFiles; + final long startTime; + final long flushOpSeqId; + final long flushedSeqId; + final long totalFlushableSize; + + /** Constructs an early exit case */ + PrepareFlushResult(FlushResult result, long flushSeqId) { + this(result, null, null, Math.max(0, flushSeqId), 0, 0, 0); + } + + /** Constructs a successful prepare flush result */ + PrepareFlushResult( + TreeMap storeFlushCtxs, + TreeMap> committedFiles, long startTime, long flushSeqId, + long flushedSeqId, long totalFlushableSize) { + this(null, storeFlushCtxs, committedFiles, startTime, + flushSeqId, flushedSeqId, totalFlushableSize); + } + + private PrepareFlushResult( + FlushResult result, + TreeMap storeFlushCtxs, + TreeMap> committedFiles, long startTime, long flushSeqId, + long flushedSeqId, long totalFlushableSize) { + this.result = result; + this.storeFlushCtxs = storeFlushCtxs; + this.committedFiles = committedFiles; + this.startTime = startTime; + this.flushOpSeqId = flushSeqId; + this.flushedSeqId = flushedSeqId; + this.totalFlushableSize = totalFlushableSize; + } } final WriteState writestate = new WriteState(); @@ -771,6 +834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Initialize all the HStores status.setStatus("Initializing all the Stores"); long maxSeqId = initializeRegionStores(reporter, status); + this.lastReplayedOpenRegionSeqId = maxSeqId; this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this)); this.writestate.flushRequested = false; @@ -1229,9 +1293,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } status.setStatus("Disabling compacts and flushes for region"); + boolean canFlush = true; synchronized (writestate) { // Disable compacting and flushing by background threads for this // region. + canFlush = !writestate.readOnly; writestate.writesEnabled = false; LOG.debug("Closing " + this + ": disabling compactions & flushes"); waitForFlushesAndCompactions(); @@ -1239,7 +1305,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // If we were not just flushing, is it worth doing a preflush...one // that will clear out of the bulk of the memstore before we put up // the close flag? - if (!abort && worthPreFlushing()) { + if (!abort && worthPreFlushing() && canFlush) { status.setStatus("Pre-flushing region before close"); LOG.info("Running close preflush of " + this.getRegionNameAsString()); try { @@ -1262,7 +1328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } LOG.debug("Updates disabled for region " + this); // Don't flush the cache if we are aborting - if (!abort) { + if (!abort && canFlush) { int flushCount = 0; while (this.getMemstoreSize().get() > 0) { try { @@ -1300,7 +1366,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // close each store in parallel for (final Store store : stores.values()) { - assert abort || store.getFlushableSize() == 0; + assert abort || store.getFlushableSize() == 0 || writestate.readOnly; completionService .submit(new Callable>>() { @Override @@ -1336,7 +1402,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } this.closed.set(true); - if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get()); + if (!canFlush) { + addAndGetGlobalMemstoreSize(-memstoreSize.get()); + } else if (memstoreSize.get() != 0) { + LOG.error("Memstore size is " + memstoreSize.get()); + } if (coprocessorHost != null) { status.setStatus("Running coprocessor post-close hooks"); this.coprocessorHost.postClose(abort); @@ -1362,6 +1432,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // */ public void waitForFlushesAndCompactions() { synchronized (writestate) { + if (this.writestate.readOnly) { + // we should not wait for replayed flushed if we are read only (for example in case the + // region is a secondary replica). + return; + } boolean interrupted = false; try { while (writestate.compacting > 0 || writestate.flushing) { @@ -1592,6 +1667,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } /** + * This is a helper function that compact the given store + * It is used by utilities and testing + * + * @throws IOException e + */ + @VisibleForTesting + void compactStore(byte[] family, CompactionThroughputController throughputController) + throws IOException { + Store s = getStore(family); + CompactionContext compaction = s.requestCompaction(); + if (compaction != null) { + compact(compaction, s, throughputController); + } + } + + /* * Called by compaction thread and after region is opened to compact the * HStores if necessary. * @@ -1738,6 +1829,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // status.setStatus("Running coprocessor pre-flush hooks"); coprocessorHost.preFlush(); } + // TODO: this should be managed within memstore with the snapshot, updated only after flush + // successful if (numMutationsWithoutWAL.get() > 0) { numMutationsWithoutWAL.set(0); dataInMemoryWithoutWAL.set(0); @@ -1903,6 +1996,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // */ protected FlushResult internalFlushcache(final WAL wal, final long myseqid, final Collection storesToFlush, MonitoredTask status) throws IOException { + PrepareFlushResult result + = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, false); + if (result.result == null) { + return internalFlushCacheAndCommit(wal, status, result, storesToFlush); + } else { + return result.result; // early exit due to failure from prepare stage + } + } + + protected PrepareFlushResult internalPrepareFlushCache( + final WAL wal, final long myseqid, final Collection storesToFlush, + MonitoredTask status, boolean isReplay) + throws IOException { + if (this.rsServices != null && this.rsServices.isAborted()) { // Don't flush when server aborting, it's unsafe throw new IOException("Aborting flush because server is aborted..."); @@ -1930,10 +2037,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // w.setWriteNumber(flushSeqId); mvcc.waitForPreviousTransactionsComplete(w); w = null; - return flushResult; + return new PrepareFlushResult(flushResult, myseqid); } else { - return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, - "Nothing to flush"); + return new PrepareFlushResult( + new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"), + myseqid); } } } finally { @@ -1977,7 +2085,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // flushedFamilyNames.add(store.getFamily().getName()); } - List storeFlushCtxs = new ArrayList(stores.size()); + TreeMap storeFlushCtxs + = new TreeMap(Bytes.BYTES_COMPARATOR); TreeMap> committedFiles = new TreeMap>( Bytes.BYTES_COMPARATOR); // The sequence id of this flush operation which is used to log FlushMarker and pass to @@ -1998,7 +2107,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; status.setStatus(msg); - return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); + return new PrepareFlushResult(new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg), + myseqid); } flushOpSeqId = getNextSequenceId(wal); long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName); @@ -2013,12 +2123,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // for (Store s : storesToFlush) { totalFlushableSizeOfFlushableStores += s.getFlushableSize(); - storeFlushCtxs.add(s.createFlushContext(flushOpSeqId)); + storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId)); committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL } // write the snapshot start to WAL - if (wal != null) { + if (wal != null && !writestate.readOnly) { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); // no sync. Sync is below where we do not hold the updates lock @@ -2027,7 +2137,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } // Prepare flush (take a snapshot) - for (StoreFlushContext flush : storeFlushCtxs) { + for (StoreFlushContext flush : storeFlushCtxs.values()) { flush.prepare(); } } catch (IOException ex) { @@ -2075,15 +2185,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // mvcc.waitForPreviousTransactionsComplete(w); // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block w = null; - s = "Flushing stores of " + this; - status.setStatus(s); - if (LOG.isTraceEnabled()) LOG.trace(s); } finally { if (w != null) { // in case of failure just mark current w as complete mvcc.advanceMemstore(w); } } + return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId, + flushedSeqId, totalFlushableSizeOfFlushableStores); + } + + protected FlushResult internalFlushCacheAndCommit( + final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult, + final Collection storesToFlush) + throws IOException { + + // prepare flush context is carried via PrepareFlushResult + TreeMap storeFlushCtxs = prepareResult.storeFlushCtxs; + TreeMap> committedFiles = prepareResult.committedFiles; + long startTime = prepareResult.startTime; + long flushOpSeqId = prepareResult.flushOpSeqId; + long flushedSeqId = prepareResult.flushedSeqId; + long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize; + + String s = "Flushing stores of " + this; + status.setStatus(s); + if (LOG.isTraceEnabled()) LOG.trace(s); // Any failure from here on out will be catastrophic requiring server // restart so wal content can be replayed and put back into the memstore. @@ -2096,7 +2223,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // just-made new flush store file. The new flushed file is still in the // tmp directory. - for (StoreFlushContext flush : storeFlushCtxs) { + for (StoreFlushContext flush : storeFlushCtxs.values()) { flush.flushCache(status); } @@ -2104,7 +2231,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // all the store scanners to reset/reseek). Iterator it = storesToFlush.iterator(); // stores.values() and storeFlushCtxs have same order - for (StoreFlushContext flush : storeFlushCtxs) { + for (StoreFlushContext flush : storeFlushCtxs.values()) { boolean needsCompaction = flush.commit(status); if (needsCompaction) { compactionRequested = true; @@ -2593,6 +2720,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // */ public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException { + if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo()) + && replaySeqId < lastReplayedOpenRegionSeqId) { + // if it is a secondary replica we should ignore these entries silently + // since they are coming out of order + if (LOG.isTraceEnabled()) { + LOG.trace(getRegionInfo().getEncodedName() + " : " + + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId + + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId); + for (MutationReplay mut : mutations) { + LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation); + } + } + + OperationStatus[] statuses = new OperationStatus[mutations.length]; + for (int i = 0; i < statuses.length; i++) { + statuses[i] = OperationStatus.SUCCESS; + } + return statuses; + } return batchMutate(new ReplayBatch(mutations, replaySeqId)); } @@ -2897,7 +3043,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } // txid should always increase, so having the one from the last call is ok. // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), now, m.getClusterIds(), currentNonceGroup, currentNonce); txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, @@ -2923,14 +3069,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // STEP 5. Append the final edit to WAL. Do not sync wal. // ------------------------- Mutation mutation = batchOp.getMutation(firstIndex); + if (isInReplay) { + // use wal key from the original + walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + mutation.getClusterIds(), currentNonceGroup, currentNonce); + long replaySeqId = batchOp.getReplaySequenceId(); + walKey.setOrigLogSeqNum(replaySeqId); + + // ensure that the sequence id of the region is at least as big as orig log seq id + while (true) { + long seqId = getSequenceId().get(); + if (seqId >= replaySeqId) break; + if (getSequenceId().compareAndSet(seqId, replaySeqId)) break; + } + } if (walEdit.size() > 0) { + if (!isInReplay) { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, mutation.getClusterIds(), currentNonceGroup, currentNonce); - if(isInReplay) { - walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId()); } + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, getSequenceId(), true, memstoreCells); } @@ -3803,7 +3964,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // CompactionDescriptor compaction = WALEdit.getCompaction(cell); if (compaction != null) { //replay the compaction - completeCompactionMarker(compaction); + replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE); } skippedEdits++; continue; @@ -3886,15 +4047,506 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * that was not finished. We could find one recovering a WAL after a regionserver crash. * See HBASE-2331. */ - void completeCompactionMarker(CompactionDescriptor compaction) + void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, + boolean removeFiles, long replaySeqId) + throws IOException { + checkTargetRegion(compaction.getEncodedRegionName().toByteArray(), + "Compaction marker from WAL ", compaction); + + if (replaySeqId < lastReplayedOpenRegionSeqId) { + LOG.warn("Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction) + + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId " + + " of " + lastReplayedOpenRegionSeqId); + return; + } + + startRegionOperation(Operation.REPLAY_EVENT); + try { + Store store = this.getStore(compaction.getFamilyName().toByteArray()); + if (store == null) { + LOG.warn("Found Compaction WAL edit for deleted family:" + + Bytes.toString(compaction.getFamilyName().toByteArray())); + return; + } + store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles); + } finally { + closeRegionOperation(Operation.REPLAY_EVENT); + } + } + + void replayWALFlushMarker(FlushDescriptor flush) throws IOException { + checkTargetRegion(flush.getEncodedRegionName().toByteArray(), + "Flush marker from WAL ", flush); + + if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { + return; // if primary nothing to do + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Replaying flush marker " + TextFormat.shortDebugString(flush)); + } + + startRegionOperation(Operation.REPLAY_EVENT); // use region close lock to guard against close + try { + FlushAction action = flush.getAction(); + switch (action) { + case START_FLUSH: + replayWALFlushStartMarker(flush); + break; + case COMMIT_FLUSH: + replayWALFlushCommitMarker(flush); + break; + case ABORT_FLUSH: + replayWALFlushAbortMarker(flush); + break; + default: + LOG.warn("Received a flush event with unknown action, ignoring. " + + TextFormat.shortDebugString(flush)); + break; + } + } finally { + closeRegionOperation(Operation.REPLAY_EVENT); + } + } + + /** Replay the flush marker from primary region by creating a corresponding snapshot of + * the store memstores, only if the memstores do not have a higher seqId from an earlier wal + * edit (because the events may be coming out of order). + */ + @VisibleForTesting + PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException { + long flushSeqId = flush.getFlushSequenceNumber(); + + HashSet storesToFlush = new HashSet(); + for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) { + byte[] family = storeFlush.getFamilyName().toByteArray(); + Store store = getStore(family); + if (store == null) { + LOG.info("Received a flush start marker from primary, but the family is not found. Ignoring" + + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush)); + continue; + } + storesToFlush.add(store); + } + + MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this); + + // we will use writestate as a coarse-grain lock for all the replay events + // (flush, compaction, region open etc) + synchronized (writestate) { + try { + if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) { + LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush) + + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId " + + " of " + lastReplayedOpenRegionSeqId); + return null; + } + if (numMutationsWithoutWAL.get() > 0) { + numMutationsWithoutWAL.set(0); + dataInMemoryWithoutWAL.set(0); + } + + if (!writestate.flushing) { + // we do not have an active snapshot and corresponding this.prepareResult. This means + // we can just snapshot our memstores and continue as normal. + + // invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal + PrepareFlushResult prepareResult = internalPrepareFlushCache(null, + flushSeqId, storesToFlush, status, true); + if (prepareResult.result == null) { + // save the PrepareFlushResult so that we can use it later from commit flush + this.writestate.flushing = true; + this.prepareFlushResult = prepareResult; + status.markComplete("Flush prepare successful"); + if (LOG.isDebugEnabled()) { + LOG.debug(getRegionInfo().getEncodedName() + " : " + + " Prepared flush with seqId:" + flush.getFlushSequenceNumber()); + } + } else { + status.abort("Flush prepare failed with " + prepareResult.result); + // nothing much to do. prepare flush failed because of some reason. + } + return prepareResult; + } else { + // we already have an active snapshot. + if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) { + // They define the same flush. Log and continue. + LOG.warn("Received a flush prepare marker with the same seqId: " + + + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: " + + prepareFlushResult.flushOpSeqId + ". Ignoring"); + // ignore + } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) { + // We received a flush with a smaller seqNum than what we have prepared. We can only + // ignore this prepare flush request. + LOG.warn("Received a flush prepare marker with a smaller seqId: " + + + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: " + + prepareFlushResult.flushOpSeqId + ". Ignoring"); + // ignore + } else { + // We received a flush with a larger seqNum than what we have prepared + LOG.warn("Received a flush prepare marker with a larger seqId: " + + + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: " + + prepareFlushResult.flushOpSeqId + ". Ignoring"); + // We do not have multiple active snapshots in the memstore or a way to merge current + // memstore snapshot with the contents and resnapshot for now. We cannot take + // another snapshot and drop the previous one because that will cause temporary + // data loss in the secondary. So we ignore this for now, deferring the resolution + // to happen when we see the corresponding flush commit marker. If we have a memstore + // snapshot with x, and later received another prepare snapshot with y (where x < y), + // when we see flush commit for y, we will drop snapshot for x, and can also drop all + // the memstore edits if everything in memstore is < y. This is the usual case for + // RS crash + recovery where we might see consequtive prepare flush wal markers. + // Otherwise, this will cause more memory to be used in secondary replica until a + // further prapare + commit flush is seen and replayed. + } + } + } finally { + status.cleanup(); + writestate.notifyAll(); + } + } + return null; + } + + @VisibleForTesting + void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException { + MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this); + + // check whether we have the memstore snapshot with the corresponding seqId. Replay to + // secondary region replicas are in order, except for when the region moves or then the + // region server crashes. In those cases, we may receive replay requests out of order from + // the original seqIds. + synchronized (writestate) { + try { + if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) { + LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush) + + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId " + + " of " + lastReplayedOpenRegionSeqId); + return; + } + + if (writestate.flushing) { + PrepareFlushResult prepareFlushResult = this.prepareFlushResult; + if (flush.getFlushSequenceNumber() == prepareFlushResult.flushOpSeqId) { + if (LOG.isDebugEnabled()) { + LOG.debug(getRegionInfo().getEncodedName() + " : " + + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber() + + " and a previous prepared snapshot was found"); + } + // This is the regular case where we received commit flush after prepare flush + // corresponding to the same seqId. + replayFlushInStores(flush, prepareFlushResult, true); + + // Set down the memstore size by amount of flush. + this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize); + + this.prepareFlushResult = null; + writestate.flushing = false; + } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) { + // This should not happen normally. However, lets be safe and guard against these cases + // we received a flush commit with a smaller seqId than what we have prepared + // we will pick the flush file up from this commit (if we have not seen it), but we + // will not drop the memstore + LOG.warn("Received a flush commit marker with smaller seqId: " + + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " + + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping" + +" prepared memstore snapshot"); + replayFlushInStores(flush, prepareFlushResult, false); + + // snapshot is not dropped, so memstore sizes should not be decremented + // we still have the prepared snapshot, flushing should still be true + } else { + // This should not happen normally. However, lets be safe and guard against these cases + // we received a flush commit with a larger seqId than what we have prepared + // we will pick the flush file for this. We will also obtain the updates lock and + // look for contents of the memstore to see whether we have edits after this seqId. + // If not, we will drop all the memstore edits and the snapshot as well. + LOG.warn("Received a flush commit marker with larger seqId: " + + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " + + prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared" + +" memstore snapshot"); + + replayFlushInStores(flush, prepareFlushResult, true); + + // Set down the memstore size by amount of flush. + this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize); + + // Inspect the memstore contents to see whether the memstore contains only edits + // with seqId smaller than the flush seqId. If so, we can discard those edits. + dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null); + + this.prepareFlushResult = null; + writestate.flushing = false; + } + } else { + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber() + + ", but no previous prepared snapshot was found"); + // There is no corresponding prepare snapshot from before. + // We will pick up the new flushed file + replayFlushInStores(flush, null, false); + + // Inspect the memstore contents to see whether the memstore contains only edits + // with seqId smaller than the flush seqId. If so, we can discard those edits. + dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null); + } + + status.markComplete("Flush commit successful"); + + // Update the last flushed sequence id for region. + this.maxFlushedSeqId = flush.getFlushSequenceNumber(); + + // advance the mvcc read point so that the new flushed file is visible. + // there may be some in-flight transactions, but they won't be made visible since they are + // either greater than flush seq number or they were already dropped via flush. + // TODO: If we are using FlushAllStoresPolicy, then this can make edits visible from other + // stores while they are still in flight because the flush commit marker will not contain + // flushes from ALL stores. + getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber()); + + // C. Finally notify anyone waiting on memstore to clear: + // e.g. checkResources(). + synchronized (this) { + notifyAll(); // FindBugs NN_NAKED_NOTIFY + } + } finally { + status.cleanup(); + writestate.notifyAll(); + } + } + } + + /** + * Replays the given flush descriptor by opening the flush files in stores and dropping the + * memstore snapshots if requested. + * @param flush + * @param prepareFlushResult + * @param dropMemstoreSnapshot + * @throws IOException + */ + private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult, + boolean dropMemstoreSnapshot) throws IOException { - Store store = this.getStore(compaction.getFamilyName().toByteArray()); - if (store == null) { - LOG.warn("Found Compaction WAL edit for deleted family:" + - Bytes.toString(compaction.getFamilyName().toByteArray())); + for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) { + byte[] family = storeFlush.getFamilyName().toByteArray(); + Store store = getStore(family); + if (store == null) { + LOG.warn("Received a flush commit marker from primary, but the family is not found." + + "Ignoring StoreFlushDescriptor:" + storeFlush); + continue; + } + List flushFiles = storeFlush.getFlushOutputList(); + StoreFlushContext ctx = null; + long startTime = EnvironmentEdgeManager.currentTime(); + if (prepareFlushResult == null) { + ctx = store.createFlushContext(flush.getFlushSequenceNumber()); + } else { + ctx = prepareFlushResult.storeFlushCtxs.get(family); + startTime = prepareFlushResult.startTime; + } + + if (ctx == null) { + LOG.warn("Unexpected: flush commit marker received from store " + + Bytes.toString(family) + " but no associated flush context. Ignoring"); + continue; + } + ctx.replayFlush(flushFiles, dropMemstoreSnapshot); // replay the flush + + // Record latest flush time + this.lastStoreFlushTimeMap.put(store, startTime); + } + } + + /** + * Drops the memstore contents after replaying a flush descriptor or region open event replay + * if the memstore edits have seqNums smaller than the given seq id + * @param flush the flush descriptor + * @throws IOException + */ + private void dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException { + this.updatesLock.writeLock().lock(); + try { + mvcc.waitForPreviousTransactionsComplete(); + long currentSeqId = getSequenceId().get(); + if (seqId >= currentSeqId) { + // then we can drop the memstore contents since everything is below this seqId + LOG.info("Dropping memstore contents as well since replayed flush seqId: " + + seqId + " is greater than current seqId:" + currentSeqId); + + // Prepare flush (take a snapshot) and then abort (drop the snapshot) + if (store == null ) { + for (Store s : stores.values()) { + dropStoreMemstoreContentsForSeqId(s, currentSeqId); + } + } else { + dropStoreMemstoreContentsForSeqId(store, currentSeqId); + } + } else { + LOG.info("Not dropping memstore contents since replayed flush seqId: " + + seqId + " is smaller than current seqId:" + currentSeqId); + } + } finally { + this.updatesLock.writeLock().unlock(); + } + } + + private void dropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException { + this.addAndGetGlobalMemstoreSize(-s.getFlushableSize()); + StoreFlushContext ctx = s.createFlushContext(currentSeqId); + ctx.prepare(); + ctx.abort(); + } + + private void replayWALFlushAbortMarker(FlushDescriptor flush) { + // nothing to do for now. A flush abort will cause a RS abort which means that the region + // will be opened somewhere else later. We will see the region open event soon, and replaying + // that will drop the snapshot + } + + @VisibleForTesting + PrepareFlushResult getPrepareFlushResult() { + return prepareFlushResult; + } + + void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException { + checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(), + "RegionEvent marker from WAL ", regionEvent); + + startRegionOperation(Operation.REPLAY_EVENT); + try { + if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { + return; // if primary nothing to do + } + + if (regionEvent.getEventType() == EventType.REGION_CLOSE) { + // nothing to do on REGION_CLOSE for now. + return; + } + if (regionEvent.getEventType() != EventType.REGION_OPEN) { + LOG.warn("Unknown region event received, ignoring :" + + TextFormat.shortDebugString(regionEvent)); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Replaying region open event marker " + TextFormat.shortDebugString(regionEvent)); + } + + // we will use writestate as a coarse-grain lock for all the replay events + synchronized (writestate) { + // Replication can deliver events out of order when primary region moves or the region + // server crashes, since there is no coordination between replication of different wal files + // belonging to different region servers. We have to safe guard against this case by using + // region open event's seqid. Since this is the first event that the region puts (after + // possibly flushing recovered.edits), after seeing this event, we can ignore every edit + // smaller than this seqId + if (this.lastReplayedOpenRegionSeqId < regionEvent.getLogSequenceNumber()) { + this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber(); + } else { + LOG.warn("Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent) + + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId " + + " of " + lastReplayedOpenRegionSeqId); + return; + } + + // region open lists all the files that the region has at the time of the opening. Just pick + // all the files and drop prepared flushes and empty memstores + for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) { + // stores of primary may be different now + byte[] family = storeDescriptor.getFamilyName().toByteArray(); + Store store = getStore(family); + if (store == null) { + LOG.warn("Received a region open marker from primary, but the family is not found. " + + "Ignoring. StoreDescriptor:" + storeDescriptor); + continue; + } + + long storeSeqId = store.getMaxSequenceId(); + List storeFiles = storeDescriptor.getStoreFileList(); + store.refreshStoreFiles(storeFiles); // replace the files with the new ones + if (store.getMaxSequenceId() != storeSeqId) { + // Record latest flush time if we picked up new files + lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime()); + } + + if (writestate.flushing) { + // only drop memstore snapshots if they are smaller than last flush for the store + if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) { + StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs.get(family); + if (ctx != null) { + long snapshotSize = store.getFlushableSize(); + ctx.abort(); + this.addAndGetGlobalMemstoreSize(-snapshotSize); + this.prepareFlushResult.storeFlushCtxs.remove(family); + } + } + } + + // Drop the memstore contents if they are now smaller than the latest seen flushed file + dropMemstoreContentsForSeqId(regionEvent.getLogSequenceNumber(), store); + if (storeSeqId > this.maxFlushedSeqId) { + this.maxFlushedSeqId = storeSeqId; + } + } + + // if all stores ended up dropping their snapshots, we can safely drop the + // prepareFlushResult + if (writestate.flushing) { + boolean canDrop = true; + for (Entry entry + : prepareFlushResult.storeFlushCtxs.entrySet()) { + Store store = getStore(entry.getKey()); + if (store == null) { + continue; + } + if (store.getSnapshotSize() > 0) { + canDrop = false; + } + } + + // this means that all the stores in the region has finished flushing, but the WAL marker + // may not have been written or we did not receive it yet. + if (canDrop) { + writestate.flushing = false; + this.prepareFlushResult = null; + } + } + + + // advance the mvcc read point so that the new flushed file is visible. + // there may be some in-flight transactions, but they won't be made visible since they are + // either greater than flush seq number or they were already dropped via flush. + getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId); + + // C. Finally notify anyone waiting on memstore to clear: + // e.g. checkResources(). + synchronized (this) { + notifyAll(); // FindBugs NN_NAKED_NOTIFY + } + } + } finally { + closeRegionOperation(Operation.REPLAY_EVENT); + } + } + + /** Checks whether the given regionName is either equal to our region, or that + * the regionName is the primary region to our corresponding range for the secondary replica. + */ + private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload) + throws WrongRegionException { + if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) { return; } - store.completeCompactionMarker(compaction); + + if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) && + Bytes.equals(encodedRegionName, + this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) { + return; + } + + throw new WrongRegionException(exceptionMsg + payload + + " targetted for region " + Bytes.toStringBinary(encodedRegionName) + + " does not match this region: " + this.getRegionInfo()); } /** @@ -4127,8 +4779,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @param familyPaths List of Pair * @param bulkLoadListener Internal hooks enabling massaging/preparation of a * file about to be bulk loaded - * @param assignSeqId Force a flush, get it's sequenceId to preserve the guarantee that - * all the edits lower than the highest sequential ID from all the + * @param assignSeqId Force a flush, get it's sequenceId to preserve the guarantee that + * all the edits lower than the highest sequential ID from all the * HFiles are flushed on disk. * @return true if successful, false if failed recoverably * @throws IOException if failed unrecoverably. @@ -4217,7 +4869,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); } store.bulkLoadHFile(finalPath, seqId); - + if(storeFiles.containsKey(familyName)) { storeFiles.get(familyName).add(new Path(finalPath)); } else { @@ -4265,7 +4917,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } } - + closeBulkRegionOperation(); } } @@ -4989,7 +5641,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // checkClassLoading(); this.openSeqNum = initialize(reporter); this.setSequenceId(openSeqNum); - if (wal != null && getRegionServerServices() != null) { + if (wal != null && getRegionServerServices() != null && !writestate.readOnly) { writeRegionOpenMarker(wal, openSeqNum); } return this; @@ -5660,7 +6312,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } if (cell.getTagsLength() > 0) { - Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), + Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); while (i.hasNext()) { newTags.add(i.next()); @@ -6080,8 +6732,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (11 * Bytes.SIZEOF_LONG) + + 45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + (12 * Bytes.SIZEOF_LONG) + 4 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 0751634..014ec2c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -119,6 +119,10 @@ public class HRegionFileSystem { return this.regionInfo; } + public HRegionInfo getRegionInfoForFS() { + return this.regionInfoForFs; + } + /** @return {@link Path} to the region's root directory. */ public Path getTableDir() { return this.tableDir; @@ -205,7 +209,7 @@ public class HRegionFileSystem { continue; } StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo, - regionInfoForFs, familyName, status); + regionInfoForFs, familyName, status.getPath()); storeFiles.add(info); } @@ -234,8 +238,8 @@ public class HRegionFileSystem { StoreFileInfo getStoreFileInfo(final String familyName, final String fileName) throws IOException { Path familyDir = getStoreDir(familyName); - FileStatus status = fs.getFileStatus(new Path(familyDir, fileName)); - return new StoreFileInfo(this.conf, this.fs, status); + return ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo, + regionInfoForFs, familyName, new Path(familyDir, fileName)); } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 252e5e1..df9e482 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -399,6 +399,11 @@ public class HStore implements Store { } @Override + public long getSnapshotSize() { + return this.memstore.getSnapshotSize(); + } + + @Override public long getCompactionCheckMultiplier() { return this.compactionCheckMultiplier; } @@ -448,7 +453,8 @@ public class HStore implements Store { /** * @return The maximum sequence id in all store files. Used for log replay. */ - long getMaxSequenceId() { + @Override + public long getMaxSequenceId() { return StoreFile.getMaxSequenceIdInList(this.getStorefiles()); } @@ -576,11 +582,31 @@ public class HStore implements Store { */ @Override public void refreshStoreFiles() throws IOException { + Collection newFiles = fs.getStoreFiles(getColumnFamilyName()); + refreshStoreFilesInternal(newFiles); + } + + @Override + public void refreshStoreFiles(Collection newFiles) throws IOException { + List storeFiles = new ArrayList(newFiles.size()); + for (String file : newFiles) { + storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file)); + } + refreshStoreFilesInternal(storeFiles); + } + + /** + * Checks the underlying store files, and opens the files that have not + * been opened, and removes the store file readers for store files no longer + * available. Mainly used by secondary region replicas to keep up to date with + * the primary region files. + * @throws IOException + */ + private void refreshStoreFilesInternal(Collection newFiles) throws IOException { StoreFileManager sfm = storeEngine.getStoreFileManager(); Collection currentFiles = sfm.getStorefiles(); if (currentFiles == null) currentFiles = new ArrayList(0); - Collection newFiles = fs.getStoreFiles(getColumnFamilyName()); if (newFiles == null) newFiles = new ArrayList(0); HashMap currentFilesSet = new HashMap(currentFiles.size()); @@ -1011,7 +1037,9 @@ public class HStore implements Store { this.lock.writeLock().lock(); try { this.storeEngine.getStoreFileManager().insertNewFiles(sfs); - this.memstore.clearSnapshot(snapshotId); + if (snapshotId > 0) { + this.memstore.clearSnapshot(snapshotId); + } } finally { // We need the lock, as long as we are updating the storeFiles // or changing the memstore. Let us release it before calling @@ -1311,10 +1339,12 @@ public class HStore implements Store { * @param compaction */ @Override - public void completeCompactionMarker(CompactionDescriptor compaction) + public void replayCompactionMarker(CompactionDescriptor compaction, + boolean pickCompactionFiles, boolean removeFiles) throws IOException { LOG.debug("Completing compaction from the WAL marker"); List compactionInputs = compaction.getCompactionInputList(); + List compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); // The Compaction Marker is written after the compaction is completed, // and the files moved into the region/family folder. @@ -1331,22 +1361,40 @@ public class HStore implements Store { // being in the store's folder) or they may be missing due to a compaction. String familyName = this.getColumnFamilyName(); - List inputPaths = new ArrayList(compactionInputs.size()); + List inputFiles = new ArrayList(compactionInputs.size()); for (String compactionInput : compactionInputs) { Path inputPath = fs.getStoreFilePath(familyName, compactionInput); - inputPaths.add(inputPath); + inputFiles.add(inputPath.getName()); } //some of the input files might already be deleted List inputStoreFiles = new ArrayList(compactionInputs.size()); for (StoreFile sf : this.getStorefiles()) { - if (inputPaths.contains(sf.getQualifiedPath())) { + if (inputFiles.contains(sf.getPath().getName())) { inputStoreFiles.add(sf); } } - this.replaceStoreFiles(inputStoreFiles, Collections.emptyList()); - this.completeCompaction(inputStoreFiles); + // check whether we need to pick up the new files + List outputStoreFiles = new ArrayList(compactionOutputs.size()); + + if (pickCompactionFiles) { + for (StoreFile sf : this.getStorefiles()) { + compactionOutputs.remove(sf.getPath().getName()); + } + for (String compactionOutput : compactionOutputs) { + StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput); + StoreFile storeFile = createStoreFileAndReader(storeFileInfo); + outputStoreFiles.add(storeFile); + } + } + + if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { + LOG.info("Replaying compaction marker, replacing input files: " + + inputStoreFiles + " with output files : " + outputStoreFiles); + this.replaceStoreFiles(inputStoreFiles, outputStoreFiles); + this.completeCompaction(inputStoreFiles, removeFiles); + } } /** @@ -2175,6 +2223,47 @@ public class HStore implements Store { public List getCommittedFiles() { return committedFiles; } + + /** + * Similar to commit, but called in secondary region replicas for replaying the + * flush cache from primary region. Adds the new files to the store, and drops the + * snapshot depending on dropMemstoreSnapshot argument. + * @param fileNames names of the flushed files + * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot + * @throws IOException + */ + @Override + public void replayFlush(List fileNames, boolean dropMemstoreSnapshot) + throws IOException { + List storeFiles = new ArrayList(fileNames.size()); + for (String file : fileNames) { + // open the file as a store file (hfile link, etc) + StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file); + StoreFile storeFile = createStoreFileAndReader(storeFileInfo); + storeFiles.add(storeFile); + if (LOG.isInfoEnabled()) { + LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() + + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() + + ", sequenceid=" + + storeFile.getReader().getSequenceID() + + ", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length())); + } + } + + long snapshotId = dropMemstoreSnapshot ? snapshot.getId() : -1; // -1 means do not drop + HStore.this.updateStorefiles(storeFiles, snapshotId); + } + + /** + * Abort the snapshot preparation. Drops the snapshot if any. + * @throws IOException + */ + @Override + public void abort() throws IOException { + if (snapshot == null) { + return; + } + HStore.this.updateStorefiles(new ArrayList(0), snapshot.getId()); + } } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 7fa81fc..364b9c9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -60,6 +60,12 @@ public interface MemStore extends HeapSize { long getFlushableSize(); /** + * Return the size of the snapshot(s) if any + * @return size of the memstore snapshot + */ + long getSnapshotSize(); + + /** * Write an update * @param cell * @return approximate size of the passed KV and the newly added KV which maybe different from the diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 3653cfb..3944ae8 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -144,6 +144,8 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.quotas.OperationQuota; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.HRegion.Operation; @@ -712,8 +714,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (metaCells != null && !metaCells.isEmpty()) { for (Cell metaCell : metaCells) { CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); + boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); if (compactionDesc != null) { - region.completeCompactionMarker(compactionDesc); + // replay the compaction. Remove the files from stores only if we are the primary + // region replica (thus own the files) + region.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, + replaySeqId); + continue; + } + FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); + if (flushDesc != null && !isDefaultReplica) { + region.replayWALFlushMarker(flushDesc); + continue; + } + RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); + if (regionEvent != null && !isDefaultReplica) { + region.replayWALRegionEventMarker(regionEvent); + continue; } } it.remove(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 0c420b5..6a422a9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -213,9 +213,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * Call to complete a compaction. Its for the case where we find in the WAL a compaction * that was not finished. We could find one recovering a WAL after a regionserver crash. * See HBASE-2331. - * @param compaction + * @param compaction the descriptor for compaction + * @param pickCompactionFiles whether or not pick up the new compaction output files and + * add it to the store + * @param removeFiles whether to remove/archive files from filesystem */ - void completeCompactionMarker(CompactionDescriptor compaction) + void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, + boolean removeFiles) throws IOException; // Split oriented methods @@ -265,9 +269,20 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf */ long getFlushableSize(); + /** + * Returns the memstore snapshot size + * @return size of the memstore snapshot + */ + long getSnapshotSize(); + HColumnDescriptor getFamily(); /** + * @return The maximum sequence id in all store files. + */ + long getMaxSequenceId(); + + /** * @return The maximum memstoreTS in all store files. */ long getMaxMemstoreTS(); @@ -416,4 +431,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * linear formula. */ double getCompactionPressure(); + + /** + * Replaces the store files that the store has with the given files. Mainly used by + * secondary region replicas to keep up to date with + * the primary region files. + * @throws IOException + */ + void refreshStoreFiles(Collection newFiles) throws IOException; + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java index 0c2fe6f..34ba1fa 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java @@ -65,6 +65,22 @@ interface StoreFlushContext { boolean commit(MonitoredTask status) throws IOException; /** + * Similar to commit, but called in secondary region replicas for replaying the + * flush cache from primary region. Adds the new files to the store, and drops the + * snapshot depending on dropMemstoreSnapshot argument. + * @param fileNames names of the flushed files + * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot + * @throws IOException + */ + void replayFlush(List fileNames, boolean dropMemstoreSnapshot) throws IOException; + + /** + * Abort the snapshot preparation. Drops the snapshot if any. + * @throws IOException + */ + void abort() throws IOException; + + /** * Returns the newly committed files from the flush. Called only if commit returns true * @return a list of Paths for new files */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java new file mode 100644 index 0000000..4506b19 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.TableName; + +/** + * An HLogKey specific to WalEdits coming from replay. + */ +@InterfaceAudience.Private +public class ReplayHLogKey extends HLogKey { + + public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename, + final long now, List clusterIds, long nonceGroup, long nonce) { + super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce); + } + + public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename, + long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) { + super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce); + } + + /** + * Returns the original sequence id + * @return long the new assigned sequence number + * @throws InterruptedException + */ + @Override + public long getSequenceId() throws IOException { + return this.getOrigLogSeqNum(); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index c3d4e5a..fc19603 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -287,7 +287,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { @Override public List finishWritingAndClose() throws IOException { - finishWriting(); + finishWriting(true); return null; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index cf87219..7bcee0b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; @@ -96,23 +96,24 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { * @throws IOException */ public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs, - HRegionInfo regionInfo, HRegionInfo regionInfoForFs, String familyName, FileStatus status) + HRegionInfo regionInfo, HRegionInfo regionInfoForFs, String familyName, Path path) throws IOException { // if this is a primary region, just return the StoreFileInfo constructed from path if (regionInfo.equals(regionInfoForFs)) { - return new StoreFileInfo(conf, fs, status); - } - - if (StoreFileInfo.isReference(status.getPath())) { - Reference reference = Reference.read(fs, status.getPath()); - return new StoreFileInfo(conf, fs, status, reference); + return new StoreFileInfo(conf, fs, path); } // else create a store file link. The link file does not exists on filesystem though. HFileLink link = HFileLink.build(conf, regionInfoForFs.getTable(), - regionInfoForFs.getEncodedName(), familyName, status.getPath().getName()); - return new StoreFileInfo(conf, fs, status, link); + regionInfoForFs.getEncodedName(), familyName, path.getName()); + + if (StoreFileInfo.isReference(path)) { + Reference reference = Reference.read(fs, path); + return new StoreFileInfo(conf, fs, link.getFileStatus(fs), reference); + } + + return new StoreFileInfo(conf, fs, link.getFileStatus(fs), link); } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 4d8cc2d..53f46b4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -1186,12 +1186,18 @@ public class WALSplitter { * @return true when there is no error * @throws IOException */ - protected boolean finishWriting() throws IOException { + protected boolean finishWriting(boolean interrupt) throws IOException { LOG.debug("Waiting for split writer threads to finish"); boolean progress_failed = false; for (WriterThread t : writerThreads) { t.finish(); } + if (interrupt) { + for (WriterThread t : writerThreads) { + t.interrupt(); // interrupt the writer threads. We are stopping now. + } + } + for (WriterThread t : writerThreads) { if (!progress_failed && reporter != null && !reporter.progress()) { progress_failed = true; @@ -1260,7 +1266,7 @@ public class WALSplitter { boolean isSuccessful = false; List result = null; try { - isSuccessful = finishWriting(); + isSuccessful = finishWriting(false); } finally { result = close(); List thrown = closeLogWriters(null); @@ -1960,7 +1966,7 @@ public class WALSplitter { @Override public List finishWritingAndClose() throws IOException { try { - if (!finishWriting()) { + if (!finishWriting(false)) { return null; } if (hasEditsInDisablingOrDisabledTables) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ea06346..2930f72 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -61,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -4655,7 +4656,7 @@ public class TestHRegion { // create a primary region, load some data and flush // create a secondary region, and do a get against that Path rootDir = new Path(dir + "testRegionReplicaSecondary"); - FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); + FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); byte[][] families = new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") @@ -4755,6 +4756,14 @@ public class TestHRegion { } } + static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException { + Configuration confForWAL = new Configuration(conf); + confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); + return new WALFactory(confForWAL, + Collections.singletonList(new MetricsWAL()), + "hregion-" + RandomStringUtils.randomNumeric(8)); + } + @Test public void testCompactionFromPrimary() throws IOException { Path rootDir = new Path(dir + "testRegionReplicaSecondary"); @@ -4815,9 +4824,14 @@ public class TestHRegion { private void putData(HRegion region, int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { + putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families); + } + + static void putData(HRegion region, Durability durability, + int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { for (int i = startRow; i < startRow + numRows; i++) { Put put = new Put(Bytes.toBytes("" + i)); - put.setDurability(Durability.SKIP_WAL); + put.setDurability(durability); for (byte[] family : families) { put.add(family, qf, null); } @@ -4825,7 +4839,7 @@ public class TestHRegion { } } - private void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families) + static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { for (int i = startRow; i < startRow + numRows; i++) { byte[] row = Bytes.toBytes("" + i); @@ -4844,7 +4858,7 @@ public class TestHRegion { } } - private void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException { + static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException { // Now I have k, get values out and assert they are as expected. Get get = new Get(k).addFamily(family).setMaxVersions(); Cell[] results = r.get(get).rawCells(); @@ -4991,7 +5005,7 @@ public class TestHRegion { return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families); } - private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, + public static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families) throws IOException { Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log"); @@ -5013,7 +5027,7 @@ public class TestHRegion { * @return A region on which you must call * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. */ - private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, + public static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException { return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf, @@ -6028,7 +6042,7 @@ public class TestHRegion { } } - private static HRegion initHRegion(byte[] tableName, String callingMethod, + static HRegion initHRegion(byte[] tableName, String callingMethod, byte[]... families) throws IOException { return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java new file mode 100644 index 0000000..09e9d5e --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -0,0 +1,1162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.apache.hadoop.hbase.regionserver.TestHRegion.*; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; +import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; +import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; +import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; +import org.apache.hadoop.util.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; + +/** + * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary + * region replicas + */ +@Category(MediumTests.class) +public class TestHRegionReplayEvents { + + static final Log LOG = LogFactory.getLog(TestHRegion.class); + @Rule public TestName name = new TestName(); + + private static HBaseTestingUtility TEST_UTIL; + + public static Configuration CONF ; + private String dir; + private static FileSystem FILESYSTEM; + + private byte[][] families = new byte[][] { + Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")}; + + // Test names + protected byte[] tableName; + protected String method; + protected final byte[] row = Bytes.toBytes("rowA"); + protected final byte[] row2 = Bytes.toBytes("rowB"); + protected byte[] cq = Bytes.toBytes("cq"); + + // per test fields + private Path rootDir; + private HTableDescriptor htd; + private long time; + private RegionServerServices rss; + private HRegionInfo primaryHri, secondaryHri; + private HRegion primaryRegion, secondaryRegion; + private WALFactory wals; + private WAL walPrimary, walSecondary; + private WAL.Reader reader; + + @Before + public void setup() throws IOException { + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + FILESYSTEM = TEST_UTIL.getTestFileSystem(); + CONF = TEST_UTIL.getConfiguration(); + dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString(); + method = name.getMethodName(); + tableName = Bytes.toBytes(name.getMethodName()); + rootDir = new Path(dir + method); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); + method = name.getMethodName(); + + htd = new HTableDescriptor(TableName.valueOf(method)); + for (byte[] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + + time = System.currentTimeMillis(); + + primaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 0); + secondaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 1); + + wals = TestHRegion.createWALFactory(CONF, rootDir); + walPrimary = wals.getWAL(primaryHri.getEncodedNameAsBytes()); + walSecondary = wals.getWAL(secondaryHri.getEncodedNameAsBytes()); + + rss = mock(RegionServerServices.class); + when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); + when(rss.getConfiguration()).thenReturn(CONF); + when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting()); + + primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); + primaryRegion.close(); + + primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); + secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); + + reader = null; + } + + @After + public void tearDown() throws Exception { + if (reader != null) { + reader.close(); + } + + if (primaryRegion != null) { + HBaseTestingUtility.closeRegionAndWAL(primaryRegion); + } + if (secondaryRegion != null) { + HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); + } + + EnvironmentEdgeManagerTestHelper.reset(); + LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); + TEST_UTIL.cleanupTestDir(); + } + + String getName() { + return name.getMethodName(); + } + + // Some of the test cases are as follows: + // 1. replay flush start marker again + // 2. replay flush with smaller seqId than what is there in memstore snapshot + // 3. replay flush with larger seqId than what is there in memstore snapshot + // 4. replay flush commit without flush prepare. non droppable memstore + // 5. replay flush commit without flush prepare. droppable memstore + // 6. replay open region event + // 7. replay open region event after flush start + // 8. replay flush form an earlier seqId (test ignoring seqIds) + // 9. start flush does not prevent region from closing. + + @Test + public void testRegionReplicaSecondaryCannotFlush() throws IOException { + // load some data and flush ensure that the secondary replica will not execute the flush + + // load some data to secondary by replaying + putDataByReplay(secondaryRegion, 0, 1000, cq, families); + + verifyData(secondaryRegion, 0, 1000, cq, families); + + // flush region + FlushResult flush = secondaryRegion.flushcache(); + assertEquals(flush.result, FlushResult.Result.CANNOT_FLUSH); + + verifyData(secondaryRegion, 0, 1000, cq, families); + + // close the region, and inspect that it has not flushed + Map> files = secondaryRegion.close(false); + // assert that there are no files (due to flush) + for (List f : files.values()) { + assertTrue(f.isEmpty()); + } + } + + /** + * Tests a case where we replay only a flush start marker, then the region is closed. This region + * should not block indefinitely + */ + @Test (timeout = 60000) + public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException { + // load some data to primary and flush + int start = 0; + LOG.info("-- Writing some data to primary from " + start + " to " + (start+100)); + putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); + LOG.info("-- Flushing primary, creating 3 files for 3 stores"); + primaryRegion.flushcache(); + + // now replay the edits and the flush marker + reader = createWALReaderForPrimary(); + + LOG.info("-- Replaying edits and flush events in secondary"); + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flushDesc + = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + if (flushDesc != null) { + if (flushDesc.getAction() == FlushAction.START_FLUSH) { + LOG.info("-- Replaying flush start in secondary"); + PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); + } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { + LOG.info("-- NOT Replaying flush commit in secondary"); + } + } else { + replayEdit(secondaryRegion, entry); + } + } + + assertTrue(rss.getRegionServerAccounting().getGlobalMemstoreSize() > 0); + // now close the region which should not cause hold because of un-committed flush + secondaryRegion.close(); + + // verify that the memstore size is back to what it was + assertEquals(0, rss.getRegionServerAccounting().getGlobalMemstoreSize()); + } + + static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { + if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { + return 0; // handled elsewhere + } + Put put = new Put(entry.getEdit().getCells().get(0).getRow()); + for (Cell cell : entry.getEdit().getCells()) put.add(cell); + put.setDurability(Durability.SKIP_WAL); + MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); + region.batchReplay(new MutationReplay[] {mutation}, + entry.getKey().getLogSeqNum()); + return Integer.parseInt(Bytes.toString(put.getRow())); + } + + WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException { + return wals.createReader(TEST_UTIL.getTestFileSystem(), + DefaultWALProvider.getCurrentFileName(walPrimary), + TEST_UTIL.getConfiguration()); + } + + @Test + public void testReplayFlushesAndCompactions() throws IOException { + // initiate a secondary region with some data. + + // load some data to primary and flush. 3 flushes and some more unflushed data + putDataWithFlushes(primaryRegion, 100, 300, 100); + + // compaction from primary + LOG.info("-- Compacting primary, only 1 store"); + primaryRegion.compactStore(Bytes.toBytes("cf1"), + NoLimitCompactionThroughputController.INSTANCE); + + // now replay the edits and the flush marker + reader = createWALReaderForPrimary(); + + LOG.info("-- Replaying edits and flush events in secondary"); + int lastReplayed = 0; + int expectedStoreFileCount = 0; + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flushDesc + = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + CompactionDescriptor compactionDesc + = WALEdit.getCompaction(entry.getEdit().getCells().get(0)); + if (flushDesc != null) { + // first verify that everything is replayed and visible before flush event replay + verifyData(secondaryRegion, 0, lastReplayed, cq, families); + Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); + long storeMemstoreSize = store.getMemStoreSize(); + long regionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + long storeFlushableSize = store.getFlushableSize(); + if (flushDesc.getAction() == FlushAction.START_FLUSH) { + LOG.info("-- Replaying flush start in secondary"); + PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); + assertNull(result.result); + assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber()); + + // assert that the store memstore is smaller now + long newStoreMemstoreSize = store.getMemStoreSize(); + LOG.info("Memstore size reduced by:" + + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); + assertTrue(storeMemstoreSize > newStoreMemstoreSize); + + } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { + LOG.info("-- Replaying flush commit in secondary"); + secondaryRegion.replayWALFlushCommitMarker(flushDesc); + + // assert that the flush files are picked + expectedStoreFileCount++; + for (Store s : secondaryRegion.getStores().values()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + long newFlushableSize = store.getFlushableSize(); + assertTrue(storeFlushableSize > newFlushableSize); + + // assert that the region memstore is smaller now + long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + assertTrue(regionMemstoreSize > newRegionMemstoreSize); + } + // after replay verify that everything is still visible + verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); + } else if (compactionDesc != null) { + secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE); + + // assert that the compaction is applied + for (Store store : secondaryRegion.getStores().values()) { + if (store.getColumnFamilyName().equals("cf1")) { + assertEquals(1, store.getStorefilesCount()); + } else { + assertEquals(expectedStoreFileCount, store.getStorefilesCount()); + } + } + } else { + lastReplayed = replayEdit(secondaryRegion, entry);; + } + } + + assertEquals(400-1, lastReplayed); + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, 400, cq, families); + + LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted"); + verifyData(primaryRegion, 0, lastReplayed, cq, families); + for (Store store : primaryRegion.getStores().values()) { + if (store.getColumnFamilyName().equals("cf1")) { + assertEquals(1, store.getStorefilesCount()); + } else { + assertEquals(expectedStoreFileCount, store.getStorefilesCount()); + } + } + } + + /** + * Tests cases where we prepare a flush with some seqId and we receive other flush start markers + * equal to, greater or less than the previous flush start marker. + */ + @Test + public void testReplayFlushStartMarkers() throws IOException { + // load some data to primary and flush. 1 flush and some more unflushed data + putDataWithFlushes(primaryRegion, 100, 100, 100); + int numRows = 200; + + // now replay the edits and the flush marker + reader = createWALReaderForPrimary(); + + LOG.info("-- Replaying edits and flush events in secondary"); + + FlushDescriptor startFlushDesc = null; + + int lastReplayed = 0; + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flushDesc + = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + if (flushDesc != null) { + // first verify that everything is replayed and visible before flush event replay + Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); + long storeMemstoreSize = store.getMemStoreSize(); + long regionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + long storeFlushableSize = store.getFlushableSize(); + + if (flushDesc.getAction() == FlushAction.START_FLUSH) { + startFlushDesc = flushDesc; + LOG.info("-- Replaying flush start in secondary"); + PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); + assertNull(result.result); + assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); + assertTrue(regionMemstoreSize > 0); + assertTrue(storeFlushableSize > 0); + + // assert that the store memstore is smaller now + long newStoreMemstoreSize = store.getMemStoreSize(); + LOG.info("Memstore size reduced by:" + + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); + assertTrue(storeMemstoreSize > newStoreMemstoreSize); + verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); + + } + // after replay verify that everything is still visible + verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); + } else { + lastReplayed = replayEdit(secondaryRegion, entry); + } + } + + // at this point, there should be some data (rows 0-100) in memstore snapshot + // and some more data in memstores (rows 100-200) + + verifyData(secondaryRegion, 0, numRows, cq, families); + + // Test case 1: replay the same flush start marker again + LOG.info("-- Replaying same flush start in secondary again"); + PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); + assertNull(result); // this should return null. Ignoring the flush start marker + // assert that we still have prepared flush with the previous setup. + assertNotNull(secondaryRegion.getPrepareFlushResult()); + assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, + startFlushDesc.getFlushSequenceNumber()); + assertTrue(secondaryRegion.getMemstoreSize().get() > 0); // memstore is not empty + verifyData(secondaryRegion, 0, numRows, cq, families); + + // Test case 2: replay a flush start marker with a smaller seqId + FlushDescriptor startFlushDescSmallerSeqId + = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50); + LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId); + result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId); + assertNull(result); // this should return null. Ignoring the flush start marker + // assert that we still have prepared flush with the previous setup. + assertNotNull(secondaryRegion.getPrepareFlushResult()); + assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, + startFlushDesc.getFlushSequenceNumber()); + assertTrue(secondaryRegion.getMemstoreSize().get() > 0); // memstore is not empty + verifyData(secondaryRegion, 0, numRows, cq, families); + + // Test case 3: replay a flush start marker with a larger seqId + FlushDescriptor startFlushDescLargerSeqId + = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50); + LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId); + result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId); + assertNull(result); // this should return null. Ignoring the flush start marker + // assert that we still have prepared flush with the previous setup. + assertNotNull(secondaryRegion.getPrepareFlushResult()); + assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, + startFlushDesc.getFlushSequenceNumber()); + assertTrue(secondaryRegion.getMemstoreSize().get() > 0); // memstore is not empty + verifyData(secondaryRegion, 0, numRows, cq, families); + + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, numRows, cq, families); + + LOG.info("-- Verifying edits from primary."); + verifyData(primaryRegion, 0, numRows, cq, families); + } + + /** + * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker + * less than the previous flush start marker. + */ + @Test + public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException { + // load some data to primary and flush. 2 flushes and some more unflushed data + putDataWithFlushes(primaryRegion, 100, 200, 100); + int numRows = 300; + + // now replay the edits and the flush marker + reader = createWALReaderForPrimary(); + + LOG.info("-- Replaying edits and flush events in secondary"); + FlushDescriptor startFlushDesc = null; + FlushDescriptor commitFlushDesc = null; + + int lastReplayed = 0; + while (true) { + System.out.println(lastReplayed); + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flushDesc + = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + if (flushDesc != null) { + if (flushDesc.getAction() == FlushAction.START_FLUSH) { + // don't replay the first flush start marker, hold on to it, replay the second one + if (startFlushDesc == null) { + startFlushDesc = flushDesc; + } else { + LOG.info("-- Replaying flush start in secondary"); + startFlushDesc = flushDesc; + PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); + assertNull(result.result); + } + } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { + // do not replay any flush commit yet + if (commitFlushDesc == null) { + commitFlushDesc = flushDesc; // hold on to the first flush commit marker + } + } + // after replay verify that everything is still visible + verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); + } else { + lastReplayed = replayEdit(secondaryRegion, entry); + } + } + + // at this point, there should be some data (rows 0-200) in memstore snapshot + // and some more data in memstores (rows 200-300) + verifyData(secondaryRegion, 0, numRows, cq, families); + + // no store files in the region + int expectedStoreFileCount = 0; + for (Store s : secondaryRegion.getStores().values()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + long regionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + + // Test case 1: replay the a flush commit marker smaller than what we have prepared + LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" + + startFlushDesc); + assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber()); + + LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); + secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); + + // assert that the flush files are picked + expectedStoreFileCount++; + for (Store s : secondaryRegion.getStores().values()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); + long newFlushableSize = store.getFlushableSize(); + assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped + + // assert that the region memstore is same as before + long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + assertEquals(regionMemstoreSize, newRegionMemstoreSize); + + assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped + + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, numRows, cq, families); + + LOG.info("-- Verifying edits from primary."); + verifyData(primaryRegion, 0, numRows, cq, families); + } + + /** + * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker + * larger than the previous flush start marker. + */ + @Test + public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException { + // load some data to primary and flush. 1 flush and some more unflushed data + putDataWithFlushes(primaryRegion, 100, 100, 100); + int numRows = 200; + + // now replay the edits and the flush marker + reader = createWALReaderForPrimary(); + + LOG.info("-- Replaying edits and flush events in secondary"); + FlushDescriptor startFlushDesc = null; + FlushDescriptor commitFlushDesc = null; + + int lastReplayed = 0; + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flushDesc + = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + if (flushDesc != null) { + if (flushDesc.getAction() == FlushAction.START_FLUSH) { + if (startFlushDesc == null) { + LOG.info("-- Replaying flush start in secondary"); + startFlushDesc = flushDesc; + PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); + assertNull(result.result); + } + } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { + // do not replay any flush commit yet + // hold on to the flush commit marker but simulate a larger + // flush commit seqId + commitFlushDesc = + FlushDescriptor.newBuilder(flushDesc) + .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50) + .build(); + } + // after replay verify that everything is still visible + verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); + } else { + lastReplayed = replayEdit(secondaryRegion, entry); + } + } + + // at this point, there should be some data (rows 0-100) in memstore snapshot + // and some more data in memstores (rows 100-200) + verifyData(secondaryRegion, 0, numRows, cq, families); + + // no store files in the region + int expectedStoreFileCount = 0; + for (Store s : secondaryRegion.getStores().values()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + long regionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + + // Test case 1: replay the a flush commit marker larger than what we have prepared + LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" + + startFlushDesc); + assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber()); + + LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); + secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); + + // assert that the flush files are picked + expectedStoreFileCount++; + for (Store s : secondaryRegion.getStores().values()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); + long newFlushableSize = store.getFlushableSize(); + assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped + + // assert that the region memstore is smaller than before, but not empty + long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + assertTrue(newRegionMemstoreSize > 0); + assertTrue(regionMemstoreSize > newRegionMemstoreSize); + + assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped + + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, numRows, cq, families); + + LOG.info("-- Verifying edits from primary."); + verifyData(primaryRegion, 0, numRows, cq, families); + } + + /** + * Tests the case where we receive a flush commit before receiving any flush prepare markers. + * The memstore edits should be dropped after the flush commit replay since they should be in + * flushed files + */ + @Test + public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore() + throws IOException { + testReplayFlushCommitMarkerWithoutFlushStartMarker(true); + } + + /** + * Tests the case where we receive a flush commit before receiving any flush prepare markers. + * The memstore edits should be not dropped after the flush commit replay since not every edit + * will be in flushed files (based on seqId) + */ + @Test + public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore() + throws IOException { + testReplayFlushCommitMarkerWithoutFlushStartMarker(false); + } + + /** + * Tests the case where we receive a flush commit before receiving any flush prepare markers + */ + public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore) + throws IOException { + // load some data to primary and flush. 1 flushes and some more unflushed data. + // write more data after flush depending on whether droppableSnapshot + putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100); + int numRows = droppableMemstore ? 100 : 200; + + // now replay the edits and the flush marker + reader = createWALReaderForPrimary(); + + LOG.info("-- Replaying edits and flush events in secondary"); + FlushDescriptor commitFlushDesc = null; + + int lastReplayed = 0; + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flushDesc + = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + if (flushDesc != null) { + if (flushDesc.getAction() == FlushAction.START_FLUSH) { + // do not replay flush start marker + } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { + commitFlushDesc = flushDesc; // hold on to the flush commit marker + } + // after replay verify that everything is still visible + verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); + } else { + lastReplayed = replayEdit(secondaryRegion, entry); + } + } + + // at this point, there should be some data (rows 0-200) in the memstore without snapshot + // and some more data in memstores (rows 100-300) + verifyData(secondaryRegion, 0, numRows, cq, families); + + // no store files in the region + int expectedStoreFileCount = 0; + for (Store s : secondaryRegion.getStores().values()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + long regionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + + // Test case 1: replay a flush commit marker without start flush marker + assertNull(secondaryRegion.getPrepareFlushResult()); + assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0); + + // ensure all files are visible in secondary + for (Store store : secondaryRegion.getStores().values()) { + assertTrue(store.getMaxSequenceId() <= secondaryRegion.getSequenceId().get()); + } + + LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); + secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); + + // assert that the flush files are picked + expectedStoreFileCount++; + for (Store s : secondaryRegion.getStores().values()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); + long newFlushableSize = store.getFlushableSize(); + if (droppableMemstore) { + assertTrue(newFlushableSize == 0); // assert that the memstore is dropped + } else { + assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped + } + + // assert that the region memstore is same as before (we could not drop) + long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + if (droppableMemstore) { + assertTrue(0 == newRegionMemstoreSize); + } else { + assertTrue(regionMemstoreSize == newRegionMemstoreSize); + } + + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, numRows, cq, families); + + LOG.info("-- Verifying edits from primary."); + verifyData(primaryRegion, 0, numRows, cq, families); + } + + private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) { + return FlushDescriptor.newBuilder(flush) + .setFlushSequenceNumber(flushSeqId) + .build(); + } + + /** + * Tests replaying region open markers from primary region. Checks whether the files are picked up + */ + @Test + public void testReplayRegionOpenEvent() throws IOException { + putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush + int numRows = 100; + + // close the region and open again. + primaryRegion.close(); + primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); + + // now replay the edits and the flush marker + reader = createWALReaderForPrimary(); + List regionEvents = Lists.newArrayList(); + + LOG.info("-- Replaying edits and region events in secondary"); + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flushDesc + = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + RegionEventDescriptor regionEventDesc + = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); + + if (flushDesc != null) { + // don't replay flush events + } else if (regionEventDesc != null) { + regionEvents.add(regionEventDesc); + } else { + // don't replay edits + } + } + + // we should have 1 open, 1 close and 1 open event + assertEquals(3, regionEvents.size()); + + // replay the first region open event. + secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0)); + + // replay the close event as well + secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1)); + + // no store files in the region + int expectedStoreFileCount = 0; + for (Store s : secondaryRegion.getStores().values()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + long regionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + assertTrue(regionMemstoreSize == 0); + + // now replay the region open event that should contain new file locations + LOG.info("Testing replaying region open event " + regionEvents.get(2)); + secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); + + // assert that the flush files are picked + expectedStoreFileCount++; + for (Store s : secondaryRegion.getStores().values()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); + long newFlushableSize = store.getFlushableSize(); + assertTrue(newFlushableSize == 0); + + // assert that the region memstore is empty + long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + assertTrue(newRegionMemstoreSize == 0); + + assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any + + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, numRows, cq, families); + + LOG.info("-- Verifying edits from primary."); + verifyData(primaryRegion, 0, numRows, cq, families); + } + + /** + * Tests the case where we replay a region open event after a flush start but before receiving + * flush commit + */ + @Test + public void testReplayRegionOpenEventAfterFlushStart() throws IOException { + putDataWithFlushes(primaryRegion, 100, 100, 100); + int numRows = 200; + + // close the region and open again. + primaryRegion.close(); + primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); + + // now replay the edits and the flush marker + reader = createWALReaderForPrimary(); + List regionEvents = Lists.newArrayList(); + + LOG.info("-- Replaying edits and region events in secondary"); + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flushDesc + = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + RegionEventDescriptor regionEventDesc + = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); + + if (flushDesc != null) { + // only replay flush start + if (flushDesc.getAction() == FlushAction.START_FLUSH) { + secondaryRegion.replayWALFlushStartMarker(flushDesc); + } + } else if (regionEventDesc != null) { + regionEvents.add(regionEventDesc); + } else { + replayEdit(secondaryRegion, entry); + } + } + + // at this point, there should be some data (rows 0-100) in the memstore snapshot + // and some more data in memstores (rows 100-200) + verifyData(secondaryRegion, 0, numRows, cq, families); + + // we should have 1 open, 1 close and 1 open event + assertEquals(3, regionEvents.size()); + + // no store files in the region + int expectedStoreFileCount = 0; + for (Store s : secondaryRegion.getStores().values()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + + // now replay the region open event that should contain new file locations + LOG.info("Testing replaying region open event " + regionEvents.get(2)); + secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); + + // assert that the flush files are picked + expectedStoreFileCount = 2; // two flushes happened + for (Store s : secondaryRegion.getStores().values()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); + long newSnapshotSize = store.getSnapshotSize(); + assertTrue(newSnapshotSize == 0); + + // assert that the region memstore is empty + long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get(); + assertTrue(newRegionMemstoreSize == 0); + + assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any + + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, numRows, cq, families); + + LOG.info("-- Verifying edits from primary."); + verifyData(primaryRegion, 0, numRows, cq, families); + } + + /** + * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId + * of the last replayed region open event. + */ + @Test + public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException { + putDataWithFlushes(primaryRegion, 100, 100, 0); + int numRows = 100; + + // close the region and open again. + primaryRegion.close(); + primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); + + // now replay the edits and the flush marker + reader = createWALReaderForPrimary(); + List regionEvents = Lists.newArrayList(); + List edits = Lists.newArrayList(); + + LOG.info("-- Replaying edits and region events in secondary"); + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flushDesc + = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + RegionEventDescriptor regionEventDesc + = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); + + if (flushDesc != null) { + // don't replay flushes + } else if (regionEventDesc != null) { + regionEvents.add(regionEventDesc); + } else { + edits.add(entry); + } + } + + // replay the region open of first open, but with the seqid of the second open + // this way non of the flush files will be picked up. + secondaryRegion.replayWALRegionEventMarker( + RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber( + regionEvents.get(2).getLogSequenceNumber()).build()); + + + // replay edits from the before region close. If replay does not + // skip these the following verification will NOT fail. + for (WAL.Entry entry: edits) { + replayEdit(secondaryRegion, entry); + } + + boolean expectedFail = false; + try { + verifyData(secondaryRegion, 0, numRows, cq, families); + } catch (AssertionError e) { + expectedFail = true; // expected + } + if (!expectedFail) { + fail("Should have failed this verification"); + } + } + + @Test + public void testReplayFlushSeqIds() throws IOException { + // load some data to primary and flush + int start = 0; + LOG.info("-- Writing some data to primary from " + start + " to " + (start+100)); + putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); + LOG.info("-- Flushing primary, creating 3 files for 3 stores"); + primaryRegion.flushcache(); + + // now replay the flush marker + reader = createWALReaderForPrimary(); + + long flushSeqId = -1; + LOG.info("-- Replaying flush events in secondary"); + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flushDesc + = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + if (flushDesc != null) { + if (flushDesc.getAction() == FlushAction.START_FLUSH) { + LOG.info("-- Replaying flush start in secondary"); + secondaryRegion.replayWALFlushStartMarker(flushDesc); + flushSeqId = flushDesc.getFlushSequenceNumber(); + } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { + LOG.info("-- Replaying flush commit in secondary"); + secondaryRegion.replayWALFlushCommitMarker(flushDesc); + assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber()); + } + } + // else do not replay + } + + // TODO: what to do with this? + // assert that the newly picked up flush file is visible + long readPoint = secondaryRegion.getMVCC().memstoreReadPoint(); + assertEquals(flushSeqId, readPoint); + + // after replay verify that everything is still visible + verifyData(secondaryRegion, 0, 100, cq, families); + } + + @Test + public void testSeqIdsFromReplay() throws IOException { + // test the case where seqId's coming from replayed WALEdits are made persisted with their + // original seqIds and they are made visible through mvcc read point upon replay + String method = name.getMethodName(); + byte[] tableName = Bytes.toBytes(method); + byte[] family = Bytes.toBytes("family"); + + HRegion region = initHRegion(tableName, method, family); + try { + // replay an entry that is bigger than current read point + long readPoint = region.getMVCC().memstoreReadPoint(); + long origSeqId = readPoint + 100; + + Put put = new Put(row).add(family, row, row); + put.setDurability(Durability.SKIP_WAL); // we replay with skip wal + replay(region, put, origSeqId); + + // read point should have advanced to this seqId + assertGet(region, family, row); + + // region seqId should have advanced at least to this seqId + assertEquals(origSeqId, region.getSequenceId().get()); + + // replay an entry that is smaller than current read point + // caution: adding an entry below current read point might cause partial dirty reads. Normal + // replay does not allow reads while replay is going on. + put = new Put(row2).add(family, row2, row2); + put.setDurability(Durability.SKIP_WAL); + replay(region, put, origSeqId - 50); + + assertGet(region, family, row2); + } finally { + region.close(); + } + } + + /** + * Tests that a region opened in secondary mode would not write region open / close + * events to its WAL. + * @throws IOException + */ + @SuppressWarnings("unchecked") + @Test + public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException { + secondaryRegion.close(); + walSecondary = spy(walSecondary); + + // test for region open and close + secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); + verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), + (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any()); + + // test for replay prepare flush + putDataByReplay(secondaryRegion, 0, 10, cq, families); + secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder(). + setFlushSequenceNumber(10) + .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName())) + .setAction(FlushAction.START_FLUSH) + .setEncodedRegionName( + ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) + .setRegionName(ByteString.copyFrom(primaryRegion.getRegionName())) + .build()); + + verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), + (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any()); + + secondaryRegion.close(); + verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), + (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any()); + } + + private void replay(HRegion region, Put put, long replaySeqId) throws IOException { + put.setDurability(Durability.SKIP_WAL); + MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); + region.batchReplay(new MutationReplay[] {mutation}, replaySeqId); + } + + /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does + * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of + * more rows but does not execute flush after + * @throws IOException */ + private void putDataWithFlushes(HRegion region, int flushInterval, + int numRows, int numRowsAfterFlush) throws IOException { + int start = 0; + for (; start < numRows; start += flushInterval) { + LOG.info("-- Writing some data to primary from " + start + " to " + (start+flushInterval)); + putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families); + LOG.info("-- Flushing primary, creating 3 files for 3 stores"); + region.flushcache(); + } + LOG.info("-- Writing some more data to primary, not flushing"); + putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families); + } + + private void putDataByReplay(HRegion region, + int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { + for (int i = startRow; i < startRow + numRows; i++) { + Put put = new Put(Bytes.toBytes("" + i)); + put.setDurability(Durability.SKIP_WAL); + for (byte[] family : families) { + put.add(family, qf, EnvironmentEdgeManager.currentTime(), null); + } + replay(region, put, i+1); + } + } + + private static HRegion initHRegion(byte[] tableName, + String callingMethod, byte[]... families) throws IOException { + return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families); + } + + private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, + String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, + WAL wal, byte[]... families) throws IOException { + return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf, + isReadOnly, durability, wal, families); + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index e3f51ea..bda95db 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -327,8 +327,7 @@ public class TestPerColumnFamilyFlush { return null; } - @Test (timeout=180000) - public void testLogReplay() throws Exception { + public void doTestLogReplay() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000); // Carefully chosen limits so that the memstore just flushes when we're done @@ -418,7 +417,14 @@ public class TestPerColumnFamilyFlush { @Test (timeout=180000) public void testLogReplayWithDistributedReplay() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); - testLogReplay(); + doTestLogReplay(); + } + + // Test Log Replay with Distributed log split on. + @Test (timeout=180000) + public void testLogReplayWithDistributedLogSplit() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); + doTestLogReplay(); } /**