diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 756faa9..488c572 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -193,12 +193,31 @@ int bqIndex = -1; List bqSelection = null; + List bqTtlExpired = null; + long cfTtl = this.storeConfigInfo.getStoreFileTtl(); + if (cfTtl != Long.MAX_VALUE) { + cfTtl = EnvironmentEdgeManager.currentTime() - cfTtl; + } int stripeCount = stripes.size(); long bqTotalSize = -1; for (int i = 0; i < stripeCount; ++i) { + List ttlExpired = new ArrayList(); + List candidates; + if (cfTtl != Long.MAX_VALUE) { + candidates = new ArrayList(); + for(HStoreFile storeFile : stripes.get(i)){ + if (storeFile.getReader().getMaxTimestamp() < cfTtl) { // TTL expired + ttlExpired.add(storeFile); + } else { + candidates.add(storeFile); + } + } + } else { + candidates = stripes.get(i); + } // If we want to compact L0 to drop deletes, we only want whole-stripe compactions. // So, pass includeL0 as 2nd parameter to indicate that. - List selection = selectSimpleCompaction(stripes.get(i), + List selection = selectSimpleCompaction(candidates, !canDropDeletesWithoutL0 && includeL0, isOffpeak); if (selection.isEmpty()) continue; long size = 0; @@ -208,6 +227,7 @@ if (bqSelection == null || selection.size() > bqSelection.size() || (selection.size() == bqSelection.size() && size < bqTotalSize)) { bqSelection = selection; + bqTtlExpired = ttlExpired; bqIndex = i; bqTotalSize = size; } @@ -220,7 +240,8 @@ // See if we can, and need to, split this stripe. int targetCount = 1; long targetKvs = Long.MAX_VALUE; - boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size(); + boolean hasAllFiles = filesToCompact.size() == + stripes.get(bqIndex).size() - bqTtlExpired.size(); String splitString = ""; if (hasAllFiles && bqTotalSize >= config.getSplitSize()) { if (includeL0) { @@ -240,6 +261,7 @@ + filesToCompact.size() + " files of total size " + bqTotalSize + splitString); // See if we can drop deletes. + filesToCompact.addAll(bqTtlExpired); StripeCompactionRequest req; if (includeL0) { assert hasAllFiles; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index aee3dc6..0c658f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -164,6 +164,7 @@ conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4); conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits StoreConfigInformation sci = mock(StoreConfigInformation.class); + when(sci.getStoreFileTtl()).thenReturn(defaultTtl); StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) { @Override @@ -205,6 +206,12 @@ si = createStripesWithSizes(0, 0, new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L }); verifySingleStripeCompaction(policy, si, 1, null); + + // should include ttl expired file + si = createStripesWithSizes(0, 0, new Long[] { -100L, 1L, 1L, 1L}, new Long[] { 3L, 2L, 2L }); + StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); + assertEquals(4, scr.getRequest().getFiles().size()); + // Verify max count is respected. si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L }); List sfs = si.getStripes().get(1).subList(1, 5); @@ -762,6 +769,11 @@ anyBoolean())).thenReturn(mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty()); + if(size < 0){ + when(sf.getReader().getMaxTimestamp()).thenReturn(-1L); + } else { + when(sf.getReader().getMaxTimestamp()).thenReturn(Long.MAX_VALUE); + } return sf; }