diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java new file mode 100644 index 0000000..c612ebc --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java @@ -0,0 +1,155 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * + * FIFO compaction policy selects only files which have all cells expired. + * The column family MUST have non-default TTL. One of the use cases for this + * policy is when we need to store raw data which will be post-processed later + * and discarded completely after quite short period of time. Raw time-series vs. + * time-based roll up aggregates and compacted time-series. We collect raw time-series + * and store them into CF with FIFO compaction policy, periodically we run task + * which creates roll up aggregates and compacts time-series, the original raw data + * can be discarded after that. + * + */ +@InterfaceAudience.Private +public class FIFOCompactionPolicy extends RatioBasedCompactionPolicy { + + private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class); + + + public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { + super(conf, storeConfigInfo); + verifyConfig(conf); + } + + private void verifyConfig(Configuration conf) { + // Major compaction disabled (recommended) + StringBuffer warn = new StringBuffer(); + if(conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, Long.MAX_VALUE) > 0){ + warn.append(":periodic major compactions must be disabled:"); + } + // Splits must be disabled (required) - throw exception if default + String splitPolicyClassName = conf.get(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + IncreasingToUpperBoundRegionSplitPolicy.class.getName()); + if(splitPolicyClassName.equals(IncreasingToUpperBoundRegionSplitPolicy.class.getName())){ + throw new RuntimeException("Default split policy for FIFO compaction"+ + " is not supported, aborting."); + } else if( !splitPolicyClassName.equals(DisabledRegionSplitPolicy.class.getName())){ + warn.append(":region splits must be disabled:"); + } + // Maximum blocking # of files is high enough (1000+?) + if(storeConfigInfo.getBlockingFileCount() < 1000){ + warn.append(":blocking # of store files is too low-"+ + storeConfigInfo.getBlockingFileCount()+":"); + } + // MIN_VERSION = 0 (required) - throw exception if not 0 + if(conf.getInt(HColumnDescriptor.MIN_VERSIONS, 0) != 0){ + throw new RuntimeException("MIN_VERSIONS > 0 for FIFO compaction is not supported,"+ + " aborting."); + } + // TTL is not default - throw exception if default + if(storeConfigInfo.getStoreFileTtl() == Long.MAX_VALUE) + { + throw new RuntimeException("Default TTL for FIFO compaction is not supported, aborting."); + } + } + + + @Override + public CompactionRequest selectCompaction(Collection candidateFiles, + List filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, + boolean forceMajor) throws IOException { + + if(forceMajor){ + LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag."); + } + // Nothing to compact + Collection toCompact = getExpiredStores(candidateFiles, filesCompacting); + CompactionRequest result = new CompactionRequest(toCompact); + return result; + } + + @Override + public boolean isMajorCompaction(Collection filesToCompact) throws IOException { + // No major compaction support + return false; + } + + @Override + public boolean needsCompaction(Collection storeFiles, + List filesCompacting) { + return hasExpiredStores(storeFiles); + } + + private boolean hasExpiredStores(Collection files) { + long currentTime = EnvironmentEdgeManager.currentTime(); + for(StoreFile sf: files){ + // Check MIN_VERSIONS is in HStore removeUnneededFiles + Long maxTs = sf.getReader().getMaxTimestamp(); + long maxTtl = storeConfigInfo.getStoreFileTtl(); + if(maxTs == null + || maxTtl == Long.MAX_VALUE + || (currentTime - maxTtl < maxTs)){ + continue; + } else{ + return true; + } + } + return false; + } + + private Collection getExpiredStores(Collection files, + Collection filesCompacting) { + long currentTime = EnvironmentEdgeManager.currentTime(); + Collection expiredStores = new ArrayList(); + for(StoreFile sf: files){ + // Check MIN_VERSIONS is in HStore removeUnneededFiles + Long maxTs = sf.getReader().getMaxTimestamp(); + long maxTtl = storeConfigInfo.getStoreFileTtl(); + if(maxTs == null + || maxTtl == Long.MAX_VALUE + || (currentTime - maxTtl < maxTs)){ + continue; + } else if(filesCompacting == null || filesCompacting.contains(sf) == false){ + expiredStores.add(sf); + } + } + return expiredStores; + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java new file mode 100644 index 0000000..8ed8823 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestFIFOCompactionPolicy { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store prepareData() throws IOException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + FIFOCompactionPolicy.class.getName()); + desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + DisabledRegionSplitPolicy.class.getName()); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + colDesc.setTimeToLive(1); // 1 sec + desc.addFamily(colDesc); + + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(tableName); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + if(i < 9){ + try { + Thread.sleep(1001); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + return getStoreWithName(tableName); + } + + @Test + public void testPurgeExpiredFiles() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName); + while (store.getStorefilesCount() > 1) { + Thread.sleep(1000); + System.out.println(store.getStorefilesCount()); + Collection files = store.getStorefiles(); + for(StoreFile sf: files){ + System.out.println(sf.getReader().length()); + } + } + assertTrue(true); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + +}