diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index c58d1e5..7277429 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -2495,6 +2495,7 @@ public final class ProtobufUtil { for (Path outputPath : outputPaths) { builder.addCompactionOutput(outputPath.getName()); } + builder.setRegionName(HBaseZeroCopyByteString.wrap(info.getRegionName())); return builder.build(); } diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index efea2ba..c6d6a19 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -3725,6 +3725,24 @@ public final class WALProtos { */ com.google.protobuf.ByteString getStoreHomeDirBytes(); + + // optional bytes region_name = 7; + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + boolean hasRegionName(); + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + com.google.protobuf.ByteString getRegionName(); } /** * Protobuf type {@code CompactionDescriptor} @@ -3821,6 +3839,11 @@ public final class WALProtos { storeHomeDir_ = input.readBytes(); break; } + case 58: { + bitField0_ |= 0x00000010; + regionName_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -4026,6 +4049,30 @@ public final class WALProtos { } } + // optional bytes region_name = 7; + public static final int REGION_NAME_FIELD_NUMBER = 7; + private com.google.protobuf.ByteString regionName_; + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + public boolean hasRegionName() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + private void initFields() { tableName_ = com.google.protobuf.ByteString.EMPTY; encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; @@ -4033,6 +4080,7 @@ public final class WALProtos { compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY; compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY; storeHomeDir_ = ""; + regionName_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4080,6 +4128,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(6, getStoreHomeDirBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(7, regionName_); + } getUnknownFields().writeTo(output); } @@ -4123,6 +4174,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeBytesSize(6, getStoreHomeDirBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(7, regionName_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4170,6 +4225,11 @@ public final class WALProtos { result = result && getStoreHomeDir() .equals(other.getStoreHomeDir()); } + result = result && (hasRegionName() == other.hasRegionName()); + if (hasRegionName()) { + result = result && getRegionName() + .equals(other.getRegionName()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -4207,6 +4267,10 @@ public final class WALProtos { hash = (37 * hash) + STORE_HOME_DIR_FIELD_NUMBER; hash = (53 * hash) + getStoreHomeDir().hashCode(); } + if (hasRegionName()) { + hash = (37 * hash) + REGION_NAME_FIELD_NUMBER; + hash = (53 * hash) + getRegionName().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -4336,6 +4400,8 @@ public final class WALProtos { bitField0_ = (bitField0_ & ~0x00000010); storeHomeDir_ = ""; bitField0_ = (bitField0_ & ~0x00000020); + regionName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -4392,6 +4458,10 @@ public final class WALProtos { to_bitField0_ |= 0x00000008; } result.storeHomeDir_ = storeHomeDir_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000010; + } + result.regionName_ = regionName_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -4442,6 +4512,9 @@ public final class WALProtos { storeHomeDir_ = other.storeHomeDir_; onChanged(); } + if (other.hasRegionName()) { + setRegionName(other.getRegionName()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -4869,6 +4942,58 @@ public final class WALProtos { return this; } + // optional bytes region_name = 7; + private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public boolean hasRegionName() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public Builder setRegionName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000040; + regionName_ = value; + onChanged(); + return this; + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public Builder clearRegionName() { + bitField0_ = (bitField0_ & ~0x00000040); + regionName_ = getDefaultInstance().getRegionName(); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:CompactionDescriptor) } @@ -5275,15 +5400,16 @@ public final class WALProtos { "ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" + "\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number", "\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n" + - "\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Compac" + + "\nscope_type\030\002 \002(\0162\n.ScopeType\"\276\001\n\024Compac" + "tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" + "coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" + " \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" + "tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" + - "\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICAT" + - "ION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_G" + - "LOBAL\020\001B?\n*org.apache.hadoop.hbase.proto" + - "buf.generatedB\tWALProtosH\001\210\001\000\240\001\001" + "\t\022\023\n\013region_name\030\007 \001(\014\"\014\n\nWALTrailer*F\n\t" + + "ScopeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034" + + "\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apac" + + "he.hadoop.hbase.protobuf.generatedB\tWALP", + "rotosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5313,7 +5439,7 @@ public final class WALProtos { internal_static_CompactionDescriptor_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CompactionDescriptor_descriptor, - new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", }); + new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", "RegionName", }); internal_static_WALTrailer_descriptor = getDescriptor().getMessageTypes().get(4); internal_static_WALTrailer_fieldAccessorTable = new diff --git hbase-protocol/src/main/protobuf/WAL.proto hbase-protocol/src/main/protobuf/WAL.proto index 0ae65ec..88e94f4 100644 --- hbase-protocol/src/main/protobuf/WAL.proto +++ hbase-protocol/src/main/protobuf/WAL.proto @@ -92,6 +92,7 @@ message CompactionDescriptor { repeated string compaction_input = 4; repeated string compaction_output = 5; required string store_home_dir = 6; + optional bytes region_name = 7; // full region name } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 93eada8..f80e276 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1625,7 +1625,7 @@ public class HRegion implements HeapSize { // , Writable{ */ boolean shouldFlush() { // This is a rough measure. - if (this.lastFlushSeqId > 0 + if (this.lastFlushSeqId > 0 && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) { return true; } @@ -2195,7 +2195,7 @@ public class HRegion implements HeapSize { // , Writable{ public boolean isInReplay() { return false; } - + @Override public long getReplaySequenceId() { return 0; @@ -2234,7 +2234,7 @@ public class HRegion implements HeapSize { // , Writable{ public boolean isInReplay() { return true; } - + @Override public long getReplaySequenceId() { return this.replaySeqId; @@ -2270,9 +2270,32 @@ public class HRegion implements HeapSize { // , Writable{ * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId) + public OperationStatus[] batchReplay(List mutations, long replaySeqId) throws IOException { - return batchMutate(new ReplayBatch(mutations, replaySeqId)); + int i = 0; + HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()]; + // check for any compaction marker to complete + for (MutationReplay mutation : mutations) { + NavigableMap> map = mutation.mutation.getFamilyCellMap(); + List metaCells = map.get(WALEdit.METAFAMILY); + if (metaCells != null && !metaCells.isEmpty()) { + for (Cell metaCell : metaCells) { + CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); + if (compactionDesc != null) { + completeCompactionMarker(compactionDesc); + } + } + } else { + mArray[i++] = mutation; + } + } + if (i < mArray.length) { + HLogSplitter.MutationReplay[] tmpArr = new HLogSplitter.MutationReplay[i]; + System.arraycopy(mArray, 0, tmpArr, 0, i); + mArray = tmpArr; + } + + return batchMutate(new ReplayBatch(mArray, replaySeqId)); } /** @@ -3295,7 +3318,7 @@ public class HRegion implements HeapSize { // , Writable{ firstSeqIdInLog = key.getLogSeqNum(); } currentEditSeqId = key.getLogSeqNum(); - currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? + currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() : currentEditSeqId; boolean flush = false; for (KeyValue kv: val.getKeyValues()) { @@ -6243,7 +6266,7 @@ public class HRegion implements HeapSize { // , Writable{ WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells); return key; } - + /** * Explictly sync wal * @throws IOException diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index b84f9a2..1ad8d2a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -640,25 +640,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private OperationStatus [] doReplayBatchOp(final HRegion region, final List mutations, long replaySeqId) throws IOException { - HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); boolean batchContainsPuts = false, batchContainsDelete = false; try { - int i = 0; for (HLogSplitter.MutationReplay m : mutations) { if (m.type == MutationType.PUT) { batchContainsPuts = true; } else { batchContainsDelete = true; } - mArray[i++] = m; } requestCount.add(mutations.size()); if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } - return region.batchReplay(mArray, replaySeqId); + return region.batchReplay(mutations, replaySeqId); } finally { if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTimeMillis(); @@ -1355,7 +1352,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, walEntries.add(walEntry); } if(edits!=null && !edits.isEmpty()) { - long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? + long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); // check if it's a partial success @@ -1366,7 +1363,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } } - + //sync wal at the end because ASYNC_WAL is used above region.syncWal(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 873e863..de31f24 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -80,7 +80,6 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -92,6 +91,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRespo import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; @@ -241,7 +241,7 @@ public class HLogSplitter { List splits = new ArrayList(); if (logfiles != null && logfiles.length > 0) { for (FileStatus logfile: logfiles) { - HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null, + HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null, RecoveryMode.LOG_SPLITTING); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); @@ -811,6 +811,7 @@ public class HLogSplitter { k.internEncodedRegionName(this.encodedRegionName); } + @Override public long heapSize() { return heapInBuffer; } @@ -825,6 +826,7 @@ public class HLogSplitter { outputSink = sink; } + @Override public void run() { try { doRun(); @@ -1060,6 +1062,7 @@ public class HLogSplitter { TimeUnit.SECONDS, new ThreadFactory() { private int count = 1; + @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "split-log-closeStream-" + count++); return t; @@ -1070,6 +1073,7 @@ public class HLogSplitter { for (final Map.Entry writersEntry : writers.entrySet()) { LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); completionService.submit(new Callable() { + @Override public Void call() throws Exception { WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); LOG.debug("Closing " + wap.p); @@ -1242,6 +1246,7 @@ public class HLogSplitter { return (new WriterAndPath(regionedits, w)); } + @Override void append(RegionEntryBuffer buffer) throws IOException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { @@ -1280,6 +1285,7 @@ public class HLogSplitter { /** * @return a map from encoded region ID to the number of edits written out for that region. */ + @Override Map getOutputCounts() { TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR); synchronized (writers) { @@ -1368,6 +1374,7 @@ public class HLogSplitter { this.logRecoveredEditsOutputSink.setReporter(reporter); } + @Override void append(RegionEntryBuffer buffer) throws IOException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { @@ -1449,19 +1456,40 @@ public class HLogSplitter { HConnection hconn = this.getConnectionByTableName(table); for (KeyValue kv : kvs) { - // filtering HLog meta entries - // We don't handle HBASE-2231 because we may or may not replay a compaction event. - // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143& - // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143 + byte[] row = kv.getRow(); + byte[] family = kv.getFamily(); + boolean isCompactionEntry = false; if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) { - skippedKVs.add(kv); - continue; + CompactionDescriptor compaction = WALEdit.getCompaction(kv); + if (compaction != null && compaction.hasRegionName()) { + try { + byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName() + .toByteArray()); + row = regionName[1]; // startKey of the region + family = compaction.getFamilyName().toByteArray(); + isCompactionEntry = true; + } catch (Exception ex) { + LOG.warn("Unexpected exception received, ignoring " + ex); + skippedKVs.add(kv); + continue; + } + } else { + skippedKVs.add(kv); + continue; + } } try { loc = - locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(), + locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row, encodeRegionNameStr); + // skip replaying the compaction if the region is gone + if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase( + loc.getRegionInfo().getEncodedName())) { + LOG.info("Not replaying a compaction marker for an older region: " + + encodeRegionNameStr); + needSkip = true; + } } catch (TableNotFoundException ex) { // table has been deleted so skip edits of the table LOG.info("Table " + table + " doesn't exist. Skip log replay for region " @@ -1490,7 +1518,7 @@ public class HLogSplitter { regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName()); } if (maxStoreSequenceIds != null) { - Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily()); + Long maxStoreSeqId = maxStoreSequenceIds.get(family); if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) { // skip current kv if column family doesn't exist anymore or already flushed skippedKVs.add(kv); @@ -1768,6 +1796,7 @@ public class HLogSplitter { return result; } + @Override Map getOutputCounts() { TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR); synchronized (writers) { @@ -1922,7 +1951,7 @@ public class HLogSplitter { return new ArrayList(); } - long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? + long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); int count = entry.getAssociatedCellCount(); List mutations = new ArrayList(); @@ -1979,7 +2008,7 @@ public class HLogSplitter { clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); } key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey - .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds, + .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce()); logEntry.setFirst(key); logEntry.setSecond(val); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 8ecb4b3..ace1ace 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -145,6 +145,7 @@ public class WALEdit implements Writable, HeapSize { return result; } + @Override public void readFields(DataInput in) throws IOException { kvs.clear(); if (scopes != null) { @@ -180,6 +181,7 @@ public class WALEdit implements Writable, HeapSize { } } + @Override public void write(DataOutput out) throws IOException { LOG.warn("WALEdit is being serialized to writable - only expected in test code"); out.writeInt(VERSION_2); @@ -222,6 +224,7 @@ public class WALEdit implements Writable, HeapSize { return kvs.size(); } + @Override public long heapSize() { long ret = ClassSize.ARRAYLIST; for (KeyValue kv : kvs) { @@ -235,6 +238,7 @@ public class WALEdit implements Writable, HeapSize { return ret; } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -249,7 +253,7 @@ public class WALEdit implements Writable, HeapSize { sb.append(">]"); return sb.toString(); } - + /** * Create a compacion WALEdit * @param c @@ -266,7 +270,7 @@ public class WALEdit implements Writable, HeapSize { * @param kv the key value * @return deserialized CompactionDescriptor or null. */ - public static CompactionDescriptor getCompaction(KeyValue kv) throws IOException { + public static CompactionDescriptor getCompaction(Cell kv) throws IOException { if (CellUtil.matchingRow(kv, METAROW) && CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) { return CompactionDescriptor.parseFrom(kv.getValue()); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index b5848a2..7dec203 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,6 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -42,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hdfs.DFSClient; @@ -52,6 +56,8 @@ import org.apache.log4j.Level; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Lists; + /** * Test for the case where a regionserver going down has enough cycles to do damage to regions * that have actually been assigned elsehwere. @@ -215,7 +221,8 @@ public class TestIOFencing { */ @Test public void testFencingAroundCompaction() throws Exception { - doTest(BlockCompactionsInPrepRegion.class); + doTest(BlockCompactionsInPrepRegion.class, false); + doTest(BlockCompactionsInPrepRegion.class, true); } /** @@ -226,12 +233,13 @@ public class TestIOFencing { */ @Test public void testFencingAroundCompactionAfterWALSync() throws Exception { - doTest(BlockCompactionsInCompletionRegion.class); + doTest(BlockCompactionsInCompletionRegion.class, false); + doTest(BlockCompactionsInCompletionRegion.class, true); } - public void doTest(Class regionClass) throws Exception { + public void doTest(Class regionClass, boolean distributedLogReplay) throws Exception { Configuration c = TEST_UTIL.getConfiguration(); - c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); + c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay); // Insert our custom region c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class); c.setBoolean("dfs.support.append", true); @@ -264,6 +272,16 @@ public class TestIOFencing { // Load some rows TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); + // add a compaction from an older (non-existing) region to see whether we successfully skip + // those entries + HRegionInfo oldHri = new HRegionInfo(table.getName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri, + FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")), + new Path("store_dir")); + HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(), + oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100)); + // Wait till flush has happened, otherwise there won't be multiple store files long startWaitTime = System.currentTimeMillis(); while (compactingRegion.getLastFlushTime() <= lastFlushTime || @@ -281,18 +299,24 @@ public class TestIOFencing { compactingRegion.waitForCompactionToBlock(); LOG.info("Starting a new server"); RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); - HRegionServer newServer = newServerThread.getRegionServer(); + final HRegionServer newServer = newServerThread.getRegionServer(); LOG.info("Killing region server ZK lease"); TEST_UTIL.expireRegionServerSession(0); CompactionBlockerRegion newRegion = null; startWaitTime = System.currentTimeMillis(); - while (newRegion == null) { - LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); - Thread.sleep(1000); - newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); - assertTrue("Timed out waiting for new server to open region", - System.currentTimeMillis() - startWaitTime < 300000); - } + LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); + + // wait for region to be assigned and to go out of log replay if applicable + Waiter.waitFor(c, 60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + HRegion newRegion = newServer.getOnlineRegion(REGION_NAME); + return newRegion != null && !newRegion.isRecovering(); + } + }); + + newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); + LOG.info("Allowing compaction to proceed"); compactingRegion.allowCompactions(); while (compactingRegion.compactCount == 0) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 8a588e7..1c36e17 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -662,6 +662,7 @@ public class TestHRegion { .setFamilyName(ByteString.copyFrom(regionName)) .setEncodedRegionName(ByteString.copyFrom(regionName)) .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString()))) + .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName())) .build()); } else { edit = new WALEdit();