diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 11d71cf..6651774 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.SortedSet; import java.util.UUID; @@ -37,11 +38,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; @@ -82,6 +86,7 @@ import com.google.common.collect.Ordering; public class StoreFile { private static final Log LOG = LogFactory.getLog(StoreFile.class.getName()); + static final String HBASE_HFILE_PLUGINS_KEY = "hbase.hfile.plugins"; // Keys for fileinfo values in HFile /** Max Sequence ID in FileInfo */ @@ -114,13 +119,17 @@ public class StoreFile { /** Key for the number of mob cells in metadata*/ public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT"); + + /** Maximum Cell's TTL. -1, if not defined */ + public static final byte[] MAX_CELL_TTL = Bytes.toBytes("MAX_CELL_TTL"); private final StoreFileInfo fileInfo; private final FileSystem fs; // Block cache configuration and reference. private final CacheConfig cacheConf; - + // HStore config + private final Configuration conf; // Keys for metadata stored in backing HFile. // Set when we obtain a Reader. private long sequenceid = -1; @@ -171,7 +180,7 @@ public class StoreFile { * if this metadata is set as true, the reset is skipped. */ public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); - + /** * Constructor, loads a reader and it's indices, etc. May allocate a * substantial amount of ram depending on the underlying files (10-20MB?). @@ -213,6 +222,7 @@ public class StoreFile { this.fs = fs; this.fileInfo = fileInfo; this.cacheConf = cacheConf; + this.conf = conf; if (BloomFilterFactory.isGeneralBloomEnabled(conf)) { this.cfBloomType = cfBloomType; @@ -232,8 +242,18 @@ public class StoreFile { this.fileInfo = other.fileInfo; this.cacheConf = other.cacheConf; this.cfBloomType = other.cfBloomType; - } + this.conf = other.conf; + } + + /** + * Gets config + * @return config + */ + public Configuration getConf() { + return conf; + } + /** * @return the StoreFile object associated to this StoreFile. * null if the StoreFile is not a reference. @@ -252,6 +272,7 @@ public class StoreFile { /** * @return Returns the qualified path of this StoreFile */ + @SuppressWarnings("deprecation") public Path getQualifiedPath() { return this.fileInfo.getPath().makeQualified(fs); } @@ -578,8 +599,8 @@ public class StoreFile { private Path filePath; private InetSocketAddress[] favoredNodes; private HFileContext fileContext; - private boolean shouldDropCacheBehind = false; - + private boolean shouldDropCacheBehind; + public WriterBuilder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { this.conf = conf; @@ -697,19 +718,31 @@ public class StoreFile { } return new Path(dir, UUID.randomUUID().toString().replaceAll("-", "")); } + + public Long getMaximumTimestamp() { + return (getReader().timeRangeTracker == null) ? null : getReader().timeRangeTracker + .getMaximumTimestamp(); + } public Long getMinimumTimestamp() { - return (getReader().timeRangeTracker == null) ? - null : - getReader().timeRangeTracker.getMinimumTimestamp(); + return (getReader().timeRangeTracker == null) ? null : getReader().timeRangeTracker + .getMinimumTimestamp(); } - + /** + * Returns maximum cells TTL or -1 if not defined + * @return maximum cells TTL or -1 if not defined + */ + public long getMaxCellTimeToLive() + { + byte[] val = getMetadataValue(MAX_CELL_TTL); + return val != null? Bytes.toLong(val) : HColumnDescriptor.DEFAULT_TTL; + } + /** * Gets the approximate mid-point of this file that is optimal for use in splitting it. * @param comparator Comparator used to compare KVs. * @return The split point row, or null if splitting is not possible, or reader is null. */ - @SuppressWarnings("deprecation") byte[] getFileSplitPoint(CellComparator comparator) throws IOException { if (this.reader == null) { LOG.warn("Storefile " + this + " Reader is null; cannot get split point"); @@ -764,6 +797,8 @@ public class StoreFile { protected HFile.Writer writer; private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null; + + private List plugins; /** * Creates an HFile.Writer that also write helpful meta data. @@ -820,8 +855,22 @@ public class StoreFile { if (LOG.isTraceEnabled()) LOG.trace("Delete Family Bloom filter type for " + path + ": " + deleteFamilyBloomFilterWriter.getClass().getSimpleName()); } + this.plugins = StoreFilePluginFactory.getPlugins(conf); + verifyMaxCellTTLPlugin(conf); } + private void verifyMaxCellTTLPlugin(Configuration conf) { + for(Plugin p: plugins){ + if(p instanceof StoreFile.MaxCellTTLPlugin){ + return; + } + } + StoreFile.MaxCellTTLPlugin p = new StoreFile.MaxCellTTLPlugin(); + p.config(conf); + plugins.add(p); + } + + /** * Writes meta data. * Call before {@link #close()} since its written as meta data to this file. @@ -835,6 +884,21 @@ public class StoreFile { writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); appendTrackedTimestampsToMetadata(); + notifyPluginsOnAppendMetadata(writer); + } + + private void notifyPluginsOnAppendMetadata( + org.apache.hadoop.hbase.io.hfile.HFile.Writer w) + throws IOException + { + if(plugins.size() > 0){ + int size = plugins.size(); + for(int i=0; i < size; i++){ + Plugin plugin = plugins.get(i); + StoreFile.MetaWriter metaWriter = plugin.getMetaWriter(); + metaWriter.appendMetadata(w); + } + } } /** @@ -851,6 +915,7 @@ public class StoreFile { writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); appendTrackedTimestampsToMetadata(); + //TODO append plug in meta - it does not work for MOBs for some reason } /** @@ -1001,6 +1066,19 @@ public class StoreFile { appendDeleteFamilyBloomFilter(cell); writer.append(cell); trackTimestamps(cell); + notifyPluginsOnAppend(cell); + } + + private void notifyPluginsOnAppend(Cell cell) + { + if(plugins.size() > 0){ + int size = plugins.size(); + for(int i=0; i < size; i++){ + Plugin plugin = plugins.get(i); + StoreFile.MetaWriter metaWriter = plugin.getMetaWriter(); + metaWriter.add(cell); + } + } } public Path getPath() { @@ -1697,4 +1775,103 @@ public class StoreFile { } } } + + /** + * StoreFile plug-in supports custom code in Writer/Reader(in a future) + * path. The main goal is to provide HBase applications with a hook allowing + * to store/retrieve custom meta info associated with a store file. + * + */ + public static abstract class Plugin { + protected Configuration conf; + public Plugin(){ + + } + public void config(Configuration conf){ + this.conf = conf; + } + + public abstract StoreFile.MetaWriter getMetaWriter(); + } + /** + * Meta writer plug-in code here. + */ + public static abstract class MetaWriter { + + protected Configuration conf; + + public MetaWriter(){ + } + /** + * Configure meta writer + * @param conf + */ + public void config(Configuration conf){ + this.conf = conf; + } + /** + * Add cell is called by StoreFile.Writer on add(Cell cell) + * @param cell + */ + public abstract void add(Cell cell); + + /** + * Append meta-data + * @param writer + */ + public abstract void appendMetadata(HFile.Writer writer) + throws IOException; + + } + /** + * Maximum Cell TTL tracker plugcin + */ + public static class MaxCellTTLPlugin extends StoreFile.Plugin{ + private MetaWriter writer; + @Override + public MetaWriter getMetaWriter() { + if(writer == null){ + writer = new MaxCellTTLMetaWriter(conf); + } + return writer; + } + } + + public static class MaxCellTTLMetaWriter extends StoreFile.MetaWriter + { + + private long maxCellTtl; + public MaxCellTTLMetaWriter(Configuration conf) { + maxCellTtl = conf.getInt(HColumnDescriptor.TTL, HColumnDescriptor.DEFAULT_TTL); + } + + @Override + public void add(Cell cell) { + int len = cell.getTagsLength(); + if (len <= 0) { + // No tags + return; + } + byte[] arr = cell.getTagsArray(); + int off = cell.getTagsOffset(); + // TODO: this could be optimized + // TTL is last in the list of tags + Tag tag = Tag.getTag(arr, off, len, TagType.TTL_TAG_TYPE); + if (tag != null) { + long ttl = Bytes.toLong(tag.getValue()); + if (ttl > maxCellTtl) { + maxCellTtl = ttl; + } + } + } + + @Override + public void appendMetadata(HFile.Writer writer) throws IOException { + // We append only if it is not default -1 + if (maxCellTtl > -1) { + writer.appendFileInfo(StoreFile.MAX_CELL_TTL, Bytes.toBytes(maxCellTtl)); + } + } + + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePluginFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePluginFactory.java new file mode 100644 index 0000000..665c985 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePluginFactory.java @@ -0,0 +1,54 @@ +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.regionserver; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +public final class StoreFilePluginFactory { + private static final Log LOG = LogFactory.getLog(StoreFilePluginFactory.class); + + private StoreFilePluginFactory(){ + //Utility classes should not have a public or default constructor + } + + static List getPlugins(Configuration conf){ + ArrayList plugins = new ArrayList(); + String classesStr = conf.get(StoreFile.HBASE_HFILE_PLUGINS_KEY); + if(classesStr != null){ + String[] classNameList = classesStr.split(","); + for(String className : classNameList){ + className = className.trim(); + try { + Class cls = Class.forName(className); + StoreFile.Plugin plugin = (StoreFile.Plugin) cls.newInstance(); + plugin.config(conf); + plugins.add(plugin); + } catch (Exception e) { + LOG.error("Could not instantiate plugin: "+className, e); + } + } + } + return plugins; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index f26f4fe..6228a1d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.util.Bytes; /** * Compact passed set of files. Create an instance and then call @@ -59,24 +61,31 @@ public class DefaultCompactor extends Compactor { long smallestReadPoint = getSmallestReadPoint(); List scanners; + List newFiles = new ArrayList(); Collection readersToClose; + Collection toCompact = filterExpiredStores(request.getFiles()); + + if(toCompact.size() == 0) { + // Nothing to compact - no new files, old files have expired + return newFiles; + } + if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, // HFileFiles, and their readers - readersToClose = new ArrayList(request.getFiles().size()); - for (StoreFile f : request.getFiles()) { + readersToClose = new ArrayList(toCompact.size()); + for (StoreFile f : toCompact) { readersToClose.add(new StoreFile(f)); } scanners = createFileScanners(readersToClose, smallestReadPoint, - store.throttleCompaction(request.getSize())); + store.throttleCompaction(getSize(toCompact))); } else { readersToClose = Collections.emptyList(); - scanners = createFileScanners(request.getFiles(), smallestReadPoint, - store.throttleCompaction(request.getSize())); + scanners = createFileScanners(toCompact, smallestReadPoint, + store.throttleCompaction(getSize(toCompact))); } StoreFile.Writer writer = null; - List newFiles = new ArrayList(); boolean cleanSeqId = false; IOException e = null; try { @@ -150,6 +159,42 @@ public class DefaultCompactor extends Compactor { return newFiles; } + private long getSize(Collection toCompact) { + long size = 0; + for(StoreFile sf: toCompact){ + size += sf.getReader().length(); + } + return size; + } + + private Collection filterExpiredStores(Collection files) { + ArrayList filtered = new ArrayList(); + long currentTime = System.currentTimeMillis(); + for(StoreFile sf: files){ + // Check MIN_VERSIONS + int minVersions = getMinVersions(sf); + Long maxTs = sf.getMaximumTimestamp(); + long maxCellTtl = sf.getMaxCellTimeToLive(); + + if(minVersions > 0 || maxTs == null + || maxCellTtl == HColumnDescriptor.DEFAULT_TTL + || (currentTime - maxCellTtl < maxTs)){ + filtered.add(sf); + continue; + } else{ + LOG.info("Purge expired store: maxTs="+maxTs + + " maxCellTtl="+ maxCellTtl+" current="+ currentTime+ + " diff="+(currentTime - maxCellTtl - maxTs)); + } + } + return filtered; + } + + private int getMinVersions(StoreFile sf) { + Configuration conf = sf.getConf(); + return conf.getInt(HColumnDescriptor.MIN_VERSIONS, HColumnDescriptor.DEFAULT_MIN_VERSIONS); + } + /** * Creates a writer for a new file in a temporary directory. * @param fd The file details. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java new file mode 100644 index 0000000..adf426b --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java @@ -0,0 +1,115 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.io.hfile.HFile.Writer; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * + * FIFO compaction policy selects only files which have all cells expired. + * The column family MUST have non-default TTL. One of the use cases for this + * policy is when we need to store raw data which will be post-processed later + * and discarded completely after quite short period of time. Raw time-series vs. + * time-based roll up aggregates and compacted time-series. We collect raw time-series + * and store them into CF with FIFO compaction policy, periodically we run task + * which creates roll up aggregates and compacts time-series, the original raw data + * can be discarded after that. + * + */ +public class FIFOCompactionPolicy extends RatioBasedCompactionPolicy { + + private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class); + + + public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { + super(conf, storeConfigInfo); + } + + + @Override + public CompactionRequest selectCompaction(Collection candidateFiles, + List filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, + boolean forceMajor) throws IOException { + + if(forceMajor){ + LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag."); + } + ArrayList candidateSelection = new ArrayList(candidateFiles); + Collection eligible = getCurrentEligibleFiles(candidateSelection, filesCompacting); + Collection toCompact = filterNonExpiredStores(eligible); + + CompactionRequest result = new CompactionRequest(toCompact); + return result; + } + + @Override + public boolean isMajorCompaction(Collection filesToCompact) throws IOException { + // No major compaction support + return false; + } + + @Override + public boolean needsCompaction(Collection storeFiles, + List filesCompacting) { + ArrayList candidateSelection = new ArrayList(storeFiles); + Collection eligible = getCurrentEligibleFiles(candidateSelection, filesCompacting); + Collection toCompact = filterNonExpiredStores(eligible); + return toCompact.size() > 0; + } + + private Collection filterNonExpiredStores(Collection files) { + ArrayList filtered = new ArrayList(); + long currentTime = System.currentTimeMillis(); + for(StoreFile sf: files){ + // Check MIN_VERSIONS + int minVersions = getMinVersions(sf); + Long maxTs = sf.getMaximumTimestamp(); + long maxCellTtl = sf.getMaxCellTimeToLive(); + + if(minVersions > 0 || maxTs == null + || maxCellTtl == HColumnDescriptor.DEFAULT_TTL + || (currentTime - maxCellTtl < maxTs)){ + continue; + } else{ + filtered.add(sf); + } + } + return filtered; + } + + private int getMinVersions(StoreFile sf) { + Configuration conf = sf.getConf(); + return conf.getInt(HColumnDescriptor.MIN_VERSIONS, HColumnDescriptor.DEFAULT_MIN_VERSIONS); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index 5aeff5c..2709f9d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -53,7 +53,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { super(conf, storeConfigInfo); } - private ArrayList getCurrentEligibleFiles( + protected ArrayList getCurrentEligibleFiles( ArrayList candidateFiles, final List filesCompacting) { // candidates = all storefiles not already in compaction queue if (!filesCompacting.isEmpty()) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java index fbd81c9..6687e95 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java @@ -88,6 +88,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mortbay.log.Log; @Category(LargeTests.class) public class TestMobCompactor { @@ -569,10 +570,15 @@ public class TestMobCompactor { countFiles(tableName, false, family2)); int largeFilesCount = countLargeFiles(5000, family1); + System.out.println("largeFilesCount="+largeFilesCount+" regionNum="+regionNum); // do the mob compaction + Log.info("\n\n\n\n\n\n Mob compaction starts\n\n\n\n"); + countFiles(tableName, true, family1); admin.compact(tableName, hcd1.getName(), Admin.CompactType.MOB); waitUntilMobCompactionFinished(tableName); + Log.info("\n\n\n\n\n\n Mob compaction finished\n\n\n\n"); + assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum), countMobRows(hTable)); assertEquals("After compaction: mob cells count", regionNum @@ -794,6 +800,7 @@ public class TestMobCompactor { if (fs.exists(mobDirPath)) { FileStatus[] files = fs.listStatus(mobDirPath); for (FileStatus file : files) { + System.out.println(isMobFile+":"+file.getPath()); if (isMobFile == true) { if (!StoreFileInfo.isDelFile(file.getPath())) { count++; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileMaxCellTTL.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileMaxCellTTL.java new file mode 100644 index 0000000..6c0f567 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileMaxCellTTL.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertTrue; + +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Class that test tags + */ +@Category({RegionServerTests.class, SmallTests.class}) +public class TestStoreFileMaxCellTTL { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster; + @Rule + public final TestName TEST_NAME = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt("hfile.format.version", 3); + + cluster = TEST_UTIL.startMiniCluster(1, 1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + + @Test + public void testLoadFlushWithMaxTTL() throws Exception { + Table table = null; + final int cfTtl = 5000; + final long ttl1 = 10000; + final long ttl2 = 20000; + + try { + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + byte[] fam = Bytes.toBytes("info"); + // column names + byte[] qual = Bytes.toBytes("qual"); + + byte[] row1 = Bytes.toBytes("rowb"); + + byte[] row2 = Bytes.toBytes("rowc"); + + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(fam); + colDesc.setTimeToLive(cfTtl); + desc.addFamily(colDesc); + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + byte[] value = Bytes.toBytes("value"); + table = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(row1); + put.addColumn(fam, qual, value); + put.setTTL(ttl1); + table.put(put); + + put = new Put(row2); + put.addColumn(fam, qual, value); + put.setTTL(ttl2); + table.put(put); + + admin.flush(tableName); + + List regions = cluster.getRegions(tableName); + assertTrue(regions.size() == 1); + HRegion region = regions.get(0); + List stores = region.getStores(); + assertTrue(stores.size() == 1); + Store store = stores.get(0); + Collection files = store.getStorefiles(); + assertTrue(files.size() == 1); + StoreFile file = files.iterator().next(); + long time = file.getMaxCellTimeToLive(); + assertTrue(time == Math.max(ttl1, ttl2)); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testLoadFlushNoMaxTTLWithColTTL() throws Exception { + Table table = null; + final int cfTtl = 5000; + + + try { + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + byte[] fam = Bytes.toBytes("info"); + // column names + byte[] qual = Bytes.toBytes("qual"); + + byte[] row1 = Bytes.toBytes("rowb"); + + byte[] row2 = Bytes.toBytes("rowc"); + + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(fam); + colDesc.setTimeToLive(cfTtl); + desc.addFamily(colDesc); + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + byte[] value = Bytes.toBytes("value"); + table = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(row1); + put.addColumn(fam, qual, value); + table.put(put); + + put = new Put(row2); + put.addColumn(fam, qual, value); + table.put(put); + + admin.flush(tableName); + + List regions = cluster.getRegions(tableName); + assertTrue(regions.size() == 1); + HRegion region = regions.get(0); + List stores = region.getStores(); + assertTrue(stores.size() == 1); + Store store = stores.get(0); + Collection files = store.getStorefiles(); + assertTrue(files.size() == 1); + StoreFile file = files.iterator().next(); + long time = file.getMaxCellTimeToLive(); + assertTrue(time == cfTtl); + } finally { + if (table != null) { + table.close(); + } + } + } + @Test + public void testLoadFlushNoMaxTTLNoColTTL() throws Exception { + Table table = null; + + try { + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + byte[] fam = Bytes.toBytes("info"); + // column names + byte[] qual = Bytes.toBytes("qual"); + + byte[] row1 = Bytes.toBytes("rowb"); + + byte[] row2 = Bytes.toBytes("rowc"); + + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(fam); + desc.addFamily(colDesc); + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + byte[] value = Bytes.toBytes("value"); + table = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(row1); + put.addColumn(fam, qual, value); + table.put(put); + + put = new Put(row2); + put.addColumn(fam, qual, value); + table.put(put); + + admin.flush(tableName); + + List regions = cluster.getRegions(tableName); + assertTrue(regions.size() == 1); + HRegion region = regions.get(0); + List stores = region.getStores(); + assertTrue(stores.size() == 1); + Store store = stores.get(0); + Collection files = store.getStorefiles(); + assertTrue(files.size() == 1); + StoreFile file = files.iterator().next(); + long time = file.getMaxCellTimeToLive(); + assertTrue( time == HColumnDescriptor.DEFAULT_TTL); + + } finally { + if (table != null) { + table.close(); + } + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreMetaDataPlugins.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreMetaDataPlugins.java new file mode 100644 index 0000000..b6d6051 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreMetaDataPlugins.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.hfile.HFile.Writer; +import org.apache.hadoop.hbase.regionserver.StoreFile.MetaWriter; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Class that test StoreFile meta plug in + */ +@Category({RegionServerTests.class, SmallTests.class}) +public class TestStoreMetaDataPlugins { + + public static final byte[] KEY = Bytes.toBytes("KEY"); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster; + @Rule + public final TestName TEST_NAME = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + cluster = TEST_UTIL.startMiniCluster(1, 1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + + @Test + public void testLoadFlushWithMaxTTL() throws Exception { + Table table = null; + try { + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + byte[] fam = Bytes.toBytes("info"); + // column names + byte[] qual = Bytes.toBytes("qual"); + byte[] row1 = Bytes.toBytes("rowb"); + byte[] row2 = Bytes.toBytes("rowc"); + + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(fam); + colDesc.setConfiguration(StoreFile.HBASE_HFILE_PLUGINS_KEY, + TestPlugin.class.getName()); + desc.addFamily(colDesc); + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + byte[] value = Bytes.toBytes("value"); + table = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(row1); + put.addColumn(fam, qual, value); + table.put(put); + + put = new Put(row2); + put.addColumn(fam, qual, value); + table.put(put); + + admin.flush(tableName); + + List regions = cluster.getRegions(tableName); + assertTrue(regions.size() == 1); + HRegion region = regions.get(0); + List stores = region.getStores(); + assertTrue(stores.size() == 1); + Store store = stores.get(0); + Collection files = store.getStorefiles(); + assertTrue(files.size() == 1); + StoreFile file = files.iterator().next(); + int numCells = Bytes.toInt(file.getMetadataValue(KEY)); + assertEquals(numCells, 2); + } finally { + if (table != null) { + table.close(); + } + } + } + +} + +class TestPlugin extends StoreFile.Plugin +{ + MetaWriter metaWriter; + @Override + public MetaWriter getMetaWriter() { + if(metaWriter == null){ + metaWriter = new TestMetaWriter(); + } + return metaWriter; + } + +} + +class TestMetaWriter extends StoreFile.MetaWriter +{ + + int cellCount = 0; + public TestMetaWriter(){} + @Override + public void add(Cell cell) { + cellCount++; + } + + @Override + public void appendMetadata(Writer writer) + throws IOException + { + writer.appendFileInfo(TestStoreMetaDataPlugins.KEY, Bytes.toBytes(cellCount)); + } + +} \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java new file mode 100644 index 0000000..233bfc6 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestFIFOCompactionPolicy { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store prepareData() throws IOException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + FIFOCompactionPolicy.class.getName()); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + colDesc.setTimeToLive(1000); + desc.addFamily(colDesc); + + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(tableName); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + if(i < 9){ + try { + Thread.sleep(1001); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + return getStoreWithName(tableName); + } + + @Test + public void testPurgeExpiredFiles() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName); + while (store.getStorefilesCount() > 1) { + Thread.sleep(20); + } + Thread.sleep(1001); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName); + while (store.getStorefilesCount() > 0) { + Thread.sleep(20); + } + assertTrue(true); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPurgeExpiredStores.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPurgeExpiredStores.java new file mode 100644 index 0000000..2f84c4e --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPurgeExpiredStores.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestPurgeExpiredStores { + + private static final Log LOG = LogFactory.getLog(TestPurgeExpiredStores.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store prepareData() throws IOException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + colDesc.setTimeToLive(3000); + desc.addFamily(colDesc); + + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(tableName); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + } + return getStoreWithName(tableName); + } + + @Test + public void testPurgeExpiredFiles() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + Thread.sleep(3010); + assertEquals(10, store.getStorefilesCount()); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName); + while (store.getStorefilesCount() > 0) { + Thread.sleep(20); + } + assertTrue(true); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + +}