diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index f160835..e308b61 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -124,10 +124,16 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.quotas.RegionStateListener; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Addressing; @@ -1562,7 +1568,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } catch (IOException e) { warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e); } - + // Verify compaction policy + try{ + checkCompactionPolicy(conf, htd); + } catch(IOException e){ + warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e); + } // check that we have at least 1 CF if (htd.getColumnFamilies().length == 0) { String message = "Table should have at least one column family."; @@ -1618,6 +1629,87 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } } + private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) + throws IOException { + // FIFO compaction has some requirements + // Actually FCP ignores periodic major compactions + // Get compaction policy + String className = + htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY); + if (className == null) { + className = + conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + ExploringCompactionPolicy.class.getName()); + } + + long majorCompactionPeriod = Long.MAX_VALUE; + String sv = htd.getConfigurationValue(HConstants.MAJOR_COMPACTION_PERIOD); + if (sv != null) { + majorCompactionPeriod = Long.parseLong(sv); + } else { + majorCompactionPeriod = + conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, majorCompactionPeriod); + } + String splitPolicyClassName = htd.getRegionSplitPolicyClassName(); + if (splitPolicyClassName == null) { + splitPolicyClassName = conf.get(HConstants.HBASE_REGION_SPLIT_POLICY_KEY); + } + + int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT; + sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY); + if (sv != null) { + blockingFileCount = Integer.parseInt(sv); + } else { + blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount); + } + + for (HColumnDescriptor hcd : htd.getColumnFamilies()) { + String compactionPolicy = + hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY); + if (compactionPolicy == null) { + compactionPolicy = className; + } + if (className.equals(FIFOCompactionPolicy.class.getName()) == false) { + continue; + } + // FIFOCompaction + String message = null; + // 1. Check split policy + String spn = hcd.getConfigurationValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY); + if (spn != null) { + splitPolicyClassName = spn; + } + if (splitPolicyClassName.equals(IncreasingToUpperBoundRegionSplitPolicy.class.getName())) { + message = "Default split policy for FIFO compaction" + " is not supported"; + throw new IOException(message); + } + + // 2. Check TTL + if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) { + message = "Default TTL is not supported for FIFO compaction"; + throw new IOException(message); + } + + // 3. Check min versions + if (hcd.getMinVersions() > 0) { + message = "MIN_VERSION > 0 is not supported for FIFO compaction"; + throw new IOException(message); + } + + // 4. blocking file count + String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY); + if (sbfc != null) { + blockingFileCount = Integer.parseInt(sbfc); + } + if (blockingFileCount < 1000) { + message = + "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount + + " is below recommended minimum of 1000"; + throw new IOException(message); + } + } + } + // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled. private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey, String message, Exception cause) throws IOException { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java new file mode 100644 index 0000000..d611fa3 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java @@ -0,0 +1,117 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * + * FIFO compaction policy selects only files which have all cells expired. + * The column family MUST have non-default TTL. One of the use cases for this + * policy is when we need to store raw data which will be post-processed later + * and discarded completely after quite short period of time. Raw time-series vs. + * time-based roll up aggregates and compacted time-series. We collect raw time-series + * and store them into CF with FIFO compaction policy, periodically we run task + * which creates roll up aggregates and compacts time-series, the original raw data + * can be discarded after that. + * + */ +@InterfaceAudience.Private +public class FIFOCompactionPolicy extends RatioBasedCompactionPolicy { + + private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class); + + + public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { + super(conf, storeConfigInfo); + } + + @Override + public CompactionRequest selectCompaction(Collection candidateFiles, + List filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, + boolean forceMajor) throws IOException { + + if(forceMajor){ + LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag."); + } + // Nothing to compact + Collection toCompact = getExpiredStores(candidateFiles, filesCompacting); + CompactionRequest result = new CompactionRequest(toCompact); + return result; + } + + @Override + public boolean isMajorCompaction(Collection filesToCompact) throws IOException { + // No major compaction support + return false; + } + + @Override + public boolean needsCompaction(Collection storeFiles, + List filesCompacting) { + return hasExpiredStores(storeFiles); + } + + private boolean hasExpiredStores(Collection files) { + long currentTime = EnvironmentEdgeManager.currentTime(); + for(StoreFile sf: files){ + // Check MIN_VERSIONS is in HStore removeUnneededFiles + Long maxTs = sf.getReader().getMaxTimestamp(); + long maxTtl = storeConfigInfo.getStoreFileTtl(); + if(maxTs == null + || maxTtl == Long.MAX_VALUE + || (currentTime - maxTtl < maxTs)){ + continue; + } else{ + return true; + } + } + return false; + } + + private Collection getExpiredStores(Collection files, + Collection filesCompacting) { + long currentTime = EnvironmentEdgeManager.currentTime(); + Collection expiredStores = new ArrayList(); + for(StoreFile sf: files){ + // Check MIN_VERSIONS is in HStore removeUnneededFiles + Long maxTs = sf.getReader().getMaxTimestamp(); + long maxTtl = storeConfigInfo.getStoreFileTtl(); + if(maxTs == null + || maxTtl == Long.MAX_VALUE + || (currentTime - maxTtl < maxTs)){ + continue; + } else if(filesCompacting == null || filesCompacting.contains(sf) == false){ + expiredStores.add(sf); + } + } + return expiredStores; + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java new file mode 100644 index 0000000..8ed8823 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestFIFOCompactionPolicy { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store prepareData() throws IOException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + FIFOCompactionPolicy.class.getName()); + desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + DisabledRegionSplitPolicy.class.getName()); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + colDesc.setTimeToLive(1); // 1 sec + desc.addFamily(colDesc); + + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(tableName); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + if(i < 9){ + try { + Thread.sleep(1001); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + return getStoreWithName(tableName); + } + + @Test + public void testPurgeExpiredFiles() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName); + while (store.getStorefilesCount() > 1) { + Thread.sleep(1000); + System.out.println(store.getStorefilesCount()); + Collection files = store.getStorefiles(); + for(StoreFile sf: files){ + System.out.println(sf.getReader().length()); + } + } + assertTrue(true); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + +}