diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9549a13..47a1905 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4130,11 +4130,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi continue; } } + boolean checkRowWithinBoundary = false; // Check this edit is for this region. if (!Bytes.equals(key.getEncodedRegionName(), this.getRegionInfo().getEncodedNameAsBytes())) { - skippedEdits++; - continue; + checkRowWithinBoundary = true; } boolean flush = false; @@ -4142,11 +4142,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Check this edit is for me. Also, guard against writing the special // METACOLUMN info such as HBASE::CACHEFLUSH entries if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { - //this is a special edit, we should handle it CompactionDescriptor compaction = WALEdit.getCompaction(cell); if (compaction != null) { - //replay the compaction - replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE); + // replay the compaction + // if region names don't match, skipp replaying compaction marker + replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE, + !checkRowWithinBoundary); } skippedEdits++; continue; @@ -4162,6 +4163,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi skippedEdits++; continue; } + if (checkRowWithinBoundary && !rowIsInRange(this.getRegionInfo(), + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) { + LOG.warn("Row of " + cell + " is not within region boundary"); + skippedEdits++; + continue; + } // Now, figure if we should skip this edit. if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily() .getName())) { @@ -4230,10 +4237,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * See HBASE-2331. */ void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, - boolean removeFiles, long replaySeqId) + boolean removeFiles, long replaySeqId, boolean throwExIfRegionNamesMismatch) throws IOException { - checkTargetRegion(compaction.getEncodedRegionName().toByteArray(), - "Compaction marker from WAL ", compaction); + if (!checkTargetRegion(compaction.getEncodedRegionName().toByteArray(), + "Compaction marker from WAL ", compaction, throwExIfRegionNamesMismatch)) { + return; + } synchronized (writestate) { if (replaySeqId < lastReplayedOpenRegionSeqId) { @@ -4284,7 +4293,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException { checkTargetRegion(flush.getEncodedRegionName().toByteArray(), - "Flush marker from WAL ", flush); + "Flush marker from WAL ", flush, true); if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return; // if primary nothing to do @@ -4678,7 +4687,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException { checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(), - "RegionEvent marker from WAL ", regionEvent); + "RegionEvent marker from WAL ", regionEvent, true); startRegionOperation(Operation.REPLAY_EVENT); try { @@ -4794,7 +4803,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException { checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(), - "BulkLoad marker from WAL ", bulkLoadEvent); + "BulkLoad marker from WAL ", bulkLoadEvent, true); if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return; // if primary nothing to do @@ -5005,18 +5014,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** Checks whether the given regionName is either equal to our region, or that * the regionName is the primary region to our corresponding range for the secondary replica. */ - private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload) - throws WrongRegionException { + private boolean checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload, + boolean throwExIfRegionNamesMismatch) throws WrongRegionException { if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) { - return; + return true; } if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) && Bytes.equals(encodedRegionName, this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) { - return; + return true; } + if (!throwExIfRegionNamesMismatch) { + return false; + } throw new WrongRegionException(exceptionMsg + payload + " targetted for region " + Bytes.toStringBinary(encodedRegionName) + " does not match this region: " + this.getRegionInfo()); @@ -6590,6 +6602,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi (Bytes.compareTo(info.getEndKey(), row) > 0)); } + public static boolean rowIsInRange(HRegionInfo info, final byte [] row, final int offset, + final short length) { + return ((info.getStartKey().length == 0) || + (Bytes.compareTo(info.getStartKey(), 0, info.getStartKey().length, + row, offset, length) <= 0)) && + ((info.getEndKey().length == 0) || + (Bytes.compareTo(info.getEndKey(), 0, info.getEndKey().length, row, offset, length) > 0)); + } + /** * Merge two HRegions. The regions must be adjacent and must not overlap. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 75705e6..271d0e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -899,7 +899,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // replay the compaction. Remove the files from stores only if we are the primary // region replica (thus own the files) hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, - replaySeqId); + replaySeqId, true); continue; } FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 9dbeed7..db35a68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -2554,6 +2554,18 @@ public class HBaseFsck extends Configured implements Closeable { continue; } + if (src.getName().equals(HRegionFileSystem.REGION_SPLITS_DIR)) { + continue; + } + + if (src.getName().equals(HRegionFileSystem.REGION_MERGES_DIR)) { + continue; + } + + if (src.getName().equals(".tmp")) { + continue; + } + LOG.info("[" + thread + "] Moving files from " + src + " into containing region " + dst); // FileSystem.rename is inconsistent with directories -- if the // dst (foo/a) exists and is a dir, and the src (foo/b) is a dir, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index c59d6f7..42e324b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -368,7 +368,7 @@ public class TestHRegionReplayEvents { // after replay verify that everything is still visible verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); } else if (compactionDesc != null) { - secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE); + secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE,true); // assert that the compaction is applied for (Store store : secondaryRegion.getStores()) { @@ -398,6 +398,128 @@ public class TestHRegionReplayEvents { } } + @Test + // When region name from compaction marker doesn't match current region's name, skip replaying it + public void testReplayFlushesAndSkipCompactions() throws IOException { + byte[] secondaryEncodedNameAsBytes = secondaryRegion.getRegionInfo().getEncodedNameAsBytes(); + byte[] fakeEncodedNameAsBytes = new byte [secondaryEncodedNameAsBytes.length]; + LOG.info("secondaryRegion = " + secondaryEncodedNameAsBytes); + for (int i=0; i < secondaryEncodedNameAsBytes.length; i++) { + // Mix the byte array to have a new encodedName + fakeEncodedNameAsBytes[i] = (byte) (secondaryEncodedNameAsBytes[i] + 1); + } + + LOG.debug("fakeEncodedNameAsBytes = " + fakeEncodedNameAsBytes); + + // initiate a secondary region with some data. + + // load some data to primary and flush. 3 flushes and some more unflushed data + putDataWithFlushes(primaryRegion, 100, 300, 100); + + // compaction from primary + LOG.info("-- Compacting primary, only 1 store"); + primaryRegion.compactStore(Bytes.toBytes("cf1"), + NoLimitCompactionThroughputController.INSTANCE); + + // now replay the edits and the flush marker + reader = createWALReaderForPrimary(); + + LOG.info("-- Replaying edits and flush events in secondary"); + int lastReplayed = 0; + int expectedStoreFileCount = 0; + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flushDesc + = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + CompactionDescriptor compactionDesc + = WALEdit.getCompaction(entry.getEdit().getCells().get(0)); + if (flushDesc != null) { + // first verify that everything is replayed and visible before flush event replay + verifyData(secondaryRegion, 0, lastReplayed, cq, families); + Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); + long storeMemstoreSize = store.getMemStoreSize(); + long regionMemstoreSize = secondaryRegion.getMemstoreSize(); + long storeFlushableSize = store.getFlushableSize(); + long storeSize = store.getSize(); + long storeSizeUncompressed = store.getStoreSizeUncompressed(); + if (flushDesc.getAction() == FlushAction.START_FLUSH) { + LOG.info("-- Replaying flush start in secondary"); + PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); + assertNull(result.result); + assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber()); + + // assert that the store memstore is smaller now + long newStoreMemstoreSize = store.getMemStoreSize(); + LOG.info("Memstore size reduced by:" + + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); + assertTrue(storeMemstoreSize > newStoreMemstoreSize); + } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { + LOG.info("-- Replaying flush commit in secondary"); + secondaryRegion.replayWALFlushCommitMarker(flushDesc); + + // assert that the flush files are picked + expectedStoreFileCount++; + for (Store s : secondaryRegion.getStores()) { + assertEquals(expectedStoreFileCount, s.getStorefilesCount()); + } + long newFlushableSize = store.getFlushableSize(); + assertTrue(storeFlushableSize > newFlushableSize); + + // assert that the region memstore is smaller now + long newRegionMemstoreSize = secondaryRegion.getMemstoreSize(); + assertTrue(regionMemstoreSize > newRegionMemstoreSize); + + // assert that the store sizes are bigger + assertTrue(store.getSize() > storeSize); + assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed); + assertEquals(store.getSize(), store.getStorefilesSize()); + } + // after replay verify that everything is still visible + verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); + } else if (compactionDesc != null) { + CompactionDescriptor compactionDesc2 = CompactionDescriptor.newBuilder() + .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName())) + .setEncodedRegionName(ByteString.copyFrom(fakeEncodedNameAsBytes)) + .setFamilyName(ByteString.copyFromUtf8("cf1")) + .addAllCompactionInput(compactionDesc.getCompactionInputList()) + .addAllCompactionOutput(compactionDesc.getCompactionOutputList()) + .setStoreHomeDir(compactionDesc.getStoreHomeDir()) + .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName())) + .build(); + secondaryRegion.replayWALCompactionMarker(compactionDesc2, true, false, Long.MAX_VALUE, + false); + + // assert that the compaction is skipped + for (Store store : secondaryRegion.getStores()) { + if (store.getColumnFamilyName().equals("cf1")) { + assertEquals(3, store.getStorefilesCount()); + } else { + assertEquals(expectedStoreFileCount, store.getStorefilesCount()); + } + } + } else { + lastReplayed = replayEdit(secondaryRegion, entry);; + } + } + + assertEquals(400-1, lastReplayed); + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, 400, cq, families); + + LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted"); + verifyData(primaryRegion, 0, lastReplayed, cq, families); + for (Store store : primaryRegion.getStores()) { + if (store.getColumnFamilyName().equals("cf1")) { + assertEquals(1, store.getStorefilesCount()); + } else { + assertEquals(expectedStoreFileCount, store.getStorefilesCount()); + } + } + } + /** * Tests cases where we prepare a flush with some seqId and we receive other flush start markers * equal to, greater or less than the previous flush start marker. @@ -1546,7 +1668,7 @@ public class TestHRegionReplayEvents { .setStoreHomeDir("/store_home_dir") .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName())) .build() - , true, true, Long.MAX_VALUE); + , true, true, Long.MAX_VALUE, true); } @Test