diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index ad40734..7df55b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -205,12 +205,12 @@ (KeyBeforeConcatenatedLists.Iterator)candidateFiles; assert original != null; ArrayList> components = original.getComponents(); - for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) { + // Entries are ordered as such: L0, then stripes in reverse order. We never remove + // level 0; we remove the stripe, and all subsequent ones, as soon as we find the + // first one that cannot possibly have better candidates. + for (int firstIrrelevant = 1; firstIrrelevant < components.size(); firstIrrelevant++) { HStoreFile sf = components.get(firstIrrelevant).get(0); byte[] endKey = endOf(sf); - // Entries are ordered as such: L0, then stripes in reverse order. We never remove - // level 0; we remove the stripe, and all subsequent ones, as soon as we find the - // first one that cannot possibly have better candidates. if (!isInvalid(endKey) && !isOpen(endKey) && (nonOpenRowCompare(targetKey, endKey) >= 0)) { original.removeComponents(firstIrrelevant); @@ -734,10 +734,7 @@ this.results = results; // Do logical processing. if (!isFlush) removeCompactedFiles(); - TreeMap newStripes = processResults(); - if (newStripes != null) { - processNewCandidateStripes(newStripes); - } + processResults(); // Create new state and update parent. State state = createNewState(false); StripeStoreFileManager.this.state = state; @@ -831,38 +828,57 @@ * or to the list of new candidate stripes. * @return New candidate stripes. */ - private TreeMap processResults() throws IOException { - TreeMap newStripes = null; + private void processResults() throws IOException { + if (this.stripeFiles.isEmpty() && isFlush && !config.isUsingL0Flush()) { + processNewCandidateStripes(); + return; + } for (HStoreFile sf : this.results) { byte[] startRow = startOf(sf), endRow = endOf(sf); - if (isInvalid(endRow) || isInvalid(startRow)) { - if (!isFlush) { - LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath()); - } + if (this.stripeFiles.isEmpty() || isInvalid(endRow) || isInvalid(startRow)) { + LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath()); insertFileIntoStripe(getLevel0Copy(), sf); this.l0Results.add(sf); - continue; - } - if (!this.stripeFiles.isEmpty()) { + } else { int stripeIndex = findStripeIndexByEndRow(endRow); if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) { // Simple/common case - add file to an existing stripe. insertFileIntoStripe(getStripeCopy(stripeIndex), sf); - continue; + } else { + insertFileIntoStripe(getLevel0Copy(), sf); + this.l0Results.add(sf); } } - - // Make a new candidate stripe. - if (newStripes == null) { - newStripes = new TreeMap<>(MAP_COMPARATOR); - } - HStoreFile oldSf = newStripes.put(endRow, sf); - if (oldSf != null) { - throw new IOException("Compactor has produced multiple files for the stripe ending in [" - + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath()); - } } - return newStripes; + } + + private void processNewCandidateStripes() throws IOException { + this.stripeEndRows = new ArrayList<>(Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows)); + byte[] previousEndRow = null; + int insertAt = 0; + for (HStoreFile sf : this.results) { + byte[] startRow = startOf(sf), endRow = endOf(sf); + if (isInvalid(endRow) || isInvalid(startRow)) { // just for test case + LOG.warn("The newly flushed file doesn't have stripes set: " + sf.getPath()); + insertFileIntoStripe(getLevel0Copy(), sf); + this.l0Results.add(sf); + continue; + } + if (previousEndRow != null) { + if (!rowEquals(previousEndRow, startRow)) { + throw new IOException("The new stripes produced by flush are not contiguous!"); + } + } + // Add the new stripe. + ArrayList tmp = new ArrayList<>(); + tmp.add(sf); + stripeFiles.add(insertAt, tmp); + previousEndRow = endOf(sf); + if (!isOpen(previousEndRow)) { + stripeEndRows.add(insertAt, previousEndRow); + } + ++insertAt; + } } /** @@ -886,105 +902,6 @@ if (!source.remove(oldFile)) { throw new IOException("An allegedly compacted file [" + oldFile + "] was not found"); } - } - } - - /** - * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with - * new candidate stripes/removes old stripes; produces new set of stripe end rows. - * @param newStripes New stripes - files by end row. - */ - private void processNewCandidateStripes( - TreeMap newStripes) throws IOException { - // Validate that the removed and added aggregate ranges still make for a full key space. - boolean hasStripes = !this.stripeFiles.isEmpty(); - this.stripeEndRows = new ArrayList<>(Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows)); - int removeFrom = 0; - byte[] firstStartRow = startOf(newStripes.firstEntry().getValue()); - byte[] lastEndRow = newStripes.lastKey(); - if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) { - throw new IOException("Newly created stripes do not cover the entire key space."); - } - - boolean canAddNewStripes = true; - Collection filesForL0 = null; - if (hasStripes) { - // Determine which stripes will need to be removed because they conflict with new stripes. - // The new boundaries should match old stripe boundaries, so we should get exact matches. - if (isOpen(firstStartRow)) { - removeFrom = 0; - } else { - removeFrom = findStripeIndexByEndRow(firstStartRow); - if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range."); - ++removeFrom; - } - int removeTo = findStripeIndexByEndRow(lastEndRow); - if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range."); - // See if there are files in the stripes we are trying to replace. - ArrayList conflictingFiles = new ArrayList<>(); - for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) { - conflictingFiles.addAll(this.stripeFiles.get(removeIndex)); - } - if (!conflictingFiles.isEmpty()) { - // This can be caused by two things - concurrent flush into stripes, or a bug. - // Unfortunately, we cannot tell them apart without looking at timing or something - // like that. We will assume we are dealing with a flush and dump it into L0. - if (isFlush) { - long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values()); - LOG.warn("Stripes were created by a flush, but results of size " + newSize - + " cannot be added because the stripes have changed"); - canAddNewStripes = false; - filesForL0 = newStripes.values(); - } else { - long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles); - LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) " - + " of size " + oldSize + " are moved to L0 due to concurrent stripe change"); - filesForL0 = conflictingFiles; - } - if (filesForL0 != null) { - for (HStoreFile sf : filesForL0) { - insertFileIntoStripe(getLevel0Copy(), sf); - } - l0Results.addAll(filesForL0); - } - } - - if (canAddNewStripes) { - // Remove old empty stripes. - int originalCount = this.stripeFiles.size(); - for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) { - if (removeIndex != originalCount - 1) { - this.stripeEndRows.remove(removeIndex); - } - this.stripeFiles.remove(removeIndex); - } - } - } - - if (!canAddNewStripes) return; // Files were already put into L0. - - // Now, insert new stripes. The total ranges match, so we can insert where we removed. - byte[] previousEndRow = null; - int insertAt = removeFrom; - for (Map.Entry newStripe : newStripes.entrySet()) { - if (previousEndRow != null) { - // Validate that the ranges are contiguous. - assert !isOpen(previousEndRow); - byte[] startRow = startOf(newStripe.getValue()); - if (!rowEquals(previousEndRow, startRow)) { - throw new IOException("The new stripes produced by " - + (isFlush ? "flush" : "compaction") + " are not contiguous"); - } - } - // Add the new stripe. - ArrayList tmp = new ArrayList<>(); - tmp.add(newStripe.getValue()); - stripeFiles.add(insertAt, tmp); - previousEndRow = newStripe.getKey(); - if (!isOpen(previousEndRow)) { - stripeEndRows.add(insertAt, previousEndRow); - } - ++insertAt; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java index 2185b83..29a47c3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; @@ -31,6 +30,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -69,8 +69,6 @@ private static final KeyValue KV_A = new KeyValue(KEY_A, 0L); private static final KeyValue KV_B = new KeyValue(KEY_B, 0L); - private static final KeyValue KV_C = new KeyValue(KEY_C, 0L); - private static final KeyValue KV_D = new KeyValue(KEY_D, 0L); @Before public void setUp() throws Exception { @@ -145,24 +143,10 @@ manager.removeCompactedFiles(al(l0File)); // If we want a key <= KEY_A, we should get everything except stripe1. ArrayList sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_A)); - assertEquals(2, sfsDump.size()); + assertEquals(3, sfsDump.size()); assertTrue(sfsDump.contains(stripe0a)); - assertFalse(sfsDump.contains(stripe1)); - // If we want a key <= KEY_B, we should get everything since lower bound is inclusive. - sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_B)); - assertEquals(3, sfsDump.size()); assertTrue(sfsDump.contains(stripe1)); - // For KEY_D, we should also get everything. - sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_D)); - assertEquals(3, sfsDump.size()); - // Suppose in the first file we found candidate with KEY_C. - // Then, stripe0 no longer matters and should be removed, but stripe1 should stay. - sfs = manager.getCandidateFilesForRowKeyBefore(KV_D); - sfs.next(); // Skip L0 file. - sfs.remove(); - sfs = manager.updateCandidateFilesForRowKeyBefore(sfs, KV_D, KV_C); - assertEquals(stripe1, sfs.next()); - assertFalse(sfs.hasNext()); + assertTrue(sfsDump.contains(l0File2)); // Add one more, later, file to stripe0, remove the last annoying L0 file. // This file should be returned in preference to older L0 file; also, after we get // a candidate from the first file, the old one should not be removed. @@ -252,7 +236,7 @@ conf.setFloat(StripeStoreConfig.MAX_REGION_SPLIT_IMBALANCE_KEY, splitRatioToVerify); } StripeStoreFileManager manager = createManager(al(), conf); - manager.addCompactionResults(al(), sfs); + manager.loadFiles(sfs); int result = Bytes.toInt(manager.getSplitPoint().get()); // Either end key and thus positive index, or "middle" of the file and thus negative index. assertEquals(splitPointAfter * (shouldSplitStripe ? -1 : 1), result); @@ -282,7 +266,16 @@ MockHStoreFile sfC = createFile(KEY_B, KEY_C); MockHStoreFile sfD = createFile(KEY_C, KEY_D); MockHStoreFile sfE = createFile(KEY_D, OPEN_KEY); - manager.addCompactionResults(al(), al(sfA, sfB, sfC, sfD, sfE)); + + manager = createManager(); + List storeFiles = new ArrayList(); + storeFiles.add(sf0); + storeFiles.add(sfA); + storeFiles.add(sfB); + storeFiles.add(sfC); + storeFiles.add(sfD); + storeFiles.add(sfE); + manager.loadFiles(storeFiles); verifyGetAndScanScenario(manager, null, null, sf0, sfA, sfB, sfC, sfD, sfE); verifyGetAndScanScenario(manager, keyAfter(KEY_A), null, sf0, sfB, sfC, sfD, sfE); @@ -354,17 +347,17 @@ StripeStoreFileManager manager = createManager(al(createFile(OPEN_KEY, KEY_B), sf)); assertEquals(0, manager.getLevel0Files().size()); // Here, [B, C] is logically [B, inf), so we should be able to compact it to that only. - verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C))); manager.addCompactionResults(al(sf), al(createFile(KEY_B, OPEN_KEY))); manager.removeCompactedFiles(al(sf)); + assertEquals(0, manager.getLevel0Files().size()); // Do the same for other variants. manager = createManager(al(sf, createFile(KEY_C, OPEN_KEY))); - verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C))); manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, KEY_C))); manager.removeCompactedFiles(al(sf)); + assertEquals(0, manager.getLevel0Files().size()); manager = createManager(al(sf)); - verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C))); manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, OPEN_KEY))); + assertEquals(0, manager.getLevel0Files().size()); } @Test @@ -373,15 +366,6 @@ // First, add some L0 files and "compact" one with new stripe creation. HStoreFile sf_L0_0a = createFile(), sf_L0_0b = createFile(); manager.insertNewFiles(al(sf_L0_0a, sf_L0_0b)); - - // Try compacting with invalid new branches (gaps, overlaps) - no effect. - verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B))); - verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B), - createFile(KEY_C, OPEN_KEY))); - verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B), - createFile(KEY_B, OPEN_KEY), createFile(KEY_A, KEY_D))); - verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B), - createFile(KEY_A, KEY_B), createFile(KEY_B, OPEN_KEY))); HStoreFile sf_i2B_0 = createFile(OPEN_KEY, KEY_B); HStoreFile sf_B2C_0 = createFile(KEY_B, KEY_C); @@ -478,8 +462,8 @@ sfm.insertNewFiles(al(sf_i2c_2)); sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i)); sfm.removeCompactedFiles(al(sf_i2c, sf_c2i)); - assertEquals(1, sfm.getLevel0Files().size()); - verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2); + assertEquals(2, sfm.getLevel0Files().size()); + verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i); } @Test @@ -526,30 +510,18 @@ Configuration conf = TEST_UTIL.getConfiguration(); conf.setInt("hbase.hstore.blockingStoreFiles", limit); StripeStoreFileManager sfm = createManager(al(), conf); + List storeFiles = new ArrayList(); for (int i = 0; i < l0Files; ++i) { - sfm.insertNewFiles(al(createFile())); + storeFiles.add(createFile()); } for (int i = 0; i < filesInStripe; ++i) { - ArrayList stripe = new ArrayList<>(); for (int j = 0; j < stripes; ++j) { - stripe.add(createFile( + storeFiles.add(createFile( (j == 0) ? OPEN_KEY : keys[j - 1], (j == stripes - 1) ? OPEN_KEY : keys[j])); } - sfm.addCompactionResults(al(), stripe); } + sfm.loadFiles(storeFiles); assertEquals(expectedPriority, sfm.getStoreCompactionPriority()); - } - - private void verifyInvalidCompactionScenario(StripeStoreFileManager manager, - ArrayList filesToCompact, ArrayList filesToInsert) throws Exception { - Collection allFiles = manager.getStorefiles(); - try { - manager.addCompactionResults(filesToCompact, filesToInsert); - fail("Should have thrown"); - } catch (IOException ex) { - // Ignore it. - } - verifyAllFiles(manager, allFiles); // must have the same files. } private void verifyGetOrScanScenario(StripeStoreFileManager manager, byte[] start, byte[] end,