diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index c28ebfd..606a585 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -2509,6 +2509,7 @@ public final class ProtobufUtil { for (Path outputPath : outputPaths) { builder.addCompactionOutput(outputPath.getName()); } + builder.setRegionName(HBaseZeroCopyByteString.wrap(info.getRegionName())); return builder.build(); } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index ade035b..e3ed1ec 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -340,6 +340,12 @@ public final class CellUtil { buf, 0, buf.length); } + public static boolean matchingColumn(final Cell left, final byte[] fam, final byte[] qual) { + if (!matchingFamily(left, fam)) + return false; + return matchingQualifier(left, qual); + } + public static boolean matchingColumn(final Cell left, final Cell right) { if (!matchingFamily(left, right)) return false; diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index 19afcb2..57a9aa9 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -3635,6 +3635,24 @@ public final class WALProtos { */ com.google.protobuf.ByteString getStoreHomeDirBytes(); + + // optional bytes region_name = 7; + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + boolean hasRegionName(); + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + com.google.protobuf.ByteString getRegionName(); } /** * Protobuf type {@code CompactionDescriptor} @@ -3731,6 +3749,11 @@ public final class WALProtos { storeHomeDir_ = input.readBytes(); break; } + case 58: { + bitField0_ |= 0x00000010; + regionName_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -3936,6 +3959,30 @@ public final class WALProtos { } } + // optional bytes region_name = 7; + public static final int REGION_NAME_FIELD_NUMBER = 7; + private com.google.protobuf.ByteString regionName_; + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + public boolean hasRegionName() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + private void initFields() { tableName_ = com.google.protobuf.ByteString.EMPTY; encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; @@ -3943,6 +3990,7 @@ public final class WALProtos { compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY; compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY; storeHomeDir_ = ""; + regionName_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -3990,6 +4038,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(6, getStoreHomeDirBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(7, regionName_); + } getUnknownFields().writeTo(output); } @@ -4033,6 +4084,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeBytesSize(6, getStoreHomeDirBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(7, regionName_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4080,6 +4135,11 @@ public final class WALProtos { result = result && getStoreHomeDir() .equals(other.getStoreHomeDir()); } + result = result && (hasRegionName() == other.hasRegionName()); + if (hasRegionName()) { + result = result && getRegionName() + .equals(other.getRegionName()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -4117,6 +4177,10 @@ public final class WALProtos { hash = (37 * hash) + STORE_HOME_DIR_FIELD_NUMBER; hash = (53 * hash) + getStoreHomeDir().hashCode(); } + if (hasRegionName()) { + hash = (37 * hash) + REGION_NAME_FIELD_NUMBER; + hash = (53 * hash) + getRegionName().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -4246,6 +4310,8 @@ public final class WALProtos { bitField0_ = (bitField0_ & ~0x00000010); storeHomeDir_ = ""; bitField0_ = (bitField0_ & ~0x00000020); + regionName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -4302,6 +4368,10 @@ public final class WALProtos { to_bitField0_ |= 0x00000008; } result.storeHomeDir_ = storeHomeDir_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000010; + } + result.regionName_ = regionName_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -4352,6 +4422,9 @@ public final class WALProtos { storeHomeDir_ = other.storeHomeDir_; onChanged(); } + if (other.hasRegionName()) { + setRegionName(other.getRegionName()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -4779,6 +4852,58 @@ public final class WALProtos { return this; } + // optional bytes region_name = 7; + private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public boolean hasRegionName() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public Builder setRegionName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000040; + regionName_ = value; + onChanged(); + return this; + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public Builder clearRegionName() { + bitField0_ = (bitField0_ & ~0x00000040); + regionName_ = getDefaultInstance().getRegionName(); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:CompactionDescriptor) } @@ -5185,15 +5310,15 @@ public final class WALProtos { "ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" + "\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilyScope\022\016\n\006famil", "y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" + - "\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" + + "\276\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" + " \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam" + "ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" + "\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" + - "e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" + - "\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" + - "ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" + - "base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" + - "\001\001" + "e_dir\030\006 \002(\t\022\023\n\013region_name\030\007 \001(\014\"\014\n\nWALT" + + "railer*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE" + + "_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?" + + "\n*org.apache.hadoop.hbase.protobuf.gener" + + "atedB\tWALProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5223,7 +5348,7 @@ public final class WALProtos { internal_static_CompactionDescriptor_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CompactionDescriptor_descriptor, - new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", }); + new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", "RegionName", }); internal_static_WALTrailer_descriptor = getDescriptor().getMessageTypes().get(4); internal_static_WALTrailer_fieldAccessorTable = new diff --git hbase-protocol/src/main/protobuf/WAL.proto hbase-protocol/src/main/protobuf/WAL.proto index 8c7b84b..ae3636e 100644 --- hbase-protocol/src/main/protobuf/WAL.proto +++ hbase-protocol/src/main/protobuf/WAL.proto @@ -91,6 +91,7 @@ message CompactionDescriptor { repeated string compaction_input = 4; repeated string compaction_output = 5; required string store_home_dir = 6; + optional bytes region_name = 7; // full region name } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 300b0b4..c054211 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -38,6 +38,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.SortedMap; @@ -191,6 +192,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; @@ -4341,25 +4343,35 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa */ protected OperationStatus [] doReplayBatchOp(final HRegion region, final List mutations) throws IOException { - HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); boolean batchContainsPuts = false, batchContainsDelete = false; try { - int i = 0; - for (HLogSplitter.MutationReplay m : mutations) { + for (Iterator it = mutations.iterator(); it.hasNext();) { + HLogSplitter.MutationReplay m = it.next(); if (m.type == MutationType.PUT) { batchContainsPuts = true; } else { batchContainsDelete = true; } - mArray[i++] = m; + NavigableMap> map = m.mutation.getFamilyCellMap(); + List metaCells = map.get(WALEdit.METAFAMILY); + if (metaCells != null && !metaCells.isEmpty()) { + for (Cell metaCell : metaCells) { + CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); + if (compactionDesc != null) { + region.completeCompactionMarker(compactionDesc); + } + } + it.remove(); + } } requestCount.add(mutations.size()); if (!region.getRegionInfo().isMetaTable()) { cacheFlusher.reclaimMemStoreMemory(); } - return region.batchReplay(mArray); + return region.batchReplay(mutations.toArray( + new HLogSplitter.MutationReplay[mutations.size()])); } finally { long after = EnvironmentEdgeManager.currentTimeMillis(); if (batchContainsPuts) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 545c253..59204b3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRespo import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; @@ -1440,19 +1441,41 @@ public class HLogSplitter { HConnection hconn = this.getConnectionByTableName(table); for (KeyValue kv : kvs) { - // filtering HLog meta entries - // We don't handle HBASE-2231 because we may or may not replay a compaction event. - // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143& - // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143 + byte[] row = kv.getRow(); + byte[] family = kv.getFamily(); + boolean isCompactionEntry = false; + if (kv.matchingFamily(WALEdit.METAFAMILY)) { - skippedKVs.add(kv); - continue; + CompactionDescriptor compaction = WALEdit.getCompaction(kv); + if (compaction != null && compaction.hasRegionName()) { + try { + byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName() + .toByteArray()); + row = regionName[1]; // startKey of the region + family = compaction.getFamilyName().toByteArray(); + isCompactionEntry = true; + } catch (Exception ex) { + LOG.warn("Unexpected exception received, ignoring " + ex); + skippedKVs.add(kv); + continue; + } + } else { + skippedKVs.add(kv); + continue; + } } try { loc = - locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(), + locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row, encodeRegionNameStr); + // skip replaying the compaction if the region is gone + if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase( + loc.getRegionInfo().getEncodedName())) { + LOG.info("Not replaying a compaction marker for an older region: " + + encodeRegionNameStr); + needSkip = true; + } } catch (TableNotFoundException ex) { // table has been deleted so skip edits of the table LOG.info("Table " + table + " doesn't exist. Skip log replay for region " @@ -1481,7 +1504,7 @@ public class HLogSplitter { regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName()); } if (maxStoreSequenceIds != null) { - Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily()); + Long maxStoreSeqId = maxStoreSequenceIds.get(family); if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) { // skip current kv if column family doesn't exist anymore or already flushed skippedKVs.add(kv); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 684f78c..ff3f3e6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -263,7 +263,7 @@ public class HLogUtil { */ public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info, final CompactionDescriptor c, AtomicLong sequenceId) throws IOException { - WALEdit e = WALEdit.createCompaction(c); + WALEdit e = WALEdit.createCompaction(info, c); long now = EnvironmentEdgeManager.currentTimeMillis(); TableName tn = TableName.valueOf(c.getTableName().toByteArray()); long txid = log.appendNoSync(info, tn, e, new ArrayList(), now, htd, sequenceId, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index d4b36a6..fd223a4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -32,6 +32,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.util.Bytes; @@ -144,6 +146,7 @@ public class WALEdit implements Writable, HeapSize { return result; } + @Override public void readFields(DataInput in) throws IOException { kvs.clear(); if (scopes != null) { @@ -179,6 +182,7 @@ public class WALEdit implements Writable, HeapSize { } } + @Override public void write(DataOutput out) throws IOException { LOG.warn("WALEdit is being serialized to writable - only expected in test code"); out.writeInt(VERSION_2); @@ -221,6 +225,7 @@ public class WALEdit implements Writable, HeapSize { return kvs.size(); } + @Override public long heapSize() { long ret = ClassSize.ARRAYLIST; for (KeyValue kv : kvs) { @@ -234,6 +239,7 @@ public class WALEdit implements Writable, HeapSize { return ret; } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -254,19 +260,29 @@ public class WALEdit implements Writable, HeapSize { * @param c * @return A WALEdit that has c serialized as its value */ - public static WALEdit createCompaction(final CompactionDescriptor c) { + public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) { byte [] pbbytes = c.toByteArray(); - KeyValue kv = new KeyValue(METAROW, METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes); + KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes); return new WALEdit().add(kv); //replication scope null so that this won't be replicated } + private static byte[] getRowForRegion(HRegionInfo hri) { + byte[] startKey = hri.getStartKey(); + if (startKey.length == 0) { + // empty row key is not allowed in mutations because it is both the start key and the end key + // we return the smallest byte[] that is bigger (in lex comparison) than byte[0]. + return new byte[] {0}; + } + return startKey; + } + /** * Deserialized and returns a CompactionDescriptor is the KeyValue contains one. * @param kv the key value * @return deserialized CompactionDescriptor or null. */ - public static CompactionDescriptor getCompaction(KeyValue kv) throws IOException { - if (kv.matchingRow(METAROW) && kv.matchingColumn(METAFAMILY, COMPACTION)) { + public static CompactionDescriptor getCompaction(Cell kv) throws IOException { + if (CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) { return CompactionDescriptor.parseFrom(kv.getValue()); } return null; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index 3845d6c..966e1ee 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -233,8 +233,6 @@ public class WALEditsReplaySink { List kvs = edit.getKeyValues(); for (KeyValue kv : kvs) { // filtering HLog meta entries - if (kv.matchingFamily(WALEdit.METAFAMILY)) continue; - setLocation(conn.locateRegion(tableName, kv.getRow())); skip = true; break; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index cc84600..26d08c0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,6 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -42,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hdfs.DFSClient; @@ -53,6 +57,8 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Lists; + /** * Test for the case where a regionserver going down has enough cycles to do damage to regions * that have actually been assigned elsehwere. @@ -206,7 +212,8 @@ public class TestIOFencing { @Ignore("See HBASE-10298") @Test public void testFencingAroundCompaction() throws Exception { - doTest(BlockCompactionsInPrepRegion.class); + doTest(BlockCompactionsInPrepRegion.class, false); + doTest(BlockCompactionsInPrepRegion.class, true); } /** @@ -218,12 +225,13 @@ public class TestIOFencing { @Ignore("See HBASE-10298") @Test public void testFencingAroundCompactionAfterWALSync() throws Exception { - doTest(BlockCompactionsInCompletionRegion.class); + doTest(BlockCompactionsInCompletionRegion.class, false); + doTest(BlockCompactionsInCompletionRegion.class, true); } - public void doTest(Class regionClass) throws Exception { + public void doTest(Class regionClass, boolean distributedLogReplay) throws Exception { Configuration c = TEST_UTIL.getConfiguration(); - c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); + c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay); // Insert our custom region c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class); c.setBoolean("dfs.support.append", true); @@ -256,6 +264,16 @@ public class TestIOFencing { // Load some rows TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); + // add a compaction from an older (non-existing) region to see whether we successfully skip + // those entries + HRegionInfo oldHri = new HRegionInfo(table.getName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri, + FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")), + new Path("store_dir")); + HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(), + oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100)); + // Wait till flush has happened, otherwise there won't be multiple store files long startWaitTime = System.currentTimeMillis(); while (compactingRegion.getLastFlushTime() <= lastFlushTime || @@ -273,18 +291,24 @@ public class TestIOFencing { compactingRegion.waitForCompactionToBlock(); LOG.info("Starting a new server"); RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); - HRegionServer newServer = newServerThread.getRegionServer(); + final HRegionServer newServer = newServerThread.getRegionServer(); LOG.info("Killing region server ZK lease"); TEST_UTIL.expireRegionServerSession(0); CompactionBlockerRegion newRegion = null; startWaitTime = System.currentTimeMillis(); - while (newRegion == null) { - LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); - Thread.sleep(1000); - newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); - assertTrue("Timed out waiting for new server to open region", - System.currentTimeMillis() - startWaitTime < 300000); - } + LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); + + // wait for region to be assigned and to go out of log replay if applicable + Waiter.waitFor(c, 60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + HRegion newRegion = newServer.getOnlineRegion(REGION_NAME); + return newRegion != null && !newRegion.isRecovering(); + } + }); + + newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); + LOG.info("Allowing compaction to proceed"); compactingRegion.allowCompactions(); while (compactingRegion.compactCount == 0) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 52713ed..c88f996 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -605,11 +605,13 @@ public class TestHRegion { long time = System.nanoTime(); WALEdit edit = null; if (i == maxSeqId) { - edit = WALEdit.createCompaction(CompactionDescriptor.newBuilder() + edit = WALEdit.createCompaction(region.getRegionInfo(), + CompactionDescriptor.newBuilder() .setTableName(ByteString.copyFrom(tableName.getName())) .setFamilyName(ByteString.copyFrom(regionName)) .setEncodedRegionName(ByteString.copyFrom(regionName)) .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString()))) + .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName())) .build()); } else { edit = new WALEdit(); @@ -701,7 +703,8 @@ public class TestHRegion { long time = System.nanoTime(); writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time, - HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(compactionDescriptor))); + HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(), + compactionDescriptor))); writer.close(); // close the region now, and reopen again diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 5c0f710..f67ab1f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.TableName; @@ -491,10 +492,12 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testCompactionWALEdits() throws Exception { WALProtos.CompactionDescriptor compactionDescriptor = WALProtos.CompactionDescriptor.getDefaultInstance(); - WALEdit edit = WALEdit.createCompaction(compactionDescriptor); + HRegionInfo hri = new HRegionInfo(htable1.getName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); Replication.scopeWALEdits(htable1.getTableDescriptor(), new HLogKey(), edit); } - + /** * Test for HBASE-8663 * Create two new Tables with colfamilies enabled for replication then run