diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 840085d..dc89854 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -120,6 +120,7 @@ public class HStore implements Store { public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = "hbase.server.compactchecker.interval.multiplier"; public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; + public static final String TIME_TO_PURGE_DELETES_KEY = "hbase.hstore.time.to.purge.deletes"; public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7; @@ -224,7 +225,7 @@ public class HStore implements Store { this.comparator = region.getCellCompartor(); // used by ScanQueryMatcher long timeToPurgeDeletes = - Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); + Math.max(conf.getLong(TIME_TO_PURGE_DELETES_KEY, 0), 0); LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes + "ms in store " + this); // Get TTL diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index da1b084..dfcd00b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -120,6 +120,8 @@ public class StoreFile { // Block cache configuration and reference. private final CacheConfig cacheConf; + // Keep Store config for future use durin compactions + private final Configuration conf; // Keys for metadata stored in backing HFile. // Set when we obtain a Reader. @@ -213,7 +215,7 @@ public class StoreFile { this.fs = fs; this.fileInfo = fileInfo; this.cacheConf = cacheConf; - + this.conf = conf; if (BloomFilterFactory.isGeneralBloomEnabled(conf)) { this.cfBloomType = cfBloomType; } else { @@ -232,9 +234,19 @@ public class StoreFile { this.fileInfo = other.fileInfo; this.cacheConf = other.cacheConf; this.cfBloomType = other.cfBloomType; + this.conf = other.getConfiguration(); } /** + * @return Store's configuration : merge of global - table - cf - cf metadata + */ + + public Configuration getConfiguration() + { + return conf; + } + + /** * @return the StoreFile object associated to this StoreFile. * null if the StoreFile is not a reference. */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index 5aeff5c..aa4ca08 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -28,12 +28,16 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.Bytes; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -300,15 +304,26 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { : now - minTimestamp.longValue(); if (sf.isMajorCompaction() && (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { - float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex( - RSRpcServices.getHostname(comConf.conf, false) - ); - if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { + float blockLocalityIndex = + sf.getHDFSBlockDistribution(). + getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false)); + + boolean keepDeletedCells = getKeepDeletedCellsForStore(sf.getConfiguration()); + long timeToPurgeDeletes = getTimeToPurgeDeletesForStore(sf.getConfiguration()); + boolean purgeDeletes = keepDeletedCells && (now - timeToPurgeDeletes > lowTimestamp); + // If CF KEEP_DELETED_CELL = true and TTL expired we do major compact regardless + // of blockLocalityIndex to remove expired deleted cells + if (blockLocalityIndex < comConf.getMinLocalityToForceCompact() || purgeDeletes) { if (LOG.isDebugEnabled()) { - LOG.debug("Major compaction triggered on only store " + this + + if( !purgeDeletes){ + LOG.debug("Major compaction triggered on only store " + this + "; to make hdfs blocks local, current blockLocalityIndex is " + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + - ")"); + "); keep deleted cells="+keepDeletedCells+"; purgeDeletes="+purgeDeletes); + } else{ + LOG.debug("Major compaction triggered on only store " + this + + "; to purge deleted cells."); + } } result = true; } else { @@ -317,7 +332,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { " because one (major) compacted file only, oldestTime " + oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + - ")"); + "); keep deleted cells="+keepDeletedCells +"; purgeDeletes="+purgeDeletes); } } } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { @@ -337,6 +352,20 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { return result; } + private boolean getKeepDeletedCellsForStore(Configuration conf) { + String value = conf.get(HColumnDescriptor.KEEP_DELETED_CELLS); + if (value != null) { + // toUpperCase for backwards compatibility + return KeepDeletedCells.valueOf(value.toUpperCase()) == KeepDeletedCells.TRUE; + } + return false; + } + + private long getTimeToPurgeDeletesForStore(Configuration conf) + { + return conf.getLong(HStore.TIME_TO_PURGE_DELETES_KEY, 0); + } + /** * Used calculation jitter */ diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestMajorCompactionKeepDeletedCells.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestMajorCompactionKeepDeletedCells.java new file mode 100644 index 0000000..528f77e --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestMajorCompactionKeepDeletedCells.java @@ -0,0 +1,111 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Matchers.anyString; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestMajorCompactionKeepDeletedCells { + private static HBaseTestingUtility testUtil; + + @BeforeClass + public static void setUpClass() { + testUtil = new HBaseTestingUtility(); + } + + @Test + public void testMajorCompactionKeepDeletedCells() throws IOException, InterruptedException { + Configuration conf = testUtil.getConfiguration(); + conf.setBoolean(HColumnDescriptor.KEEP_DELETED_CELLS, true); + conf.setLong(HStore.TIME_TO_PURGE_DELETES_KEY, 1000); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000); + + HStore configInfo = mock(HStore.class); + when(configInfo.getBlockingFileCount()).thenReturn((long) 10); + when(configInfo.getStoreFileTtl()).thenReturn((long) HConstants.FOREVER); + + StoreFile mockSf = mock(StoreFile.class); + + RatioBasedCompactionPolicy policy = new RatioBasedCompactionPolicy(conf, configInfo); + + when(mockSf.getPath()).thenReturn(new Path("hdfs://localhost/path")); + when(mockSf.getModificationTimeStamp()).thenReturn(System.currentTimeMillis() - 100); + when(mockSf.isMajorCompaction()).thenReturn(true); + when(mockSf.getConfiguration()).thenReturn(conf); + HDFSBlocksDistribution blockDist = mock(HDFSBlocksDistribution.class); + when(blockDist.getBlockLocalityIndex(anyString())).thenReturn(1.0f); + when(mockSf.getHDFSBlockDistribution()).thenReturn(blockDist); + + List files = new ArrayList(); + files.add(mockSf); + Thread.sleep(2000); + assertTrue(policy.isMajorCompaction(files)); + + } + + @Test + public void testMajorCompactionNoKeepDeletedCells() throws IOException, InterruptedException { + Configuration conf = testUtil.getConfiguration(); + conf.setBoolean(HColumnDescriptor.KEEP_DELETED_CELLS, false); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000); + + HStore configInfo = mock(HStore.class); + when(configInfo.getBlockingFileCount()).thenReturn((long) 10); + when(configInfo.getStoreFileTtl()).thenReturn((long) HConstants.FOREVER); + + StoreFile mockSf = mock(StoreFile.class); + + RatioBasedCompactionPolicy policy = new RatioBasedCompactionPolicy(conf, configInfo); + + when(mockSf.getPath()).thenReturn(new Path("hdfs://localhost/path")); + when(mockSf.getModificationTimeStamp()).thenReturn(System.currentTimeMillis() - 100); + when(mockSf.isMajorCompaction()).thenReturn(true); + when(mockSf.getConfiguration()).thenReturn(conf); + HDFSBlocksDistribution blockDist = mock(HDFSBlocksDistribution.class); + when(blockDist.getBlockLocalityIndex(anyString())).thenReturn(1.0f); + when(mockSf.getHDFSBlockDistribution()).thenReturn(blockDist); + + List files = new ArrayList(); + files.add(mockSf); + Thread.sleep(2000); + assertTrue(policy.isMajorCompaction(files) == false); + + } +}