diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 2b9d101..5991a8e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -25,12 +25,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.SortedSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,11 +38,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.nio.ByteBuff; @@ -61,7 +62,6 @@ import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.hbase.io.hfile.HFileBlock; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; @@ -86,6 +86,7 @@ import com.google.common.collect.Ordering; public class StoreFile { private static final Log LOG = LogFactory.getLog(StoreFile.class.getName()); + static final String HBASE_HFILE_PLUGINS_KEY = "hbase.hfile.plugins"; // Keys for fileinfo values in HFile /** Max Sequence ID in FileInfo */ @@ -200,6 +201,10 @@ public class StoreFile { public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); /** + * Configuration + */ + private final Configuration conf; + /** * Constructor, loads a reader and it's indices, etc. May allocate a * substantial amount of ram depending on the underlying files (10-20MB?). * @@ -240,6 +245,7 @@ public class StoreFile { this.fs = fs; this.fileInfo = fileInfo; this.cacheConf = cacheConf; + this.conf = conf; if (BloomFilterFactory.isGeneralBloomEnabled(conf)) { this.cfBloomType = cfBloomType; @@ -259,6 +265,8 @@ public class StoreFile { this.fileInfo = other.fileInfo; this.cacheConf = other.cacheConf; this.cfBloomType = other.cfBloomType; + this.conf = other.conf; + } /** @@ -766,7 +774,6 @@ public class StoreFile { * @param comparator Comparator used to compare KVs. * @return The split point row, or null if splitting is not possible, or reader is null. */ - @SuppressWarnings("deprecation") byte[] getFileSplitPoint(CellComparator comparator) throws IOException { if (this.reader == null) { LOG.warn("Storefile " + this + " Reader is null; cannot get split point"); @@ -822,6 +829,8 @@ public class StoreFile { protected HFile.Writer writer; private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null; + private List plugins; + /** * Creates an HFile.Writer that also write helpful meta data. * @param fs file system to write to @@ -877,6 +886,7 @@ public class StoreFile { if (LOG.isTraceEnabled()) LOG.trace("Delete Family Bloom filter type for " + path + ": " + deleteFamilyBloomFilterWriter.getClass().getSimpleName()); } + this.plugins = StoreFilePluginFactory.getPlugins(conf); } /** @@ -892,6 +902,21 @@ public class StoreFile { writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); appendTrackedTimestampsToMetadata(); + notifyPluginsOnAppendMetadata(writer); + } + + private void notifyPluginsOnAppendMetadata( + org.apache.hadoop.hbase.io.hfile.HFile.Writer w) + throws IOException + { + if(plugins.size() > 0){ + int size = plugins.size(); + for(int i=0; i < size; i++){ + Plugin plugin = plugins.get(i); + StoreFile.MetaWriter metaWriter = plugin.getMetaWriter(); + metaWriter.appendMetadata(w); + } + } } /** @@ -908,6 +933,7 @@ public class StoreFile { writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); appendTrackedTimestampsToMetadata(); + notifyPluginsOnAppendMetadata(writer); } /** @@ -1058,6 +1084,23 @@ public class StoreFile { appendDeleteFamilyBloomFilter(cell); writer.append(cell); trackTimestamps(cell); + notifyPluginsOnAppend(cell); + } + + /** + * Notifies writer's plug-ins on append Cell. + * @param cell cell to be appended + */ + private void notifyPluginsOnAppend(Cell cell) + { + if(plugins.size() > 0){ + int size = plugins.size(); + for(int i=0; i < size; i++){ + Plugin plugin = plugins.get(i); + StoreFile.MetaWriter metaWriter = plugin.getMetaWriter(); + metaWriter.add(cell); + } + } } public Path getPath() { @@ -1794,4 +1837,53 @@ public class StoreFile { } } } + + /** + * StoreFile plug-in supports custom code in Writer/Reader(in a future) + * path. The main goal is to provide HBase applications with a hook allowing + * to store/retrieve custom meta info associated with a store file. + * + */ + @InterfaceAudience.Public + public static abstract class Plugin { + protected Configuration conf; + + public Plugin(Configuration conf){ + this.conf = conf; + } + + /** + * Get Meta-Writer plug-in implementation. + * Subclass must override + * @return meta writer plug-in + */ + public abstract StoreFile.MetaWriter getMetaWriter(); + } + /** + * Meta writer plug-in code here. + */ + + @InterfaceAudience.Public + public static abstract class MetaWriter { + + protected Configuration conf; + + public MetaWriter(Configuration conf) { + this.conf = conf; + } + + /** + * Add cell is called by StoreFile.Writer on add(Cell cell) + * @param cell + */ + public abstract void add(Cell cell); + + /** + * Append meta-data + * @param writer + */ + public abstract void appendMetadata(HFile.Writer writer) + throws IOException; + + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePluginFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePluginFactory.java new file mode 100644 index 0000000..24b854f --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePluginFactory.java @@ -0,0 +1,55 @@ +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.regionserver; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +@InterfaceAudience.Private +public final class StoreFilePluginFactory { + private static final Log LOG = LogFactory.getLog(StoreFilePluginFactory.class); + + private StoreFilePluginFactory(){ + //Utility classes should not have a public or default constructor + } + + static List getPlugins(Configuration conf) { + ArrayList plugins = new ArrayList(); + String[] classNameList = conf.getStrings(StoreFile.HBASE_HFILE_PLUGINS_KEY, new String[] {}); + for (String className : classNameList) { + className = className.trim(); + try { + Class[] ctorArgTypes = new Class[]{Configuration.class}; + Object[] ctorArgs = new Object[]{conf}; + StoreFile.Plugin plugin = + ReflectionUtils.instantiateWithCustomCtor(className, ctorArgTypes, ctorArgs); + plugins.add(plugin); + } catch (Exception e) { + LOG.error("Could not instantiate plugin: " + className, e); + } + } + return plugins; + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java index 9922aff..2f79b0c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java @@ -26,6 +26,7 @@ import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; @@ -78,6 +79,8 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.SortedList; import org.apache.hadoop.hbase.util.Threads; import org.junit.AfterClass; import org.junit.Assert; @@ -153,10 +156,6 @@ public class TestMobCompactor { @Test(timeout = 300000) public void testMinorCompaction() throws Exception { resetConf(); - int mergeSize = 5000; - // change the mob compaction merge size - conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); - // create a table with namespace NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("ns").build(); String tableNameAsString = "ns:testMinorCompaction"; @@ -174,7 +173,10 @@ public class TestMobCompactor { assertEquals("Before deleting: mob file count", regionNum * count, countFiles(tableName, true, family1)); - int largeFilesCount = countLargeFiles(mergeSize, tableName, family1); + Pair largerFiles = countLargeFiles(tableName, family1); + int largeFilesCount = largerFiles.getFirst(); + // change the mob compaction merge size + conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, largerFiles.getSecond()); createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1)); assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum), @@ -200,7 +202,8 @@ public class TestMobCompactor { * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table)); // After the compaction, the files smaller than the mob compaction merge size // is merge to one file - assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum, + assertEquals("After compaction: family1 mob file count", + (largeFilesCount == regionNum * count) ? largeFilesCount : largeFilesCount + regionNum, countFiles(tableName, true, family1)); assertEquals("After compaction: family2 mob file count", regionNum * count, countFiles(tableName, true, family2)); @@ -591,7 +594,7 @@ public class TestMobCompactor { * @param familyName the family name * @return the number of files large than the size */ - private int countLargeFiles(int size, TableName tableName, String familyName) throws IOException { +/* private int countLargeFiles(int size, TableName tableName, String familyName) throws IOException { Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName); int count = 0; if (fs.exists(mobDirPath)) { @@ -605,6 +608,37 @@ public class TestMobCompactor { } return count; } +*/ + private Pair countLargeFiles(TableName tableName, String familyName) + throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath(MobUtils.getMobRegionPath(conf, tableName), + familyName); + SortedList fileLength = new SortedList(new Comparator() { + @Override + public int compare(Long o1, Long o2) { + return o1 == o2 ? 0 : (o1 > o2 ? 1 : -1); + } + }); + if (fs.exists(mobDirPath)) { + FileStatus[] files = fs.listStatus(mobDirPath); + for (FileStatus file : files) { + // ignore the del files in the mob path + if (!StoreFileInfo.isDelFile(file.getPath())) { + fileLength.add(file.getLen()); + } + } + } + int index = fileLength.size() / 2; + if (index > 0) { + while (index < fileLength.size() + && fileLength.get(index - 1).longValue() == fileLength.get(index).longValue()) { + index++; + } + } + return index == fileLength.size() ? new Pair(fileLength.size(), + fileLength.get(0)) + : new Pair(fileLength.size() - index, fileLength.get(index)); + } /** * loads some data to the table. diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreMetaDataPlugins.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreMetaDataPlugins.java new file mode 100644 index 0000000..4e127fb --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreMetaDataPlugins.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.hfile.HFile.Writer; +import org.apache.hadoop.hbase.regionserver.StoreFile.MetaWriter; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Class that test StoreFile meta plug in + */ +@Category({RegionServerTests.class, SmallTests.class}) +public class TestStoreMetaDataPlugins { + + public static final byte[] KEY = Bytes.toBytes("KEY"); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster; + @Rule + public final TestName TEST_NAME = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + cluster = TEST_UTIL.startMiniCluster(1, 1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testLoadFlushWithMaxTTL() throws Exception { + Table table = null; + try { + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + byte[] fam = Bytes.toBytes("info"); + // column names + byte[] qual = Bytes.toBytes("qual"); + byte[] row1 = Bytes.toBytes("rowb"); + byte[] row2 = Bytes.toBytes("rowc"); + + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(fam); + colDesc.setConfiguration(StoreFile.HBASE_HFILE_PLUGINS_KEY, TestPlugin.class.getName()); + desc.addFamily(colDesc); + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + byte[] value = Bytes.toBytes("value"); + table = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(row1); + put.addColumn(fam, qual, value); + table.put(put); + + put = new Put(row2); + put.addColumn(fam, qual, value); + table.put(put); + + admin.flush(tableName); + + List regions = cluster.getRegions(tableName); + assertTrue(regions.size() == 1); + HRegion region = regions.get(0); + List stores = region.getStores(); + assertTrue(stores.size() == 1); + Store store = stores.get(0); + Collection files = store.getStorefiles(); + assertTrue(files.size() == 1); + StoreFile file = files.iterator().next(); + int numCells = Bytes.toInt(file.getMetadataValue(KEY)); + assertEquals(numCells, 2); + } finally { + if (table != null) { + table.close(); + } + } + } + + public static class TestPlugin extends StoreFile.Plugin { + MetaWriter metaWriter; + + public TestPlugin(Configuration conf) { + super(conf); + } + + @Override + public MetaWriter getMetaWriter() { + if (metaWriter == null) { + metaWriter = new TestMetaWriter(conf); + } + return metaWriter; + } + + } + + public static class TestMetaWriter extends StoreFile.MetaWriter { + + int cellCount = 0; + + public TestMetaWriter(Configuration conf) { + super(conf); + } + + @Override + public void add(Cell cell) { + cellCount++; + } + + @Override + public void appendMetadata(Writer writer) throws IOException { + writer.appendFileInfo(TestStoreMetaDataPlugins.KEY, Bytes.toBytes(cellCount)); + } + + } +} \ No newline at end of file