diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index c58d1e5..7277429 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -2495,6 +2495,7 @@ public final class ProtobufUtil { for (Path outputPath : outputPaths) { builder.addCompactionOutput(outputPath.getName()); } + builder.setRegionName(HBaseZeroCopyByteString.wrap(info.getRegionName())); return builder.build(); } diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index efea2ba..c6d6a19 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -3725,6 +3725,24 @@ public final class WALProtos { */ com.google.protobuf.ByteString getStoreHomeDirBytes(); + + // optional bytes region_name = 7; + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + boolean hasRegionName(); + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + com.google.protobuf.ByteString getRegionName(); } /** * Protobuf type {@code CompactionDescriptor} @@ -3821,6 +3839,11 @@ public final class WALProtos { storeHomeDir_ = input.readBytes(); break; } + case 58: { + bitField0_ |= 0x00000010; + regionName_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -4026,6 +4049,30 @@ public final class WALProtos { } } + // optional bytes region_name = 7; + public static final int REGION_NAME_FIELD_NUMBER = 7; + private com.google.protobuf.ByteString regionName_; + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + public boolean hasRegionName() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes region_name = 7; + * + *
+     * full region name
+     * 
+ */ + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + private void initFields() { tableName_ = com.google.protobuf.ByteString.EMPTY; encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; @@ -4033,6 +4080,7 @@ public final class WALProtos { compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY; compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY; storeHomeDir_ = ""; + regionName_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4080,6 +4128,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(6, getStoreHomeDirBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(7, regionName_); + } getUnknownFields().writeTo(output); } @@ -4123,6 +4174,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeBytesSize(6, getStoreHomeDirBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(7, regionName_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4170,6 +4225,11 @@ public final class WALProtos { result = result && getStoreHomeDir() .equals(other.getStoreHomeDir()); } + result = result && (hasRegionName() == other.hasRegionName()); + if (hasRegionName()) { + result = result && getRegionName() + .equals(other.getRegionName()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -4207,6 +4267,10 @@ public final class WALProtos { hash = (37 * hash) + STORE_HOME_DIR_FIELD_NUMBER; hash = (53 * hash) + getStoreHomeDir().hashCode(); } + if (hasRegionName()) { + hash = (37 * hash) + REGION_NAME_FIELD_NUMBER; + hash = (53 * hash) + getRegionName().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -4336,6 +4400,8 @@ public final class WALProtos { bitField0_ = (bitField0_ & ~0x00000010); storeHomeDir_ = ""; bitField0_ = (bitField0_ & ~0x00000020); + regionName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -4392,6 +4458,10 @@ public final class WALProtos { to_bitField0_ |= 0x00000008; } result.storeHomeDir_ = storeHomeDir_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000010; + } + result.regionName_ = regionName_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -4442,6 +4512,9 @@ public final class WALProtos { storeHomeDir_ = other.storeHomeDir_; onChanged(); } + if (other.hasRegionName()) { + setRegionName(other.getRegionName()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -4869,6 +4942,58 @@ public final class WALProtos { return this; } + // optional bytes region_name = 7; + private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public boolean hasRegionName() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public Builder setRegionName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000040; + regionName_ = value; + onChanged(); + return this; + } + /** + * optional bytes region_name = 7; + * + *
+       * full region name
+       * 
+ */ + public Builder clearRegionName() { + bitField0_ = (bitField0_ & ~0x00000040); + regionName_ = getDefaultInstance().getRegionName(); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:CompactionDescriptor) } @@ -5275,15 +5400,16 @@ public final class WALProtos { "ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" + "\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number", "\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n" + - "\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Compac" + + "\nscope_type\030\002 \002(\0162\n.ScopeType\"\276\001\n\024Compac" + "tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" + "coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" + " \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" + "tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" + - "\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICAT" + - "ION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_G" + - "LOBAL\020\001B?\n*org.apache.hadoop.hbase.proto" + - "buf.generatedB\tWALProtosH\001\210\001\000\240\001\001" + "\t\022\023\n\013region_name\030\007 \001(\014\"\014\n\nWALTrailer*F\n\t" + + "ScopeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034" + + "\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apac" + + "he.hadoop.hbase.protobuf.generatedB\tWALP", + "rotosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5313,7 +5439,7 @@ public final class WALProtos { internal_static_CompactionDescriptor_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CompactionDescriptor_descriptor, - new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", }); + new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", "RegionName", }); internal_static_WALTrailer_descriptor = getDescriptor().getMessageTypes().get(4); internal_static_WALTrailer_fieldAccessorTable = new diff --git hbase-protocol/src/main/protobuf/WAL.proto hbase-protocol/src/main/protobuf/WAL.proto index 0ae65ec..88e94f4 100644 --- hbase-protocol/src/main/protobuf/WAL.proto +++ hbase-protocol/src/main/protobuf/WAL.proto @@ -92,6 +92,7 @@ message CompactionDescriptor { repeated string compaction_input = 4; repeated string compaction_output = 5; required string store_home_dir = 6; + optional bytes region_name = 7; // full region name } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 93eada8..7c940ad 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1625,7 +1625,7 @@ public class HRegion implements HeapSize { // , Writable{ */ boolean shouldFlush() { // This is a rough measure. - if (this.lastFlushSeqId > 0 + if (this.lastFlushSeqId > 0 && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) { return true; } @@ -2195,7 +2195,7 @@ public class HRegion implements HeapSize { // , Writable{ public boolean isInReplay() { return false; } - + @Override public long getReplaySequenceId() { return 0; @@ -2234,7 +2234,7 @@ public class HRegion implements HeapSize { // , Writable{ public boolean isInReplay() { return true; } - + @Override public long getReplaySequenceId() { return this.replaySeqId; @@ -3295,7 +3295,7 @@ public class HRegion implements HeapSize { // , Writable{ firstSeqIdInLog = key.getLogSeqNum(); } currentEditSeqId = key.getLogSeqNum(); - currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? + currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() : currentEditSeqId; boolean flush = false; for (KeyValue kv: val.getKeyValues()) { @@ -6243,7 +6243,7 @@ public class HRegion implements HeapSize { // , Writable{ WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells); return key; } - + /** * Explictly sync wal * @throws IOException diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index b84f9a2..26233b5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -26,8 +26,10 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -140,6 +142,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; @@ -640,25 +643,37 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private OperationStatus [] doReplayBatchOp(final HRegion region, final List mutations, long replaySeqId) throws IOException { - HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); boolean batchContainsPuts = false, batchContainsDelete = false; try { - int i = 0; - for (HLogSplitter.MutationReplay m : mutations) { + for (Iterator it = mutations.iterator(); it.hasNext();) { + HLogSplitter.MutationReplay m = it.next(); + if (m.type == MutationType.PUT) { batchContainsPuts = true; } else { batchContainsDelete = true; } - mArray[i++] = m; + + NavigableMap> map = m.mutation.getFamilyCellMap(); + List metaCells = map.get(WALEdit.METAFAMILY); + if (metaCells != null && !metaCells.isEmpty()) { + for (Cell metaCell : metaCells) { + CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); + if (compactionDesc != null) { + region.completeCompactionMarker(compactionDesc); + } + } + it.remove(); + } } requestCount.add(mutations.size()); if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } - return region.batchReplay(mArray, replaySeqId); + return region.batchReplay(mutations.toArray( + new HLogSplitter.MutationReplay[mutations.size()]), replaySeqId); } finally { if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTimeMillis(); @@ -1355,7 +1370,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, walEntries.add(walEntry); } if(edits!=null && !edits.isEmpty()) { - long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? + long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); // check if it's a partial success @@ -1366,7 +1381,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } } - + //sync wal at the end because ASYNC_WAL is used above region.syncWal(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 873e863..de31f24 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -80,7 +80,6 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -92,6 +91,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRespo import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; @@ -241,7 +241,7 @@ public class HLogSplitter { List splits = new ArrayList(); if (logfiles != null && logfiles.length > 0) { for (FileStatus logfile: logfiles) { - HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null, + HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null, RecoveryMode.LOG_SPLITTING); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); @@ -811,6 +811,7 @@ public class HLogSplitter { k.internEncodedRegionName(this.encodedRegionName); } + @Override public long heapSize() { return heapInBuffer; } @@ -825,6 +826,7 @@ public class HLogSplitter { outputSink = sink; } + @Override public void run() { try { doRun(); @@ -1060,6 +1062,7 @@ public class HLogSplitter { TimeUnit.SECONDS, new ThreadFactory() { private int count = 1; + @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "split-log-closeStream-" + count++); return t; @@ -1070,6 +1073,7 @@ public class HLogSplitter { for (final Map.Entry writersEntry : writers.entrySet()) { LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); completionService.submit(new Callable() { + @Override public Void call() throws Exception { WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); LOG.debug("Closing " + wap.p); @@ -1242,6 +1246,7 @@ public class HLogSplitter { return (new WriterAndPath(regionedits, w)); } + @Override void append(RegionEntryBuffer buffer) throws IOException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { @@ -1280,6 +1285,7 @@ public class HLogSplitter { /** * @return a map from encoded region ID to the number of edits written out for that region. */ + @Override Map getOutputCounts() { TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR); synchronized (writers) { @@ -1368,6 +1374,7 @@ public class HLogSplitter { this.logRecoveredEditsOutputSink.setReporter(reporter); } + @Override void append(RegionEntryBuffer buffer) throws IOException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { @@ -1449,19 +1456,40 @@ public class HLogSplitter { HConnection hconn = this.getConnectionByTableName(table); for (KeyValue kv : kvs) { - // filtering HLog meta entries - // We don't handle HBASE-2231 because we may or may not replay a compaction event. - // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143& - // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143 + byte[] row = kv.getRow(); + byte[] family = kv.getFamily(); + boolean isCompactionEntry = false; if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) { - skippedKVs.add(kv); - continue; + CompactionDescriptor compaction = WALEdit.getCompaction(kv); + if (compaction != null && compaction.hasRegionName()) { + try { + byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName() + .toByteArray()); + row = regionName[1]; // startKey of the region + family = compaction.getFamilyName().toByteArray(); + isCompactionEntry = true; + } catch (Exception ex) { + LOG.warn("Unexpected exception received, ignoring " + ex); + skippedKVs.add(kv); + continue; + } + } else { + skippedKVs.add(kv); + continue; + } } try { loc = - locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(), + locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row, encodeRegionNameStr); + // skip replaying the compaction if the region is gone + if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase( + loc.getRegionInfo().getEncodedName())) { + LOG.info("Not replaying a compaction marker for an older region: " + + encodeRegionNameStr); + needSkip = true; + } } catch (TableNotFoundException ex) { // table has been deleted so skip edits of the table LOG.info("Table " + table + " doesn't exist. Skip log replay for region " @@ -1490,7 +1518,7 @@ public class HLogSplitter { regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName()); } if (maxStoreSequenceIds != null) { - Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily()); + Long maxStoreSeqId = maxStoreSequenceIds.get(family); if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) { // skip current kv if column family doesn't exist anymore or already flushed skippedKVs.add(kv); @@ -1768,6 +1796,7 @@ public class HLogSplitter { return result; } + @Override Map getOutputCounts() { TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR); synchronized (writers) { @@ -1922,7 +1951,7 @@ public class HLogSplitter { return new ArrayList(); } - long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? + long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); int count = entry.getAssociatedCellCount(); List mutations = new ArrayList(); @@ -1979,7 +2008,7 @@ public class HLogSplitter { clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); } key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey - .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds, + .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce()); logEntry.setFirst(key); logEntry.setSecond(val); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 6809aad..a0707f7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -262,7 +262,7 @@ public class HLogUtil { final CompactionDescriptor c, AtomicLong sequenceId) throws IOException { TableName tn = TableName.valueOf(c.getTableName().toByteArray()); HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false, null); + log.appendNoSync(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null); log.sync(); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 8ecb4b3..85e2d7c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.HeapSize; @@ -145,6 +146,7 @@ public class WALEdit implements Writable, HeapSize { return result; } + @Override public void readFields(DataInput in) throws IOException { kvs.clear(); if (scopes != null) { @@ -180,6 +182,7 @@ public class WALEdit implements Writable, HeapSize { } } + @Override public void write(DataOutput out) throws IOException { LOG.warn("WALEdit is being serialized to writable - only expected in test code"); out.writeInt(VERSION_2); @@ -222,6 +225,7 @@ public class WALEdit implements Writable, HeapSize { return kvs.size(); } + @Override public long heapSize() { long ret = ClassSize.ARRAYLIST; for (KeyValue kv : kvs) { @@ -235,6 +239,7 @@ public class WALEdit implements Writable, HeapSize { return ret; } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -249,25 +254,36 @@ public class WALEdit implements Writable, HeapSize { sb.append(">]"); return sb.toString(); } - + /** * Create a compacion WALEdit * @param c * @return A WALEdit that has c serialized as its value */ - public static WALEdit createCompaction(final CompactionDescriptor c) { + public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) { byte [] pbbytes = c.toByteArray(); - KeyValue kv = new KeyValue(METAROW, METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes); + KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION, + System.currentTimeMillis(), pbbytes); return new WALEdit().add(kv); //replication scope null so that this won't be replicated } + private static byte[] getRowForRegion(HRegionInfo hri) { + byte[] startKey = hri.getStartKey(); + if (startKey.length == 0) { + // empty row key is not allowed in mutations because it is both the start key and the end key + // we return the smallest byte[] that is bigger (in lex comparison) than byte[0]. + return new byte[] {0}; + } + return startKey; + } + /** * Deserialized and returns a CompactionDescriptor is the KeyValue contains one. * @param kv the key value * @return deserialized CompactionDescriptor or null. */ - public static CompactionDescriptor getCompaction(KeyValue kv) throws IOException { - if (CellUtil.matchingRow(kv, METAROW) && CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) { + public static CompactionDescriptor getCompaction(Cell kv) throws IOException { + if (CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) { return CompactionDescriptor.parseFrom(kv.getValue()); } return null; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index 1c08fac..97e9b86 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -234,8 +233,6 @@ public class WALEditsReplaySink { List kvs = edit.getKeyValues(); for (KeyValue kv : kvs) { // filtering HLog meta entries - if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) continue; - setLocation(conn.locateRegion(tableName, kv.getRow())); skip = true; break; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index b5848a2..7dec203 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,6 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -42,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hdfs.DFSClient; @@ -52,6 +56,8 @@ import org.apache.log4j.Level; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Lists; + /** * Test for the case where a regionserver going down has enough cycles to do damage to regions * that have actually been assigned elsehwere. @@ -215,7 +221,8 @@ public class TestIOFencing { */ @Test public void testFencingAroundCompaction() throws Exception { - doTest(BlockCompactionsInPrepRegion.class); + doTest(BlockCompactionsInPrepRegion.class, false); + doTest(BlockCompactionsInPrepRegion.class, true); } /** @@ -226,12 +233,13 @@ public class TestIOFencing { */ @Test public void testFencingAroundCompactionAfterWALSync() throws Exception { - doTest(BlockCompactionsInCompletionRegion.class); + doTest(BlockCompactionsInCompletionRegion.class, false); + doTest(BlockCompactionsInCompletionRegion.class, true); } - public void doTest(Class regionClass) throws Exception { + public void doTest(Class regionClass, boolean distributedLogReplay) throws Exception { Configuration c = TEST_UTIL.getConfiguration(); - c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); + c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay); // Insert our custom region c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class); c.setBoolean("dfs.support.append", true); @@ -264,6 +272,16 @@ public class TestIOFencing { // Load some rows TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); + // add a compaction from an older (non-existing) region to see whether we successfully skip + // those entries + HRegionInfo oldHri = new HRegionInfo(table.getName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri, + FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")), + new Path("store_dir")); + HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(), + oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100)); + // Wait till flush has happened, otherwise there won't be multiple store files long startWaitTime = System.currentTimeMillis(); while (compactingRegion.getLastFlushTime() <= lastFlushTime || @@ -281,18 +299,24 @@ public class TestIOFencing { compactingRegion.waitForCompactionToBlock(); LOG.info("Starting a new server"); RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); - HRegionServer newServer = newServerThread.getRegionServer(); + final HRegionServer newServer = newServerThread.getRegionServer(); LOG.info("Killing region server ZK lease"); TEST_UTIL.expireRegionServerSession(0); CompactionBlockerRegion newRegion = null; startWaitTime = System.currentTimeMillis(); - while (newRegion == null) { - LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); - Thread.sleep(1000); - newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); - assertTrue("Timed out waiting for new server to open region", - System.currentTimeMillis() - startWaitTime < 300000); - } + LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); + + // wait for region to be assigned and to go out of log replay if applicable + Waiter.waitFor(c, 60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + HRegion newRegion = newServer.getOnlineRegion(REGION_NAME); + return newRegion != null && !newRegion.isRecovering(); + } + }); + + newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); + LOG.info("Allowing compaction to proceed"); compactingRegion.allowCompactions(); while (compactingRegion.compactCount == 0) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 8a588e7..15e530a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -657,11 +657,13 @@ public class TestHRegion { long time = System.nanoTime(); WALEdit edit = null; if (i == maxSeqId) { - edit = WALEdit.createCompaction(CompactionDescriptor.newBuilder() + edit = WALEdit.createCompaction(region.getRegionInfo(), + CompactionDescriptor.newBuilder() .setTableName(ByteString.copyFrom(tableName.getName())) .setFamilyName(ByteString.copyFrom(regionName)) .setEncodedRegionName(ByteString.copyFrom(regionName)) .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString()))) + .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName())) .build()); } else { edit = new WALEdit(); @@ -753,7 +755,8 @@ public class TestHRegion { long time = System.nanoTime(); writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time, - HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(compactionDescriptor))); + HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(), + compactionDescriptor))); writer.close(); // close the region now, and reopen again diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 5c0f710..f67ab1f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.TableName; @@ -491,10 +492,12 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testCompactionWALEdits() throws Exception { WALProtos.CompactionDescriptor compactionDescriptor = WALProtos.CompactionDescriptor.getDefaultInstance(); - WALEdit edit = WALEdit.createCompaction(compactionDescriptor); + HRegionInfo hri = new HRegionInfo(htable1.getName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); Replication.scopeWALEdits(htable1.getTableDescriptor(), new HLogKey(), edit); } - + /** * Test for HBASE-8663 * Create two new Tables with colfamilies enabled for replication then run