diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index c58d1e5..7277429 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -2495,6 +2495,7 @@ public final class ProtobufUtil {
for (Path outputPath : outputPaths) {
builder.addCompactionOutput(outputPath.getName());
}
+ builder.setRegionName(HBaseZeroCopyByteString.wrap(info.getRegionName()));
return builder.build();
}
diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index efea2ba..c6d6a19 100644
--- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -3725,6 +3725,24 @@ public final class WALProtos {
*/
com.google.protobuf.ByteString
getStoreHomeDirBytes();
+
+ // optional bytes region_name = 7;
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ boolean hasRegionName();
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ com.google.protobuf.ByteString getRegionName();
}
/**
* Protobuf type {@code CompactionDescriptor}
@@ -3821,6 +3839,11 @@ public final class WALProtos {
storeHomeDir_ = input.readBytes();
break;
}
+ case 58: {
+ bitField0_ |= 0x00000010;
+ regionName_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4026,6 +4049,30 @@ public final class WALProtos {
}
}
+ // optional bytes region_name = 7;
+ public static final int REGION_NAME_FIELD_NUMBER = 7;
+ private com.google.protobuf.ByteString regionName_;
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+
private void initFields() {
tableName_ = com.google.protobuf.ByteString.EMPTY;
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
@@ -4033,6 +4080,7 @@ public final class WALProtos {
compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
storeHomeDir_ = "";
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -4080,6 +4128,9 @@ public final class WALProtos {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(6, getStoreHomeDirBytes());
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBytes(7, regionName_);
+ }
getUnknownFields().writeTo(output);
}
@@ -4123,6 +4174,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(6, getStoreHomeDirBytes());
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(7, regionName_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -4170,6 +4225,11 @@ public final class WALProtos {
result = result && getStoreHomeDir()
.equals(other.getStoreHomeDir());
}
+ result = result && (hasRegionName() == other.hasRegionName());
+ if (hasRegionName()) {
+ result = result && getRegionName()
+ .equals(other.getRegionName());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -4207,6 +4267,10 @@ public final class WALProtos {
hash = (37 * hash) + STORE_HOME_DIR_FIELD_NUMBER;
hash = (53 * hash) + getStoreHomeDir().hashCode();
}
+ if (hasRegionName()) {
+ hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
+ hash = (53 * hash) + getRegionName().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -4336,6 +4400,8 @@ public final class WALProtos {
bitField0_ = (bitField0_ & ~0x00000010);
storeHomeDir_ = "";
bitField0_ = (bitField0_ & ~0x00000020);
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -4392,6 +4458,10 @@ public final class WALProtos {
to_bitField0_ |= 0x00000008;
}
result.storeHomeDir_ = storeHomeDir_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.regionName_ = regionName_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -4442,6 +4512,9 @@ public final class WALProtos {
storeHomeDir_ = other.storeHomeDir_;
onChanged();
}
+ if (other.hasRegionName()) {
+ setRegionName(other.getRegionName());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -4869,6 +4942,58 @@ public final class WALProtos {
return this;
}
+ // optional bytes region_name = 7;
+ private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public Builder setRegionName(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000040;
+ regionName_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional bytes region_name = 7;
+ *
+ *
+ * full region name
+ *
+ */
+ public Builder clearRegionName() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ regionName_ = getDefaultInstance().getRegionName();
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:CompactionDescriptor)
}
@@ -5275,15 +5400,16 @@ public final class WALProtos {
"ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
"\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number",
"\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n" +
- "\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Compac" +
+ "\nscope_type\030\002 \002(\0162\n.ScopeType\"\276\001\n\024Compac" +
"tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" +
"coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" +
" \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" +
"tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" +
- "\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICAT" +
- "ION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_G" +
- "LOBAL\020\001B?\n*org.apache.hadoop.hbase.proto" +
- "buf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
+ "\t\022\023\n\013region_name\030\007 \001(\014\"\014\n\nWALTrailer*F\n\t" +
+ "ScopeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034" +
+ "\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apac" +
+ "he.hadoop.hbase.protobuf.generatedB\tWALP",
+ "rotosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5313,7 +5439,7 @@ public final class WALProtos {
internal_static_CompactionDescriptor_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CompactionDescriptor_descriptor,
- new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", });
+ new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", "RegionName", });
internal_static_WALTrailer_descriptor =
getDescriptor().getMessageTypes().get(4);
internal_static_WALTrailer_fieldAccessorTable = new
diff --git hbase-protocol/src/main/protobuf/WAL.proto hbase-protocol/src/main/protobuf/WAL.proto
index 0ae65ec..88e94f4 100644
--- hbase-protocol/src/main/protobuf/WAL.proto
+++ hbase-protocol/src/main/protobuf/WAL.proto
@@ -92,6 +92,7 @@ message CompactionDescriptor {
repeated string compaction_input = 4;
repeated string compaction_output = 5;
required string store_home_dir = 6;
+ optional bytes region_name = 7; // full region name
}
/**
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 93eada8..40fc6d1 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1625,7 +1625,7 @@ public class HRegion implements HeapSize { // , Writable{
*/
boolean shouldFlush() {
// This is a rough measure.
- if (this.lastFlushSeqId > 0
+ if (this.lastFlushSeqId > 0
&& (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) {
return true;
}
@@ -2195,7 +2195,7 @@ public class HRegion implements HeapSize { // , Writable{
public boolean isInReplay() {
return false;
}
-
+
@Override
public long getReplaySequenceId() {
return 0;
@@ -2234,7 +2234,7 @@ public class HRegion implements HeapSize { // , Writable{
public boolean isInReplay() {
return true;
}
-
+
@Override
public long getReplaySequenceId() {
return this.replaySeqId;
@@ -2270,9 +2270,32 @@ public class HRegion implements HeapSize { // , Writable{
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
- public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
- throws IOException {
- return batchMutate(new ReplayBatch(mutations, replaySeqId));
+ public OperationStatus[] batchReplay(List mutations
+ , long replaySeqId) throws IOException {
+ int i = 0;
+ HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()];
+ // check for any compaction marker to complete
+ for (MutationReplay mutation : mutations) {
+ NavigableMap> map = mutation.mutation.getFamilyCellMap();
+ List metaCells = map.get(WALEdit.METAFAMILY);
+ if (metaCells != null && !metaCells.isEmpty()) {
+ for (Cell metaCell : metaCells) {
+ CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
+ if (compactionDesc != null) {
+ completeCompactionMarker(compactionDesc);
+ }
+ }
+ } else {
+ mArray[i++] = mutation;
+ }
+ }
+ if (i < mArray.length) {
+ HLogSplitter.MutationReplay[] tmpArr = new HLogSplitter.MutationReplay[i];
+ System.arraycopy(mArray, 0, tmpArr, 0, i);
+ mArray = tmpArr;
+ }
+
+ return batchMutate(new ReplayBatch(mArray, replaySeqId));
}
/**
@@ -3295,7 +3318,7 @@ public class HRegion implements HeapSize { // , Writable{
firstSeqIdInLog = key.getLogSeqNum();
}
currentEditSeqId = key.getLogSeqNum();
- currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
+ currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
key.getOrigLogSeqNum() : currentEditSeqId;
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
@@ -6243,7 +6266,7 @@ public class HRegion implements HeapSize { // , Writable{
WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
return key;
}
-
+
/**
* Explictly sync wal
* @throws IOException
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b84f9a2..1ad8d2a 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -640,25 +640,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private OperationStatus [] doReplayBatchOp(final HRegion region,
final List mutations, long replaySeqId) throws IOException {
- HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()];
long before = EnvironmentEdgeManager.currentTimeMillis();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
- int i = 0;
for (HLogSplitter.MutationReplay m : mutations) {
if (m.type == MutationType.PUT) {
batchContainsPuts = true;
} else {
batchContainsDelete = true;
}
- mArray[i++] = m;
}
requestCount.add(mutations.size());
if (!region.getRegionInfo().isMetaTable()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
}
- return region.batchReplay(mArray, replaySeqId);
+ return region.batchReplay(mutations, replaySeqId);
} finally {
if (regionServer.metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTimeMillis();
@@ -1355,7 +1352,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
walEntries.add(walEntry);
}
if(edits!=null && !edits.isEmpty()) {
- long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
+ long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
// check if it's a partial success
@@ -1366,7 +1363,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
}
-
+
//sync wal at the end because ASYNC_WAL is used above
region.syncWal();
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 873e863..de31f24 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -80,7 +80,6 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -92,6 +91,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRespo
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
@@ -241,7 +241,7 @@ public class HLogSplitter {
List splits = new ArrayList();
if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) {
- HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null,
+ HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null,
RecoveryMode.LOG_SPLITTING);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
@@ -811,6 +811,7 @@ public class HLogSplitter {
k.internEncodedRegionName(this.encodedRegionName);
}
+ @Override
public long heapSize() {
return heapInBuffer;
}
@@ -825,6 +826,7 @@ public class HLogSplitter {
outputSink = sink;
}
+ @Override
public void run() {
try {
doRun();
@@ -1060,6 +1062,7 @@ public class HLogSplitter {
TimeUnit.SECONDS, new ThreadFactory() {
private int count = 1;
+ @Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "split-log-closeStream-" + count++);
return t;
@@ -1070,6 +1073,7 @@ public class HLogSplitter {
for (final Map.Entry writersEntry : writers.entrySet()) {
LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
completionService.submit(new Callable() {
+ @Override
public Void call() throws Exception {
WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
LOG.debug("Closing " + wap.p);
@@ -1242,6 +1246,7 @@ public class HLogSplitter {
return (new WriterAndPath(regionedits, w));
}
+ @Override
void append(RegionEntryBuffer buffer) throws IOException {
List entries = buffer.entryBuffer;
if (entries.isEmpty()) {
@@ -1280,6 +1285,7 @@ public class HLogSplitter {
/**
* @return a map from encoded region ID to the number of edits written out for that region.
*/
+ @Override
Map getOutputCounts() {
TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR);
synchronized (writers) {
@@ -1368,6 +1374,7 @@ public class HLogSplitter {
this.logRecoveredEditsOutputSink.setReporter(reporter);
}
+ @Override
void append(RegionEntryBuffer buffer) throws IOException {
List entries = buffer.entryBuffer;
if (entries.isEmpty()) {
@@ -1449,19 +1456,40 @@ public class HLogSplitter {
HConnection hconn = this.getConnectionByTableName(table);
for (KeyValue kv : kvs) {
- // filtering HLog meta entries
- // We don't handle HBASE-2231 because we may or may not replay a compaction event.
- // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143&
- // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143
+ byte[] row = kv.getRow();
+ byte[] family = kv.getFamily();
+ boolean isCompactionEntry = false;
if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) {
- skippedKVs.add(kv);
- continue;
+ CompactionDescriptor compaction = WALEdit.getCompaction(kv);
+ if (compaction != null && compaction.hasRegionName()) {
+ try {
+ byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
+ .toByteArray());
+ row = regionName[1]; // startKey of the region
+ family = compaction.getFamilyName().toByteArray();
+ isCompactionEntry = true;
+ } catch (Exception ex) {
+ LOG.warn("Unexpected exception received, ignoring " + ex);
+ skippedKVs.add(kv);
+ continue;
+ }
+ } else {
+ skippedKVs.add(kv);
+ continue;
+ }
}
try {
loc =
- locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(),
+ locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
encodeRegionNameStr);
+ // skip replaying the compaction if the region is gone
+ if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
+ loc.getRegionInfo().getEncodedName())) {
+ LOG.info("Not replaying a compaction marker for an older region: "
+ + encodeRegionNameStr);
+ needSkip = true;
+ }
} catch (TableNotFoundException ex) {
// table has been deleted so skip edits of the table
LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
@@ -1490,7 +1518,7 @@ public class HLogSplitter {
regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
}
if (maxStoreSequenceIds != null) {
- Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily());
+ Long maxStoreSeqId = maxStoreSequenceIds.get(family);
if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
// skip current kv if column family doesn't exist anymore or already flushed
skippedKVs.add(kv);
@@ -1768,6 +1796,7 @@ public class HLogSplitter {
return result;
}
+ @Override
Map getOutputCounts() {
TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR);
synchronized (writers) {
@@ -1922,7 +1951,7 @@ public class HLogSplitter {
return new ArrayList();
}
- long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
+ long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
int count = entry.getAssociatedCellCount();
List mutations = new ArrayList();
@@ -1979,7 +2008,7 @@ public class HLogSplitter {
clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
}
key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
- .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,
+ .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,
walKey.getNonceGroup(), walKey.getNonce());
logEntry.setFirst(key);
logEntry.setSecond(val);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
index 6809aad..a0707f7 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
@@ -262,7 +262,7 @@ public class HLogUtil {
final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
TableName tn = TableName.valueOf(c.getTableName().toByteArray());
HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false, null);
+ log.appendNoSync(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
log.sync();
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index 8ecb4b3..85e2d7c 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
@@ -145,6 +146,7 @@ public class WALEdit implements Writable, HeapSize {
return result;
}
+ @Override
public void readFields(DataInput in) throws IOException {
kvs.clear();
if (scopes != null) {
@@ -180,6 +182,7 @@ public class WALEdit implements Writable, HeapSize {
}
}
+ @Override
public void write(DataOutput out) throws IOException {
LOG.warn("WALEdit is being serialized to writable - only expected in test code");
out.writeInt(VERSION_2);
@@ -222,6 +225,7 @@ public class WALEdit implements Writable, HeapSize {
return kvs.size();
}
+ @Override
public long heapSize() {
long ret = ClassSize.ARRAYLIST;
for (KeyValue kv : kvs) {
@@ -235,6 +239,7 @@ public class WALEdit implements Writable, HeapSize {
return ret;
}
+ @Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -249,25 +254,36 @@ public class WALEdit implements Writable, HeapSize {
sb.append(">]");
return sb.toString();
}
-
+
/**
* Create a compacion WALEdit
* @param c
* @return A WALEdit that has c serialized as its value
*/
- public static WALEdit createCompaction(final CompactionDescriptor c) {
+ public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) {
byte [] pbbytes = c.toByteArray();
- KeyValue kv = new KeyValue(METAROW, METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes);
+ KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION,
+ System.currentTimeMillis(), pbbytes);
return new WALEdit().add(kv); //replication scope null so that this won't be replicated
}
+ private static byte[] getRowForRegion(HRegionInfo hri) {
+ byte[] startKey = hri.getStartKey();
+ if (startKey.length == 0) {
+ // empty row key is not allowed in mutations because it is both the start key and the end key
+ // we return the smallest byte[] that is bigger (in lex comparison) than byte[0].
+ return new byte[] {0};
+ }
+ return startKey;
+ }
+
/**
* Deserialized and returns a CompactionDescriptor is the KeyValue contains one.
* @param kv the key value
* @return deserialized CompactionDescriptor or null.
*/
- public static CompactionDescriptor getCompaction(KeyValue kv) throws IOException {
- if (CellUtil.matchingRow(kv, METAROW) && CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) {
+ public static CompactionDescriptor getCompaction(Cell kv) throws IOException {
+ if (CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) {
return CompactionDescriptor.parseFrom(kv.getValue());
}
return null;
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 1c08fac..97e9b86 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -234,8 +233,6 @@ public class WALEditsReplaySink {
List kvs = edit.getKeyValues();
for (KeyValue kv : kvs) {
// filtering HLog meta entries
- if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) continue;
-
setLocation(conn.locateRegion(tableName, kv.getRow()));
skip = true;
break;
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index b5848a2..7dec203 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,6 +34,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -42,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hdfs.DFSClient;
@@ -52,6 +56,8 @@ import org.apache.log4j.Level;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Lists;
+
/**
* Test for the case where a regionserver going down has enough cycles to do damage to regions
* that have actually been assigned elsehwere.
@@ -215,7 +221,8 @@ public class TestIOFencing {
*/
@Test
public void testFencingAroundCompaction() throws Exception {
- doTest(BlockCompactionsInPrepRegion.class);
+ doTest(BlockCompactionsInPrepRegion.class, false);
+ doTest(BlockCompactionsInPrepRegion.class, true);
}
/**
@@ -226,12 +233,13 @@ public class TestIOFencing {
*/
@Test
public void testFencingAroundCompactionAfterWALSync() throws Exception {
- doTest(BlockCompactionsInCompletionRegion.class);
+ doTest(BlockCompactionsInCompletionRegion.class, false);
+ doTest(BlockCompactionsInCompletionRegion.class, true);
}
- public void doTest(Class> regionClass) throws Exception {
+ public void doTest(Class> regionClass, boolean distributedLogReplay) throws Exception {
Configuration c = TEST_UTIL.getConfiguration();
- c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+ c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
// Insert our custom region
c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
c.setBoolean("dfs.support.append", true);
@@ -264,6 +272,16 @@ public class TestIOFencing {
// Load some rows
TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
+ // add a compaction from an older (non-existing) region to see whether we successfully skip
+ // those entries
+ HRegionInfo oldHri = new HRegionInfo(table.getName(),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
+ FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
+ new Path("store_dir"));
+ HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(),
+ oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
+
// Wait till flush has happened, otherwise there won't be multiple store files
long startWaitTime = System.currentTimeMillis();
while (compactingRegion.getLastFlushTime() <= lastFlushTime ||
@@ -281,18 +299,24 @@ public class TestIOFencing {
compactingRegion.waitForCompactionToBlock();
LOG.info("Starting a new server");
RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
- HRegionServer newServer = newServerThread.getRegionServer();
+ final HRegionServer newServer = newServerThread.getRegionServer();
LOG.info("Killing region server ZK lease");
TEST_UTIL.expireRegionServerSession(0);
CompactionBlockerRegion newRegion = null;
startWaitTime = System.currentTimeMillis();
- while (newRegion == null) {
- LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
- Thread.sleep(1000);
- newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
- assertTrue("Timed out waiting for new server to open region",
- System.currentTimeMillis() - startWaitTime < 300000);
- }
+ LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
+
+ // wait for region to be assigned and to go out of log replay if applicable
+ Waiter.waitFor(c, 60000, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ HRegion newRegion = newServer.getOnlineRegion(REGION_NAME);
+ return newRegion != null && !newRegion.isRecovering();
+ }
+ });
+
+ newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
+
LOG.info("Allowing compaction to proceed");
compactingRegion.allowCompactions();
while (compactingRegion.compactCount == 0) {
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 8a588e7..15e530a 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -657,11 +657,13 @@ public class TestHRegion {
long time = System.nanoTime();
WALEdit edit = null;
if (i == maxSeqId) {
- edit = WALEdit.createCompaction(CompactionDescriptor.newBuilder()
+ edit = WALEdit.createCompaction(region.getRegionInfo(),
+ CompactionDescriptor.newBuilder()
.setTableName(ByteString.copyFrom(tableName.getName()))
.setFamilyName(ByteString.copyFrom(regionName))
.setEncodedRegionName(ByteString.copyFrom(regionName))
.setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
+ .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
.build());
} else {
edit = new WALEdit();
@@ -753,7 +755,8 @@ public class TestHRegion {
long time = System.nanoTime();
writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time,
- HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(compactionDescriptor)));
+ HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
+ compactionDescriptor)));
writer.close();
// close the region now, and reopen again
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 5c0f710..f67ab1f 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
@@ -491,10 +492,12 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testCompactionWALEdits() throws Exception {
WALProtos.CompactionDescriptor compactionDescriptor =
WALProtos.CompactionDescriptor.getDefaultInstance();
- WALEdit edit = WALEdit.createCompaction(compactionDescriptor);
+ HRegionInfo hri = new HRegionInfo(htable1.getName(),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
Replication.scopeWALEdits(htable1.getTableDescriptor(), new HLogKey(), edit);
}
-
+
/**
* Test for HBASE-8663
* Create two new Tables with colfamilies enabled for replication then run
|