diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 43e91d2..038b148 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -2584,6 +2584,7 @@ public final class ProtobufUtil {
FlushDescriptor.Builder desc = FlushDescriptor.newBuilder()
.setAction(action)
.setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes()))
+ .setRegionName(ByteStringer.wrap(hri.getRegionName()))
.setFlushSequenceNumber(flushSeqId)
.setTableName(ByteStringer.wrap(hri.getTable().getName()));
@@ -2609,6 +2610,7 @@ public final class ProtobufUtil {
.setEventType(eventType)
.setTableName(ByteStringer.wrap(hri.getTable().getName()))
.setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes()))
+ .setRegionName(ByteStringer.wrap(hri.getRegionName()))
.setLogSequenceNumber(seqId)
.setServer(toServerName(server));
diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 8566a88..d388267 100644
--- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.io.RawComparator;
import com.google.common.annotations.VisibleForTesting;
/**
- * An HBase Key/Value. This is the fundamental HBase Type.
+ * An HBase Key/Value. This is the fundamental HBase Type.
*
* HBase applications and users should use the Cell interface and avoid directly using KeyValue
* and member functions not defined in Cell.
@@ -297,6 +297,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return seqId;
}
+ @Override
public void setSequenceId(long seqId) {
this.seqId = seqId;
}
@@ -577,7 +578,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
this(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
qlength, timestamp, type, value, voffset, vlength, null);
}
-
+
/**
* Constructs KeyValue structure filled with specified values. Uses the provided buffer as the
* data buffer.
@@ -742,9 +743,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
public KeyValue(Cell c) {
this(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
- c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
- c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
- c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
+ c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
+ c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
+ c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
this.seqId = c.getSequenceId();
}
@@ -955,7 +956,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
final int rlength, final byte [] family, final int foffset, int flength,
final byte [] qualifier, final int qoffset, int qlength,
final long timestamp, final Type type,
- final byte [] value, final int voffset,
+ final byte [] value, final int voffset,
int vlength, byte[] tags, int tagsOffset, int tagsLength) {
checkParameters(row, rlength, family, flength, qlength, vlength);
@@ -1115,6 +1116,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
//
//---------------------------------------------------------------------------
+ @Override
public String toString() {
if (this.bytes == null || this.bytes.length == 0) {
return "empty";
@@ -1125,10 +1127,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
/**
* @param k Key portion of a KeyValue.
- * @return Key as a String, empty string if k is null.
+ * @return Key as a String, empty string if k is null.
*/
public static String keyToString(final byte [] k) {
- if (k == null) {
+ if (k == null) {
return "";
}
return keyToString(k, 0, k.length);
@@ -1464,6 +1466,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* save on allocations.
* @return Value in a new byte array.
*/
+ @Override
@Deprecated // use CellUtil.getValueArray()
public byte [] getValue() {
return CellUtil.cloneValue(this);
@@ -1477,6 +1480,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* lengths instead.
* @return Row in a new byte array.
*/
+ @Override
@Deprecated // use CellUtil.getRowArray()
public byte [] getRow() {
return CellUtil.cloneRow(this);
@@ -1534,6 +1538,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* lengths instead.
* @return Returns family. Makes a copy.
*/
+ @Override
@Deprecated // use CellUtil.getFamilyArray
public byte [] getFamily() {
return CellUtil.cloneFamily(this);
@@ -1548,6 +1553,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* Use {@link #getBuffer()} with appropriate offsets and lengths instead.
* @return Returns qualifier. Makes a copy.
*/
+ @Override
@Deprecated // use CellUtil.getQualifierArray
public byte [] getQualifier() {
return CellUtil.cloneQualifier(this);
@@ -1846,7 +1852,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return compareFlatKey(l,loff,llen, r,roff,rlen);
}
-
+
/**
* Compares the only the user specified portion of a Key. This is overridden by MetaComparator.
* @param left
@@ -2214,7 +2220,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* @param leftKey
* @param rightKey
* @return 0 if equal, <0 if left smaller, >0 if right smaller
- * @deprecated Since 0.99.2; Use {@link CellComparator#getMidpoint(Cell, Cell)} instead.
+ * @deprecated Since 0.99.2; Use {@link CellComparator#getMidpoint(KVComparator, Cell, Cell)}
+ * instead.
*/
@Deprecated
public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
@@ -2354,7 +2361,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
in.readFully(bytes);
return new KeyValue(bytes, 0, length);
}
-
+
/**
* Create a new KeyValue by copying existing cell and adding new tags
* @param c
@@ -2370,9 +2377,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
existingTags = newTags;
}
return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
- c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
- c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
- c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
+ c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
+ c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
+ c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
c.getValueLength(), existingTags);
}
@@ -2477,6 +2484,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
this.comparator = c;
}
+ @Override
public int compare(KeyValue left, KeyValue right) {
return comparator.compareRows(left, right);
}
@@ -2485,7 +2493,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
/**
* Avoids redundant comparisons for better performance.
- *
+ *
* TODO get rid of this wart
*/
public interface SamePrefixComparator {
@@ -2508,6 +2516,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* TODO: With V3 consider removing this.
* @return legacy class name for FileFileTrailer#comparatorClassName
*/
+ @Override
public String getLegacyKeyComparatorName() {
return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator";
}
@@ -2515,6 +2524,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
/**
* @deprecated Since 0.99.2.
*/
+ @Override
@Deprecated
public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right,
int roffset, int rlength) {
@@ -2526,6 +2536,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return compareOnlyKeyPortion(left, right);
}
+ @Override
@VisibleForTesting
public int compareOnlyKeyPortion(Cell left, Cell right) {
int c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getRowArray(), left.getRowOffset(),
@@ -2552,6 +2563,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return (0xff & left.getTypeByte()) - (0xff & right.getTypeByte());
}
+ @Override
public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
return firstKeyInBlock;
}
diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index c9fa854..35192cc 100644
--- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -5522,6 +5522,24 @@ public final class WALProtos {
*/
org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptorOrBuilder getStoreFlushesOrBuilder(
int index);
+
+ // optional bytes region_name = 6;
+ /**
+ * optional bytes region_name = 6;
+ *
+ *
+ * full region name
+ *
+ */
+ boolean hasRegionName();
+ /**
+ * optional bytes region_name = 6;
+ *
+ *
+ * full region name
+ *
+ */
+ com.google.protobuf.ByteString getRegionName();
}
/**
* Protobuf type {@code FlushDescriptor}
@@ -5613,6 +5631,11 @@ public final class WALProtos {
storeFlushes_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor.PARSER, extensionRegistry));
break;
}
+ case 50: {
+ bitField0_ |= 0x00000010;
+ regionName_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -6772,12 +6795,37 @@ public final class WALProtos {
return storeFlushes_.get(index);
}
+ // optional bytes region_name = 6;
+ public static final int REGION_NAME_FIELD_NUMBER = 6;
+ private com.google.protobuf.ByteString regionName_;
+ /**
+ * optional bytes region_name = 6;
+ *
+ *
+ * full region name
+ *
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * optional bytes region_name = 6;
+ *
+ *
+ * full region name
+ *
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+
private void initFields() {
action_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction.START_FLUSH;
tableName_ = com.google.protobuf.ByteString.EMPTY;
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
flushSequenceNumber_ = 0L;
storeFlushes_ = java.util.Collections.emptyList();
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -6824,6 +6872,9 @@ public final class WALProtos {
for (int i = 0; i < storeFlushes_.size(); i++) {
output.writeMessage(5, storeFlushes_.get(i));
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBytes(6, regionName_);
+ }
getUnknownFields().writeTo(output);
}
@@ -6853,6 +6904,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, storeFlushes_.get(i));
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(6, regionName_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -6898,6 +6953,11 @@ public final class WALProtos {
}
result = result && getStoreFlushesList()
.equals(other.getStoreFlushesList());
+ result = result && (hasRegionName() == other.hasRegionName());
+ if (hasRegionName()) {
+ result = result && getRegionName()
+ .equals(other.getRegionName());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -6931,6 +6991,10 @@ public final class WALProtos {
hash = (37 * hash) + STORE_FLUSHES_FIELD_NUMBER;
hash = (53 * hash) + getStoreFlushesList().hashCode();
}
+ if (hasRegionName()) {
+ hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
+ hash = (53 * hash) + getRegionName().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -7060,6 +7124,8 @@ public final class WALProtos {
} else {
storeFlushesBuilder_.clear();
}
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -7113,6 +7179,10 @@ public final class WALProtos {
} else {
result.storeFlushes_ = storeFlushesBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.regionName_ = regionName_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -7167,6 +7237,9 @@ public final class WALProtos {
}
}
}
+ if (other.hasRegionName()) {
+ setRegionName(other.getRegionName());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -7593,6 +7666,58 @@ public final class WALProtos {
return storeFlushesBuilder_;
}
+ // optional bytes region_name = 6;
+ private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * optional bytes region_name = 6;
+ *
+ *
+ * full region name
+ *
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * optional bytes region_name = 6;
+ *
+ *
+ * full region name
+ *
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+ /**
+ * optional bytes region_name = 6;
+ *
+ *
+ * full region name
+ *
+ */
+ public Builder setRegionName(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000020;
+ regionName_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional bytes region_name = 6;
+ *
+ *
+ * full region name
+ *
+ */
+ public Builder clearRegionName() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ regionName_ = getDefaultInstance().getRegionName();
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:FlushDescriptor)
}
@@ -9772,6 +9897,24 @@ public final class WALProtos {
*
*/
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerOrBuilder();
+
+ // optional bytes region_name = 7;
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ boolean hasRegionName();
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ com.google.protobuf.ByteString getRegionName();
}
/**
* Protobuf type {@code RegionEventDescriptor}
@@ -9876,6 +10019,11 @@ public final class WALProtos {
bitField0_ |= 0x00000010;
break;
}
+ case 58: {
+ bitField0_ |= 0x00000020;
+ regionName_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -10135,6 +10283,30 @@ public final class WALProtos {
return server_;
}
+ // optional bytes region_name = 7;
+ public static final int REGION_NAME_FIELD_NUMBER = 7;
+ private com.google.protobuf.ByteString regionName_;
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+
private void initFields() {
eventType_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType.REGION_OPEN;
tableName_ = com.google.protobuf.ByteString.EMPTY;
@@ -10142,6 +10314,7 @@ public final class WALProtos {
logSequenceNumber_ = 0L;
stores_ = java.util.Collections.emptyList();
server_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -10197,6 +10370,9 @@ public final class WALProtos {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeMessage(6, server_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeBytes(7, regionName_);
+ }
getUnknownFields().writeTo(output);
}
@@ -10230,6 +10406,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(6, server_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(7, regionName_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -10280,6 +10460,11 @@ public final class WALProtos {
result = result && getServer()
.equals(other.getServer());
}
+ result = result && (hasRegionName() == other.hasRegionName());
+ if (hasRegionName()) {
+ result = result && getRegionName()
+ .equals(other.getRegionName());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -10317,6 +10502,10 @@ public final class WALProtos {
hash = (37 * hash) + SERVER_FIELD_NUMBER;
hash = (53 * hash) + getServer().hashCode();
}
+ if (hasRegionName()) {
+ hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
+ hash = (53 * hash) + getRegionName().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -10453,6 +10642,8 @@ public final class WALProtos {
serverBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000020);
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -10514,6 +10705,10 @@ public final class WALProtos {
} else {
result.server_ = serverBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.regionName_ = regionName_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -10571,6 +10766,9 @@ public final class WALProtos {
if (other.hasServer()) {
mergeServer(other.getServer());
}
+ if (other.hasRegionName()) {
+ setRegionName(other.getRegionName());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -11156,6 +11354,58 @@ public final class WALProtos {
return serverBuilder_;
}
+ // optional bytes region_name = 7;
+ private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public Builder setRegionName(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000040;
+ regionName_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public Builder clearRegionName() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ regionName_ = getDefaultInstance().getRegionName();
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:RegionEventDescriptor)
}
@@ -11598,32 +11848,33 @@ public final class WALProtos {
"n_name\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020com" +
"paction_input\030\004 \003(\t\022\031\n\021compaction_output" +
"\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region" +
- "_name\030\007 \001(\014\"\353\002\n\017FlushDescriptor\022,\n\006actio" +
+ "_name\030\007 \001(\014\"\200\003\n\017FlushDescriptor\022,\n\006actio" +
"n\030\001 \002(\0162\034.FlushDescriptor.FlushAction\022\022\n",
"\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_name" +
"\030\003 \002(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n" +
"\rstore_flushes\030\005 \003(\0132%.FlushDescriptor.S" +
- "toreFlushDescriptor\032Y\n\024StoreFlushDescrip" +
- "tor\022\023\n\013family_name\030\001 \002(\014\022\026\n\016store_home_d" +
- "ir\030\002 \002(\t\022\024\n\014flush_output\030\003 \003(\t\"A\n\013FlushA" +
- "ction\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FLUSH\020\001" +
- "\022\017\n\013ABORT_FLUSH\020\002\"R\n\017StoreDescriptor\022\023\n\013" +
- "family_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(" +
- "\t\022\022\n\nstore_file\030\003 \003(\t\"\215\001\n\022BulkLoadDescri",
- "ptor\022\036\n\ntable_name\030\001 \002(\0132\n.TableName\022\033\n\023" +
- "encoded_region_name\030\002 \002(\014\022 \n\006stores\030\003 \003(" +
- "\0132\020.StoreDescriptor\022\030\n\020bulkload_seq_num\030" +
- "\004 \002(\003\"\212\002\n\025RegionEventDescriptor\0224\n\nevent" +
- "_type\030\001 \002(\0162 .RegionEventDescriptor.Even" +
- "tType\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_reg" +
- "ion_name\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 " +
- "\001(\004\022 \n\006stores\030\005 \003(\0132\020.StoreDescriptor\022\033\n" +
- "\006server\030\006 \001(\0132\013.ServerName\".\n\tEventType\022" +
- "\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWA",
- "LTrailer*F\n\tScopeType\022\033\n\027REPLICATION_SCO" +
- "PE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001" +
- "B?\n*org.apache.hadoop.hbase.protobuf.gen" +
- "eratedB\tWALProtosH\001\210\001\000\240\001\001"
+ "toreFlushDescriptor\022\023\n\013region_name\030\006 \001(\014" +
+ "\032Y\n\024StoreFlushDescriptor\022\023\n\013family_name\030" +
+ "\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t\022\024\n\014flush_o" +
+ "utput\030\003 \003(\t\"A\n\013FlushAction\022\017\n\013START_FLUS" +
+ "H\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\"R" +
+ "\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014\022\026" +
+ "\n\016store_home_dir\030\002 \002(\t\022\022\n\nstore_file\030\003 \003",
+ "(\t\"\215\001\n\022BulkLoadDescriptor\022\036\n\ntable_name\030" +
+ "\001 \002(\0132\n.TableName\022\033\n\023encoded_region_name" +
+ "\030\002 \002(\014\022 \n\006stores\030\003 \003(\0132\020.StoreDescriptor" +
+ "\022\030\n\020bulkload_seq_num\030\004 \002(\003\"\237\002\n\025RegionEve" +
+ "ntDescriptor\0224\n\nevent_type\030\001 \002(\0162 .Regio" +
+ "nEventDescriptor.EventType\022\022\n\ntable_name" +
+ "\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002(\014\022\033\n\023l" +
+ "og_sequence_number\030\004 \001(\004\022 \n\006stores\030\005 \003(\013" +
+ "2\020.StoreDescriptor\022\033\n\006server\030\006 \001(\0132\013.Ser" +
+ "verName\022\023\n\013region_name\030\007 \001(\014\".\n\tEventTyp",
+ "e\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\n" +
+ "WALTrailer*F\n\tScopeType\022\033\n\027REPLICATION_S" +
+ "COPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL" +
+ "\020\001B?\n*org.apache.hadoop.hbase.protobuf.g" +
+ "eneratedB\tWALProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -11659,7 +11910,7 @@ public final class WALProtos {
internal_static_FlushDescriptor_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_FlushDescriptor_descriptor,
- new java.lang.String[] { "Action", "TableName", "EncodedRegionName", "FlushSequenceNumber", "StoreFlushes", });
+ new java.lang.String[] { "Action", "TableName", "EncodedRegionName", "FlushSequenceNumber", "StoreFlushes", "RegionName", });
internal_static_FlushDescriptor_StoreFlushDescriptor_descriptor =
internal_static_FlushDescriptor_descriptor.getNestedTypes().get(0);
internal_static_FlushDescriptor_StoreFlushDescriptor_fieldAccessorTable = new
@@ -11683,7 +11934,7 @@ public final class WALProtos {
internal_static_RegionEventDescriptor_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RegionEventDescriptor_descriptor,
- new java.lang.String[] { "EventType", "TableName", "EncodedRegionName", "LogSequenceNumber", "Stores", "Server", });
+ new java.lang.String[] { "EventType", "TableName", "EncodedRegionName", "LogSequenceNumber", "Stores", "Server", "RegionName", });
internal_static_WALTrailer_descriptor =
getDescriptor().getMessageTypes().get(8);
internal_static_WALTrailer_fieldAccessorTable = new
diff --git hbase-protocol/src/main/protobuf/WAL.proto hbase-protocol/src/main/protobuf/WAL.proto
index 169a9b2..3fd6025 100644
--- hbase-protocol/src/main/protobuf/WAL.proto
+++ hbase-protocol/src/main/protobuf/WAL.proto
@@ -122,6 +122,7 @@ message FlushDescriptor {
required bytes encoded_region_name = 3;
optional uint64 flush_sequence_number = 4;
repeated StoreFlushDescriptor store_flushes = 5;
+ optional bytes region_name = 6; // full region name
}
message StoreDescriptor {
@@ -155,6 +156,7 @@ message RegionEventDescriptor {
optional uint64 log_sequence_number = 4;
repeated StoreDescriptor stores = 5;
optional ServerName server = 6; // Server who opened the region
+ optional bytes region_name = 7; // full region name
}
/**
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 48b78c2..081d7a5 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -208,6 +208,11 @@ public class DefaultMemStore implements MemStore {
return this.snapshotSize > 0 ? this.snapshotSize : keySize();
}
+ @Override
+ public long getSnapshotSize() {
+ return this.snapshotSize;
+ }
+
/**
* Write an update
* @param cell
@@ -462,6 +467,7 @@ public class DefaultMemStore implements MemStore {
* @param now
* @return Timestamp
*/
+ @Override
public long updateColumnValue(byte[] row,
byte[] family,
byte[] qualifier,
@@ -524,7 +530,7 @@ public class DefaultMemStore implements MemStore {
* atomically. Scans will only see each KeyValue update as atomic.
*
* @param cells
- * @param readpoint readpoint below which we can safely remove duplicate KVs
+ * @param readpoint readpoint below which we can safely remove duplicate KVs
* @return change in memstore size
*/
@Override
@@ -1031,7 +1037,7 @@ public class DefaultMemStore implements MemStore {
public long size() {
return heapSize();
}
-
+
/**
* Code to help figure if our approximation of object heap sizes is close
* enough. See hbase-900. Fills memstores then waits so user can heap
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 53e732a..aa65ddd 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.RandomAccess;
@@ -61,7 +62,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -100,6 +99,7 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
@@ -133,12 +133,16 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -176,6 +180,7 @@ import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
+import com.google.protobuf.TextFormat;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@@ -255,6 +260,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
private final AtomicLong sequenceId = new AtomicLong(-1L);
/**
+ * The sequence id of the last replayed open region event from the primary region. This is used
+ * to skip entries before this due to the possibility of replay edits coming out of order from
+ * replication.
+ */
+ protected volatile long lastReplayedOpenRegionSeqId = -1L;
+
+ /**
* Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
* startRegionOperation to possibly invoke different checks before any region operations. Not all
* operations have to be defined here. It's only needed when a special check is need in
@@ -262,7 +274,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
public enum Operation {
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
- REPLAY_BATCH_MUTATE, COMPACT_REGION
+ REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT
}
//////////////////////////////////////////////////////////////////////////////
@@ -367,6 +379,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// The following map is populated when opening the region
Map maxSeqIdInStores = new TreeMap(Bytes.BYTES_COMPARATOR);
+ /** Saved state from replaying prepare flush cache */
+ private PrepareFlushResult prepareFlushResult = null;
+
/**
* Config setting for whether to allow writes when a region is in recovering or not.
*/
@@ -516,6 +531,54 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public boolean isCompactionNeeded() {
return result == Result.FLUSHED_COMPACTION_NEEDED;
}
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("flush result:").append(result).append(", ")
+ .append("failureReason:").append(failureReason).append(",")
+ .append("flush seq id").append(flushSequenceId).toString();
+ }
+ }
+
+ /** A result object from prepare flush cache stage */
+ @VisibleForTesting
+ static class PrepareFlushResult {
+ final FlushResult result; // indicating a failure result from prepare
+ final TreeMap storeFlushCtxs;
+ final TreeMap> committedFiles;
+ final long startTime;
+ final long flushOpSeqId;
+ final long flushedSeqId;
+ final long totalFlushableSize;
+
+ /** Constructs an early exit case */
+ PrepareFlushResult(FlushResult result, long flushSeqId) {
+ this(result, null, null, Math.max(0, flushSeqId), 0, 0, 0);
+ }
+
+ /** Constructs a successful prepare flush result */
+ PrepareFlushResult(
+ TreeMap storeFlushCtxs,
+ TreeMap> committedFiles, long startTime, long flushSeqId,
+ long flushedSeqId, long totalFlushableSize) {
+ this(null, storeFlushCtxs, committedFiles, startTime,
+ flushSeqId, flushedSeqId, totalFlushableSize);
+ }
+
+ private PrepareFlushResult(
+ FlushResult result,
+ TreeMap storeFlushCtxs,
+ TreeMap> committedFiles, long startTime, long flushSeqId,
+ long flushedSeqId, long totalFlushableSize) {
+ this.result = result;
+ this.storeFlushCtxs = storeFlushCtxs;
+ this.committedFiles = committedFiles;
+ this.startTime = startTime;
+ this.flushOpSeqId = flushSeqId;
+ this.flushedSeqId = flushedSeqId;
+ this.totalFlushableSize = totalFlushableSize;
+ }
}
final WriteState writestate = new WriteState();
@@ -771,6 +834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Initialize all the HStores
status.setStatus("Initializing all the Stores");
long maxSeqId = initializeRegionStores(reporter, status);
+ this.lastReplayedOpenRegionSeqId = maxSeqId;
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
this.writestate.flushRequested = false;
@@ -1229,9 +1293,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
status.setStatus("Disabling compacts and flushes for region");
+ boolean canFlush = true;
synchronized (writestate) {
// Disable compacting and flushing by background threads for this
// region.
+ canFlush = !writestate.readOnly;
writestate.writesEnabled = false;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
waitForFlushesAndCompactions();
@@ -1239,7 +1305,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// If we were not just flushing, is it worth doing a preflush...one
// that will clear out of the bulk of the memstore before we put up
// the close flag?
- if (!abort && worthPreFlushing()) {
+ if (!abort && worthPreFlushing() && canFlush) {
status.setStatus("Pre-flushing region before close");
LOG.info("Running close preflush of " + this.getRegionNameAsString());
try {
@@ -1262,7 +1328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
LOG.debug("Updates disabled for region " + this);
// Don't flush the cache if we are aborting
- if (!abort) {
+ if (!abort && canFlush) {
int flushCount = 0;
while (this.getMemstoreSize().get() > 0) {
try {
@@ -1300,7 +1366,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// close each store in parallel
for (final Store store : stores.values()) {
- assert abort || store.getFlushableSize() == 0;
+ assert abort || store.getFlushableSize() == 0 || writestate.readOnly;
completionService
.submit(new Callable>>() {
@Override
@@ -1336,7 +1402,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
this.closed.set(true);
- if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
+ if (!canFlush) {
+ addAndGetGlobalMemstoreSize(-memstoreSize.get());
+ } else if (memstoreSize.get() != 0) {
+ LOG.error("Memstore size is " + memstoreSize.get());
+ }
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-close hooks");
this.coprocessorHost.postClose(abort);
@@ -1362,6 +1432,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
public void waitForFlushesAndCompactions() {
synchronized (writestate) {
+ if (this.writestate.readOnly) {
+ // we should not wait for replayed flushed if we are read only (for example in case the
+ // region is a secondary replica).
+ return;
+ }
boolean interrupted = false;
try {
while (writestate.compacting > 0 || writestate.flushing) {
@@ -1592,6 +1667,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
+ * This is a helper function that compact the given store
+ * It is used by utilities and testing
+ *
+ * @throws IOException e
+ */
+ @VisibleForTesting
+ void compactStore(byte[] family, CompactionThroughputController throughputController)
+ throws IOException {
+ Store s = getStore(family);
+ CompactionContext compaction = s.requestCompaction();
+ if (compaction != null) {
+ compact(compaction, s, throughputController);
+ }
+ }
+
+ /*
* Called by compaction thread and after region is opened to compact the
* HStores if necessary.
*
@@ -1738,6 +1829,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
status.setStatus("Running coprocessor pre-flush hooks");
coprocessorHost.preFlush();
}
+ // TODO: this should be managed within memstore with the snapshot, updated only after flush
+ // successful
if (numMutationsWithoutWAL.get() > 0) {
numMutationsWithoutWAL.set(0);
dataInMemoryWithoutWAL.set(0);
@@ -1903,6 +1996,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
final Collection storesToFlush, MonitoredTask status) throws IOException {
+ PrepareFlushResult result
+ = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, false);
+ if (result.result == null) {
+ return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
+ } else {
+ return result.result; // early exit due to failure from prepare stage
+ }
+ }
+
+ protected PrepareFlushResult internalPrepareFlushCache(
+ final WAL wal, final long myseqid, final Collection storesToFlush,
+ MonitoredTask status, boolean isReplay)
+ throws IOException {
+
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@@ -1930,10 +2037,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
w.setWriteNumber(flushSeqId);
mvcc.waitForPreviousTransactionsComplete(w);
w = null;
- return flushResult;
+ return new PrepareFlushResult(flushResult, myseqid);
} else {
- return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
- "Nothing to flush");
+ return new PrepareFlushResult(
+ new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"),
+ myseqid);
}
}
} finally {
@@ -1977,7 +2085,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
flushedFamilyNames.add(store.getFamily().getName());
}
- List storeFlushCtxs = new ArrayList(stores.size());
+ TreeMap storeFlushCtxs
+ = new TreeMap(Bytes.BYTES_COMPARATOR);
TreeMap> committedFiles = new TreeMap>(
Bytes.BYTES_COMPARATOR);
// The sequence id of this flush operation which is used to log FlushMarker and pass to
@@ -1998,7 +2107,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
String msg = "Flush will not be started for ["
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
status.setStatus(msg);
- return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
+ return new PrepareFlushResult(new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg),
+ myseqid);
}
flushOpSeqId = getNextSequenceId(wal);
long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
@@ -2013,12 +2123,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
for (Store s : storesToFlush) {
totalFlushableSizeOfFlushableStores += s.getFlushableSize();
- storeFlushCtxs.add(s.createFlushContext(flushOpSeqId));
+ storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
}
// write the snapshot start to WAL
- if (wal != null) {
+ if (wal != null && !writestate.readOnly) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
// no sync. Sync is below where we do not hold the updates lock
@@ -2027,7 +2137,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
// Prepare flush (take a snapshot)
- for (StoreFlushContext flush : storeFlushCtxs) {
+ for (StoreFlushContext flush : storeFlushCtxs.values()) {
flush.prepare();
}
} catch (IOException ex) {
@@ -2075,15 +2185,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
mvcc.waitForPreviousTransactionsComplete(w);
// set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
w = null;
- s = "Flushing stores of " + this;
- status.setStatus(s);
- if (LOG.isTraceEnabled()) LOG.trace(s);
} finally {
if (w != null) {
// in case of failure just mark current w as complete
mvcc.advanceMemstore(w);
}
}
+ return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId,
+ flushedSeqId, totalFlushableSizeOfFlushableStores);
+ }
+
+ protected FlushResult internalFlushCacheAndCommit(
+ final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
+ final Collection storesToFlush)
+ throws IOException {
+
+ // prepare flush context is carried via PrepareFlushResult
+ TreeMap storeFlushCtxs = prepareResult.storeFlushCtxs;
+ TreeMap> committedFiles = prepareResult.committedFiles;
+ long startTime = prepareResult.startTime;
+ long flushOpSeqId = prepareResult.flushOpSeqId;
+ long flushedSeqId = prepareResult.flushedSeqId;
+ long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
+
+ String s = "Flushing stores of " + this;
+ status.setStatus(s);
+ if (LOG.isTraceEnabled()) LOG.trace(s);
// Any failure from here on out will be catastrophic requiring server
// restart so wal content can be replayed and put back into the memstore.
@@ -2096,7 +2223,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// just-made new flush store file. The new flushed file is still in the
// tmp directory.
- for (StoreFlushContext flush : storeFlushCtxs) {
+ for (StoreFlushContext flush : storeFlushCtxs.values()) {
flush.flushCache(status);
}
@@ -2104,7 +2231,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// all the store scanners to reset/reseek).
Iterator it = storesToFlush.iterator();
// stores.values() and storeFlushCtxs have same order
- for (StoreFlushContext flush : storeFlushCtxs) {
+ for (StoreFlushContext flush : storeFlushCtxs.values()) {
boolean needsCompaction = flush.commit(status);
if (needsCompaction) {
compactionRequested = true;
@@ -2593,6 +2720,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
throws IOException {
+ if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
+ && replaySeqId < lastReplayedOpenRegionSeqId) {
+ // if it is a secondary replica we should ignore these entries silently
+ // since they are coming out of order
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(getRegionInfo().getEncodedName() + " : "
+ + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId
+ + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);
+ for (MutationReplay mut : mutations) {
+ LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);
+ }
+ }
+
+ OperationStatus[] statuses = new OperationStatus[mutations.length];
+ for (int i = 0; i < statuses.length; i++) {
+ statuses[i] = OperationStatus.SUCCESS;
+ }
+ return statuses;
+ }
return batchMutate(new ReplayBatch(mutations, replaySeqId));
}
@@ -2897,7 +3043,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
// txid should always increase, so having the one from the last call is ok.
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(),
currentNonceGroup, currentNonce);
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
@@ -2923,14 +3069,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// STEP 5. Append the final edit to WAL. Do not sync wal.
// -------------------------
Mutation mutation = batchOp.getMutation(firstIndex);
+ if (isInReplay) {
+ // use wal key from the original
+ walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
+ mutation.getClusterIds(), currentNonceGroup, currentNonce);
+ long replaySeqId = batchOp.getReplaySequenceId();
+ walKey.setOrigLogSeqNum(replaySeqId);
+
+ // ensure that the sequence id of the region is at least as big as orig log seq id
+ while (true) {
+ long seqId = getSequenceId().get();
+ if (seqId >= replaySeqId) break;
+ if (getSequenceId().compareAndSet(seqId, replaySeqId)) break;
+ }
+ }
if (walEdit.size() > 0) {
+ if (!isInReplay) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce);
- if(isInReplay) {
- walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
}
+
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
getSequenceId(), true, memstoreCells);
}
@@ -3803,7 +3964,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
CompactionDescriptor compaction = WALEdit.getCompaction(cell);
if (compaction != null) {
//replay the compaction
- completeCompactionMarker(compaction);
+ replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
}
skippedEdits++;
continue;
@@ -3886,15 +4047,506 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* that was not finished. We could find one recovering a WAL after a regionserver crash.
* See HBASE-2331.
*/
- void completeCompactionMarker(CompactionDescriptor compaction)
+ void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
+ boolean removeFiles, long replaySeqId)
+ throws IOException {
+ checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
+ "Compaction marker from WAL ", compaction);
+
+ if (replaySeqId < lastReplayedOpenRegionSeqId) {
+ LOG.warn("Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
+ + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ + " of " + lastReplayedOpenRegionSeqId);
+ return;
+ }
+
+ startRegionOperation(Operation.REPLAY_EVENT);
+ try {
+ Store store = this.getStore(compaction.getFamilyName().toByteArray());
+ if (store == null) {
+ LOG.warn("Found Compaction WAL edit for deleted family:" +
+ Bytes.toString(compaction.getFamilyName().toByteArray()));
+ return;
+ }
+ store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
+ } finally {
+ closeRegionOperation(Operation.REPLAY_EVENT);
+ }
+ }
+
+ void replayWALFlushMarker(FlushDescriptor flush) throws IOException {
+ checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
+ "Flush marker from WAL ", flush);
+
+ if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+ return; // if primary nothing to do
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Replaying flush marker " + TextFormat.shortDebugString(flush));
+ }
+
+ startRegionOperation(Operation.REPLAY_EVENT); // use region close lock to guard against close
+ try {
+ FlushAction action = flush.getAction();
+ switch (action) {
+ case START_FLUSH:
+ replayWALFlushStartMarker(flush);
+ break;
+ case COMMIT_FLUSH:
+ replayWALFlushCommitMarker(flush);
+ break;
+ case ABORT_FLUSH:
+ replayWALFlushAbortMarker(flush);
+ break;
+ default:
+ LOG.warn("Received a flush event with unknown action, ignoring. "
+ + TextFormat.shortDebugString(flush));
+ break;
+ }
+ } finally {
+ closeRegionOperation(Operation.REPLAY_EVENT);
+ }
+ }
+
+ /** Replay the flush marker from primary region by creating a corresponding snapshot of
+ * the store memstores, only if the memstores do not have a higher seqId from an earlier wal
+ * edit (because the events may be coming out of order).
+ */
+ @VisibleForTesting
+ PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
+ long flushSeqId = flush.getFlushSequenceNumber();
+
+ HashSet storesToFlush = new HashSet();
+ for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
+ byte[] family = storeFlush.getFamilyName().toByteArray();
+ Store store = getStore(family);
+ if (store == null) {
+ LOG.info("Received a flush start marker from primary, but the family is not found. Ignoring"
+ + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
+ continue;
+ }
+ storesToFlush.add(store);
+ }
+
+ MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
+
+ // we will use writestate as a coarse-grain lock for all the replay events
+ // (flush, compaction, region open etc)
+ synchronized (writestate) {
+ try {
+ if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
+ LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+ + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ + " of " + lastReplayedOpenRegionSeqId);
+ return null;
+ }
+ if (numMutationsWithoutWAL.get() > 0) {
+ numMutationsWithoutWAL.set(0);
+ dataInMemoryWithoutWAL.set(0);
+ }
+
+ if (!writestate.flushing) {
+ // we do not have an active snapshot and corresponding this.prepareResult. This means
+ // we can just snapshot our memstores and continue as normal.
+
+ // invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal
+ PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
+ flushSeqId, storesToFlush, status, true);
+ if (prepareResult.result == null) {
+ // save the PrepareFlushResult so that we can use it later from commit flush
+ this.writestate.flushing = true;
+ this.prepareFlushResult = prepareResult;
+ status.markComplete("Flush prepare successful");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getRegionInfo().getEncodedName() + " : "
+ + " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
+ }
+ } else {
+ status.abort("Flush prepare failed with " + prepareResult.result);
+ // nothing much to do. prepare flush failed because of some reason.
+ }
+ return prepareResult;
+ } else {
+ // we already have an active snapshot.
+ if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
+ // They define the same flush. Log and continue.
+ LOG.warn("Received a flush prepare marker with the same seqId: " +
+ + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ + prepareFlushResult.flushOpSeqId + ". Ignoring");
+ // ignore
+ } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
+ // We received a flush with a smaller seqNum than what we have prepared. We can only
+ // ignore this prepare flush request.
+ LOG.warn("Received a flush prepare marker with a smaller seqId: " +
+ + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ + prepareFlushResult.flushOpSeqId + ". Ignoring");
+ // ignore
+ } else {
+ // We received a flush with a larger seqNum than what we have prepared
+ LOG.warn("Received a flush prepare marker with a larger seqId: " +
+ + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ + prepareFlushResult.flushOpSeqId + ". Ignoring");
+ // We do not have multiple active snapshots in the memstore or a way to merge current
+ // memstore snapshot with the contents and resnapshot for now. We cannot take
+ // another snapshot and drop the previous one because that will cause temporary
+ // data loss in the secondary. So we ignore this for now, deferring the resolution
+ // to happen when we see the corresponding flush commit marker. If we have a memstore
+ // snapshot with x, and later received another prepare snapshot with y (where x < y),
+ // when we see flush commit for y, we will drop snapshot for x, and can also drop all
+ // the memstore edits if everything in memstore is < y. This is the usual case for
+ // RS crash + recovery where we might see consequtive prepare flush wal markers.
+ // Otherwise, this will cause more memory to be used in secondary replica until a
+ // further prapare + commit flush is seen and replayed.
+ }
+ }
+ } finally {
+ status.cleanup();
+ writestate.notifyAll();
+ }
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
+ MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
+
+ // check whether we have the memstore snapshot with the corresponding seqId. Replay to
+ // secondary region replicas are in order, except for when the region moves or then the
+ // region server crashes. In those cases, we may receive replay requests out of order from
+ // the original seqIds.
+ synchronized (writestate) {
+ try {
+ if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
+ LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+ + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ + " of " + lastReplayedOpenRegionSeqId);
+ return;
+ }
+
+ if (writestate.flushing) {
+ PrepareFlushResult prepareFlushResult = this.prepareFlushResult;
+ if (flush.getFlushSequenceNumber() == prepareFlushResult.flushOpSeqId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getRegionInfo().getEncodedName() + " : "
+ + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
+ + " and a previous prepared snapshot was found");
+ }
+ // This is the regular case where we received commit flush after prepare flush
+ // corresponding to the same seqId.
+ replayFlushInStores(flush, prepareFlushResult, true);
+
+ // Set down the memstore size by amount of flush.
+ this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
+
+ this.prepareFlushResult = null;
+ writestate.flushing = false;
+ } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
+ // This should not happen normally. However, lets be safe and guard against these cases
+ // we received a flush commit with a smaller seqId than what we have prepared
+ // we will pick the flush file up from this commit (if we have not seen it), but we
+ // will not drop the memstore
+ LOG.warn("Received a flush commit marker with smaller seqId: "
+ + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
+ + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
+ +" prepared memstore snapshot");
+ replayFlushInStores(flush, prepareFlushResult, false);
+
+ // snapshot is not dropped, so memstore sizes should not be decremented
+ // we still have the prepared snapshot, flushing should still be true
+ } else {
+ // This should not happen normally. However, lets be safe and guard against these cases
+ // we received a flush commit with a larger seqId than what we have prepared
+ // we will pick the flush file for this. We will also obtain the updates lock and
+ // look for contents of the memstore to see whether we have edits after this seqId.
+ // If not, we will drop all the memstore edits and the snapshot as well.
+ LOG.warn("Received a flush commit marker with larger seqId: "
+ + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
+ prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
+ +" memstore snapshot");
+
+ replayFlushInStores(flush, prepareFlushResult, true);
+
+ // Set down the memstore size by amount of flush.
+ this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
+
+ // Inspect the memstore contents to see whether the memstore contains only edits
+ // with seqId smaller than the flush seqId. If so, we can discard those edits.
+ dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
+
+ this.prepareFlushResult = null;
+ writestate.flushing = false;
+ }
+ } else {
+ LOG.warn(getRegionInfo().getEncodedName() + " : "
+ + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
+ + ", but no previous prepared snapshot was found");
+ // There is no corresponding prepare snapshot from before.
+ // We will pick up the new flushed file
+ replayFlushInStores(flush, null, false);
+
+ // Inspect the memstore contents to see whether the memstore contains only edits
+ // with seqId smaller than the flush seqId. If so, we can discard those edits.
+ dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
+ }
+
+ status.markComplete("Flush commit successful");
+
+ // Update the last flushed sequence id for region.
+ this.maxFlushedSeqId = flush.getFlushSequenceNumber();
+
+ // advance the mvcc read point so that the new flushed file is visible.
+ // there may be some in-flight transactions, but they won't be made visible since they are
+ // either greater than flush seq number or they were already dropped via flush.
+ // TODO: If we are using FlushAllStoresPolicy, then this can make edits visible from other
+ // stores while they are still in flight because the flush commit marker will not contain
+ // flushes from ALL stores.
+ getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
+
+ // C. Finally notify anyone waiting on memstore to clear:
+ // e.g. checkResources().
+ synchronized (this) {
+ notifyAll(); // FindBugs NN_NAKED_NOTIFY
+ }
+ } finally {
+ status.cleanup();
+ writestate.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Replays the given flush descriptor by opening the flush files in stores and dropping the
+ * memstore snapshots if requested.
+ * @param flush
+ * @param prepareFlushResult
+ * @param dropMemstoreSnapshot
+ * @throws IOException
+ */
+ private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
+ boolean dropMemstoreSnapshot)
throws IOException {
- Store store = this.getStore(compaction.getFamilyName().toByteArray());
- if (store == null) {
- LOG.warn("Found Compaction WAL edit for deleted family:" +
- Bytes.toString(compaction.getFamilyName().toByteArray()));
+ for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
+ byte[] family = storeFlush.getFamilyName().toByteArray();
+ Store store = getStore(family);
+ if (store == null) {
+ LOG.warn("Received a flush commit marker from primary, but the family is not found." +
+ "Ignoring StoreFlushDescriptor:" + storeFlush);
+ continue;
+ }
+ List flushFiles = storeFlush.getFlushOutputList();
+ StoreFlushContext ctx = null;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ if (prepareFlushResult == null) {
+ ctx = store.createFlushContext(flush.getFlushSequenceNumber());
+ } else {
+ ctx = prepareFlushResult.storeFlushCtxs.get(family);
+ startTime = prepareFlushResult.startTime;
+ }
+
+ if (ctx == null) {
+ LOG.warn("Unexpected: flush commit marker received from store "
+ + Bytes.toString(family) + " but no associated flush context. Ignoring");
+ continue;
+ }
+ ctx.replayFlush(flushFiles, dropMemstoreSnapshot); // replay the flush
+
+ // Record latest flush time
+ this.lastStoreFlushTimeMap.put(store, startTime);
+ }
+ }
+
+ /**
+ * Drops the memstore contents after replaying a flush descriptor or region open event replay
+ * if the memstore edits have seqNums smaller than the given seq id
+ * @param flush the flush descriptor
+ * @throws IOException
+ */
+ private void dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
+ this.updatesLock.writeLock().lock();
+ try {
+ mvcc.waitForPreviousTransactionsComplete();
+ long currentSeqId = getSequenceId().get();
+ if (seqId >= currentSeqId) {
+ // then we can drop the memstore contents since everything is below this seqId
+ LOG.info("Dropping memstore contents as well since replayed flush seqId: "
+ + seqId + " is greater than current seqId:" + currentSeqId);
+
+ // Prepare flush (take a snapshot) and then abort (drop the snapshot)
+ if (store == null ) {
+ for (Store s : stores.values()) {
+ dropStoreMemstoreContentsForSeqId(s, currentSeqId);
+ }
+ } else {
+ dropStoreMemstoreContentsForSeqId(store, currentSeqId);
+ }
+ } else {
+ LOG.info("Not dropping memstore contents since replayed flush seqId: "
+ + seqId + " is smaller than current seqId:" + currentSeqId);
+ }
+ } finally {
+ this.updatesLock.writeLock().unlock();
+ }
+ }
+
+ private void dropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
+ this.addAndGetGlobalMemstoreSize(-s.getFlushableSize());
+ StoreFlushContext ctx = s.createFlushContext(currentSeqId);
+ ctx.prepare();
+ ctx.abort();
+ }
+
+ private void replayWALFlushAbortMarker(FlushDescriptor flush) {
+ // nothing to do for now. A flush abort will cause a RS abort which means that the region
+ // will be opened somewhere else later. We will see the region open event soon, and replaying
+ // that will drop the snapshot
+ }
+
+ @VisibleForTesting
+ PrepareFlushResult getPrepareFlushResult() {
+ return prepareFlushResult;
+ }
+
+ void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
+ checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
+ "RegionEvent marker from WAL ", regionEvent);
+
+ startRegionOperation(Operation.REPLAY_EVENT);
+ try {
+ if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+ return; // if primary nothing to do
+ }
+
+ if (regionEvent.getEventType() == EventType.REGION_CLOSE) {
+ // nothing to do on REGION_CLOSE for now.
+ return;
+ }
+ if (regionEvent.getEventType() != EventType.REGION_OPEN) {
+ LOG.warn("Unknown region event received, ignoring :"
+ + TextFormat.shortDebugString(regionEvent));
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
+ }
+
+ // we will use writestate as a coarse-grain lock for all the replay events
+ synchronized (writestate) {
+ // Replication can deliver events out of order when primary region moves or the region
+ // server crashes, since there is no coordination between replication of different wal files
+ // belonging to different region servers. We have to safe guard against this case by using
+ // region open event's seqid. Since this is the first event that the region puts (after
+ // possibly flushing recovered.edits), after seeing this event, we can ignore every edit
+ // smaller than this seqId
+ if (this.lastReplayedOpenRegionSeqId < regionEvent.getLogSequenceNumber()) {
+ this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
+ } else {
+ LOG.warn("Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
+ + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ + " of " + lastReplayedOpenRegionSeqId);
+ return;
+ }
+
+ // region open lists all the files that the region has at the time of the opening. Just pick
+ // all the files and drop prepared flushes and empty memstores
+ for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
+ // stores of primary may be different now
+ byte[] family = storeDescriptor.getFamilyName().toByteArray();
+ Store store = getStore(family);
+ if (store == null) {
+ LOG.warn("Received a region open marker from primary, but the family is not found. "
+ + "Ignoring. StoreDescriptor:" + storeDescriptor);
+ continue;
+ }
+
+ long storeSeqId = store.getMaxSequenceId();
+ List storeFiles = storeDescriptor.getStoreFileList();
+ store.refreshStoreFiles(storeFiles); // replace the files with the new ones
+ if (store.getMaxSequenceId() != storeSeqId) {
+ // Record latest flush time if we picked up new files
+ lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
+ }
+
+ if (writestate.flushing) {
+ // only drop memstore snapshots if they are smaller than last flush for the store
+ if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
+ StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs.get(family);
+ if (ctx != null) {
+ long snapshotSize = store.getFlushableSize();
+ ctx.abort();
+ this.addAndGetGlobalMemstoreSize(-snapshotSize);
+ this.prepareFlushResult.storeFlushCtxs.remove(family);
+ }
+ }
+ }
+
+ // Drop the memstore contents if they are now smaller than the latest seen flushed file
+ dropMemstoreContentsForSeqId(regionEvent.getLogSequenceNumber(), store);
+ if (storeSeqId > this.maxFlushedSeqId) {
+ this.maxFlushedSeqId = storeSeqId;
+ }
+ }
+
+ // if all stores ended up dropping their snapshots, we can safely drop the
+ // prepareFlushResult
+ if (writestate.flushing) {
+ boolean canDrop = true;
+ for (Entry entry
+ : prepareFlushResult.storeFlushCtxs.entrySet()) {
+ Store store = getStore(entry.getKey());
+ if (store == null) {
+ continue;
+ }
+ if (store.getSnapshotSize() > 0) {
+ canDrop = false;
+ }
+ }
+
+ // this means that all the stores in the region has finished flushing, but the WAL marker
+ // may not have been written or we did not receive it yet.
+ if (canDrop) {
+ writestate.flushing = false;
+ this.prepareFlushResult = null;
+ }
+ }
+
+
+ // advance the mvcc read point so that the new flushed file is visible.
+ // there may be some in-flight transactions, but they won't be made visible since they are
+ // either greater than flush seq number or they were already dropped via flush.
+ getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
+
+ // C. Finally notify anyone waiting on memstore to clear:
+ // e.g. checkResources().
+ synchronized (this) {
+ notifyAll(); // FindBugs NN_NAKED_NOTIFY
+ }
+ }
+ } finally {
+ closeRegionOperation(Operation.REPLAY_EVENT);
+ }
+ }
+
+ /** Checks whether the given regionName is either equal to our region, or that
+ * the regionName is the primary region to our corresponding range for the secondary replica.
+ */
+ private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload)
+ throws WrongRegionException {
+ if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) {
return;
}
- store.completeCompactionMarker(compaction);
+
+ if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) &&
+ Bytes.equals(encodedRegionName,
+ this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) {
+ return;
+ }
+
+ throw new WrongRegionException(exceptionMsg + payload
+ + " targetted for region " + Bytes.toStringBinary(encodedRegionName)
+ + " does not match this region: " + this.getRegionInfo());
}
/**
@@ -4127,8 +4779,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param familyPaths List of Pair
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
* file about to be bulk loaded
- * @param assignSeqId Force a flush, get it's sequenceId to preserve the guarantee that
- * all the edits lower than the highest sequential ID from all the
+ * @param assignSeqId Force a flush, get it's sequenceId to preserve the guarantee that
+ * all the edits lower than the highest sequential ID from all the
* HFiles are flushed on disk.
* @return true if successful, false if failed recoverably
* @throws IOException if failed unrecoverably.
@@ -4217,7 +4869,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
}
store.bulkLoadHFile(finalPath, seqId);
-
+
if(storeFiles.containsKey(familyName)) {
storeFiles.get(familyName).add(new Path(finalPath));
} else {
@@ -4265,7 +4917,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
}
-
+
closeBulkRegionOperation();
}
}
@@ -4989,7 +5641,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
checkClassLoading();
this.openSeqNum = initialize(reporter);
this.setSequenceId(openSeqNum);
- if (wal != null && getRegionServerServices() != null) {
+ if (wal != null && getRegionServerServices() != null && !writestate.readOnly) {
writeRegionOpenMarker(wal, openSeqNum);
}
return this;
@@ -5660,7 +6312,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
if (cell.getTagsLength() > 0) {
- Iterator i = CellUtil.tagsIterator(cell.getTagsArray(),
+ Iterator i = CellUtil.tagsIterator(cell.getTagsArray(),
cell.getTagsOffset(), cell.getTagsLength());
while (i.hasNext()) {
newTags.add(i.next());
@@ -6080,8 +6732,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
- (11 * Bytes.SIZEOF_LONG) +
+ 45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+ (12 * Bytes.SIZEOF_LONG) +
4 * Bytes.SIZEOF_BOOLEAN);
// woefully out of date - currently missing:
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 0751634..014ec2c 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -119,6 +119,10 @@ public class HRegionFileSystem {
return this.regionInfo;
}
+ public HRegionInfo getRegionInfoForFS() {
+ return this.regionInfoForFs;
+ }
+
/** @return {@link Path} to the region's root directory. */
public Path getTableDir() {
return this.tableDir;
@@ -205,7 +209,7 @@ public class HRegionFileSystem {
continue;
}
StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
- regionInfoForFs, familyName, status);
+ regionInfoForFs, familyName, status.getPath());
storeFiles.add(info);
}
@@ -234,8 +238,8 @@ public class HRegionFileSystem {
StoreFileInfo getStoreFileInfo(final String familyName, final String fileName)
throws IOException {
Path familyDir = getStoreDir(familyName);
- FileStatus status = fs.getFileStatus(new Path(familyDir, fileName));
- return new StoreFileInfo(this.conf, this.fs, status);
+ return ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
+ regionInfoForFs, familyName, new Path(familyDir, fileName));
}
/**
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 252e5e1..df9e482 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -399,6 +399,11 @@ public class HStore implements Store {
}
@Override
+ public long getSnapshotSize() {
+ return this.memstore.getSnapshotSize();
+ }
+
+ @Override
public long getCompactionCheckMultiplier() {
return this.compactionCheckMultiplier;
}
@@ -448,7 +453,8 @@ public class HStore implements Store {
/**
* @return The maximum sequence id in all store files. Used for log replay.
*/
- long getMaxSequenceId() {
+ @Override
+ public long getMaxSequenceId() {
return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
}
@@ -576,11 +582,31 @@ public class HStore implements Store {
*/
@Override
public void refreshStoreFiles() throws IOException {
+ Collection newFiles = fs.getStoreFiles(getColumnFamilyName());
+ refreshStoreFilesInternal(newFiles);
+ }
+
+ @Override
+ public void refreshStoreFiles(Collection newFiles) throws IOException {
+ List storeFiles = new ArrayList(newFiles.size());
+ for (String file : newFiles) {
+ storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
+ }
+ refreshStoreFilesInternal(storeFiles);
+ }
+
+ /**
+ * Checks the underlying store files, and opens the files that have not
+ * been opened, and removes the store file readers for store files no longer
+ * available. Mainly used by secondary region replicas to keep up to date with
+ * the primary region files.
+ * @throws IOException
+ */
+ private void refreshStoreFilesInternal(Collection newFiles) throws IOException {
StoreFileManager sfm = storeEngine.getStoreFileManager();
Collection currentFiles = sfm.getStorefiles();
if (currentFiles == null) currentFiles = new ArrayList(0);
- Collection newFiles = fs.getStoreFiles(getColumnFamilyName());
if (newFiles == null) newFiles = new ArrayList(0);
HashMap currentFilesSet = new HashMap(currentFiles.size());
@@ -1011,7 +1037,9 @@ public class HStore implements Store {
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
- this.memstore.clearSnapshot(snapshotId);
+ if (snapshotId > 0) {
+ this.memstore.clearSnapshot(snapshotId);
+ }
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
@@ -1311,10 +1339,12 @@ public class HStore implements Store {
* @param compaction
*/
@Override
- public void completeCompactionMarker(CompactionDescriptor compaction)
+ public void replayCompactionMarker(CompactionDescriptor compaction,
+ boolean pickCompactionFiles, boolean removeFiles)
throws IOException {
LOG.debug("Completing compaction from the WAL marker");
List compactionInputs = compaction.getCompactionInputList();
+ List compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
// The Compaction Marker is written after the compaction is completed,
// and the files moved into the region/family folder.
@@ -1331,22 +1361,40 @@ public class HStore implements Store {
// being in the store's folder) or they may be missing due to a compaction.
String familyName = this.getColumnFamilyName();
- List inputPaths = new ArrayList(compactionInputs.size());
+ List inputFiles = new ArrayList(compactionInputs.size());
for (String compactionInput : compactionInputs) {
Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
- inputPaths.add(inputPath);
+ inputFiles.add(inputPath.getName());
}
//some of the input files might already be deleted
List inputStoreFiles = new ArrayList(compactionInputs.size());
for (StoreFile sf : this.getStorefiles()) {
- if (inputPaths.contains(sf.getQualifiedPath())) {
+ if (inputFiles.contains(sf.getPath().getName())) {
inputStoreFiles.add(sf);
}
}
- this.replaceStoreFiles(inputStoreFiles, Collections.emptyList());
- this.completeCompaction(inputStoreFiles);
+ // check whether we need to pick up the new files
+ List outputStoreFiles = new ArrayList(compactionOutputs.size());
+
+ if (pickCompactionFiles) {
+ for (StoreFile sf : this.getStorefiles()) {
+ compactionOutputs.remove(sf.getPath().getName());
+ }
+ for (String compactionOutput : compactionOutputs) {
+ StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
+ StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
+ outputStoreFiles.add(storeFile);
+ }
+ }
+
+ if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
+ LOG.info("Replaying compaction marker, replacing input files: " +
+ inputStoreFiles + " with output files : " + outputStoreFiles);
+ this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
+ this.completeCompaction(inputStoreFiles, removeFiles);
+ }
}
/**
@@ -2175,6 +2223,47 @@ public class HStore implements Store {
public List getCommittedFiles() {
return committedFiles;
}
+
+ /**
+ * Similar to commit, but called in secondary region replicas for replaying the
+ * flush cache from primary region. Adds the new files to the store, and drops the
+ * snapshot depending on dropMemstoreSnapshot argument.
+ * @param fileNames names of the flushed files
+ * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
+ * @throws IOException
+ */
+ @Override
+ public void replayFlush(List fileNames, boolean dropMemstoreSnapshot)
+ throws IOException {
+ List storeFiles = new ArrayList(fileNames.size());
+ for (String file : fileNames) {
+ // open the file as a store file (hfile link, etc)
+ StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
+ StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
+ storeFiles.add(storeFile);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
+ " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
+ ", sequenceid=" + + storeFile.getReader().getSequenceID() +
+ ", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length()));
+ }
+ }
+
+ long snapshotId = dropMemstoreSnapshot ? snapshot.getId() : -1; // -1 means do not drop
+ HStore.this.updateStorefiles(storeFiles, snapshotId);
+ }
+
+ /**
+ * Abort the snapshot preparation. Drops the snapshot if any.
+ * @throws IOException
+ */
+ @Override
+ public void abort() throws IOException {
+ if (snapshot == null) {
+ return;
+ }
+ HStore.this.updateStorefiles(new ArrayList(0), snapshot.getId());
+ }
}
@Override
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 7fa81fc..364b9c9 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -60,6 +60,12 @@ public interface MemStore extends HeapSize {
long getFlushableSize();
/**
+ * Return the size of the snapshot(s) if any
+ * @return size of the memstore snapshot
+ */
+ long getSnapshotSize();
+
+ /**
* Write an update
* @param cell
* @return approximate size of the passed KV and the newly added KV which maybe different from the
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 3653cfb..3944ae8 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -144,6 +144,8 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
@@ -712,8 +714,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (metaCells != null && !metaCells.isEmpty()) {
for (Cell metaCell : metaCells) {
CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
+ boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
if (compactionDesc != null) {
- region.completeCompactionMarker(compactionDesc);
+ // replay the compaction. Remove the files from stores only if we are the primary
+ // region replica (thus own the files)
+ region.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
+ replaySeqId);
+ continue;
+ }
+ FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
+ if (flushDesc != null && !isDefaultReplica) {
+ region.replayWALFlushMarker(flushDesc);
+ continue;
+ }
+ RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
+ if (regionEvent != null && !isDefaultReplica) {
+ region.replayWALRegionEventMarker(regionEvent);
+ continue;
}
}
it.remove();
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 0c420b5..6a422a9 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -213,9 +213,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* Call to complete a compaction. Its for the case where we find in the WAL a compaction
* that was not finished. We could find one recovering a WAL after a regionserver crash.
* See HBASE-2331.
- * @param compaction
+ * @param compaction the descriptor for compaction
+ * @param pickCompactionFiles whether or not pick up the new compaction output files and
+ * add it to the store
+ * @param removeFiles whether to remove/archive files from filesystem
*/
- void completeCompactionMarker(CompactionDescriptor compaction)
+ void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
+ boolean removeFiles)
throws IOException;
// Split oriented methods
@@ -265,9 +269,20 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
*/
long getFlushableSize();
+ /**
+ * Returns the memstore snapshot size
+ * @return size of the memstore snapshot
+ */
+ long getSnapshotSize();
+
HColumnDescriptor getFamily();
/**
+ * @return The maximum sequence id in all store files.
+ */
+ long getMaxSequenceId();
+
+ /**
* @return The maximum memstoreTS in all store files.
*/
long getMaxMemstoreTS();
@@ -416,4 +431,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* linear formula.
*/
double getCompactionPressure();
+
+ /**
+ * Replaces the store files that the store has with the given files. Mainly used by
+ * secondary region replicas to keep up to date with
+ * the primary region files.
+ * @throws IOException
+ */
+ void refreshStoreFiles(Collection newFiles) throws IOException;
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
index 0c2fe6f..34ba1fa 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
@@ -65,6 +65,22 @@ interface StoreFlushContext {
boolean commit(MonitoredTask status) throws IOException;
/**
+ * Similar to commit, but called in secondary region replicas for replaying the
+ * flush cache from primary region. Adds the new files to the store, and drops the
+ * snapshot depending on dropMemstoreSnapshot argument.
+ * @param fileNames names of the flushed files
+ * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
+ * @throws IOException
+ */
+ void replayFlush(List fileNames, boolean dropMemstoreSnapshot) throws IOException;
+
+ /**
+ * Abort the snapshot preparation. Drops the snapshot if any.
+ * @throws IOException
+ */
+ void abort() throws IOException;
+
+ /**
* Returns the newly committed files from the flush. Called only if commit returns true
* @return a list of Paths for new files
*/
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
new file mode 100644
index 0000000..4506b19
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * An HLogKey specific to WalEdits coming from replay.
+ */
+@InterfaceAudience.Private
+public class ReplayHLogKey extends HLogKey {
+
+ public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
+ final long now, List clusterIds, long nonceGroup, long nonce) {
+ super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
+ }
+
+ public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
+ long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) {
+ super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+ }
+
+ /**
+ * Returns the original sequence id
+ * @return long the new assigned sequence number
+ * @throws InterruptedException
+ */
+ @Override
+ public long getSequenceId() throws IOException {
+ return this.getOrigLogSeqNum();
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index c3d4e5a..fc19603 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -287,7 +287,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
@Override
public List finishWritingAndClose() throws IOException {
- finishWriting();
+ finishWriting(true);
return null;
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index cf87219..7bcee0b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
@@ -96,23 +96,24 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
* @throws IOException
*/
public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs,
- HRegionInfo regionInfo, HRegionInfo regionInfoForFs, String familyName, FileStatus status)
+ HRegionInfo regionInfo, HRegionInfo regionInfoForFs, String familyName, Path path)
throws IOException {
// if this is a primary region, just return the StoreFileInfo constructed from path
if (regionInfo.equals(regionInfoForFs)) {
- return new StoreFileInfo(conf, fs, status);
- }
-
- if (StoreFileInfo.isReference(status.getPath())) {
- Reference reference = Reference.read(fs, status.getPath());
- return new StoreFileInfo(conf, fs, status, reference);
+ return new StoreFileInfo(conf, fs, path);
}
// else create a store file link. The link file does not exists on filesystem though.
HFileLink link = HFileLink.build(conf, regionInfoForFs.getTable(),
- regionInfoForFs.getEncodedName(), familyName, status.getPath().getName());
- return new StoreFileInfo(conf, fs, status, link);
+ regionInfoForFs.getEncodedName(), familyName, path.getName());
+
+ if (StoreFileInfo.isReference(path)) {
+ Reference reference = Reference.read(fs, path);
+ return new StoreFileInfo(conf, fs, link.getFileStatus(fs), reference);
+ }
+
+ return new StoreFileInfo(conf, fs, link.getFileStatus(fs), link);
}
/**
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 4d8cc2d..53f46b4 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -1186,12 +1186,18 @@ public class WALSplitter {
* @return true when there is no error
* @throws IOException
*/
- protected boolean finishWriting() throws IOException {
+ protected boolean finishWriting(boolean interrupt) throws IOException {
LOG.debug("Waiting for split writer threads to finish");
boolean progress_failed = false;
for (WriterThread t : writerThreads) {
t.finish();
}
+ if (interrupt) {
+ for (WriterThread t : writerThreads) {
+ t.interrupt(); // interrupt the writer threads. We are stopping now.
+ }
+ }
+
for (WriterThread t : writerThreads) {
if (!progress_failed && reporter != null && !reporter.progress()) {
progress_failed = true;
@@ -1260,7 +1266,7 @@ public class WALSplitter {
boolean isSuccessful = false;
List result = null;
try {
- isSuccessful = finishWriting();
+ isSuccessful = finishWriting(false);
} finally {
result = close();
List thrown = closeLogWriters(null);
@@ -1960,7 +1966,7 @@ public class WALSplitter {
@Override
public List finishWritingAndClose() throws IOException {
try {
- if (!finishWriting()) {
+ if (!finishWriting(false)) {
return null;
}
if (hasEditsInDisablingOrDisabledTables) {
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index ea06346..2930f72 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -61,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -4655,7 +4656,7 @@ public class TestHRegion {
// create a primary region, load some data and flush
// create a secondary region, and do a get against that
Path rootDir = new Path(dir + "testRegionReplicaSecondary");
- FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
+ FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
byte[][] families = new byte[][] {
Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
@@ -4755,6 +4756,14 @@ public class TestHRegion {
}
}
+ static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
+ Configuration confForWAL = new Configuration(conf);
+ confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
+ return new WALFactory(confForWAL,
+ Collections.singletonList(new MetricsWAL()),
+ "hregion-" + RandomStringUtils.randomNumeric(8));
+ }
+
@Test
public void testCompactionFromPrimary() throws IOException {
Path rootDir = new Path(dir + "testRegionReplicaSecondary");
@@ -4815,9 +4824,14 @@ public class TestHRegion {
private void putData(HRegion region,
int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
+ putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
+ }
+
+ static void putData(HRegion region, Durability durability,
+ int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
for (int i = startRow; i < startRow + numRows; i++) {
Put put = new Put(Bytes.toBytes("" + i));
- put.setDurability(Durability.SKIP_WAL);
+ put.setDurability(durability);
for (byte[] family : families) {
put.add(family, qf, null);
}
@@ -4825,7 +4839,7 @@ public class TestHRegion {
}
}
- private void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
+ static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
throws IOException {
for (int i = startRow; i < startRow + numRows; i++) {
byte[] row = Bytes.toBytes("" + i);
@@ -4844,7 +4858,7 @@ public class TestHRegion {
}
}
- private void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
+ static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
// Now I have k, get values out and assert they are as expected.
Get get = new Get(k).addFamily(family).setMaxVersions();
Cell[] results = r.get(get).rawCells();
@@ -4991,7 +5005,7 @@ public class TestHRegion {
return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
}
- private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
+ public static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
throws IOException {
Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
@@ -5013,7 +5027,7 @@ public class TestHRegion {
* @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
*/
- private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
+ public static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
WAL wal, byte[]... families) throws IOException {
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
@@ -6028,7 +6042,7 @@ public class TestHRegion {
}
}
- private static HRegion initHRegion(byte[] tableName, String callingMethod,
+ static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
families);
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
new file mode 100644
index 0000000..09e9d5e
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -0,0 +1,1162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.apache.hadoop.hbase.regionserver.TestHRegion.*;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
+import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
+import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+/**
+ * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
+ * region replicas
+ */
+@Category(MediumTests.class)
+public class TestHRegionReplayEvents {
+
+ static final Log LOG = LogFactory.getLog(TestHRegion.class);
+ @Rule public TestName name = new TestName();
+
+ private static HBaseTestingUtility TEST_UTIL;
+
+ public static Configuration CONF ;
+ private String dir;
+ private static FileSystem FILESYSTEM;
+
+ private byte[][] families = new byte[][] {
+ Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")};
+
+ // Test names
+ protected byte[] tableName;
+ protected String method;
+ protected final byte[] row = Bytes.toBytes("rowA");
+ protected final byte[] row2 = Bytes.toBytes("rowB");
+ protected byte[] cq = Bytes.toBytes("cq");
+
+ // per test fields
+ private Path rootDir;
+ private HTableDescriptor htd;
+ private long time;
+ private RegionServerServices rss;
+ private HRegionInfo primaryHri, secondaryHri;
+ private HRegion primaryRegion, secondaryRegion;
+ private WALFactory wals;
+ private WAL walPrimary, walSecondary;
+ private WAL.Reader reader;
+
+ @Before
+ public void setup() throws IOException {
+ TEST_UTIL = HBaseTestingUtility.createLocalHTU();
+ FILESYSTEM = TEST_UTIL.getTestFileSystem();
+ CONF = TEST_UTIL.getConfiguration();
+ dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
+ method = name.getMethodName();
+ tableName = Bytes.toBytes(name.getMethodName());
+ rootDir = new Path(dir + method);
+ TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
+ method = name.getMethodName();
+
+ htd = new HTableDescriptor(TableName.valueOf(method));
+ for (byte[] family : families) {
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+
+ time = System.currentTimeMillis();
+
+ primaryHri = new HRegionInfo(htd.getTableName(),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
+ false, time, 0);
+ secondaryHri = new HRegionInfo(htd.getTableName(),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
+ false, time, 1);
+
+ wals = TestHRegion.createWALFactory(CONF, rootDir);
+ walPrimary = wals.getWAL(primaryHri.getEncodedNameAsBytes());
+ walSecondary = wals.getWAL(secondaryHri.getEncodedNameAsBytes());
+
+ rss = mock(RegionServerServices.class);
+ when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
+ when(rss.getConfiguration()).thenReturn(CONF);
+ when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
+
+ primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
+ primaryRegion.close();
+
+ primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
+ secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
+
+ reader = null;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (reader != null) {
+ reader.close();
+ }
+
+ if (primaryRegion != null) {
+ HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
+ }
+ if (secondaryRegion != null) {
+ HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
+ }
+
+ EnvironmentEdgeManagerTestHelper.reset();
+ LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
+ TEST_UTIL.cleanupTestDir();
+ }
+
+ String getName() {
+ return name.getMethodName();
+ }
+
+ // Some of the test cases are as follows:
+ // 1. replay flush start marker again
+ // 2. replay flush with smaller seqId than what is there in memstore snapshot
+ // 3. replay flush with larger seqId than what is there in memstore snapshot
+ // 4. replay flush commit without flush prepare. non droppable memstore
+ // 5. replay flush commit without flush prepare. droppable memstore
+ // 6. replay open region event
+ // 7. replay open region event after flush start
+ // 8. replay flush form an earlier seqId (test ignoring seqIds)
+ // 9. start flush does not prevent region from closing.
+
+ @Test
+ public void testRegionReplicaSecondaryCannotFlush() throws IOException {
+ // load some data and flush ensure that the secondary replica will not execute the flush
+
+ // load some data to secondary by replaying
+ putDataByReplay(secondaryRegion, 0, 1000, cq, families);
+
+ verifyData(secondaryRegion, 0, 1000, cq, families);
+
+ // flush region
+ FlushResult flush = secondaryRegion.flushcache();
+ assertEquals(flush.result, FlushResult.Result.CANNOT_FLUSH);
+
+ verifyData(secondaryRegion, 0, 1000, cq, families);
+
+ // close the region, and inspect that it has not flushed
+ Map> files = secondaryRegion.close(false);
+ // assert that there are no files (due to flush)
+ for (List f : files.values()) {
+ assertTrue(f.isEmpty());
+ }
+ }
+
+ /**
+ * Tests a case where we replay only a flush start marker, then the region is closed. This region
+ * should not block indefinitely
+ */
+ @Test (timeout = 60000)
+ public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
+ // load some data to primary and flush
+ int start = 0;
+ LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
+ putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
+ LOG.info("-- Flushing primary, creating 3 files for 3 stores");
+ primaryRegion.flushcache();
+
+ // now replay the edits and the flush marker
+ reader = createWALReaderForPrimary();
+
+ LOG.info("-- Replaying edits and flush events in secondary");
+ while (true) {
+ WAL.Entry entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ FlushDescriptor flushDesc
+ = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
+ if (flushDesc != null) {
+ if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+ LOG.info("-- Replaying flush start in secondary");
+ PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
+ } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
+ LOG.info("-- NOT Replaying flush commit in secondary");
+ }
+ } else {
+ replayEdit(secondaryRegion, entry);
+ }
+ }
+
+ assertTrue(rss.getRegionServerAccounting().getGlobalMemstoreSize() > 0);
+ // now close the region which should not cause hold because of un-committed flush
+ secondaryRegion.close();
+
+ // verify that the memstore size is back to what it was
+ assertEquals(0, rss.getRegionServerAccounting().getGlobalMemstoreSize());
+ }
+
+ static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
+ if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
+ return 0; // handled elsewhere
+ }
+ Put put = new Put(entry.getEdit().getCells().get(0).getRow());
+ for (Cell cell : entry.getEdit().getCells()) put.add(cell);
+ put.setDurability(Durability.SKIP_WAL);
+ MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
+ region.batchReplay(new MutationReplay[] {mutation},
+ entry.getKey().getLogSeqNum());
+ return Integer.parseInt(Bytes.toString(put.getRow()));
+ }
+
+ WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
+ return wals.createReader(TEST_UTIL.getTestFileSystem(),
+ DefaultWALProvider.getCurrentFileName(walPrimary),
+ TEST_UTIL.getConfiguration());
+ }
+
+ @Test
+ public void testReplayFlushesAndCompactions() throws IOException {
+ // initiate a secondary region with some data.
+
+ // load some data to primary and flush. 3 flushes and some more unflushed data
+ putDataWithFlushes(primaryRegion, 100, 300, 100);
+
+ // compaction from primary
+ LOG.info("-- Compacting primary, only 1 store");
+ primaryRegion.compactStore(Bytes.toBytes("cf1"),
+ NoLimitCompactionThroughputController.INSTANCE);
+
+ // now replay the edits and the flush marker
+ reader = createWALReaderForPrimary();
+
+ LOG.info("-- Replaying edits and flush events in secondary");
+ int lastReplayed = 0;
+ int expectedStoreFileCount = 0;
+ while (true) {
+ WAL.Entry entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ FlushDescriptor flushDesc
+ = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
+ CompactionDescriptor compactionDesc
+ = WALEdit.getCompaction(entry.getEdit().getCells().get(0));
+ if (flushDesc != null) {
+ // first verify that everything is replayed and visible before flush event replay
+ verifyData(secondaryRegion, 0, lastReplayed, cq, families);
+ Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
+ long storeMemstoreSize = store.getMemStoreSize();
+ long regionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+ long storeFlushableSize = store.getFlushableSize();
+ if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+ LOG.info("-- Replaying flush start in secondary");
+ PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
+ assertNull(result.result);
+ assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
+
+ // assert that the store memstore is smaller now
+ long newStoreMemstoreSize = store.getMemStoreSize();
+ LOG.info("Memstore size reduced by:"
+ + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
+ assertTrue(storeMemstoreSize > newStoreMemstoreSize);
+
+ } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
+ LOG.info("-- Replaying flush commit in secondary");
+ secondaryRegion.replayWALFlushCommitMarker(flushDesc);
+
+ // assert that the flush files are picked
+ expectedStoreFileCount++;
+ for (Store s : secondaryRegion.getStores().values()) {
+ assertEquals(expectedStoreFileCount, s.getStorefilesCount());
+ }
+ long newFlushableSize = store.getFlushableSize();
+ assertTrue(storeFlushableSize > newFlushableSize);
+
+ // assert that the region memstore is smaller now
+ long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+ assertTrue(regionMemstoreSize > newRegionMemstoreSize);
+ }
+ // after replay verify that everything is still visible
+ verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
+ } else if (compactionDesc != null) {
+ secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
+
+ // assert that the compaction is applied
+ for (Store store : secondaryRegion.getStores().values()) {
+ if (store.getColumnFamilyName().equals("cf1")) {
+ assertEquals(1, store.getStorefilesCount());
+ } else {
+ assertEquals(expectedStoreFileCount, store.getStorefilesCount());
+ }
+ }
+ } else {
+ lastReplayed = replayEdit(secondaryRegion, entry);;
+ }
+ }
+
+ assertEquals(400-1, lastReplayed);
+ LOG.info("-- Verifying edits from secondary");
+ verifyData(secondaryRegion, 0, 400, cq, families);
+
+ LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
+ verifyData(primaryRegion, 0, lastReplayed, cq, families);
+ for (Store store : primaryRegion.getStores().values()) {
+ if (store.getColumnFamilyName().equals("cf1")) {
+ assertEquals(1, store.getStorefilesCount());
+ } else {
+ assertEquals(expectedStoreFileCount, store.getStorefilesCount());
+ }
+ }
+ }
+
+ /**
+ * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
+ * equal to, greater or less than the previous flush start marker.
+ */
+ @Test
+ public void testReplayFlushStartMarkers() throws IOException {
+ // load some data to primary and flush. 1 flush and some more unflushed data
+ putDataWithFlushes(primaryRegion, 100, 100, 100);
+ int numRows = 200;
+
+ // now replay the edits and the flush marker
+ reader = createWALReaderForPrimary();
+
+ LOG.info("-- Replaying edits and flush events in secondary");
+
+ FlushDescriptor startFlushDesc = null;
+
+ int lastReplayed = 0;
+ while (true) {
+ WAL.Entry entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ FlushDescriptor flushDesc
+ = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
+ if (flushDesc != null) {
+ // first verify that everything is replayed and visible before flush event replay
+ Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
+ long storeMemstoreSize = store.getMemStoreSize();
+ long regionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+ long storeFlushableSize = store.getFlushableSize();
+
+ if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+ startFlushDesc = flushDesc;
+ LOG.info("-- Replaying flush start in secondary");
+ PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
+ assertNull(result.result);
+ assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
+ assertTrue(regionMemstoreSize > 0);
+ assertTrue(storeFlushableSize > 0);
+
+ // assert that the store memstore is smaller now
+ long newStoreMemstoreSize = store.getMemStoreSize();
+ LOG.info("Memstore size reduced by:"
+ + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
+ assertTrue(storeMemstoreSize > newStoreMemstoreSize);
+ verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
+
+ }
+ // after replay verify that everything is still visible
+ verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
+ } else {
+ lastReplayed = replayEdit(secondaryRegion, entry);
+ }
+ }
+
+ // at this point, there should be some data (rows 0-100) in memstore snapshot
+ // and some more data in memstores (rows 100-200)
+
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ // Test case 1: replay the same flush start marker again
+ LOG.info("-- Replaying same flush start in secondary again");
+ PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
+ assertNull(result); // this should return null. Ignoring the flush start marker
+ // assert that we still have prepared flush with the previous setup.
+ assertNotNull(secondaryRegion.getPrepareFlushResult());
+ assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
+ startFlushDesc.getFlushSequenceNumber());
+ assertTrue(secondaryRegion.getMemstoreSize().get() > 0); // memstore is not empty
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ // Test case 2: replay a flush start marker with a smaller seqId
+ FlushDescriptor startFlushDescSmallerSeqId
+ = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
+ LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
+ result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
+ assertNull(result); // this should return null. Ignoring the flush start marker
+ // assert that we still have prepared flush with the previous setup.
+ assertNotNull(secondaryRegion.getPrepareFlushResult());
+ assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
+ startFlushDesc.getFlushSequenceNumber());
+ assertTrue(secondaryRegion.getMemstoreSize().get() > 0); // memstore is not empty
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ // Test case 3: replay a flush start marker with a larger seqId
+ FlushDescriptor startFlushDescLargerSeqId
+ = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
+ LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
+ result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
+ assertNull(result); // this should return null. Ignoring the flush start marker
+ // assert that we still have prepared flush with the previous setup.
+ assertNotNull(secondaryRegion.getPrepareFlushResult());
+ assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
+ startFlushDesc.getFlushSequenceNumber());
+ assertTrue(secondaryRegion.getMemstoreSize().get() > 0); // memstore is not empty
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ LOG.info("-- Verifying edits from secondary");
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ LOG.info("-- Verifying edits from primary.");
+ verifyData(primaryRegion, 0, numRows, cq, families);
+ }
+
+ /**
+ * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
+ * less than the previous flush start marker.
+ */
+ @Test
+ public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
+ // load some data to primary and flush. 2 flushes and some more unflushed data
+ putDataWithFlushes(primaryRegion, 100, 200, 100);
+ int numRows = 300;
+
+ // now replay the edits and the flush marker
+ reader = createWALReaderForPrimary();
+
+ LOG.info("-- Replaying edits and flush events in secondary");
+ FlushDescriptor startFlushDesc = null;
+ FlushDescriptor commitFlushDesc = null;
+
+ int lastReplayed = 0;
+ while (true) {
+ System.out.println(lastReplayed);
+ WAL.Entry entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ FlushDescriptor flushDesc
+ = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
+ if (flushDesc != null) {
+ if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+ // don't replay the first flush start marker, hold on to it, replay the second one
+ if (startFlushDesc == null) {
+ startFlushDesc = flushDesc;
+ } else {
+ LOG.info("-- Replaying flush start in secondary");
+ startFlushDesc = flushDesc;
+ PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
+ assertNull(result.result);
+ }
+ } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
+ // do not replay any flush commit yet
+ if (commitFlushDesc == null) {
+ commitFlushDesc = flushDesc; // hold on to the first flush commit marker
+ }
+ }
+ // after replay verify that everything is still visible
+ verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
+ } else {
+ lastReplayed = replayEdit(secondaryRegion, entry);
+ }
+ }
+
+ // at this point, there should be some data (rows 0-200) in memstore snapshot
+ // and some more data in memstores (rows 200-300)
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ // no store files in the region
+ int expectedStoreFileCount = 0;
+ for (Store s : secondaryRegion.getStores().values()) {
+ assertEquals(expectedStoreFileCount, s.getStorefilesCount());
+ }
+ long regionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+
+ // Test case 1: replay the a flush commit marker smaller than what we have prepared
+ LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
+ + startFlushDesc);
+ assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
+
+ LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
+ secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
+
+ // assert that the flush files are picked
+ expectedStoreFileCount++;
+ for (Store s : secondaryRegion.getStores().values()) {
+ assertEquals(expectedStoreFileCount, s.getStorefilesCount());
+ }
+ Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
+ long newFlushableSize = store.getFlushableSize();
+ assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
+
+ // assert that the region memstore is same as before
+ long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+ assertEquals(regionMemstoreSize, newRegionMemstoreSize);
+
+ assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
+
+ LOG.info("-- Verifying edits from secondary");
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ LOG.info("-- Verifying edits from primary.");
+ verifyData(primaryRegion, 0, numRows, cq, families);
+ }
+
+ /**
+ * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
+ * larger than the previous flush start marker.
+ */
+ @Test
+ public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
+ // load some data to primary and flush. 1 flush and some more unflushed data
+ putDataWithFlushes(primaryRegion, 100, 100, 100);
+ int numRows = 200;
+
+ // now replay the edits and the flush marker
+ reader = createWALReaderForPrimary();
+
+ LOG.info("-- Replaying edits and flush events in secondary");
+ FlushDescriptor startFlushDesc = null;
+ FlushDescriptor commitFlushDesc = null;
+
+ int lastReplayed = 0;
+ while (true) {
+ WAL.Entry entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ FlushDescriptor flushDesc
+ = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
+ if (flushDesc != null) {
+ if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+ if (startFlushDesc == null) {
+ LOG.info("-- Replaying flush start in secondary");
+ startFlushDesc = flushDesc;
+ PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
+ assertNull(result.result);
+ }
+ } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
+ // do not replay any flush commit yet
+ // hold on to the flush commit marker but simulate a larger
+ // flush commit seqId
+ commitFlushDesc =
+ FlushDescriptor.newBuilder(flushDesc)
+ .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50)
+ .build();
+ }
+ // after replay verify that everything is still visible
+ verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
+ } else {
+ lastReplayed = replayEdit(secondaryRegion, entry);
+ }
+ }
+
+ // at this point, there should be some data (rows 0-100) in memstore snapshot
+ // and some more data in memstores (rows 100-200)
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ // no store files in the region
+ int expectedStoreFileCount = 0;
+ for (Store s : secondaryRegion.getStores().values()) {
+ assertEquals(expectedStoreFileCount, s.getStorefilesCount());
+ }
+ long regionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+
+ // Test case 1: replay the a flush commit marker larger than what we have prepared
+ LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
+ + startFlushDesc);
+ assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
+
+ LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
+ secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
+
+ // assert that the flush files are picked
+ expectedStoreFileCount++;
+ for (Store s : secondaryRegion.getStores().values()) {
+ assertEquals(expectedStoreFileCount, s.getStorefilesCount());
+ }
+ Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
+ long newFlushableSize = store.getFlushableSize();
+ assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
+
+ // assert that the region memstore is smaller than before, but not empty
+ long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+ assertTrue(newRegionMemstoreSize > 0);
+ assertTrue(regionMemstoreSize > newRegionMemstoreSize);
+
+ assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
+
+ LOG.info("-- Verifying edits from secondary");
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ LOG.info("-- Verifying edits from primary.");
+ verifyData(primaryRegion, 0, numRows, cq, families);
+ }
+
+ /**
+ * Tests the case where we receive a flush commit before receiving any flush prepare markers.
+ * The memstore edits should be dropped after the flush commit replay since they should be in
+ * flushed files
+ */
+ @Test
+ public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
+ throws IOException {
+ testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
+ }
+
+ /**
+ * Tests the case where we receive a flush commit before receiving any flush prepare markers.
+ * The memstore edits should be not dropped after the flush commit replay since not every edit
+ * will be in flushed files (based on seqId)
+ */
+ @Test
+ public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
+ throws IOException {
+ testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
+ }
+
+ /**
+ * Tests the case where we receive a flush commit before receiving any flush prepare markers
+ */
+ public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
+ throws IOException {
+ // load some data to primary and flush. 1 flushes and some more unflushed data.
+ // write more data after flush depending on whether droppableSnapshot
+ putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
+ int numRows = droppableMemstore ? 100 : 200;
+
+ // now replay the edits and the flush marker
+ reader = createWALReaderForPrimary();
+
+ LOG.info("-- Replaying edits and flush events in secondary");
+ FlushDescriptor commitFlushDesc = null;
+
+ int lastReplayed = 0;
+ while (true) {
+ WAL.Entry entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ FlushDescriptor flushDesc
+ = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
+ if (flushDesc != null) {
+ if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+ // do not replay flush start marker
+ } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
+ commitFlushDesc = flushDesc; // hold on to the flush commit marker
+ }
+ // after replay verify that everything is still visible
+ verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
+ } else {
+ lastReplayed = replayEdit(secondaryRegion, entry);
+ }
+ }
+
+ // at this point, there should be some data (rows 0-200) in the memstore without snapshot
+ // and some more data in memstores (rows 100-300)
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ // no store files in the region
+ int expectedStoreFileCount = 0;
+ for (Store s : secondaryRegion.getStores().values()) {
+ assertEquals(expectedStoreFileCount, s.getStorefilesCount());
+ }
+ long regionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+
+ // Test case 1: replay a flush commit marker without start flush marker
+ assertNull(secondaryRegion.getPrepareFlushResult());
+ assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
+
+ // ensure all files are visible in secondary
+ for (Store store : secondaryRegion.getStores().values()) {
+ assertTrue(store.getMaxSequenceId() <= secondaryRegion.getSequenceId().get());
+ }
+
+ LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
+ secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
+
+ // assert that the flush files are picked
+ expectedStoreFileCount++;
+ for (Store s : secondaryRegion.getStores().values()) {
+ assertEquals(expectedStoreFileCount, s.getStorefilesCount());
+ }
+ Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
+ long newFlushableSize = store.getFlushableSize();
+ if (droppableMemstore) {
+ assertTrue(newFlushableSize == 0); // assert that the memstore is dropped
+ } else {
+ assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
+ }
+
+ // assert that the region memstore is same as before (we could not drop)
+ long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+ if (droppableMemstore) {
+ assertTrue(0 == newRegionMemstoreSize);
+ } else {
+ assertTrue(regionMemstoreSize == newRegionMemstoreSize);
+ }
+
+ LOG.info("-- Verifying edits from secondary");
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ LOG.info("-- Verifying edits from primary.");
+ verifyData(primaryRegion, 0, numRows, cq, families);
+ }
+
+ private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
+ return FlushDescriptor.newBuilder(flush)
+ .setFlushSequenceNumber(flushSeqId)
+ .build();
+ }
+
+ /**
+ * Tests replaying region open markers from primary region. Checks whether the files are picked up
+ */
+ @Test
+ public void testReplayRegionOpenEvent() throws IOException {
+ putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
+ int numRows = 100;
+
+ // close the region and open again.
+ primaryRegion.close();
+ primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
+
+ // now replay the edits and the flush marker
+ reader = createWALReaderForPrimary();
+ List regionEvents = Lists.newArrayList();
+
+ LOG.info("-- Replaying edits and region events in secondary");
+ while (true) {
+ WAL.Entry entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ FlushDescriptor flushDesc
+ = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
+ RegionEventDescriptor regionEventDesc
+ = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
+
+ if (flushDesc != null) {
+ // don't replay flush events
+ } else if (regionEventDesc != null) {
+ regionEvents.add(regionEventDesc);
+ } else {
+ // don't replay edits
+ }
+ }
+
+ // we should have 1 open, 1 close and 1 open event
+ assertEquals(3, regionEvents.size());
+
+ // replay the first region open event.
+ secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
+
+ // replay the close event as well
+ secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
+
+ // no store files in the region
+ int expectedStoreFileCount = 0;
+ for (Store s : secondaryRegion.getStores().values()) {
+ assertEquals(expectedStoreFileCount, s.getStorefilesCount());
+ }
+ long regionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+ assertTrue(regionMemstoreSize == 0);
+
+ // now replay the region open event that should contain new file locations
+ LOG.info("Testing replaying region open event " + regionEvents.get(2));
+ secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
+
+ // assert that the flush files are picked
+ expectedStoreFileCount++;
+ for (Store s : secondaryRegion.getStores().values()) {
+ assertEquals(expectedStoreFileCount, s.getStorefilesCount());
+ }
+ Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
+ long newFlushableSize = store.getFlushableSize();
+ assertTrue(newFlushableSize == 0);
+
+ // assert that the region memstore is empty
+ long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+ assertTrue(newRegionMemstoreSize == 0);
+
+ assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
+
+ LOG.info("-- Verifying edits from secondary");
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ LOG.info("-- Verifying edits from primary.");
+ verifyData(primaryRegion, 0, numRows, cq, families);
+ }
+
+ /**
+ * Tests the case where we replay a region open event after a flush start but before receiving
+ * flush commit
+ */
+ @Test
+ public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
+ putDataWithFlushes(primaryRegion, 100, 100, 100);
+ int numRows = 200;
+
+ // close the region and open again.
+ primaryRegion.close();
+ primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
+
+ // now replay the edits and the flush marker
+ reader = createWALReaderForPrimary();
+ List regionEvents = Lists.newArrayList();
+
+ LOG.info("-- Replaying edits and region events in secondary");
+ while (true) {
+ WAL.Entry entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ FlushDescriptor flushDesc
+ = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
+ RegionEventDescriptor regionEventDesc
+ = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
+
+ if (flushDesc != null) {
+ // only replay flush start
+ if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+ secondaryRegion.replayWALFlushStartMarker(flushDesc);
+ }
+ } else if (regionEventDesc != null) {
+ regionEvents.add(regionEventDesc);
+ } else {
+ replayEdit(secondaryRegion, entry);
+ }
+ }
+
+ // at this point, there should be some data (rows 0-100) in the memstore snapshot
+ // and some more data in memstores (rows 100-200)
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ // we should have 1 open, 1 close and 1 open event
+ assertEquals(3, regionEvents.size());
+
+ // no store files in the region
+ int expectedStoreFileCount = 0;
+ for (Store s : secondaryRegion.getStores().values()) {
+ assertEquals(expectedStoreFileCount, s.getStorefilesCount());
+ }
+
+ // now replay the region open event that should contain new file locations
+ LOG.info("Testing replaying region open event " + regionEvents.get(2));
+ secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
+
+ // assert that the flush files are picked
+ expectedStoreFileCount = 2; // two flushes happened
+ for (Store s : secondaryRegion.getStores().values()) {
+ assertEquals(expectedStoreFileCount, s.getStorefilesCount());
+ }
+ Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
+ long newSnapshotSize = store.getSnapshotSize();
+ assertTrue(newSnapshotSize == 0);
+
+ // assert that the region memstore is empty
+ long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get();
+ assertTrue(newRegionMemstoreSize == 0);
+
+ assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
+
+ LOG.info("-- Verifying edits from secondary");
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+
+ LOG.info("-- Verifying edits from primary.");
+ verifyData(primaryRegion, 0, numRows, cq, families);
+ }
+
+ /**
+ * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
+ * of the last replayed region open event.
+ */
+ @Test
+ public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
+ putDataWithFlushes(primaryRegion, 100, 100, 0);
+ int numRows = 100;
+
+ // close the region and open again.
+ primaryRegion.close();
+ primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
+
+ // now replay the edits and the flush marker
+ reader = createWALReaderForPrimary();
+ List regionEvents = Lists.newArrayList();
+ List edits = Lists.newArrayList();
+
+ LOG.info("-- Replaying edits and region events in secondary");
+ while (true) {
+ WAL.Entry entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ FlushDescriptor flushDesc
+ = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
+ RegionEventDescriptor regionEventDesc
+ = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
+
+ if (flushDesc != null) {
+ // don't replay flushes
+ } else if (regionEventDesc != null) {
+ regionEvents.add(regionEventDesc);
+ } else {
+ edits.add(entry);
+ }
+ }
+
+ // replay the region open of first open, but with the seqid of the second open
+ // this way non of the flush files will be picked up.
+ secondaryRegion.replayWALRegionEventMarker(
+ RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber(
+ regionEvents.get(2).getLogSequenceNumber()).build());
+
+
+ // replay edits from the before region close. If replay does not
+ // skip these the following verification will NOT fail.
+ for (WAL.Entry entry: edits) {
+ replayEdit(secondaryRegion, entry);
+ }
+
+ boolean expectedFail = false;
+ try {
+ verifyData(secondaryRegion, 0, numRows, cq, families);
+ } catch (AssertionError e) {
+ expectedFail = true; // expected
+ }
+ if (!expectedFail) {
+ fail("Should have failed this verification");
+ }
+ }
+
+ @Test
+ public void testReplayFlushSeqIds() throws IOException {
+ // load some data to primary and flush
+ int start = 0;
+ LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
+ putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
+ LOG.info("-- Flushing primary, creating 3 files for 3 stores");
+ primaryRegion.flushcache();
+
+ // now replay the flush marker
+ reader = createWALReaderForPrimary();
+
+ long flushSeqId = -1;
+ LOG.info("-- Replaying flush events in secondary");
+ while (true) {
+ WAL.Entry entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ FlushDescriptor flushDesc
+ = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
+ if (flushDesc != null) {
+ if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+ LOG.info("-- Replaying flush start in secondary");
+ secondaryRegion.replayWALFlushStartMarker(flushDesc);
+ flushSeqId = flushDesc.getFlushSequenceNumber();
+ } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
+ LOG.info("-- Replaying flush commit in secondary");
+ secondaryRegion.replayWALFlushCommitMarker(flushDesc);
+ assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
+ }
+ }
+ // else do not replay
+ }
+
+ // TODO: what to do with this?
+ // assert that the newly picked up flush file is visible
+ long readPoint = secondaryRegion.getMVCC().memstoreReadPoint();
+ assertEquals(flushSeqId, readPoint);
+
+ // after replay verify that everything is still visible
+ verifyData(secondaryRegion, 0, 100, cq, families);
+ }
+
+ @Test
+ public void testSeqIdsFromReplay() throws IOException {
+ // test the case where seqId's coming from replayed WALEdits are made persisted with their
+ // original seqIds and they are made visible through mvcc read point upon replay
+ String method = name.getMethodName();
+ byte[] tableName = Bytes.toBytes(method);
+ byte[] family = Bytes.toBytes("family");
+
+ HRegion region = initHRegion(tableName, method, family);
+ try {
+ // replay an entry that is bigger than current read point
+ long readPoint = region.getMVCC().memstoreReadPoint();
+ long origSeqId = readPoint + 100;
+
+ Put put = new Put(row).add(family, row, row);
+ put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
+ replay(region, put, origSeqId);
+
+ // read point should have advanced to this seqId
+ assertGet(region, family, row);
+
+ // region seqId should have advanced at least to this seqId
+ assertEquals(origSeqId, region.getSequenceId().get());
+
+ // replay an entry that is smaller than current read point
+ // caution: adding an entry below current read point might cause partial dirty reads. Normal
+ // replay does not allow reads while replay is going on.
+ put = new Put(row2).add(family, row2, row2);
+ put.setDurability(Durability.SKIP_WAL);
+ replay(region, put, origSeqId - 50);
+
+ assertGet(region, family, row2);
+ } finally {
+ region.close();
+ }
+ }
+
+ /**
+ * Tests that a region opened in secondary mode would not write region open / close
+ * events to its WAL.
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
+ secondaryRegion.close();
+ walSecondary = spy(walSecondary);
+
+ // test for region open and close
+ secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
+ verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+ (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any());
+
+ // test for replay prepare flush
+ putDataByReplay(secondaryRegion, 0, 10, cq, families);
+ secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder().
+ setFlushSequenceNumber(10)
+ .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
+ .setAction(FlushAction.START_FLUSH)
+ .setEncodedRegionName(
+ ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
+ .setRegionName(ByteString.copyFrom(primaryRegion.getRegionName()))
+ .build());
+
+ verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+ (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any());
+
+ secondaryRegion.close();
+ verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+ (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List| ) any());
+ }
+
+ private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
+ put.setDurability(Durability.SKIP_WAL);
+ MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
+ region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
+ }
+
+ /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
+ * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
+ * more rows but does not execute flush after
+ * @throws IOException */
+ private void putDataWithFlushes(HRegion region, int flushInterval,
+ int numRows, int numRowsAfterFlush) throws IOException {
+ int start = 0;
+ for (; start < numRows; start += flushInterval) {
+ LOG.info("-- Writing some data to primary from " + start + " to " + (start+flushInterval));
+ putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
+ LOG.info("-- Flushing primary, creating 3 files for 3 stores");
+ region.flushcache();
+ }
+ LOG.info("-- Writing some more data to primary, not flushing");
+ putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
+ }
+
+ private void putDataByReplay(HRegion region,
+ int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
+ for (int i = startRow; i < startRow + numRows; i++) {
+ Put put = new Put(Bytes.toBytes("" + i));
+ put.setDurability(Durability.SKIP_WAL);
+ for (byte[] family : families) {
+ put.add(family, qf, EnvironmentEdgeManager.currentTime(), null);
+ }
+ replay(region, put, i+1);
+ }
+ }
+
+ private static HRegion initHRegion(byte[] tableName,
+ String callingMethod, byte[]... families) throws IOException {
+ return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
+ callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families);
+ }
+
+ private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
+ String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
+ WAL wal, byte[]... families) throws IOException {
+ return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
+ isReadOnly, durability, wal, families);
+ }
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index e3f51ea..bda95db 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -327,8 +327,7 @@ public class TestPerColumnFamilyFlush {
return null;
}
- @Test (timeout=180000)
- public void testLogReplay() throws Exception {
+ public void doTestLogReplay() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
// Carefully chosen limits so that the memstore just flushes when we're done
@@ -418,7 +417,14 @@ public class TestPerColumnFamilyFlush {
@Test (timeout=180000)
public void testLogReplayWithDistributedReplay() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
- testLogReplay();
+ doTestLogReplay();
+ }
+
+ // Test Log Replay with Distributed log split on.
+ @Test (timeout=180000)
+ public void testLogReplayWithDistributedLogSplit() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+ doTestLogReplay();
}
/**
| | |