diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 630ca7d..a2118b0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -61,6 +61,8 @@ public class CompactionConfiguration { public static final String HBASE_HSTORE_OFFPEAK_START_HOUR = "hbase.offpeak.start.hour"; public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT = "hbase.hstore.min.locality.to.skip.major.compact"; + public static final String HBASE_HSTORE_COMPACTION_DELAY = "hbase.hstore.compaction.delay"; + Configuration conf; StoreConfigInformation storeConfigInfo; @@ -76,6 +78,7 @@ public class CompactionConfiguration { private final long majorCompactionPeriod; private final float majorCompactionJitter; private final float minLocalityToForceCompact; + private final long compactionDelay; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -96,6 +99,8 @@ public class CompactionConfiguration { // Make it 0.5 so jitter has us fall evenly either side of when the compaction should run majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F); minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f); + //In ms, but hbase.hstore.compaction.delay is in sec + compactionDelay = conf.getLong(HBASE_HSTORE_COMPACTION_DELAY, 0) * 1000; LOG.info(this); } @@ -189,4 +194,8 @@ public class CompactionConfiguration { public float getMinLocalityToForceCompact() { return minLocalityToForceCompact; } + + public long getCompactionDelay() { + return compactionDelay; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index 5aeff5c..95cc5e7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -110,6 +110,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { if (!isTryingMajor && !isAfterSplit) { // We're are not compacting all files, let's see what files are applicable candidateSelection = filterBulk(candidateSelection); + candidateSelection = filterDelayed(candidateSelection); candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); candidateSelection = checkMinFilesCriteria(candidateSelection); } @@ -122,6 +123,29 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { return result; } + private ArrayList filterDelayed(ArrayList candidates) { + final long delay = comConf.getCompactionDelay(); + final long currentTime= System.currentTimeMillis(); + if(delay == 0){ + return candidates; + } + int pos = -1; + for(int i = 0; i < candidates.size(); i++){ + StoreFile sf = candidates.get(i); + // TODO: min, max + Long ts = sf.getReader().getMaxTimestamp(); + if(ts == null || currentTime - delay > ts){ + continue; + } else{ + pos = i; break; + } + } + if(pos >=0){ + candidates.subList(pos, candidates.size()-1).clear(); + } + return candidates; + } + /** * @param candidates pre-filtrate * @return filtered subset diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDelayedCompaction.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDelayedCompaction.java new file mode 100644 index 0000000..aa186eb --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDelayedCompaction.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestDelayedCompaction { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store prepareData(boolean delay) throws IOException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + // 10 sec delay + if(delay){ + colDesc.setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_DELAY, "10"); + } + desc.addFamily(colDesc); + + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(tableName); + Random rand = new Random(); + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + } + return getStoreWithName(tableName); + } + + @Test + public void testDelayedCompaction() throws Exception { + + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(true); + assertEquals(3, store.getStorefilesCount()); + TEST_UTIL.getHBaseAdmin().compact(tableName); + Thread.sleep(2000); + assertEquals(3, store.getStorefilesCount()); + Thread.sleep(8100); + TEST_UTIL.getHBaseAdmin().compact(tableName); + while (store.getStorefilesCount() > 1) { + Thread.sleep(1000); + } + assertTrue(true); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testCompactionNoDelay() throws Exception { + + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(false); + assertEquals(3, store.getStorefilesCount()); + TEST_UTIL.getHBaseAdmin().compact(tableName); + Thread.sleep(2000); + while (store.getStorefilesCount() > 1) { + Thread.sleep(1000); + } + assertTrue(true); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + +}