diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 97cc404..38ca97a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -63,6 +63,8 @@ public class CompactionConfiguration { public static final String HBASE_HSTORE_OFFPEAK_START_HOUR = "hbase.offpeak.start.hour"; public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT = "hbase.hstore.min.locality.to.skip.major.compact"; + public static final String HBASE_HSTORE_COMPACTION_DELAY = "hbase.hstore.compaction.delay"; + public static final String HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT = "hbase.hfile.compaction.discharger.thread.count"; @@ -108,6 +110,7 @@ public class CompactionConfiguration { private final int incomingWindowMin; private final String compactionPolicyForTieredWindow; private final boolean singleOutputForMinorCompaction; + private final long compactionDelay; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -139,7 +142,8 @@ public class CompactionConfiguration { DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName()); singleOutputForMinorCompaction = conf.getBoolean(SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, true); - + //In ms, but hbase.hstore.compaction.delay is in sec + compactionDelay = conf.getLong(HBASE_HSTORE_COMPACTION_DELAY, 0) * 1000; LOG.info(this); } @@ -284,4 +288,8 @@ public class CompactionConfiguration { public boolean useSingleOutputForMinorCompaction() { return singleOutputForMinorCompaction; } + + public long getCompactionDelay() { + return compactionDelay; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index 3386bfd..9cba6fb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -106,6 +106,7 @@ public class RatioBasedCompactionPolicy extends SortedCompactionPolicy { throws IOException { if (!tryingMajor) { candidateSelection = filterBulk(candidateSelection); + candidateSelection = filterDelayed(candidateSelection); candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); candidateSelection = checkMinFilesCriteria(candidateSelection, comConf.getMinFilesToCompact()); @@ -113,6 +114,29 @@ public class RatioBasedCompactionPolicy extends SortedCompactionPolicy { return new CompactionRequest(candidateSelection); } + private ArrayList filterDelayed(ArrayList candidates) { + final long delay = comConf.getCompactionDelay(); + final long currentTime= System.currentTimeMillis(); + if(delay == 0){ + return candidates; + } + int pos = -1; + for(int i = 0; i < candidates.size(); i++){ + StoreFile sf = candidates.get(i); + // TODO: min, max + Long ts = sf.getReader().getMaxTimestamp(); + if(ts == null || currentTime - delay > ts){ + continue; + } else{ + pos = i; break; + } + } + if(pos >=0){ + candidates.subList(pos, candidates.size()).clear(); + } + return candidates; + } + /** * -- Default minor compaction selection algorithm: * choose CompactSelection from candidates -- diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDelayedCompaction.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDelayedCompaction.java new file mode 100644 index 0000000..aa186eb --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDelayedCompaction.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestDelayedCompaction { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store prepareData(boolean delay) throws IOException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + // 10 sec delay + if(delay){ + colDesc.setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_DELAY, "10"); + } + desc.addFamily(colDesc); + + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(tableName); + Random rand = new Random(); + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + } + return getStoreWithName(tableName); + } + + @Test + public void testDelayedCompaction() throws Exception { + + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(true); + assertEquals(3, store.getStorefilesCount()); + TEST_UTIL.getHBaseAdmin().compact(tableName); + Thread.sleep(2000); + assertEquals(3, store.getStorefilesCount()); + Thread.sleep(8100); + TEST_UTIL.getHBaseAdmin().compact(tableName); + while (store.getStorefilesCount() > 1) { + Thread.sleep(1000); + } + assertTrue(true); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testCompactionNoDelay() throws Exception { + + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(false); + assertEquals(3, store.getStorefilesCount()); + TEST_UTIL.getHBaseAdmin().compact(tableName); + Thread.sleep(2000); + while (store.getStorefilesCount() > 1) { + Thread.sleep(1000); + } + assertTrue(true); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + +}