diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index c28ebfd..606a585 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -2509,6 +2509,7 @@ public final class ProtobufUtil {
for (Path outputPath : outputPaths) {
builder.addCompactionOutput(outputPath.getName());
}
+ builder.setRegionName(HBaseZeroCopyByteString.wrap(info.getRegionName()));
return builder.build();
}
diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index ade035b..e3ed1ec 100644
--- hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -340,6 +340,12 @@ public final class CellUtil {
buf, 0, buf.length);
}
+ public static boolean matchingColumn(final Cell left, final byte[] fam, final byte[] qual) {
+ if (!matchingFamily(left, fam))
+ return false;
+ return matchingQualifier(left, qual);
+ }
+
public static boolean matchingColumn(final Cell left, final Cell right) {
if (!matchingFamily(left, right))
return false;
diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index 19afcb2..57a9aa9 100644
--- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -3635,6 +3635,24 @@ public final class WALProtos {
*/
com.google.protobuf.ByteString
getStoreHomeDirBytes();
+
+ // optional bytes region_name = 7;
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ boolean hasRegionName();
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ com.google.protobuf.ByteString getRegionName();
}
/**
* Protobuf type {@code CompactionDescriptor}
@@ -3731,6 +3749,11 @@ public final class WALProtos {
storeHomeDir_ = input.readBytes();
break;
}
+ case 58: {
+ bitField0_ |= 0x00000010;
+ regionName_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3936,6 +3959,30 @@ public final class WALProtos {
}
}
+ // optional bytes region_name = 7;
+ public static final int REGION_NAME_FIELD_NUMBER = 7;
+ private com.google.protobuf.ByteString regionName_;
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+
private void initFields() {
tableName_ = com.google.protobuf.ByteString.EMPTY;
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
@@ -3943,6 +3990,7 @@ public final class WALProtos {
compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
storeHomeDir_ = "";
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -3990,6 +4038,9 @@ public final class WALProtos {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(6, getStoreHomeDirBytes());
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBytes(7, regionName_);
+ }
getUnknownFields().writeTo(output);
}
@@ -4033,6 +4084,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(6, getStoreHomeDirBytes());
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(7, regionName_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -4080,6 +4135,11 @@ public final class WALProtos {
result = result && getStoreHomeDir()
.equals(other.getStoreHomeDir());
}
+ result = result && (hasRegionName() == other.hasRegionName());
+ if (hasRegionName()) {
+ result = result && getRegionName()
+ .equals(other.getRegionName());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -4117,6 +4177,10 @@ public final class WALProtos {
hash = (37 * hash) + STORE_HOME_DIR_FIELD_NUMBER;
hash = (53 * hash) + getStoreHomeDir().hashCode();
}
+ if (hasRegionName()) {
+ hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
+ hash = (53 * hash) + getRegionName().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -4246,6 +4310,8 @@ public final class WALProtos {
bitField0_ = (bitField0_ & ~0x00000010);
storeHomeDir_ = "";
bitField0_ = (bitField0_ & ~0x00000020);
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -4302,6 +4368,10 @@ public final class WALProtos {
to_bitField0_ |= 0x00000008;
}
result.storeHomeDir_ = storeHomeDir_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.regionName_ = regionName_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -4352,6 +4422,9 @@ public final class WALProtos {
storeHomeDir_ = other.storeHomeDir_;
onChanged();
}
+ if (other.hasRegionName()) {
+ setRegionName(other.getRegionName());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -4779,6 +4852,58 @@ public final class WALProtos {
return this;
}
+ // optional bytes region_name = 7;
+ private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public Builder setRegionName(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000040;
+ regionName_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public Builder clearRegionName() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ regionName_ = getDefaultInstance().getRegionName();
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:CompactionDescriptor)
}
@@ -5185,15 +5310,15 @@ public final class WALProtos {
"ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
"\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilyScope\022\016\n\006famil",
"y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" +
- "\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" +
+ "\276\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" +
" \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam" +
"ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" +
"\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" +
- "e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" +
- "\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" +
- "ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" +
- "base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" +
- "\001\001"
+ "e_dir\030\006 \002(\t\022\023\n\013region_name\030\007 \001(\014\"\014\n\nWALT" +
+ "railer*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE" +
+ "_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?" +
+ "\n*org.apache.hadoop.hbase.protobuf.gener" +
+ "atedB\tWALProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5223,7 +5348,7 @@ public final class WALProtos {
internal_static_CompactionDescriptor_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CompactionDescriptor_descriptor,
- new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", });
+ new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", "RegionName", });
internal_static_WALTrailer_descriptor =
getDescriptor().getMessageTypes().get(4);
internal_static_WALTrailer_fieldAccessorTable = new
diff --git hbase-protocol/src/main/protobuf/WAL.proto hbase-protocol/src/main/protobuf/WAL.proto
index 8c7b84b..ae3636e 100644
--- hbase-protocol/src/main/protobuf/WAL.proto
+++ hbase-protocol/src/main/protobuf/WAL.proto
@@ -91,6 +91,7 @@ message CompactionDescriptor {
repeated string compaction_input = 4;
repeated string compaction_output = 5;
required string store_home_dir = 6;
+ optional bytes region_name = 7; // full region name
}
/**
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 300b0b4..c054211 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
@@ -191,6 +192,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
@@ -4341,25 +4343,35 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
*/
protected OperationStatus [] doReplayBatchOp(final HRegion region,
final List mutations) throws IOException {
- HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()];
long before = EnvironmentEdgeManager.currentTimeMillis();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
- int i = 0;
- for (HLogSplitter.MutationReplay m : mutations) {
+ for (Iterator it = mutations.iterator(); it.hasNext();) {
+ HLogSplitter.MutationReplay m = it.next();
if (m.type == MutationType.PUT) {
batchContainsPuts = true;
} else {
batchContainsDelete = true;
}
- mArray[i++] = m;
+ NavigableMap> map = m.mutation.getFamilyCellMap();
+ List metaCells = map.get(WALEdit.METAFAMILY);
+ if (metaCells != null && !metaCells.isEmpty()) {
+ for (Cell metaCell : metaCells) {
+ CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
+ if (compactionDesc != null) {
+ region.completeCompactionMarker(compactionDesc);
+ }
+ }
+ it.remove();
+ }
}
requestCount.add(mutations.size());
if (!region.getRegionInfo().isMetaTable()) {
cacheFlusher.reclaimMemStoreMemory();
}
- return region.batchReplay(mArray);
+ return region.batchReplay(mutations.toArray(
+ new HLogSplitter.MutationReplay[mutations.size()]));
} finally {
long after = EnvironmentEdgeManager.currentTimeMillis();
if (batchContainsPuts) {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 545c253..59204b3 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRespo
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
@@ -1440,19 +1441,41 @@ public class HLogSplitter {
HConnection hconn = this.getConnectionByTableName(table);
for (KeyValue kv : kvs) {
- // filtering HLog meta entries
- // We don't handle HBASE-2231 because we may or may not replay a compaction event.
- // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143&
- // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143
+ byte[] row = kv.getRow();
+ byte[] family = kv.getFamily();
+ boolean isCompactionEntry = false;
+
if (kv.matchingFamily(WALEdit.METAFAMILY)) {
- skippedKVs.add(kv);
- continue;
+ CompactionDescriptor compaction = WALEdit.getCompaction(kv);
+ if (compaction != null && compaction.hasRegionName()) {
+ try {
+ byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
+ .toByteArray());
+ row = regionName[1]; // startKey of the region
+ family = compaction.getFamilyName().toByteArray();
+ isCompactionEntry = true;
+ } catch (Exception ex) {
+ LOG.warn("Unexpected exception received, ignoring " + ex);
+ skippedKVs.add(kv);
+ continue;
+ }
+ } else {
+ skippedKVs.add(kv);
+ continue;
+ }
}
try {
loc =
- locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(),
+ locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
encodeRegionNameStr);
+ // skip replaying the compaction if the region is gone
+ if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
+ loc.getRegionInfo().getEncodedName())) {
+ LOG.info("Not replaying a compaction marker for an older region: "
+ + encodeRegionNameStr);
+ needSkip = true;
+ }
} catch (TableNotFoundException ex) {
// table has been deleted so skip edits of the table
LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
@@ -1481,7 +1504,7 @@ public class HLogSplitter {
regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
}
if (maxStoreSequenceIds != null) {
- Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily());
+ Long maxStoreSeqId = maxStoreSequenceIds.get(family);
if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
// skip current kv if column family doesn't exist anymore or already flushed
skippedKVs.add(kv);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
index 684f78c..ff3f3e6 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
@@ -263,7 +263,7 @@ public class HLogUtil {
*/
public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
- WALEdit e = WALEdit.createCompaction(c);
+ WALEdit e = WALEdit.createCompaction(info, c);
long now = EnvironmentEdgeManager.currentTimeMillis();
TableName tn = TableName.valueOf(c.getTableName().toByteArray());
long txid = log.appendNoSync(info, tn, e, new ArrayList(), now, htd, sequenceId,
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index d4b36a6..fd223a4 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
@@ -144,6 +146,7 @@ public class WALEdit implements Writable, HeapSize {
return result;
}
+ @Override
public void readFields(DataInput in) throws IOException {
kvs.clear();
if (scopes != null) {
@@ -179,6 +182,7 @@ public class WALEdit implements Writable, HeapSize {
}
}
+ @Override
public void write(DataOutput out) throws IOException {
LOG.warn("WALEdit is being serialized to writable - only expected in test code");
out.writeInt(VERSION_2);
@@ -221,6 +225,7 @@ public class WALEdit implements Writable, HeapSize {
return kvs.size();
}
+ @Override
public long heapSize() {
long ret = ClassSize.ARRAYLIST;
for (KeyValue kv : kvs) {
@@ -234,6 +239,7 @@ public class WALEdit implements Writable, HeapSize {
return ret;
}
+ @Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -254,19 +260,29 @@ public class WALEdit implements Writable, HeapSize {
* @param c
* @return A WALEdit that has c serialized as its value
*/
- public static WALEdit createCompaction(final CompactionDescriptor c) {
+ public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) {
byte [] pbbytes = c.toByteArray();
- KeyValue kv = new KeyValue(METAROW, METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes);
+ KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes);
return new WALEdit().add(kv); //replication scope null so that this won't be replicated
}
+ private static byte[] getRowForRegion(HRegionInfo hri) {
+ byte[] startKey = hri.getStartKey();
+ if (startKey.length == 0) {
+ // empty row key is not allowed in mutations because it is both the start key and the end key
+ // we return the smallest byte[] that is bigger (in lex comparison) than byte[0].
+ return new byte[] {0};
+ }
+ return startKey;
+ }
+
/**
* Deserialized and returns a CompactionDescriptor is the KeyValue contains one.
* @param kv the key value
* @return deserialized CompactionDescriptor or null.
*/
- public static CompactionDescriptor getCompaction(KeyValue kv) throws IOException {
- if (kv.matchingRow(METAROW) && kv.matchingColumn(METAFAMILY, COMPACTION)) {
+ public static CompactionDescriptor getCompaction(Cell kv) throws IOException {
+ if (CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) {
return CompactionDescriptor.parseFrom(kv.getValue());
}
return null;
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 3845d6c..966e1ee 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -233,8 +233,6 @@ public class WALEditsReplaySink {
List kvs = edit.getKeyValues();
for (KeyValue kv : kvs) {
// filtering HLog meta entries
- if (kv.matchingFamily(WALEdit.METAFAMILY)) continue;
-
setLocation(conn.locateRegion(tableName, kv.getRow()));
skip = true;
break;
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index cc84600..26d08c0 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,6 +34,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -42,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hdfs.DFSClient;
@@ -53,6 +57,8 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Lists;
+
/**
* Test for the case where a regionserver going down has enough cycles to do damage to regions
* that have actually been assigned elsehwere.
@@ -206,7 +212,8 @@ public class TestIOFencing {
@Ignore("See HBASE-10298")
@Test
public void testFencingAroundCompaction() throws Exception {
- doTest(BlockCompactionsInPrepRegion.class);
+ doTest(BlockCompactionsInPrepRegion.class, false);
+ doTest(BlockCompactionsInPrepRegion.class, true);
}
/**
@@ -218,12 +225,13 @@ public class TestIOFencing {
@Ignore("See HBASE-10298")
@Test
public void testFencingAroundCompactionAfterWALSync() throws Exception {
- doTest(BlockCompactionsInCompletionRegion.class);
+ doTest(BlockCompactionsInCompletionRegion.class, false);
+ doTest(BlockCompactionsInCompletionRegion.class, true);
}
- public void doTest(Class> regionClass) throws Exception {
+ public void doTest(Class> regionClass, boolean distributedLogReplay) throws Exception {
Configuration c = TEST_UTIL.getConfiguration();
- c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+ c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
// Insert our custom region
c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
c.setBoolean("dfs.support.append", true);
@@ -256,6 +264,16 @@ public class TestIOFencing {
// Load some rows
TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
+ // add a compaction from an older (non-existing) region to see whether we successfully skip
+ // those entries
+ HRegionInfo oldHri = new HRegionInfo(table.getName(),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
+ FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
+ new Path("store_dir"));
+ HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(),
+ oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
+
// Wait till flush has happened, otherwise there won't be multiple store files
long startWaitTime = System.currentTimeMillis();
while (compactingRegion.getLastFlushTime() <= lastFlushTime ||
@@ -273,18 +291,24 @@ public class TestIOFencing {
compactingRegion.waitForCompactionToBlock();
LOG.info("Starting a new server");
RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
- HRegionServer newServer = newServerThread.getRegionServer();
+ final HRegionServer newServer = newServerThread.getRegionServer();
LOG.info("Killing region server ZK lease");
TEST_UTIL.expireRegionServerSession(0);
CompactionBlockerRegion newRegion = null;
startWaitTime = System.currentTimeMillis();
- while (newRegion == null) {
- LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
- Thread.sleep(1000);
- newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
- assertTrue("Timed out waiting for new server to open region",
- System.currentTimeMillis() - startWaitTime < 300000);
- }
+ LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
+
+ // wait for region to be assigned and to go out of log replay if applicable
+ Waiter.waitFor(c, 60000, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ HRegion newRegion = newServer.getOnlineRegion(REGION_NAME);
+ return newRegion != null && !newRegion.isRecovering();
+ }
+ });
+
+ newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
+
LOG.info("Allowing compaction to proceed");
compactingRegion.allowCompactions();
while (compactingRegion.compactCount == 0) {
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 52713ed..c88f996 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -605,11 +605,13 @@ public class TestHRegion {
long time = System.nanoTime();
WALEdit edit = null;
if (i == maxSeqId) {
- edit = WALEdit.createCompaction(CompactionDescriptor.newBuilder()
+ edit = WALEdit.createCompaction(region.getRegionInfo(),
+ CompactionDescriptor.newBuilder()
.setTableName(ByteString.copyFrom(tableName.getName()))
.setFamilyName(ByteString.copyFrom(regionName))
.setEncodedRegionName(ByteString.copyFrom(regionName))
.setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
+ .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
.build());
} else {
edit = new WALEdit();
@@ -701,7 +703,8 @@ public class TestHRegion {
long time = System.nanoTime();
writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time,
- HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(compactionDescriptor)));
+ HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
+ compactionDescriptor)));
writer.close();
// close the region now, and reopen again
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 5c0f710..f67ab1f 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
@@ -491,10 +492,12 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testCompactionWALEdits() throws Exception {
WALProtos.CompactionDescriptor compactionDescriptor =
WALProtos.CompactionDescriptor.getDefaultInstance();
- WALEdit edit = WALEdit.createCompaction(compactionDescriptor);
+ HRegionInfo hri = new HRegionInfo(htable1.getName(),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
Replication.scopeWALEdits(htable1.getTableDescriptor(), new HLogKey(), edit);
}
-
+
/**
* Test for HBASE-8663
* Create two new Tables with colfamilies enabled for replication then run
|