diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 11d71cf..270b708 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -37,11 +37,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; @@ -114,13 +117,17 @@ public class StoreFile { /** Key for the number of mob cells in metadata*/ public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT"); + + /** Maximum Cell's TTL. -1, if not defined */ + public static final byte[] MAX_CELL_TTL = Bytes.toBytes("MAX_CELL_TTL"); private final StoreFileInfo fileInfo; private final FileSystem fs; // Block cache configuration and reference. private final CacheConfig cacheConf; - + // HStore config + private final Configuration conf; // Keys for metadata stored in backing HFile. // Set when we obtain a Reader. private long sequenceid = -1; @@ -213,7 +220,7 @@ public class StoreFile { this.fs = fs; this.fileInfo = fileInfo; this.cacheConf = cacheConf; - + this.conf = conf; if (BloomFilterFactory.isGeneralBloomEnabled(conf)) { this.cfBloomType = cfBloomType; } else { @@ -232,9 +239,18 @@ public class StoreFile { this.fileInfo = other.fileInfo; this.cacheConf = other.cacheConf; this.cfBloomType = other.cfBloomType; + this.conf = other.conf; } /** + * Returns configuration + * @return configuration + */ + public Configuration getConf() + { + return conf; + } + /** * @return the StoreFile object associated to this StoreFile. * null if the StoreFile is not a reference. */ @@ -252,6 +268,7 @@ public class StoreFile { /** * @return Returns the qualified path of this StoreFile */ + @SuppressWarnings("deprecation") public Path getQualifiedPath() { return this.fileInfo.getPath().makeQualified(fs); } @@ -578,8 +595,7 @@ public class StoreFile { private Path filePath; private InetSocketAddress[] favoredNodes; private HFileContext fileContext; - private boolean shouldDropCacheBehind = false; - + public WriterBuilder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { this.conf = conf; @@ -647,7 +663,6 @@ public class StoreFile { } public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { - this.shouldDropCacheBehind = shouldDropCacheBehind; return this; } /** @@ -703,13 +718,28 @@ public class StoreFile { null : getReader().timeRangeTracker.getMinimumTimestamp(); } + + public Long getMaximumTimestamp() { + return (getReader().timeRangeTracker == null) ? + null : + getReader().timeRangeTracker.getMaximumTimestamp(); + } /** + * Returns maximum cells TTL or -1 if not defined + * @return maximum cells TTL or -1 if not defined + */ + public long getMaxCellTimeToLive() + { + byte[] val = getMetadataValue(MAX_CELL_TTL); + return val != null? Bytes.toLong(val) : -1; + } + + /** * Gets the approximate mid-point of this file that is optimal for use in splitting it. * @param comparator Comparator used to compare KVs. * @return The split point row, or null if splitting is not possible, or reader is null. */ - @SuppressWarnings("deprecation") byte[] getFileSplitPoint(CellComparator comparator) throws IOException { if (this.reader == null) { LOG.warn("Storefile " + this + " Reader is null; cannot get split point"); @@ -747,6 +777,7 @@ public class StoreFile { private int lastBloomKeyOffset, lastBloomKeyLen; private Cell lastCell = null; private long earliestPutTs = HConstants.LATEST_TIMESTAMP; + private long maxCellTtl = -1; private Cell lastDeleteFamilyCell = null; private long deleteFamilyCnt = 0; @@ -820,6 +851,8 @@ public class StoreFile { if (LOG.isTraceEnabled()) LOG.trace("Delete Family Bloom filter type for " + path + ": " + deleteFamilyBloomFilterWriter.getClass().getSimpleName()); } + // Read CF TTL we use it in tracking max cell TTL + maxCellTtl = conf.getInt(HColumnDescriptor.TTL, HColumnDescriptor.DEFAULT_TTL); } /** @@ -835,6 +868,7 @@ public class StoreFile { writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); appendTrackedTimestampsToMetadata(); + appendMaxCellTTL(); } /** @@ -851,8 +885,38 @@ public class StoreFile { writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); appendTrackedTimestampsToMetadata(); + appendMaxCellTTL(); + } + + /** + * Add maximum cell TTL detected during writing this file + * @throws IOException + */ + private void appendMaxCellTTL() throws IOException { + // We append only if it is not default -1 + if(maxCellTtl > -1){ + appendFileInfo(MAX_CELL_TTL, Bytes.toBytes(maxCellTtl)); + } } + private void trackMaxCellTTL(final Cell cell){ + int len = cell.getTagsLength(); + if(len <= 0) { + // No tags + return; + } + byte[] arr = cell.getTagsArray(); + int off = cell.getTagsOffset(); + // TODO: this could be optimized + // TTL is last in the list of tags + Tag tag = Tag.getTag(arr, off, len, TagType.TTL_TAG_TYPE); + if(tag != null){ + long ttl = Bytes.toLong(tag.getValue()); + if(ttl > maxCellTtl) { + maxCellTtl = ttl; + } + } + } /** * Add TimestampRange and earliest put timestamp to Metadata */ @@ -1001,6 +1065,7 @@ public class StoreFile { appendDeleteFamilyBloomFilter(cell); writer.append(cell); trackTimestamps(cell); + trackMaxCellTTL(cell); } public Path getPath() { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index f26f4fe..2dd613c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -59,24 +60,31 @@ public class DefaultCompactor extends Compactor { long smallestReadPoint = getSmallestReadPoint(); List scanners; + List newFiles = new ArrayList(); Collection readersToClose; + Collection toCompact = filterExpiredStores(request.getFiles()); + + if(toCompact.size() == 0) { + // Nothing to compact - no new files, old files have expired + return newFiles; + } + if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, // HFileFiles, and their readers - readersToClose = new ArrayList(request.getFiles().size()); - for (StoreFile f : request.getFiles()) { + readersToClose = new ArrayList(toCompact.size()); + for (StoreFile f : toCompact) { readersToClose.add(new StoreFile(f)); } scanners = createFileScanners(readersToClose, smallestReadPoint, - store.throttleCompaction(request.getSize())); + store.throttleCompaction(getSize(toCompact))); } else { readersToClose = Collections.emptyList(); - scanners = createFileScanners(request.getFiles(), smallestReadPoint, - store.throttleCompaction(request.getSize())); + scanners = createFileScanners(toCompact, smallestReadPoint, + store.throttleCompaction(getSize(toCompact))); } StoreFile.Writer writer = null; - List newFiles = new ArrayList(); boolean cleanSeqId = false; IOException e = null; try { @@ -150,6 +158,42 @@ public class DefaultCompactor extends Compactor { return newFiles; } + private long getSize(Collection toCompact) { + long size = 0; + for(StoreFile sf: toCompact){ + size += sf.getReader().length(); + } + return size; + } + + private Collection filterExpiredStores(Collection files) { + ArrayList filtered = new ArrayList(); + long currentTime = System.currentTimeMillis(); + for(StoreFile sf: files){ + // Check MIN_VERSIONS + int minVersions = getMinVersions(sf); + Long maxTs = sf.getMaximumTimestamp(); + long maxCellTtl = sf.getMaxCellTimeToLive(); + + if(minVersions > 0 || maxTs == null + || maxCellTtl == HColumnDescriptor.DEFAULT_TTL + || (currentTime - maxCellTtl < maxTs)){ + filtered.add(sf); + continue; + } else{ + LOG.info("Purge expired store: maxTs="+maxTs + + " maxCellTtl="+ maxCellTtl+" current="+ currentTime+ + " diff="+(currentTime - maxCellTtl - maxTs)); + } + } + return filtered; + } + + private int getMinVersions(StoreFile sf) { + Configuration conf = sf.getConf(); + return conf.getInt(HColumnDescriptor.MIN_VERSIONS, HColumnDescriptor.DEFAULT_MIN_VERSIONS); + } + /** * Creates a writer for a new file in a temporary directory. * @param fd The file details. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java new file mode 100644 index 0000000..b65e04b --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java @@ -0,0 +1,112 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreFile; + +/** + * + * FIFO compaction policy selects only files which have all cells expired. + * The column family MUST have non-default TTL. One of the use cases for this + * policy is when we need to store raw data which will be post-processed later + * and discarded completely after quite short period of time. Raw time-series vs. + * time-based roll up aggregates and compacted time-series. We collect raw time-series + * and store them into CF with FIFO compaction policy, periodically we run task + * which creates roll up aggregates and compacts time-series, the original raw data + * can be discarded after that. + * + */ +public class FIFOCompactionPolicy extends RatioBasedCompactionPolicy { + + private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class); + + + public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { + super(conf, storeConfigInfo); + // TODO Auto-generated constructor stub + } + + + @Override + public CompactionRequest selectCompaction(Collection candidateFiles, + List filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, + boolean forceMajor) throws IOException { + + if(forceMajor){ + LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag."); + } + ArrayList candidateSelection = new ArrayList(candidateFiles); + Collection eligible = getCurrentEligibleFiles(candidateSelection, filesCompacting); + Collection toCompact = filterNonExpiredStores(eligible); + LOG.info("[FIFOCompactionPolicy] Selected: "+ toCompact.size()+" asked: " + candidateFiles.size()); + + CompactionRequest result = new CompactionRequest(toCompact); + return result; + } + + @Override + public boolean isMajorCompaction(Collection filesToCompact) throws IOException { + // No major compaction support + return false; + } + + @Override + public boolean needsCompaction(Collection storeFiles, List filesCompacting) { + ArrayList candidateSelection = new ArrayList(storeFiles); + Collection eligible = getCurrentEligibleFiles(candidateSelection, filesCompacting); + Collection toCompact = filterNonExpiredStores(eligible); + LOG.info("need compaction: "+ toCompact.size()); + return toCompact.size() > 0; + } + + private Collection filterNonExpiredStores(Collection files) { + ArrayList filtered = new ArrayList(); + long currentTime = System.currentTimeMillis(); + for(StoreFile sf: files){ + // Check MIN_VERSIONS + int minVersions = getMinVersions(sf); + Long maxTs = sf.getMaximumTimestamp(); + long maxCellTtl = sf.getMaxCellTimeToLive(); + + if(minVersions > 0 || maxTs == null + || maxCellTtl == HColumnDescriptor.DEFAULT_TTL + || (currentTime - maxCellTtl < maxTs)){ + continue; + } else{ + filtered.add(sf); + } + } + return filtered; + } + + private int getMinVersions(StoreFile sf) { + Configuration conf = sf.getConf(); + return conf.getInt(HColumnDescriptor.MIN_VERSIONS, HColumnDescriptor.DEFAULT_MIN_VERSIONS); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index 5aeff5c..2709f9d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -53,7 +53,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { super(conf, storeConfigInfo); } - private ArrayList getCurrentEligibleFiles( + protected ArrayList getCurrentEligibleFiles( ArrayList candidateFiles, final List filesCompacting) { // candidates = all storefiles not already in compaction queue if (!filesCompacting.isEmpty()) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileMaxCellTTL.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileMaxCellTTL.java new file mode 100644 index 0000000..6c0f567 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileMaxCellTTL.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertTrue; + +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Class that test tags + */ +@Category({RegionServerTests.class, SmallTests.class}) +public class TestStoreFileMaxCellTTL { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster; + @Rule + public final TestName TEST_NAME = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt("hfile.format.version", 3); + + cluster = TEST_UTIL.startMiniCluster(1, 1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + + @Test + public void testLoadFlushWithMaxTTL() throws Exception { + Table table = null; + final int cfTtl = 5000; + final long ttl1 = 10000; + final long ttl2 = 20000; + + try { + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + byte[] fam = Bytes.toBytes("info"); + // column names + byte[] qual = Bytes.toBytes("qual"); + + byte[] row1 = Bytes.toBytes("rowb"); + + byte[] row2 = Bytes.toBytes("rowc"); + + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(fam); + colDesc.setTimeToLive(cfTtl); + desc.addFamily(colDesc); + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + byte[] value = Bytes.toBytes("value"); + table = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(row1); + put.addColumn(fam, qual, value); + put.setTTL(ttl1); + table.put(put); + + put = new Put(row2); + put.addColumn(fam, qual, value); + put.setTTL(ttl2); + table.put(put); + + admin.flush(tableName); + + List regions = cluster.getRegions(tableName); + assertTrue(regions.size() == 1); + HRegion region = regions.get(0); + List stores = region.getStores(); + assertTrue(stores.size() == 1); + Store store = stores.get(0); + Collection files = store.getStorefiles(); + assertTrue(files.size() == 1); + StoreFile file = files.iterator().next(); + long time = file.getMaxCellTimeToLive(); + assertTrue(time == Math.max(ttl1, ttl2)); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testLoadFlushNoMaxTTLWithColTTL() throws Exception { + Table table = null; + final int cfTtl = 5000; + + + try { + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + byte[] fam = Bytes.toBytes("info"); + // column names + byte[] qual = Bytes.toBytes("qual"); + + byte[] row1 = Bytes.toBytes("rowb"); + + byte[] row2 = Bytes.toBytes("rowc"); + + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(fam); + colDesc.setTimeToLive(cfTtl); + desc.addFamily(colDesc); + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + byte[] value = Bytes.toBytes("value"); + table = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(row1); + put.addColumn(fam, qual, value); + table.put(put); + + put = new Put(row2); + put.addColumn(fam, qual, value); + table.put(put); + + admin.flush(tableName); + + List regions = cluster.getRegions(tableName); + assertTrue(regions.size() == 1); + HRegion region = regions.get(0); + List stores = region.getStores(); + assertTrue(stores.size() == 1); + Store store = stores.get(0); + Collection files = store.getStorefiles(); + assertTrue(files.size() == 1); + StoreFile file = files.iterator().next(); + long time = file.getMaxCellTimeToLive(); + assertTrue(time == cfTtl); + } finally { + if (table != null) { + table.close(); + } + } + } + @Test + public void testLoadFlushNoMaxTTLNoColTTL() throws Exception { + Table table = null; + + try { + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + byte[] fam = Bytes.toBytes("info"); + // column names + byte[] qual = Bytes.toBytes("qual"); + + byte[] row1 = Bytes.toBytes("rowb"); + + byte[] row2 = Bytes.toBytes("rowc"); + + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(fam); + desc.addFamily(colDesc); + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + byte[] value = Bytes.toBytes("value"); + table = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(row1); + put.addColumn(fam, qual, value); + table.put(put); + + put = new Put(row2); + put.addColumn(fam, qual, value); + table.put(put); + + admin.flush(tableName); + + List regions = cluster.getRegions(tableName); + assertTrue(regions.size() == 1); + HRegion region = regions.get(0); + List stores = region.getStores(); + assertTrue(stores.size() == 1); + Store store = stores.get(0); + Collection files = store.getStorefiles(); + assertTrue(files.size() == 1); + StoreFile file = files.iterator().next(); + long time = file.getMaxCellTimeToLive(); + assertTrue( time == HColumnDescriptor.DEFAULT_TTL); + + } finally { + if (table != null) { + table.close(); + } + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java new file mode 100644 index 0000000..233bfc6 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestFIFOCompactionPolicy { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store prepareData() throws IOException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + FIFOCompactionPolicy.class.getName()); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + colDesc.setTimeToLive(1000); + desc.addFamily(colDesc); + + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(tableName); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + if(i < 9){ + try { + Thread.sleep(1001); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + return getStoreWithName(tableName); + } + + @Test + public void testPurgeExpiredFiles() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName); + while (store.getStorefilesCount() > 1) { + Thread.sleep(20); + } + Thread.sleep(1001); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName); + while (store.getStorefilesCount() > 0) { + Thread.sleep(20); + } + assertTrue(true); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPurgeExpiredStores.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPurgeExpiredStores.java new file mode 100644 index 0000000..2f84c4e --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPurgeExpiredStores.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestPurgeExpiredStores { + + private static final Log LOG = LogFactory.getLog(TestPurgeExpiredStores.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store prepareData() throws IOException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + colDesc.setTimeToLive(3000); + desc.addFamily(colDesc); + + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(tableName); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + } + return getStoreWithName(tableName); + } + + @Test + public void testPurgeExpiredFiles() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + Thread.sleep(3010); + assertEquals(10, store.getStorefilesCount()); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName); + while (store.getStorefilesCount() > 0) { + Thread.sleep(20); + } + assertTrue(true); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + +}