diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 872bbc5..647defd 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1483,4 +1483,38 @@ possible configurations would overwhelm and obscure the important. The default value is 0.5f. + + hbase.mob.sweep.tool.compaction.ratio + 0.5f + + If there're too many cells deleted in a mob file, it's regarded + as an invalid file and needs to be merged. + If existingCellsSize/mobFileSize is less than ratio, it's regarded + as an invalid file. The default value is 0.5f. + + + + hbase.mob.sweep.tool.compaction.mergeable.size + 134217728 + + If the size of a mob file is less than this value, it's regarded as a small + file and needs to be merged. The default value is 128MB. + + + + hbase.mob.sweep.tool.compaction.memstore.flush.size + 134217728 + + The flush size for the memstore used by sweep job. Each sweep reducer owns such a memstore. + The default value is 128MB. + + + + hbase.master.mob.ttl.cleaner.period + 86400000 + + The period that ExpiredMobFileCleanerChore runs. The unit is millisecond. + The default value is one day. + + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java new file mode 100644 index 0000000..98fe236 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.util.Threads; + +/** + * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired + * mob files. + */ +@InterfaceAudience.Private +public class ExpiredMobFileCleanerChore extends Chore { + + private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleanerChore.class); + private final HMaster master; + private ExpiredMobFileCleaner cleaner; + + public ExpiredMobFileCleanerChore(HMaster master) { + super(master.getServerName() + "-ExpiredMobFileCleanerChore", master.getConfiguration().getInt( + MobConstants.MOB_CLEANER_PERIOD, MobConstants.DEFAULT_MOB_CLEANER_PERIOD), master); + this.master = master; + cleaner = new ExpiredMobFileCleaner(); + } + + @Override + protected void chore() { + try { + TableDescriptors htds = master.getTableDescriptors(); + Map map = htds.getAll(); + for (HTableDescriptor htd : map.values()) { + for (HColumnDescriptor hcd : htd.getColumnFamilies()) { + if (MobUtils.isMobFamily(hcd) && hcd.getMinVersions() == 0) { + cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd); + } + } + } + } catch (Exception e) { + LOG.error("Fail to clean the expired mob files", e); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 714b5a8..4ff3592 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -208,6 +208,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { CatalogJanitor catalogJanitorChore; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; + private ExpiredMobFileCleanerChore expiredMobFileCleanerChore; MasterCoprocessorHost cpHost; @@ -610,6 +611,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // master initialization. See HBASE-5916. this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer(); + this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this); + Threads.setDaemonThreadRunning(expiredMobFileCleanerChore.getThread()); + if (this.cpHost != null) { // don't let cp initialization errors kill the master try { @@ -856,6 +860,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } private void stopChores() { + if (this.expiredMobFileCleanerChore != null) { + this.expiredMobFileCleanerChore.interrupt(); + } if (this.balancerChore != null) { this.balancerChore.interrupt(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java index 5f13502..fd35a15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java @@ -152,7 +152,7 @@ public class DefaultMobCompactor extends DefaultCompactor { // to the store file. writer.append(kv); } else if (MobUtils.isMobReferenceCell(kv)) { - if (MobUtils.isValidMobRefCellValue(kv)) { + if (MobUtils.hasValidMobRefCellValue(kv)) { int size = MobUtils.getMobValueLength(kv); if (size > mobSizeThreshold) { // If the value size is larger than the threshold, it's regarded as a mob. Since diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java new file mode 100644 index 0000000..d3c11ad --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java @@ -0,0 +1,120 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.google.protobuf.ServiceException; + +/** + * The cleaner to delete the expired MOB files. + */ +@InterfaceAudience.Private +public class ExpiredMobFileCleaner extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleaner.class); + /** + * Cleans the MOB files when they're expired and their min versions are 0. + * If the latest timestamp of Cells in a MOB file is older than the TTL in the column family, + * it's regarded as expired. This cleaner deletes them. + * At a time T0, the cells in a mob file M0 are expired. If a user starts a scan before T0, those + * mob cells are visible, this scan still runs after T0. At that time T1, this mob file M0 + * is expired, meanwhile a cleaner starts, the M0 is archived and can be read in the archive + * directory. + * @param tableName The current table name. + * @param family The current family. + * @throws ServiceException + * @throws IOException + */ + public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) + throws ServiceException, IOException { + Configuration conf = getConf(); + TableName tn = TableName.valueOf(tableName); + FileSystem fs = FileSystem.get(conf); + LOG.info("Cleaning the expired MOB files of " + family.getNameAsString() + " in " + tableName); + // disable the block cache. + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + CacheConfig cacheConfig = new CacheConfig(copyOfConf); + MobUtils.cleanExpiredMobFiles(fs, conf, tn, family, cacheConfig, + EnvironmentEdgeManager.currentTime()); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + ToolRunner.run(conf, new ExpiredMobFileCleaner(), args); + } + + private void printUsage() { + System.err.println("Usage:\n" + "--------------------------\n" + + ExpiredMobFileCleaner.class.getName() + " tableName familyName"); + System.err.println(" tableName The table name"); + System.err.println(" familyName The column family name"); + } + + public int run(String[] args) throws Exception { + if (args.length != 2) { + printUsage(); + return 1; + } + String tableName = args[0]; + String familyName = args[1]; + TableName tn = TableName.valueOf(tableName); + HBaseAdmin.checkHBaseAvailable(getConf()); + HBaseAdmin admin = new HBaseAdmin(getConf()); + try { + HTableDescriptor htd = admin.getTableDescriptor(tn); + HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName)); + if (family == null || !MobUtils.isMobFamily(family)) { + throw new IOException("Column family " + familyName + " is not a MOB column family"); + } + if (family.getMinVersions() > 0) { + throw new IOException( + "The minVersions of the column family is not 0, could not be handled by this cleaner"); + } + cleanExpiredMobFiles(tableName, family); + return 0; + } finally { + try { + admin.close(); + } catch (IOException e) { + LOG.error("Fail to close the HBaseAdmin.", e); + } + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java index 9978afd..4e3e7c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java @@ -38,6 +38,7 @@ public class MobConstants { public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw"; public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks"; + public static final String MOB_SCAN_REF_ONLY = "hbase.mob.scan.ref.only"; public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size"; public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000; @@ -46,6 +47,26 @@ public class MobConstants { public static final String MOB_REGION_NAME = ".mob"; public static final byte[] MOB_REGION_NAME_BYTES = Bytes.toBytes(MOB_REGION_NAME); + public static final String MOB_CLEANER_PERIOD = "hbase.master.mob.ttl.cleaner.period"; + public static final int DEFAULT_MOB_CLEANER_PERIOD = 24 * 60 * 60 * 1000; // one day + + public static final String MOB_SWEEP_TOOL_COMPACTION_START_DATE = + "hbase.mob.sweep.tool.compaction.start.date"; + public static final String MOB_SWEEP_TOOL_COMPACTION_RATIO = + "hbase.mob.sweep.tool.compaction.ratio"; + public static final String MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE = + "hbase.mob.sweep.tool.compaction.mergeable.size"; + + public static final float DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO = 0.5f; + public static final long DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE = 128 * 1024 * 1024; + + public static final String MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME = "mobcompaction"; + + public static final String MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE = + "hbase.mob.sweep.tool.compaction.memstore.flush.size"; + public static final long DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE = + 1024 * 1024 * 128; // 128M + public static final String MOB_CACHE_EVICT_PERIOD = "hbase.mob.cache.evict.period"; public static final String MOB_CACHE_EVICT_REMAIN_RATIO = "hbase.mob.cache.evict.remain.ratio"; public static final Tag MOB_REF_TAG = new Tag(TagType.MOB_REFERENCE_TAG_TYPE, @@ -55,6 +76,7 @@ public class MobConstants { public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l; public final static String TEMP_DIR_NAME = ".tmp"; + public final static String EMPTY_STRING = ""; private MobConstants() { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index e52d336..e49d3ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -18,13 +18,22 @@ */ package org.apache.hadoop.hbase.mob; +import java.io.FileNotFoundException; +import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.List; +import java.util.UUID; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -34,7 +43,16 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -44,6 +62,9 @@ import org.apache.hadoop.hbase.util.FSUtils; @InterfaceAudience.Private public class MobUtils { + private static final Log LOG = LogFactory.getLog(MobUtils.class); + private static final String COMPACTION_WORKING_DIR_NAME = "working"; + private static final ThreadLocal LOCAL_FORMAT = new ThreadLocal() { @Override @@ -143,6 +164,22 @@ public class MobUtils { } /** + * Indicates whether it's a reference only scan. + * The information is set in the attribute "hbase.mob.scan.ref.only" of scan. + * If it's a ref only scan, only the cells with ref tag are returned. + * @param scan The current scan. + * @return True if it's a ref only scan. + */ + public static boolean isRefOnlyScan(Scan scan) { + byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY); + try { + return refOnly != null && Bytes.toBoolean(refOnly); + } catch (IllegalArgumentException e) { + return false; + } + } + + /** * Indicates whether the scan contains the information of caching blocks. * The information is set in the attribute "hbase.mob.cache.blocks" of scan. * @param scan The current scan. @@ -172,6 +209,91 @@ public class MobUtils { } /** + * Cleans the expired mob files. + * Cleans the files whose creation date is older than (current - columnFamily.ttl), and + * the minVersions of that column family is 0. + * @param fs The current file system. + * @param conf The current configuration. + * @param tableName The current table name. + * @param columnDescriptor The descriptor of the current column family. + * @param cacheConfig The cacheConfig that disables the block cache. + * @param current The current time. + * @throws IOException + */ + public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName, + HColumnDescriptor columnDescriptor, CacheConfig cacheConfig, long current) + throws IOException { + long timeToLive = columnDescriptor.getTimeToLive(); + if (Integer.MAX_VALUE == timeToLive) { + // no need to clean, because the TTL is not set. + return; + } + + Date expireDate = new Date(current - timeToLive * 1000); + expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate()); + LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!"); + + FileStatus[] stats = null; + Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName); + Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString()); + try { + stats = fs.listStatus(path); + } catch (FileNotFoundException e) { + LOG.warn("Fail to find the mob file " + path, e); + } + if (null == stats) { + // no file found + return; + } + List filesToClean = new ArrayList(); + int deletedFileCount = 0; + for (FileStatus file : stats) { + String fileName = file.getPath().getName(); + try { + MobFileName mobFileName = null; + if (!HFileLink.isHFileLink(file.getPath())) { + mobFileName = MobFileName.create(fileName); + } else { + HFileLink hfileLink = new HFileLink(conf, file.getPath()); + mobFileName = MobFileName.create(hfileLink.getOriginPath().getName()); + } + Date fileDate = parseDate(mobFileName.getDate()); + if (LOG.isDebugEnabled()) { + LOG.debug("Checking file " + fileName); + } + if (fileDate.getTime() < expireDate.getTime()) { + if (LOG.isDebugEnabled()) { + LOG.debug(fileName + " is an expired file"); + } + filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE)); + } + } catch (Exception e) { + LOG.error("Cannot parse the fileName " + fileName, e); + } + } + if (!filesToClean.isEmpty()) { + try { + removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(), + filesToClean); + deletedFileCount = filesToClean.size(); + } catch (IOException e) { + LOG.error("Fail to delete the mob files " + filesToClean, e); + } + } + LOG.info(deletedFileCount + " expired mob files are deleted"); + } + + /** + * Gets the znode name of column family. + * @param tableName The current table name. + * @param familyName The name of the current column family. + * @return The znode name of column family. + */ + public static String getColumnFamilyZNodeName(String tableName, String familyName) { + return tableName + ":" + familyName; + } + + /** * Gets the root dir of the mob files. * It's {HBASE_DIR}/mobdir. * @param conf The current configuration. @@ -183,6 +305,19 @@ public class MobUtils { } /** + * Gets the qualified root dir of the mob files. + * @param conf The current configuration. + * @return The qualified root dir. + * @throws IOException + */ + public static Path getQualifiedMobRootDir(Configuration conf) throws IOException { + Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); + Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME); + FileSystem fs = mobRootDir.getFileSystem(conf); + return mobRootDir.makeQualified(fs); + } + + /** * Gets the region dir of the mob files. * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}. * @param conf The current configuration. @@ -190,7 +325,7 @@ public class MobUtils { * @return The region dir of the mob files. */ public static Path getMobRegionPath(Configuration conf, TableName tableName) { - Path tablePath = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName); + Path tablePath = FSUtils.getTableDir(getMobHome(conf), tableName); HRegionInfo regionInfo = getMobRegionInfo(tableName); return new Path(tablePath, regionInfo.getEncodedName()); } @@ -232,36 +367,160 @@ public class MobUtils { } /** + * Gets whether the current HRegionInfo is a mob one. + * @param regionInfo The current HRegionInfo. + * @return If true, the current HRegionInfo is a mob one. + */ + public static boolean isMobRegionInfo(HRegionInfo regionInfo) { + return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName() + .equals(regionInfo.getEncodedName()); + } + + /** + * Gets the working directory of the mob compaction. + * @param root The root directory of the mob compaction. + * @param jobName The current job name. + * @return The directory of the mob compaction for the current job. + */ + public static Path getCompactionWorkingPath(Path root, String jobName) { + Path parent = new Path(root, jobName); + return new Path(parent, COMPACTION_WORKING_DIR_NAME); + } + + /** + * Archives the mob files. + * @param conf The current configuration. + * @param fs The current file system. + * @param tableName The table name. + * @param tableDir The table directory. + * @param family The name of the column family. + * @param storeFiles The files to be deleted. + * @throws IOException + */ + public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName, + Path tableDir, byte[] family, Collection storeFiles) throws IOException { + HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family, + storeFiles); + } + + /** * Creates a mob reference KeyValue. * The value of the mob reference KeyValue is mobCellValueSize + mobFileName. - * @param kv The original KeyValue. + * @param cell The original Cell. * @param fileName The mob file name where the mob reference KeyValue is written. * @param tableNameTag The tag of the current table name. It's very important in * cloning the snapshot. * @return The mob reference KeyValue. */ - public static KeyValue createMobRefKeyValue(KeyValue kv, byte[] fileName, Tag tableNameTag) { + public static KeyValue createMobRefKeyValue(Cell cell, byte[] fileName, Tag tableNameTag) { // Append the tags to the KeyValue. // The key is same, the value is the filename of the mob file - List existingTags = Tag.asList(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength()); - existingTags.add(MobConstants.MOB_REF_TAG); + List tags = new ArrayList(); + // Add the ref tag as the 1st one. + tags.add(MobConstants.MOB_REF_TAG); + // Add the existing tags. + tags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength())); // Add the tag of the source table name, this table is where this mob file is flushed // from. // It's very useful in cloning the snapshot. When reading from the cloning table, we need to // find the original mob files by this table name. For details please see cloning // snapshot for mob files. - existingTags.add(tableNameTag); - int valueLength = kv.getValueLength(); + tags.add(tableNameTag); + int valueLength = cell.getValueLength(); byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName); - KeyValue reference = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), - kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), - kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put, - refValue, 0, refValue.length, existingTags); - reference.setSequenceId(kv.getSequenceId()); + KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags); + reference.setSequenceId(cell.getSequenceId()); return reference; } /** + * Creates a directory of mob files for flushing. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The hex string of the start key. + * @param cacheConfig The current cache config. + * @return The writer for the mob file. + * @throws IOException + */ + public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, String startKey, CacheConfig cacheConfig) + throws IOException { + MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString() + .replaceAll("-", "")); + HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) + .withIncludesMvcc(false).withIncludesTags(true) + .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE) + .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) + .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) + .withDataBlockEncoding(family.getDataBlockEncoding()).build(); + + StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs) + .withFilePath(new Path(basePath, mobFileName.getFileName())) + .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + return w; + } + + /** + * Commits the mob file. + * @param @param conf The current configuration. + * @param fs The current file system. + * @param path The path where the mob file is saved. + * @param targetPath The directory path where the source file is renamed to. + * @param cacheConfig The current cache config. + * @throws IOException + */ + public static void commitFile(Configuration conf, FileSystem fs, final Path sourceFile, + Path targetPath, CacheConfig cacheConfig) throws IOException { + if (sourceFile == null) { + return; + } + Path dstPath = new Path(targetPath, sourceFile.getName()); + validateMobFile(conf, fs, sourceFile, cacheConfig); + String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; + LOG.info(msg); + Path parent = dstPath.getParent(); + if (!fs.exists(parent)) { + fs.mkdirs(parent); + } + if (!fs.rename(sourceFile, dstPath)) { + throw new IOException("Failed rename of " + sourceFile + " to " + dstPath); + } + } + + /** + * Validates a mob file by opening and closing it. + * @param conf The current configuration. + * @param fs The current file system. + * @param path The path where the mob file is saved. + * @param cacheConfig The current cache config. + */ + private static void validateMobFile(Configuration conf, FileSystem fs, Path path, + CacheConfig cacheConfig) throws IOException { + StoreFile storeFile = null; + try { + storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE); + storeFile.createReader(); + } catch (IOException e) { + LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeReader(false); + } + } + } + + /** * Indicates whether the current mob ref cell has a valid value. * A mob ref cell has a mob reference tag. * The value of a mob ref cell consists of two parts, real mob value length and mob file name. @@ -270,7 +529,7 @@ public class MobUtils { * @param cell The mob ref cell. * @return True if the cell has a valid value. */ - public static boolean isValidMobRefCellValue(Cell cell) { + public static boolean hasValidMobRefCellValue(Cell cell) { return cell.getValueLength() > Bytes.SIZEOF_INT; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java new file mode 100644 index 0000000..a9557d7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java @@ -0,0 +1,270 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * The zookeeper used for MOB. + * This zookeeper is used to synchronize the HBase major compaction and sweep tool. + * The structure of the nodes for mob in zookeeper. + * |--baseNode + * |--MOB + * |--tableName:columnFamilyName-lock // locks for the mob column family + * |--tableName:columnFamilyName-sweeper // when a sweep tool runs, such a node is added + * |--tableName:columnFamilyName-majorCompaction + * |--UUID //when a major compaction occurs, such a node is added. + * In order to synchronize the operations between the sweep tool and HBase major compaction, these + * actions need to acquire the tableName:columnFamilyName-lock before the sweep tool and major + * compaction run. + * In sweep tool. + * 1. If it acquires the lock successfully. It check whether the sweeper node exists, if exist the + * current running is aborted. If not it it checks whether there're major compaction nodes, if yes + * the current running is aborted, if not it adds a sweep node to the zookeeper. + * 2. If it could not obtain the lock, the current running is aborted. + * In the HBase compaction. + * 1. If it's a minor compaction, continue the compaction. + * 2. If it's a major compaction, it acquires a lock in zookeeper. + * A. If it obtains the lock, it checks whether there's sweep node, if yes it converts itself + * to a minor one and continue, if no it adds a major compaction node to the zookeeper. + * B. If it could not obtain the lock, it converts itself to a minor one and continue the + * compaction. + */ +@InterfaceAudience.Private +public class MobZookeeper { + // TODO Will remove this class before the mob is merged back to master. + private static final Log LOG = LogFactory.getLog(MobZookeeper.class); + + private ZooKeeperWatcher zkw; + private String mobZnode; + private static final String LOCK_EPHEMERAL = "-lock"; + private static final String SWEEPER_EPHEMERAL = "-sweeper"; + private static final String MAJOR_COMPACTION_EPHEMERAL = "-majorCompaction"; + + private MobZookeeper(Configuration conf, String identifier) throws IOException, + KeeperException { + this.zkw = new ZooKeeperWatcher(conf, identifier, new DummyMobAbortable()); + mobZnode = ZKUtil.joinZNode(zkw.baseZNode, "MOB"); + if (ZKUtil.checkExists(zkw, mobZnode) == -1) { + ZKUtil.createWithParents(zkw, mobZnode); + } + } + + /** + * Creates an new instance of MobZookeeper. + * @param conf The current configuration. + * @param identifier string that is passed to RecoverableZookeeper to be used as + * identifier for this instance. + * @return A new instance of MobZookeeper. + * @throws IOException + * @throws KeeperException + */ + public static MobZookeeper newInstance(Configuration conf, String identifier) throws IOException, + KeeperException { + return new MobZookeeper(conf, identifier); + } + + /** + * Acquire a lock on the current column family. + * All the threads try to access the column family acquire a lock which is actually create an + * ephemeral node in the zookeeper. + * @param tableName The current table name. + * @param familyName The current column family name. + * @return True if the lock is obtained successfully. Otherwise false is returned. + */ + public boolean lockColumnFamily(String tableName, String familyName) { + String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); + boolean locked = false; + try { + locked = ZKUtil.createEphemeralNodeAndWatch(zkw, + ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL), null); + if (LOG.isDebugEnabled()) { + LOG.debug(locked ? "Locked the column family " + znodeName + : "Can not lock the column family " + znodeName); + } + } catch (KeeperException e) { + LOG.error("Fail to lock the column family " + znodeName, e); + } + return locked; + } + + /** + * Release the lock on the current column family. + * @param tableName The current table name. + * @param familyName The current column family name. + */ + public void unlockColumnFamily(String tableName, String familyName) { + String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); + if (LOG.isDebugEnabled()) { + LOG.debug("Unlocking the column family " + znodeName); + } + try { + ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL)); + } catch (KeeperException e) { + LOG.warn("Fail to unlock the column family " + znodeName, e); + } + } + + /** + * Adds a node to zookeeper which indicates that a sweep tool is running. + * @param tableName The current table name. + * @param familyName The current columnFamilyName name. + * @param data the data of the ephemeral node. + * @return True if the node is created successfully. Otherwise false is returned. + */ + public boolean addSweeperZNode(String tableName, String familyName, byte[] data) { + boolean add = false; + String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); + try { + add = ZKUtil.createEphemeralNodeAndWatch(zkw, + ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL), data); + if (LOG.isDebugEnabled()) { + LOG.debug(add ? "Added a znode for sweeper " + znodeName + : "Cannot add a znode for sweeper " + znodeName); + } + } catch (KeeperException e) { + LOG.error("Fail to add a znode for sweeper " + znodeName, e); + } + return add; + } + + /** + * Gets the path of the sweeper znode in zookeeper. + * @param tableName The current table name. + * @param familyName The current columnFamilyName name. + * @return The path of the sweeper znode in zookeper. + */ + public String getSweeperZNodePath(String tableName, String familyName) { + String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); + return ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL); + } + + /** + * Deletes the node from zookeeper which indicates that a sweep tool is finished. + * @param tableName The current table name. + * @param familyName The current column family name. + */ + public void deleteSweeperZNode(String tableName, String familyName) { + String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); + try { + ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)); + } catch (KeeperException e) { + LOG.error("Fail to delete a znode for sweeper " + znodeName, e); + } + } + + /** + * Checks whether the znode exists in the Zookeeper. + * If the node exists, it means a sweep tool is running. + * Otherwise, the sweep tool is not. + * @param tableName The current table name. + * @param familyName The current column family name. + * @return True if this node doesn't exist. Otherwise false is returned. + * @throws KeeperException + */ + public boolean isSweeperZNodeExist(String tableName, String familyName) throws KeeperException { + String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); + return ZKUtil.checkExists(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)) >= 0; + } + + /** + * Checks whether there're major compactions nodes in the zookeeper. + * If there're such nodes, it means there're major compactions in progress now. + * Otherwise there're not. + * @param tableName The current table name. + * @param familyName The current column family name. + * @return True if there're major compactions in progress. Otherwise false is returned. + * @throws KeeperException + */ + public boolean hasMajorCompactionChildren(String tableName, String familyName) + throws KeeperException { + String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); + String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL); + List children = ZKUtil.listChildrenNoWatch(zkw, mcPath); + return children != null && !children.isEmpty(); + } + + /** + * Creates a node of a major compaction to the Zookeeper. + * Before a HBase major compaction, such a node is created to the Zookeeper. It tells others that + * there're major compaction in progress, the sweep tool could not be run at this time. + * @param tableName The current table name. + * @param familyName The current column family name. + * @param compactionName The current compaction name. + * @return True if the node is created successfully. Otherwise false is returned. + * @throws KeeperException + */ + public boolean addMajorCompactionZNode(String tableName, String familyName, + String compactionName) throws KeeperException { + String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); + String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL); + ZKUtil.createNodeIfNotExistsAndWatch(zkw, mcPath, null); + String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName); + return ZKUtil.createEphemeralNodeAndWatch(zkw, eachMcPath, null); + } + + /** + * Deletes a major compaction node from the Zookeeper. + * @param tableName The current table name. + * @param familyName The current column family name. + * @param compactionName The current compaction name. + * @throws KeeperException + */ + public void deleteMajorCompactionZNode(String tableName, String familyName, + String compactionName) throws KeeperException { + String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); + String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL); + String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName); + ZKUtil.deleteNode(zkw, eachMcPath); + } + + /** + * Closes the MobZookeeper. + */ + public void close() { + this.zkw.close(); + } + + /** + * An dummy abortable. It's used for the MobZookeeper. + */ + public static class DummyMobAbortable implements Abortable { + + private boolean abort = false; + + public void abort(String why, Throwable e) { + abort = true; + } + + public boolean isAborted() { + return abort; + } + + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java new file mode 100644 index 0000000..b0d4c9d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java @@ -0,0 +1,184 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; +import org.apache.hadoop.hbase.mob.mapreduce.SweepReducer.SweepPartitionId; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MemStore; +import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Reducer.Context; + +/** + * The wrapper of a DefaultMemStore. + * This wrapper is used in the sweep reducer to buffer and sort the cells written from + * the invalid and small mob files. + * It's flushed when it's full, the mob data are written to the mob files, and their file names + * are written back to store files of HBase. + * This memStore is used to sort the cells in mob files. + * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date), + * in each group, the reducer iterates the files and read the cells to a new and bigger mob file. + * The cells in the same mob file are ordered, but cells across mob files are not. + * So we need this MemStoreWrapper to sort those cells come from different mob files before + * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file. + */ +@InterfaceAudience.Private +public class MemStoreWrapper { + + private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class); + + private MemStore memstore; + private long flushSize; + private SweepPartitionId partitionId; + private Context context; + private Configuration conf; + private HTable table; + private HColumnDescriptor hcd; + private Path mobFamilyDir; + private FileSystem fs; + private CacheConfig cacheConfig; + + public MemStoreWrapper(Context context, FileSystem fs, HTable table, HColumnDescriptor hcd, + MemStore memstore, CacheConfig cacheConfig) throws IOException { + this.memstore = memstore; + this.context = context; + this.fs = fs; + this.table = table; + this.hcd = hcd; + this.conf = context.getConfiguration(); + this.cacheConfig = cacheConfig; + flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE, + MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE); + mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString()); + } + + public void setPartitionId(SweepPartitionId partitionId) { + this.partitionId = partitionId; + } + + /** + * Flushes the memstore if the size is large enough. + * @throws IOException + */ + private void flushMemStoreIfNecessary() throws IOException { + if (memstore.heapSize() >= flushSize) { + flushMemStore(); + } + } + + /** + * Flushes the memstore anyway. + * @throws IOException + */ + public void flushMemStore() throws IOException { + MemStoreSnapshot snapshot = memstore.snapshot(); + internalFlushCache(snapshot); + memstore.clearSnapshot(snapshot.getId()); + } + + /** + * Flushes the snapshot of the memstore. + * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase. + * @param snapshot The snapshot of the memstore. + * @throws IOException + */ + private void internalFlushCache(final MemStoreSnapshot snapshot) + throws IOException { + if (snapshot.getSize() == 0) { + return; + } + // generate the files into a temp directory. + String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY); + StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd, + partitionId.getDate(), new Path(tempPathString), snapshot.getCellsCount(), + hcd.getCompactionCompression(), partitionId.getStartKey(), cacheConfig); + + String relativePath = mobFileWriter.getPath().getName(); + LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString()); + + byte[] referenceValue = Bytes.toBytes(relativePath); + int keyValueCount = 0; + KeyValueScanner scanner = snapshot.getScanner(); + Cell cell = null; + while (null != (cell = scanner.next())) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + mobFileWriter.append(kv); + keyValueCount++; + } + scanner.close(); + // Write out the log sequence number that corresponds to this output + // hfile. The hfile is current up to and including logCacheFlushId. + mobFileWriter.appendMetadata(Long.MAX_VALUE, false); + mobFileWriter.close(); + + MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig); + context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1); + // write reference/fileName back to the store files of HBase. + scanner = snapshot.getScanner(); + scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + cell = null; + Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, this.table.getTableName()); + while (null != (cell = scanner.next())) { + KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag); + Put put = + new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength()); + put.add(reference); + table.put(put); + context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1); + } + if (keyValueCount > 0) { + table.flushCommits(); + } + scanner.close(); + } + + /** + * Adds a KeyValue into the memstore. + * @param kv The KeyValue to be added. + * @throws IOException + */ + public void addToMemstore(KeyValue kv) throws IOException { + memstore.add(kv); + // flush the memstore if it's full. + flushMemStoreIfNecessary(); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java new file mode 100644 index 0000000..bdec887 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java @@ -0,0 +1,41 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Partitioner; + +/** + * The partitioner for the sweep job. + * The key is a mob file name. We bucket by date. + */ +@InterfaceAudience.Private +public class MobFilePathHashPartitioner extends Partitioner { + + @Override + public int getPartition(Text fileName, KeyValue kv, int numPartitions) { + MobFileName mobFileName = MobFileName.create(fileName.toString()); + String date = mobFileName.getDate(); + int hash = date.hashCode(); + return (hash & Integer.MAX_VALUE) % numPartitions; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java new file mode 100644 index 0000000..1d048bb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java @@ -0,0 +1,550 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.MobZookeeper; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.zookeeper.KeeperException; + +/** + * The sweep job. + * Run map reduce to merge the smaller mob files into bigger ones and cleans the unused ones. + */ +@InterfaceAudience.Private +public class SweepJob { + + private final FileSystem fs; + private final Configuration conf; + private static final Log LOG = LogFactory.getLog(SweepJob.class); + static final String SWEEP_JOB_ID = "mob.compaction.id"; + static final String SWEEPER_NODE = "mob.compaction.sweep.node"; + static final String WORKING_DIR_KEY = "mob.compaction.dir"; + static final String WORKING_ALLNAMES_FILE_KEY = "mob.compaction.all.file"; + static final String WORKING_VISITED_DIR_KEY = "mob.compaction.visited.dir"; + static final String WORKING_ALLNAMES_DIR = "all"; + static final String WORKING_VISITED_DIR = "visited"; + public static final String WORKING_FILES_DIR_KEY = "mob.compaction.files.dir"; + //the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing. + public static final String MOB_COMPACTION_DELAY = "hbase.mob.compaction.delay"; + protected static long ONE_DAY = 24 * 60 * 60 * 1000; + private long compactionStartTime = EnvironmentEdgeManager.currentTime(); + public final static String CREDENTIALS_LOCATION = "credentials_location"; + private CacheConfig cacheConfig; + static final int SCAN_CACHING = 10000; + + public SweepJob(Configuration conf, FileSystem fs) { + this.conf = conf; + this.fs = fs; + // disable the block cache. + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + cacheConfig = new CacheConfig(copyOfConf); + } + + /** + * Runs MapReduce to do the sweeping on the mob files. + * There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob + * references from 'normal' regions' rows. + * The running of the sweep tool on the same column family are mutually exclusive. + * The HBase major compaction and running of the sweep tool on the same column family + * are mutually exclusive. + * The synchronization is done by the Zookeeper. + * So in the beginning of the running, we need to make sure only this sweep tool is the only one + * that is currently running in this column family, and in this column family there're no major + * compaction in progress. + * @param tn The current table name. + * @param family The descriptor of the current column family. + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + * @throws KeeperException + */ + public void sweep(TableName tn, HColumnDescriptor family) throws IOException, + ClassNotFoundException, InterruptedException, KeeperException { + Configuration conf = new Configuration(this.conf); + // check whether the current user is the same one with the owner of hbase root + String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName(); + FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR))); + if (hbaseRootFileStat.length > 0) { + String owner = hbaseRootFileStat[0].getOwner(); + if (!owner.equals(currentUserName)) { + String errorMsg = "The current user[" + currentUserName + + "] doesn't have hbase root credentials." + + " Please make sure the user is the root of the target HBase"; + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + } else { + LOG.error("The target HBase doesn't exist"); + throw new IOException("The target HBase doesn't exist"); + } + String familyName = family.getNameAsString(); + String id = "SweepJob" + UUID.randomUUID().toString().replace("-", ""); + MobZookeeper zk = MobZookeeper.newInstance(conf, id); + try { + // Try to obtain the lock. Use this lock to synchronize all the query, creation/deletion + // in the Zookeeper. + if (!zk.lockColumnFamily(tn.getNameAsString(), familyName)) { + LOG.warn("Can not lock the store " + familyName + + ". The major compaction in HBase may be in-progress. Please re-run the job."); + return; + } + try { + // Checks whether there're HBase major compaction now. + boolean hasChildren = zk.hasMajorCompactionChildren(tn.getNameAsString(), familyName); + if (hasChildren) { + LOG.warn("The major compaction in HBase may be in-progress." + + " Please re-run the job."); + return; + } else { + // Checks whether there's sweep tool in progress. + boolean hasSweeper = zk.isSweeperZNodeExist(tn.getNameAsString(), familyName); + if (hasSweeper) { + LOG.warn("Another sweep job is running"); + return; + } else { + // add the sweeper node, mark that there's one sweep tool in progress. + // All the HBase major compaction and sweep tool in this column family could not + // run until this sweep tool is finished. + zk.addSweeperZNode(tn.getNameAsString(), familyName, Bytes.toBytes(id)); + } + } + } finally { + zk.unlockColumnFamily(tn.getNameAsString(), familyName); + } + Job job = null; + try { + Scan scan = new Scan(); + scan.addFamily(family.getName()); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE)); + scan.setCaching(SCAN_CACHING); + scan.setCacheBlocks(false); + scan.setMaxVersions(family.getMaxVersions()); + conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName() + "," + WritableSerialization.class.getName()); + conf.set(SWEEP_JOB_ID, id); + conf.set(SWEEPER_NODE, zk.getSweeperZNodePath(tn.getNameAsString(), familyName)); + job = prepareJob(tn, familyName, scan, conf); + job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName); + // Record the compaction start time. + // In the sweep tool, only the mob file whose modification time is older than + // (startTime - delay) could be handled by this tool. + // The delay is one day. It could be configured as well, but this is only used + // in the test. + job.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, + compactionStartTime); + + job.setPartitionerClass(MobFilePathHashPartitioner.class); + submit(job, tn, familyName); + if (job.waitForCompletion(true)) { + // Archive the unused mob files. + removeUnusedFiles(job, tn, family); + } + } finally { + cleanup(job, tn, familyName); + zk.deleteSweeperZNode(tn.getNameAsString(), familyName); + } + } finally { + zk.close(); + } + } + + /** + * Prepares a map reduce job. + * @param tn The current table name. + * @param familyName The current family name. + * @param scan The current scan. + * @param conf The current configuration. + * @return A map reduce job. + * @throws IOException + */ + private Job prepareJob(TableName tn, String familyName, Scan scan, Configuration conf) + throws IOException { + Job job = Job.getInstance(conf); + job.setJarByClass(SweepMapper.class); + TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan, + SweepMapper.class, Text.class, Writable.class, job); + + job.setInputFormatClass(TableInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(KeyValue.class); + job.setReducerClass(SweepReducer.class); + job.setOutputFormatClass(NullOutputFormat.class); + String jobName = getCustomJobName(this.getClass().getSimpleName(), tn.getNameAsString(), + familyName); + job.setJobName(jobName); + if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) { + String fileLoc = conf.get(CREDENTIALS_LOCATION); + Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf); + job.getCredentials().addAll(cred); + } + return job; + } + + /** + * Gets a customized job name. + * It's className-mapperClassName-reducerClassName-tableName-familyName. + * @param className The current class name. + * @param tableName The current table name. + * @param familyName The current family name. + * @return The customized job name. + */ + private static String getCustomJobName(String className, String tableName, String familyName) { + StringBuilder name = new StringBuilder(); + name.append(className); + name.append('-').append(SweepMapper.class.getSimpleName()); + name.append('-').append(SweepReducer.class.getSimpleName()); + name.append('-').append(tableName); + name.append('-').append(familyName); + return name.toString(); + } + + /** + * Submits a job. + * @param job The current job. + * @param tn The current table name. + * @param familyName The current family name. + * @throws IOException + */ + private void submit(Job job, TableName tn, String familyName) throws IOException { + // delete the temp directory of the mob files in case the failure in the previous + // execution. + Path tempDir = + new Path(MobUtils.getMobHome(job.getConfiguration()), MobConstants.TEMP_DIR_NAME); + Path mobCompactionTempDir = + new Path(tempDir, MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME); + Path workingPath = MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName()); + job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString()); + // delete the working directory in case it'not deleted by the last running. + fs.delete(workingPath, true); + // create the working directory. + fs.mkdirs(workingPath); + // create a sequence file which contains the names of all the existing files. + Path workingPathOfFiles = new Path(workingPath, "files"); + Path workingPathOfNames = new Path(workingPath, "names"); + job.getConfiguration().set(WORKING_FILES_DIR_KEY, workingPathOfFiles.toString()); + Path allFileNamesPath = new Path(workingPathOfNames, WORKING_ALLNAMES_DIR); + job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString()); + Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR); + job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString()); + // create a file includes all the existing mob files whose creation time is older than + // (now - oneDay) + fs.create(allFileNamesPath, true); + // create a directory where the files contain names of visited mob files are saved. + fs.mkdirs(vistiedFileNamesPath); + Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName); + // Find all the files whose creation time are older than one day. + // Write those file names to a file. + // In each reducer there's a writer, it write the visited file names to a file which is saved + // in WORKING_VISITED_DIR. + // After the job is finished, compare those files, then find out the unused mob files and + // archive them. + FileStatus[] files = fs.listStatus(mobStorePath); + Set fileNames = new TreeSet(); + long mobCompactionDelay = job.getConfiguration().getLong(MOB_COMPACTION_DELAY, ONE_DAY); + for (FileStatus fileStatus : files) { + if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) { + if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) { + // only record the potentially unused files older than one day. + fileNames.add(fileStatus.getPath().getName()); + } + } + } + // write the names to a sequence file + SequenceFile.Writer writer = SequenceFile.createWriter(fs, job.getConfiguration(), + allFileNamesPath, String.class, String.class); + try { + for (String fileName : fileNames) { + writer.append(fileName, MobConstants.EMPTY_STRING); + } + } finally { + IOUtils.closeStream(writer); + } + } + + /** + * Gets the unused mob files. + * Compare the file which contains all the existing mob files and the visited files, + * find out the unused mob file and archive them. + * @param conf The current configuration. + * @return The unused mob files. + * @throws IOException + */ + List getUnusedFiles(Configuration conf) throws IOException { + // find out the unused files and archive them + Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY)); + SequenceFile.Reader allNamesReader = null; + MergeSortReader visitedNamesReader = null; + List toBeArchived = new ArrayList(); + try { + allNamesReader = new SequenceFile.Reader(fs, allFileNamesPath, conf); + visitedNamesReader = new MergeSortReader(fs, conf, + new Path(conf.get(WORKING_VISITED_DIR_KEY))); + String nextAll = (String) allNamesReader.next((String) null); + String nextVisited = visitedNamesReader.next(); + do { + if (nextAll != null) { + if (nextVisited != null) { + int compare = nextAll.compareTo(nextVisited); + if (compare < 0) { + toBeArchived.add(nextAll); + nextAll = (String) allNamesReader.next((String) null); + } else if (compare > 0) { + nextVisited = visitedNamesReader.next(); + } else { + nextAll = (String) allNamesReader.next((String) null); + nextVisited = visitedNamesReader.next(); + } + } else { + toBeArchived.add(nextAll); + nextAll = (String) allNamesReader.next((String) null); + } + } else { + break; + } + } while (nextAll != null || nextVisited != null); + } finally { + if (allNamesReader != null) { + allNamesReader.close(); + } + if (visitedNamesReader != null) { + visitedNamesReader.close(); + } + } + return toBeArchived; + } + + /** + * Archives unused mob files. + * @param job The current job. + * @param tn The current table name. + * @param hcd The descriptor of the current column family. + * @throws IOException + */ + private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor hcd) throws IOException { + // find out the unused files and archive them + List storeFiles = new ArrayList(); + List toBeArchived = getUnusedFiles(job.getConfiguration()); + // archive them + Path mobStorePath = MobUtils + .getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString()); + for (String archiveFileName : toBeArchived) { + Path path = new Path(mobStorePath, archiveFileName); + storeFiles.add(new StoreFile(fs, path, job.getConfiguration(), cacheConfig, BloomType.NONE)); + } + if (!storeFiles.isEmpty()) { + try { + MobUtils.removeMobFiles(job.getConfiguration(), fs, tn, + FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), hcd.getName(), storeFiles); + LOG.info(storeFiles.size() + " unused MOB files are removed"); + } catch (Exception e) { + LOG.error("Fail to archive the store files " + storeFiles, e); + } + } + } + + /** + * Deletes the working directory. + * @param job The current job. + * @param store The current MobFileStore. + * @throws IOException + */ + private void cleanup(Job job, TableName tn, String familyName) throws IOException { + if (job != null) { + // delete the working directory + Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY)); + try { + fs.delete(workingPath, true); + } catch (IOException e) { + LOG.warn("Fail to delete the working directory after sweeping store " + familyName + + " in the table " + tn.getNameAsString(), e); + } + } + } + + /** + * A result with index. + */ + private class IndexedResult implements Comparable { + private int index; + private String value; + + public IndexedResult(int index, String value) { + this.index = index; + this.value = value; + } + + public int getIndex() { + return this.index; + } + + public String getValue() { + return this.value; + } + + @Override + public int compareTo(IndexedResult o) { + if (this.value == null) { + return 0; + } else if (o.value == null) { + return 1; + } else { + return this.value.compareTo(o.value); + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof IndexedResult)) { + return false; + } + return compareTo((IndexedResult) obj) == 0; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + } + + /** + * Merge sort reader. + * It merges and sort the readers in different sequence files as one where + * the results are read in order. + */ + private class MergeSortReader { + + private List readers = new ArrayList(); + private PriorityQueue results = new PriorityQueue(); + + public MergeSortReader(FileSystem fs, Configuration conf, Path path) throws IOException { + if (fs.exists(path)) { + FileStatus[] files = fs.listStatus(path); + int index = 0; + for (FileStatus file : files) { + if (file.isFile()) { + SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf); + String key = (String) reader.next((String) null); + if (key != null) { + results.add(new IndexedResult(index, key)); + readers.add(reader); + index++; + } + } + } + } + } + + public String next() throws IOException { + IndexedResult result = results.poll(); + if (result != null) { + SequenceFile.Reader reader = readers.get(result.getIndex()); + String key = (String) reader.next((String) null); + if (key != null) { + results.add(new IndexedResult(result.getIndex(), key)); + } + return result.getValue(); + } + return null; + } + + public void close() { + for (SequenceFile.Reader reader : readers) { + IOUtils.closeStream(reader); + } + } + } + + /** + * The counter used in sweep job. + */ + public enum SweepCounter { + + /** + * How many files are read. + */ + INPUT_FILE_COUNT, + + /** + * How many files need to be merged or cleaned. + */ + FILE_TO_BE_MERGE_OR_CLEAN, + + /** + * How many files are left after merging. + */ + FILE_AFTER_MERGE_OR_CLEAN, + + /** + * How many records are updated. + */ + RECORDS_UPDATED, + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java new file mode 100644 index 0000000..b789332 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java @@ -0,0 +1,74 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Tracker on the sweep tool node in zookeeper. + * The sweep tool node is an ephemeral one, when the process dies this node is deleted, + * at that time MR might be still running, and if another sweep job is started, two MR + * for the same column family will run at the same time. + * This tracker watches this ephemeral node, if it's gone or it's not created by the + * sweep job that owns the current MR, the current process will be aborted. + */ +@InterfaceAudience.Private +public class SweepJobNodeTracker extends ZooKeeperListener { + + private String node; + private String sweepJobId; + + public SweepJobNodeTracker(ZooKeeperWatcher watcher, String node, String sweepJobId) { + super(watcher); + this.node = node; + this.sweepJobId = sweepJobId; + } + + /** + * Registers the watcher on the sweep job node. + * If there's no such a sweep job node, or it's not created by the sweep job that + * owns the current MR, the current process will be aborted. + */ + public void start() throws KeeperException { + watcher.registerListener(this); + if (ZKUtil.watchAndCheckExists(watcher, node)) { + byte[] data = ZKUtil.getDataAndWatch(watcher, node); + if (data != null) { + if (!sweepJobId.equals(Bytes.toString(data))) { + System.exit(1); + } + } + } else { + System.exit(1); + } + } + + @Override + public void nodeDeleted(String path) { + // If the ephemeral node is deleted, abort the current process. + if (node.equals(path)) { + System.exit(1); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java new file mode 100644 index 0000000..f508b93 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java @@ -0,0 +1,84 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; + +/** + * The mapper of a sweep job. + * Takes the rows from the table and their results and map to + * where mobValue is the actual cell in HBase. + */ +@InterfaceAudience.Private +public class SweepMapper extends TableMapper { + + private ZooKeeperWatcher zkw = null; + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID); + String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE); + zkw = new ZooKeeperWatcher(context.getConfiguration(), id, + new DummyMobAbortable()); + try { + SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, id); + tracker.start(); + } catch (KeeperException e) { + throw new IOException(e); + } + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + if (zkw != null) { + zkw.close(); + } + } + + @Override + public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException, + InterruptedException { + if (columns == null) { + return; + } + KeyValue[] kvList = columns.raw(); + if (kvList == null || kvList.length == 0) { + return; + } + for (KeyValue kv : kvList) { + if (MobUtils.hasValidMobRefCellValue(kv)) { + String fileName = MobUtils.getMobFileName(kv); + context.write(new Text(fileName), kv); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java new file mode 100644 index 0000000..04fe359 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java @@ -0,0 +1,506 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.InvalidFamilyOperationException; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFile; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.DefaultMemStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.zookeeper.KeeperException; + +/** + * The reducer of a sweep job. + * This reducer merges the small mob files into bigger ones, and write visited + * names of mob files to a sequence file which is used by the sweep job to delete + * the unused mob files. + * The key of the input is a file name, the value is a collection of KeyValue where + * the KeyValue is the actual cell (its format is valueLength + fileName) in HBase. + * In this reducer, we could know how many cells exist in HBase for a mob file. + * If the existCellSize/mobFileSize < compactionRatio, this mob + * file needs to be merged. + */ +@InterfaceAudience.Private +public class SweepReducer extends Reducer { + + private static final Log LOG = LogFactory.getLog(SweepReducer.class); + + private SequenceFile.Writer writer = null; + private MemStoreWrapper memstore; + private Configuration conf; + private FileSystem fs; + + private Path familyDir; + private CacheConfig cacheConfig; + private long compactionBegin; + private HTable table; + private HColumnDescriptor family; + private long mobCompactionDelay; + private Path mobTableDir; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + this.conf = context.getConfiguration(); + this.fs = FileSystem.get(conf); + // the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing. + mobCompactionDelay = conf.getLong(SweepJob.MOB_COMPACTION_DELAY, SweepJob.ONE_DAY); + String tableName = conf.get(TableInputFormat.INPUT_TABLE); + String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY); + TableName tn = TableName.valueOf(tableName); + this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName); + HBaseAdmin admin = new HBaseAdmin(this.conf); + try { + family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName)); + if (family == null) { + // this column family might be removed, directly return. + throw new InvalidFamilyOperationException("Column family '" + familyName + + "' does not exist. It might be removed."); + } + } finally { + try { + admin.close(); + } catch (IOException e) { + LOG.warn("Fail to close the HBaseAdmin", e); + } + } + // disable the block cache. + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.00001f); + this.cacheConfig = new CacheConfig(copyOfConf); + + table = new HTable(this.conf, Bytes.toBytes(tableName)); + table.setAutoFlush(false, false); + + table.setWriteBufferSize(1 * 1024 * 1024); // 1MB + memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig); + + // The start time of the sweep tool. + // Only the mob files whose creation time is older than startTime-oneDay will be handled by the + // reducer since it brings inconsistency to handle the latest mob files. + this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0); + mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn); + } + + private SweepPartition createPartition(SweepPartitionId id, Context context) throws IOException { + return new SweepPartition(id, context); + } + + @Override + public void run(Context context) throws IOException, InterruptedException { + String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID); + String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId, + new DummyMobAbortable()); + try { + SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, jobId); + tracker.start(); + setup(context); + // create a sequence contains all the visited file names in this reducer. + String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY); + Path nameFilePath = new Path(dir, UUID.randomUUID().toString() + .replace("-", MobConstants.EMPTY_STRING)); + if (!fs.exists(nameFilePath)) { + fs.create(nameFilePath, true); + } + writer = SequenceFile.createWriter(fs, context.getConfiguration(), nameFilePath, + String.class, String.class); + SweepPartitionId id; + SweepPartition partition = null; + // the mob files which have the same start key and date are in the same partition. + while (context.nextKey()) { + Text key = context.getCurrentKey(); + String keyString = key.toString(); + id = SweepPartitionId.create(keyString); + if (null == partition || !id.equals(partition.getId())) { + // It's the first mob file in the current partition. + if (null != partition) { + // this mob file is in different partitions with the previous mob file. + // directly close. + partition.close(); + } + // create a new one + partition = createPartition(id, context); + } + if (partition != null) { + // run the partition + partition.execute(key, context.getValues()); + } + } + if (null != partition) { + partition.close(); + } + } catch (KeeperException e) { + throw new IOException(e); + } finally { + cleanup(context); + zkw.close(); + if (writer != null) { + IOUtils.closeStream(writer); + } + if (table != null) { + try { + table.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + + } + + /** + * The mob files which have the same start key and date are in the same partition. + * The files in the same partition are merged together into bigger ones. + */ + public class SweepPartition { + + private final SweepPartitionId id; + private final Context context; + private boolean memstoreUpdated = false; + private boolean mergeSmall = false; + private final Map fileStatusMap = new HashMap(); + private final List toBeDeleted = new ArrayList(); + + public SweepPartition(SweepPartitionId id, Context context) throws IOException { + this.id = id; + this.context = context; + memstore.setPartitionId(id); + init(); + } + + public SweepPartitionId getId() { + return this.id; + } + + /** + * Prepares the map of files. + * + * @throws IOException + */ + private void init() throws IOException { + FileStatus[] fileStats = listStatus(familyDir, id.getStartKey()); + if (null == fileStats) { + return; + } + + int smallFileCount = 0; + float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, + MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO); + long compactionMergeableSize = conf.getLong( + MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE, + MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE); + // list the files. Just merge the hfiles, don't merge the hfile links. + // prepare the map of mob files. The key is the file name, the value is the file status. + for (FileStatus fileStat : fileStats) { + MobFileStatus mobFileStatus = null; + if (!HFileLink.isHFileLink(fileStat.getPath())) { + mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize); + if (mobFileStatus.needMerge()) { + smallFileCount++; + } + // key is file name (not hfile name), value is hfile status. + fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus); + } + } + if (smallFileCount >= 2) { + // merge the files only when there're more than 1 files in the same partition. + this.mergeSmall = true; + } + } + + /** + * Flushes the data into mob files and store files, and archives the small + * files after they're merged. + * @throws IOException + */ + public void close() throws IOException { + if (null == id) { + return; + } + // flush remain key values into mob files + if (memstoreUpdated) { + memstore.flushMemStore(); + } + List storeFiles = new ArrayList(toBeDeleted.size()); + // delete samll files after compaction + for (Path path : toBeDeleted) { + LOG.info("[In Partition close] Delete the file " + path + " in partition close"); + storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE)); + } + if (!storeFiles.isEmpty()) { + try { + MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(), + storeFiles); + context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size()); + } catch (IOException e) { + LOG.error("Fail to archive the store files " + storeFiles, e); + } + storeFiles.clear(); + } + fileStatusMap.clear(); + } + + /** + * Merges the small mob files into bigger ones. + * @param fileName The current mob file name. + * @param values The collection of KeyValues in this mob file. + * @throws IOException + */ + public void execute(Text fileName, Iterable values) throws IOException { + if (null == values) { + return; + } + MobFileName mobFileName = MobFileName.create(fileName.toString()); + LOG.info("[In reducer] The file name: " + fileName.toString()); + MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName()); + if (null == mobFileStat) { + LOG.info("[In reducer] Cannot find the file, probably this record is obsolete"); + return; + } + // only handle the files that are older then one day. + if (compactionBegin - mobFileStat.getFileStatus().getModificationTime() + <= mobCompactionDelay) { + return; + } + // write the hfile name + writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING); + Set kvs = new HashSet(); + for (KeyValue kv : values) { + if (kv.getValueLength() > Bytes.SIZEOF_INT) { + mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(), + Bytes.SIZEOF_INT)); + } + kvs.add(kv.createKeyOnly(false)); + } + // If the mob file is a invalid one or a small one, merge it into new/bigger ones. + if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) { + context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1); + MobFile file = MobFile.create(fs, + new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig); + StoreFileScanner scanner = null; + try { + scanner = file.getScanner(); + scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY)); + Cell cell; + while (null != (cell = scanner.next())) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue keyOnly = kv.createKeyOnly(false); + if (kvs.contains(keyOnly)) { + // write the KeyValue existing in HBase to the memstore. + memstore.addToMemstore(kv); + memstoreUpdated = true; + } + } + } finally { + if (scanner != null) { + scanner.close(); + } + } + toBeDeleted.add(mobFileStat.getFileStatus().getPath()); + } + } + + /** + * Lists the files with the same prefix. + * @param p The file path. + * @param prefix The prefix. + * @return The files with the same prefix. + * @throws IOException + */ + private FileStatus[] listStatus(Path p, String prefix) throws IOException { + return fs.listStatus(p, new PathPrefixFilter(prefix)); + } + } + + static class PathPrefixFilter implements PathFilter { + + private final String prefix; + + public PathPrefixFilter(String prefix) { + this.prefix = prefix; + } + + public boolean accept(Path path) { + return path.getName().startsWith(prefix, 0); + } + + } + + /** + * The sweep partition id. + * It consists of the start key and date. + * The start key is a hex string of the checksum of a region start key. + * The date is the latest timestamp of cells in a mob file. + */ + public static class SweepPartitionId { + private String date; + private String startKey; + + public SweepPartitionId(MobFileName fileName) { + this.date = fileName.getDate(); + this.startKey = fileName.getStartKey(); + } + + public SweepPartitionId(String date, String startKey) { + this.date = date; + this.startKey = startKey; + } + + public static SweepPartitionId create(String key) { + return new SweepPartitionId(MobFileName.create(key)); + } + + @Override + public boolean equals(Object anObject) { + if (this == anObject) { + return true; + } + if (anObject instanceof SweepPartitionId) { + SweepPartitionId another = (SweepPartitionId) anObject; + if (this.date.equals(another.getDate()) && this.startKey.equals(another.getStartKey())) { + return true; + } + } + return false; + } + + public String getDate() { + return this.date; + } + + public String getStartKey() { + return this.startKey; + } + + public void setDate(String date) { + this.date = date; + } + + public void setStartKey(String startKey) { + this.startKey = startKey; + } + } + + /** + * The mob file status used in the sweep reduecer. + */ + private static class MobFileStatus { + private FileStatus fileStatus; + private int validSize; + private long size; + + private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO; + private long compactionMergeableSize = + MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE; + + /** + * @param fileStatus The current FileStatus. + * @param compactionRatio compactionRatio the invalid ratio. + * If there're too many cells deleted in a mob file, it's regarded as invalid, + * and needs to be written to a new one. + * If existingCellSize/fileSize < compactionRatio, it's regarded as a invalid one. + * @param compactionMergeableSize compactionMergeableSize If the size of a mob file is less + * than this value, it's regarded as a small file and needs to be merged + */ + public MobFileStatus(FileStatus fileStatus, float compactionRatio, + long compactionMergeableSize) { + this.fileStatus = fileStatus; + this.size = fileStatus.getLen(); + validSize = 0; + this.compactionRatio = compactionRatio; + this.compactionMergeableSize = compactionMergeableSize; + } + + /** + * Add size to this file. + * @param size The size to be added. + */ + public void addValidSize(int size) { + this.validSize += size; + } + + /** + * Whether the mob files need to be cleaned. + * If there're too many cells deleted in this mob file, it needs to be cleaned. + * @return True if it needs to be cleaned. + */ + public boolean needClean() { + return validSize < compactionRatio * size; + } + + /** + * Whether the mob files need to be merged. + * If this mob file is too small, it needs to be merged. + * @return True if it needs to be merged. + */ + public boolean needMerge() { + return this.size < compactionMergeableSize; + } + + /** + * Gets the file status. + * @return The file status. + */ + public FileStatus getFileStatus() { + return fileStatus; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java new file mode 100644 index 0000000..d71dc83 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java @@ -0,0 +1,108 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.KeeperException; + +import com.google.protobuf.ServiceException; + +/** + * The sweep tool. It deletes the mob files that are not used and merges the small mob files to + * bigger ones. Each run of this sweep tool only handles one column family. The runs on + * the same column family are mutually exclusive. And the major compaction and sweep tool on the + * same column family are mutually exclusive too. + */ +@InterfaceAudience.Public +public class Sweeper extends Configured implements Tool { + + /** + * Sweeps the mob files on one column family. It deletes the unused mob files and merges + * the small mob files into bigger ones. + * @param tableName The current table name in string format. + * @param familyName The column family name. + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + * @throws KeeperException + * @throws ServiceException + */ + void sweepFamily(String tableName, String familyName) throws IOException, InterruptedException, + ClassNotFoundException, KeeperException, ServiceException { + Configuration conf = getConf(); + // make sure the target HBase exists. + HBaseAdmin.checkHBaseAvailable(conf); + HBaseAdmin admin = new HBaseAdmin(conf); + try { + FileSystem fs = FileSystem.get(conf); + TableName tn = TableName.valueOf(tableName); + HTableDescriptor htd = admin.getTableDescriptor(tn); + HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName)); + if (family == null || !MobUtils.isMobFamily(family)) { + throw new IOException("Column family " + familyName + " is not a MOB column family"); + } + SweepJob job = new SweepJob(conf, fs); + // Run the sweeping + job.sweep(tn, family); + } finally { + try { + admin.close(); + } catch (IOException e) { + System.out.println("Fail to close the HBaseAdmin: " + e.getMessage()); + } + } + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + ToolRunner.run(conf, new Sweeper(), args); + } + + private void printUsage() { + System.err.println("Usage:\n" + "--------------------------\n" + Sweeper.class.getName() + + " tableName familyName"); + System.err.println(" tableName The table name"); + System.err.println(" familyName The column family name"); + } + + public int run(String[] args) throws Exception { + if (args.length != 2) { + printUsage(); + return 1; + } + String table = args[0]; + String family = args[1]; + sweepFamily(table, family); + return 0; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 9c6f34e..071b5fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -18,13 +18,17 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.NavigableSet; import java.util.UUID; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -32,7 +36,10 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -44,7 +51,10 @@ import org.apache.hadoop.hbase.mob.MobFile; import org.apache.hadoop.hbase.mob.MobFileName; import org.apache.hadoop.hbase.mob.MobStoreEngine; import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.mob.MobZookeeper; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.zookeeper.KeeperException; /** * The store implementation to save MOBs (medium objects), it extends the HStore. @@ -68,6 +78,7 @@ public class HMobStore extends HStore { private MobCacheConfig mobCacheConfig; private Path homePath; private Path mobFamilyPath; + private List mobDirLocations; public HMobStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam) throws IOException { @@ -76,6 +87,11 @@ public class HMobStore extends HStore { this.homePath = MobUtils.getMobHome(conf); this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(), family.getNameAsString()); + mobDirLocations = new ArrayList(); + mobDirLocations.add(mobFamilyPath); + TableName tn = region.getTableDesc().getTableName(); + mobDirLocations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils + .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString())); } /** @@ -87,6 +103,13 @@ public class HMobStore extends HStore { } /** + * Gets current config. + */ + public Configuration getConfiguration() { + return this.conf; + } + + /** * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in * the mob files should be performed after the seek in HBase is done. */ @@ -94,6 +117,15 @@ public class HMobStore extends HStore { protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols, long readPt, KeyValueScanner scanner) throws IOException { if (scanner == null) { + if (MobUtils.isRefOnlyScan(scan)) { + Filter refOnlyFilter = new MobReferenceOnlyFilter(); + Filter filter = scan.getFilter(); + if (filter != null) { + scan.setFilter(new FilterList(filter, refOnlyFilter)); + } else { + scan.setFilter(refOnlyFilter); + } + } scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan, targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt); } @@ -219,30 +251,10 @@ public class HMobStore extends HStore { */ public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException { Cell result = null; - if (MobUtils.isValidMobRefCellValue(reference)) { + if (MobUtils.hasValidMobRefCellValue(reference)) { String fileName = MobUtils.getMobFileName(reference); - Path targetPath = new Path(mobFamilyPath, fileName); - MobFile file = null; - try { - file = mobCacheConfig.getMobFileCache().openFile(region.getFilesystem(), targetPath, - mobCacheConfig); - result = file.readCell(reference, cacheBlocks); - } catch (IOException e) { - LOG.error("Fail to open/read the mob file " + targetPath.toString(), e); - } catch (NullPointerException e) { - // When delete the file during the scan, the hdfs getBlockRange will - // throw NullPointerException, catch it and manage it. - LOG.error("Fail to read the mob file " + targetPath.toString() - + " since it's already deleted", e); - } finally { - if (file != null) { - mobCacheConfig.getMobFileCache().closeFile(file); - } - } - } else { - LOG.warn("Invalid reference to mob, " + reference.getValueLength() + " bytes is too short"); + result = readCell(fileName, reference, cacheBlocks); } - if (result == null) { LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family," + "qualifier,timestamp,type and tags but with an empty value to return."); @@ -258,10 +270,132 @@ public class HMobStore extends HStore { } /** + * Reads the cell from a mob file. + * The mob file might be located in different directories. + * 1. The working directory. + * 2. The archive directory. + * Reads the cell from the files located in both of the above directories. + * @param fileName The file to be read. + * @param search The cell to be searched. + * @param cacheMobBlocks Whether the scanner should cache blocks. + * @return The found cell. Null if there's no such a cell. + * @throws IOException + */ + private Cell readCell(String fileName, Cell search, boolean cacheMobBlocks) throws IOException { + FileSystem fs = getFileSystem(); + for (Path location : mobDirLocations) { + MobFile file = null; + Path path = new Path(location, fileName); + try { + file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig); + return file.readCell(search, cacheMobBlocks); + } catch (IOException e) { + mobCacheConfig.getMobFileCache().evictFile(fileName); + if (e instanceof FileNotFoundException) { + LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e); + } else { + throw e; + } + } finally { + if (file != null) { + mobCacheConfig.getMobFileCache().closeFile(file); + } + } + } + LOG.error("The mob file " + fileName + " could not be found in the locations " + + mobDirLocations); + return null; + } + + /** * Gets the mob file path. * @return The mob file path. */ public Path getPath() { return mobFamilyPath; } + + /** + * The compaction in the store of mob. + * The cells in this store contains the path of the mob files. There might be race + * condition between the major compaction and the sweeping in mob files. + * In order to avoid this, we need mutually exclude the running of the major compaction and + * sweeping in mob files. + * The minor compaction is not affected. + * The major compaction is converted to a minor one when a sweeping is in progress. + */ + @Override + public List compact(CompactionContext compaction) throws IOException { + // If it's major compaction, try to find whether there's a sweeper is running + // If yes, change the major compaction to a minor one. + if (compaction.getRequest().isMajor()) { + // Use the Zookeeper to coordinate. + // 1. Acquire a operation lock. + // 1.1. If no, convert the major compaction to a minor one and continue the compaction. + // 1.2. If the lock is obtained, search the node of sweeping. + // 1.2.1. If the node is there, the sweeping is in progress, convert the major + // compaction to a minor one and continue the compaction. + // 1.2.2. If the node is not there, add a child to the major compaction node, and + // run the compaction directly. + String compactionName = UUID.randomUUID().toString().replaceAll("-", ""); + MobZookeeper zk = null; + try { + zk = MobZookeeper.newInstance(this.conf, compactionName); + } catch (KeeperException e) { + LOG.error("Cannot connect to the zookeeper, ready to perform the minor compaction instead", + e); + // change the major compaction into a minor one + compaction.getRequest().setIsMajor(false, false); + return super.compact(compaction); + } + boolean major = false; + try { + // try to acquire the operation lock. + if (zk.lockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString())) { + try { + LOG.info("Obtain the lock for the store[" + this + + "], ready to perform the major compaction"); + // check the sweeping node to find out whether the sweeping is in progress. + boolean hasSweeper = zk.isSweeperZNodeExist(getTableName().getNameAsString(), + getFamily().getNameAsString()); + if (!hasSweeper) { + // if not, add a child to the major compaction node of this store. + major = zk.addMajorCompactionZNode(getTableName().getNameAsString(), getFamily() + .getNameAsString(), compactionName); + } + } catch (Exception e) { + LOG.error("Fail to handle the Zookeeper", e); + } finally { + // release the operation lock + zk.unlockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString()); + } + } + try { + if (major) { + return super.compact(compaction); + } else { + LOG.warn("Cannot obtain the lock or a sweep tool is running on this store[" + + this + "], ready to perform the minor compaction instead"); + // change the major compaction into a minor one + compaction.getRequest().setIsMajor(false, false); + return super.compact(compaction); + } + } finally { + if (major) { + try { + zk.deleteMajorCompactionZNode(getTableName().getNameAsString(), getFamily() + .getNameAsString(), compactionName); + } catch (KeeperException e) { + LOG.error("Fail to delete the compaction znode" + compactionName, e); + } + } + } + } finally { + zk.close(); + } + } else { + // If it's not a major compaction, continue the compaction. + return super.compact(compaction); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java new file mode 100644 index 0000000..10aea24 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java @@ -0,0 +1,42 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.mob.MobUtils; + +/** + * A filter that returns the cells which have mob reference tags. It's a server-side filter. + */ +@InterfaceAudience.Private +class MobReferenceOnlyFilter extends FilterBase { + + @Override + public ReturnCode filterKeyValue(Cell cell) { + if (null != cell) { + // If a cell with a mob reference tag, it's included. + if (MobUtils.isMobReferenceCell(cell)) { + return ReturnCode.INCLUDE; + } + } + return ReturnCode.SKIP; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java new file mode 100644 index 0000000..7cba86c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import static org.junit.Assert.assertEquals; + +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestExpiredMobFileCleaner { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static TableName tableName = TableName.valueOf("TestExpiredMobFileCleaner"); + private final static String family = "family"; + private final static byte[] row1 = Bytes.toBytes("row1"); + private final static byte[] row2 = Bytes.toBytes("row2"); + private final static byte[] qf = Bytes.toBytes("qf"); + + private static HTable table; + private static Admin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + + } + + @Before + public void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + + @After + public void tearDown() throws Exception { + admin.disableTable(tableName); + admin.deleteTable(tableName); + admin.close(); + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true); + } + + private void init() throws Exception { + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L)); + hcd.setMaxVersions(4); + desc.addFamily(hcd); + + admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + table = new HTable(TEST_UTIL.getConfiguration(), tableName); + table.setAutoFlush(false, false); + } + + private void modifyColumnExpiryDays(int expireDays) throws Exception { + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L)); + // change ttl as expire days to make some row expired + int timeToLive = expireDays * secondsOfDay(); + hcd.setTimeToLive(timeToLive); + + admin.modifyColumn(tableName, hcd); + } + + private void putKVAndFlush(HTable table, byte[] row, byte[] value, long ts) + throws Exception { + + Put put = new Put(row, ts); + put.add(Bytes.toBytes(family), qf, value); + table.put(put); + + table.flushCommits(); + admin.flush(tableName); + } + + /** + * Creates a 3 day old hfile and an 1 day old hfile then sets expiry to 2 days. + * Verifies that the 3 day old hfile is removed but the 1 day one is still present + * after the expiry based cleaner is run. + */ + @Test + public void testCleaner() throws Exception { + init(); + + Path mobDirPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); + + byte[] dummyData = makeDummyData(600); + long ts = System.currentTimeMillis() - 3 * secondsOfDay() * 1000; // 3 days before + putKVAndFlush(table, row1, dummyData, ts); + FileStatus[] firstFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); + //the first mob file + assertEquals("Before cleanup without delay 1", 1, firstFiles.length); + String firstFile = firstFiles[0].getPath().getName(); + + ts = System.currentTimeMillis() - 1 * secondsOfDay() * 1000; // 1 day before + putKVAndFlush(table, row2, dummyData, ts); + FileStatus[] secondFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); + //now there are 2 mob files + assertEquals("Before cleanup without delay 2", 2, secondFiles.length); + String f1 = secondFiles[0].getPath().getName(); + String f2 = secondFiles[1].getPath().getName(); + String secondFile = f1.equals(firstFile) ? f2 : f1; + + modifyColumnExpiryDays(2); // ttl = 2, make the first row expired + + //run the cleaner + String[] args = new String[2]; + args[0] = tableName.getNameAsString(); + args[1] = family; + ToolRunner.run(TEST_UTIL.getConfiguration(), new ExpiredMobFileCleaner(), args); + + FileStatus[] filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); + String lastFile = filesAfterClean[0].getPath().getName(); + //the first mob fie is removed + assertEquals("After cleanup without delay 1", 1, filesAfterClean.length); + assertEquals("After cleanup without delay 2", secondFile, lastFile); + } + + private Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) { + Path p = new Path(MobUtils.getMobRegionPath(conf, tableName), familyName); + return p; + } + + private int secondsOfDay() { + return 24 * 3600; + } + + private byte[] makeDummyData(int size) { + byte [] dummyData = new byte[size]; + new Random().nextBytes(dummyData); + return dummyData; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java new file mode 100644 index 0000000..e0b9a83 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestMobSweepJob { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName() + "," + WritableSerialization.class.getName()); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private void writeFileNames(FileSystem fs, Configuration conf, Path path, + String[] filesNames) throws IOException { + // write the names to a sequence file + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, + String.class, String.class); + try { + for (String fileName : filesNames) { + writer.append(fileName, MobConstants.EMPTY_STRING); + } + } finally { + IOUtils.closeStream(writer); + } + } + + @Test + public void testSweeperJobWithOutUnusedFile() throws Exception { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration configuration = new Configuration( + TEST_UTIL.getConfiguration()); + Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/0/visited"); + Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/0/all"); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, + vistiedFileNamesPath.toString()); + configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, + allFileNamesPath.toString()); + + writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", + "2", "3", "4", "5", "6"}); + + Path r0 = new Path(vistiedFileNamesPath, "r0"); + writeFileNames(fs, configuration, r0, new String[] { "1", + "2", "3"}); + Path r1 = new Path(vistiedFileNamesPath, "r1"); + writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"}); + Path r2 = new Path(vistiedFileNamesPath, "r2"); + writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"}); + + SweepJob sweepJob = new SweepJob(configuration, fs); + List toBeArchived = sweepJob.getUnusedFiles(configuration); + + assertEquals(0, toBeArchived.size()); + } + + @Test + public void testSweeperJobWithUnusedFile() throws Exception { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration configuration = new Configuration( + TEST_UTIL.getConfiguration()); + Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/1/visited"); + Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/1/all"); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, + vistiedFileNamesPath.toString()); + configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, + allFileNamesPath.toString()); + + writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", + "2", "3", "4", "5", "6"}); + + Path r0 = new Path(vistiedFileNamesPath, "r0"); + writeFileNames(fs, configuration, r0, new String[] { "1", + "2", "3"}); + Path r1 = new Path(vistiedFileNamesPath, "r1"); + writeFileNames(fs, configuration, r1, new String[] { "1", "5"}); + Path r2 = new Path(vistiedFileNamesPath, "r2"); + writeFileNames(fs, configuration, r2, new String[] { "2", "3"}); + + SweepJob sweepJob = new SweepJob(configuration, fs); + List toBeArchived = sweepJob.getUnusedFiles(configuration); + + assertEquals(2, toBeArchived.size()); + assertEquals(new String[] { "4", "6" }, toBeArchived.toArray(new String[0])); + } + + @Test + public void testSweeperJobWithRedundantFile() throws Exception { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration configuration = new Configuration( + TEST_UTIL.getConfiguration()); + Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/2/visited"); + Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/2/all"); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, + vistiedFileNamesPath.toString()); + configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, + allFileNamesPath.toString()); + + writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", + "2", "3", "4", "5", "6"}); + + Path r0 = new Path(vistiedFileNamesPath, "r0"); + writeFileNames(fs, configuration, r0, new String[] { "1", + "2", "3"}); + Path r1 = new Path(vistiedFileNamesPath, "r1"); + writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"}); + Path r2 = new Path(vistiedFileNamesPath, "r2"); + writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"}); + + SweepJob sweepJob = new SweepJob(configuration, fs); + List toBeArchived = sweepJob.getUnusedFiles(configuration); + + assertEquals(0, toBeArchived.size()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java new file mode 100644 index 0000000..a7e2538 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mob.MobZookeeper; +import org.apache.hadoop.hbase.mob.mapreduce.SweepMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +@Category(SmallTests.class) +public class TestMobSweepMapper { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void TestMap() throws Exception { + String prefix = "0000"; + final String fileName = "19691231f2cd014ea28f42788214560a21a44cef"; + final String mobFilePath = prefix + fileName; + + ImmutableBytesWritable r = new ImmutableBytesWritable(Bytes.toBytes("r")); + final KeyValue[] kvList = new KeyValue[1]; + kvList[0] = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), + Bytes.toBytes("column"), Bytes.toBytes(mobFilePath)); + + Result columns = mock(Result.class); + when(columns.raw()).thenReturn(kvList); + + Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); + configuration.set(SweepJob.SWEEP_JOB_ID, "1"); + configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepMapper:family-sweeper"); + + MobZookeeper zk = MobZookeeper.newInstance(configuration, "1"); + zk.addSweeperZNode("testSweepMapper", "family", Bytes.toBytes("1")); + + Mapper.Context ctx = + mock(Mapper.Context.class); + when(ctx.getConfiguration()).thenReturn(configuration); + SweepMapper map = new SweepMapper(); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Text text = (Text) invocation.getArguments()[0]; + KeyValue kv = (KeyValue) invocation.getArguments()[1]; + + assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName); + assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey())); + + return null; + } + }).when(ctx).write(any(Text.class), any(KeyValue.class)); + + map.map(r, columns, ctx); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java new file mode 100644 index 0000000..0f4c3ff --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.MobZookeeper; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.counters.GenericCounter; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Matchers; + +@Category(MediumTests.class) +public class TestMobSweepReducer { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String tableName = "testSweepReducer"; + private final static String row = "row"; + private final static String family = "family"; + private final static String qf = "qf"; + private static HTable table; + private static Admin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @SuppressWarnings("deprecation") + @Before + public void setUp() throws Exception { + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L)); + hcd.setMaxVersions(4); + desc.addFamily(hcd); + + admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + table = new HTable(TEST_UTIL.getConfiguration(), tableName); + } + + @After + public void tearDown() throws Exception { + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); + admin.close(); + } + + private List getKeyFromSequenceFile(FileSystem fs, Path path, + Configuration conf) throws Exception { + List list = new ArrayList(); + SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path)); + + String next = (String) reader.next((String) null); + while (next != null) { + list.add(next); + next = (String) reader.next((String) null); + } + reader.close(); + return list; + } + + @Test + public void testRun() throws Exception { + + byte[] mobValueBytes = new byte[100]; + + //get the path where mob files lie in + Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), + TableName.valueOf(tableName), family); + + Put put = new Put(Bytes.toBytes(row)); + put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); + Put put2 = new Put(Bytes.toBytes(row + "ignore")); + put2.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); + table.put(put); + table.put(put2); + table.flushCommits(); + admin.flush(TableName.valueOf(tableName)); + + FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + //check the generation of a mob file + assertEquals(1, fileStatuses.length); + + String mobFile1 = fileStatuses[0].getPath().getName(); + + Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); + configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f); + configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName); + configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family); + configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir"); + configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir"); + configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName()); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir"); + configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, + System.currentTimeMillis() + 24 * 3600 * 1000); + + configuration.set(SweepJob.SWEEP_JOB_ID, "1"); + configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepReducer:family-sweeper"); + + MobZookeeper zk = MobZookeeper.newInstance(configuration, "1"); + zk.addSweeperZNode(tableName, family, Bytes.toBytes("1")); + + //use the same counter when mocking + Counter counter = new GenericCounter(); + Reducer.Context ctx = + mock(Reducer.Context.class); + when(ctx.getConfiguration()).thenReturn(configuration); + when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter); + when(ctx.nextKey()).thenReturn(true).thenReturn(false); + when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1)); + + byte[] refBytes = Bytes.toBytes(mobFile1); + long valueLength = refBytes.length; + byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes); + KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), + Bytes.toBytes(qf), 1, KeyValue.Type.Put, newValue); + List list = new ArrayList(); + list.add(kv2); + + when(ctx.getValues()).thenReturn(list); + + SweepReducer reducer = new SweepReducer(); + reducer.run(ctx); + + FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + String mobFile2 = filsStatuses2[0].getPath().getName(); + //new mob file is generated, old one has been archived + assertEquals(1, filsStatuses2.length); + assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1)); + + //test sequence file + String workingPath = configuration.get("mob.compaction.visited.dir"); + FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath)); + Set files = new TreeSet(); + for (FileStatus st : statuses) { + files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(), + st.getPath(), configuration)); + } + assertEquals(1, files.size()); + assertEquals(true, files.contains(mobFile1)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java new file mode 100644 index 0000000..c43bceb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestMobSweeper { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private String tableName; + private final static String row = "row_"; + private final static String family = "family"; + private final static String column = "column"; + private static HTable table; + private static Admin admin; + + private Random random = new Random(); + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + + TEST_UTIL.startMiniCluster(); + + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.shutdownMiniMapReduceCluster(); + } + + @SuppressWarnings("deprecation") + @Before + public void setUp() throws Exception { + long tid = System.currentTimeMillis(); + tableName = "testSweeper" + tid; + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L)); + hcd.setMaxVersions(4); + desc.addFamily(hcd); + + admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + table = new HTable(TEST_UTIL.getConfiguration(), tableName); + table.setAutoFlush(false); + + } + + @After + public void tearDown() throws Exception { + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); + admin.close(); + } + + private Path getMobFamilyPath(Configuration conf, String tableNameStr, + String familyName) { + Path p = new Path(MobUtils.getMobRegionPath(conf, TableName.valueOf(tableNameStr)), + familyName); + return p; + } + + + private String mergeString(Set set) { + StringBuilder sb = new StringBuilder(); + for (String s : set) + sb.append(s); + return sb.toString(); + } + + + private void generateMobTable(int count, int flushStep) + throws IOException, InterruptedException { + if (count <= 0 || flushStep <= 0) + return; + int index = 0; + for (int i = 0; i < count; i++) { + byte[] mobVal = new byte[101*1024]; + random.nextBytes(mobVal); + + Put put = new Put(Bytes.toBytes(row + i)); + put.add(Bytes.toBytes(family), Bytes.toBytes(column), mobVal); + table.put(put); + if (index++ % flushStep == 0) { + table.flushCommits(); + admin.flush(TableName.valueOf(tableName)); + } + + + } + table.flushCommits(); + admin.flush(TableName.valueOf(tableName)); + } + + @Test + public void testSweeper() throws Exception { + + int count = 10; + //create table and generate 10 mob files + generateMobTable(count, 1); + + //get mob files + Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); + FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + // mobFileSet0 stores the orignal mob files + TreeSet mobFilesSet = new TreeSet(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + //scan the table, retreive the references + Scan scan = new Scan(); + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE)); + ResultScanner rs = table.getScanner(scan); + TreeSet mobFilesScanned = new TreeSet(); + for (Result res : rs) { + byte[] valueBytes = res.getValue(Bytes.toBytes(family), + Bytes.toBytes(column)); + mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT, + valueBytes.length - Bytes.SIZEOF_INT)); + } + + //there should be 10 mob files + assertEquals(10, mobFilesScanned.size()); + //check if we store the correct reference of mob files + assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned)); + + + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 24 * 60 * 60 * 1000); + + String[] args = new String[2]; + args[0] = tableName; + args[1] = family; + ToolRunner.run(conf, new Sweeper(), args); + + + mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); + fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + mobFilesSet = new TreeSet(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + assertEquals(10, mobFilesSet.size()); + + + scan = new Scan(); + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE)); + rs = table.getScanner(scan); + TreeSet mobFilesScannedAfterJob = new TreeSet(); + for (Result res : rs) { + byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes( + column)); + mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT, + valueBytes.length - Bytes.SIZEOF_INT)); + } + + assertEquals(10, mobFilesScannedAfterJob.size()); + + fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + mobFilesSet = new TreeSet(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + assertEquals(10, mobFilesSet.size()); + assertEquals(true, mobFilesScannedAfterJob.iterator().next() + .equalsIgnoreCase(mobFilesSet.iterator().next())); + + } + + @Test + public void testCompactionDelaySweeper() throws Exception { + + int count = 10; + //create table and generate 10 mob files + generateMobTable(count, 1); + + //get mob files + Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); + FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + // mobFileSet0 stores the orignal mob files + TreeSet mobFilesSet = new TreeSet(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + //scan the table, retreive the references + Scan scan = new Scan(); + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE)); + ResultScanner rs = table.getScanner(scan); + TreeSet mobFilesScanned = new TreeSet(); + for (Result res : rs) { + byte[] valueBytes = res.getValue(Bytes.toBytes(family), + Bytes.toBytes(column)); + mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT, + valueBytes.length - Bytes.SIZEOF_INT)); + } + + //there should be 10 mob files + assertEquals(10, mobFilesScanned.size()); + //check if we store the correct reference of mob files + assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned)); + + + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 0); + + String[] args = new String[2]; + args[0] = tableName; + args[1] = family; + ToolRunner.run(conf, new Sweeper(), args); + + + mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); + fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + mobFilesSet = new TreeSet(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + assertEquals(1, mobFilesSet.size()); + + + scan = new Scan(); + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE)); + rs = table.getScanner(scan); + TreeSet mobFilesScannedAfterJob = new TreeSet(); + for (Result res : rs) { + byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes( + column)); + mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT, + valueBytes.length - Bytes.SIZEOF_INT)); + } + + assertEquals(1, mobFilesScannedAfterJob.size()); + + fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + mobFilesSet = new TreeSet(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + assertEquals(1, mobFilesSet.size()); + assertEquals(true, mobFilesScannedAfterJob.iterator().next() + .equalsIgnoreCase(mobFilesSet.iterator().next())); + + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java index f8d6ce4..51f4de3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java @@ -53,10 +53,13 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.MobZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -70,7 +73,7 @@ public class TestMobCompaction { @Rule public TestName name = new TestName(); static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName()); - private HBaseTestingUtility UTIL = null; + private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); private Configuration conf = null; private HRegion region = null; @@ -84,14 +87,22 @@ public class TestMobCompaction { private final byte[] STARTROW = Bytes.toBytes(START_KEY); private int compactionThreshold; - private void init(long mobThreshold) throws Exception { - this.mobCellThreshold = mobThreshold; + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + UTIL.startMiniCluster(1); + } - UTIL = HBaseTestingUtility.createLocalHTU(); + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + private void init(long mobThreshold) throws Exception { + this.mobCellThreshold = mobThreshold; conf = UTIL.getConfiguration(); compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); - htd = UTIL.createTableDescriptor(name.getMethodName()); hcd = new HColumnDescriptor(COLUMN_FAMILY); hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); @@ -158,7 +169,8 @@ public class TestMobCompaction { region.getTableDesc().getFamily(COLUMN_FAMILY).setValue( MobConstants.MOB_THRESHOLD, Bytes.toBytes(500L)); region.initialize(); - region.compactStores(true); + region.compactStores(); + assertEquals("After compaction: store files", 1, countStoreFiles()); assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles()); assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles()); @@ -307,7 +319,7 @@ public class TestMobCompaction { if (!MobUtils.isMobReferenceCell(kv)) { continue; } - if (!MobUtils.isValidMobRefCellValue(kv)) { + if (!MobUtils.hasValidMobRefCellValue(kv)) { continue; } int size = MobUtils.getMobValueLength(kv); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java index 69b9c8f..87147d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java @@ -22,12 +22,14 @@ import java.io.IOException; import java.util.List; import java.util.Random; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.TableName; @@ -40,6 +42,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -120,6 +124,7 @@ public class TestMobStoreScanner { testGetFromMemStore(false); testGetReferences(false); testMobThreshold(false); + testGetFromArchive(false); } @Test @@ -128,6 +133,7 @@ public class TestMobStoreScanner { testGetFromMemStore(true); testGetReferences(true); testMobThreshold(true); + testGetFromArchive(true); } public void testGetFromFiles(boolean reversed) throws Exception { @@ -282,6 +288,72 @@ public class TestMobStoreScanner { results.close(); } + public void testGetFromArchive(boolean reversed) throws Exception { + String TN = "testGetFromArchive" + reversed; + setUp(defaultThreshold, TN); + long ts1 = System.currentTimeMillis(); + long ts2 = ts1 + 1; + long ts3 = ts1 + 2; + byte [] value = generateMobValue((int)defaultThreshold+1);; + // Put some data + Put put1 = new Put(row1); + put1.add(family, qf1, ts3, value); + put1.add(family, qf2, ts2, value); + put1.add(family, qf3, ts1, value); + table.put(put1); + + table.flushCommits(); + admin.flush(TN); + + // Get the files in the mob path + Path mobFamilyPath; + mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), + TableName.valueOf(TN)), hcd.getNameAsString()); + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + FileStatus[] files = fs.listStatus(mobFamilyPath); + + // Get the archive path + Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration()); + Path tableDir = FSUtils.getTableDir(rootDir, TableName.valueOf(TN)); + HRegionInfo regionInfo = MobUtils.getMobRegionInfo(TableName.valueOf(TN)); + Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), + regionInfo, tableDir, family); + + // Move the files from mob path to archive path + fs.mkdirs(storeArchiveDir); + int fileCount = 0; + for(FileStatus file : files) { + fileCount++; + Path filePath = file.getPath(); + Path src = new Path(mobFamilyPath, filePath.getName()); + Path dst = new Path(storeArchiveDir, filePath.getName()); + fs.rename(src, dst); + } + + // Verify the moving success + FileStatus[] files1 = fs.listStatus(mobFamilyPath); + Assert.assertEquals(0, files1.length); + FileStatus[] files2 = fs.listStatus(storeArchiveDir); + Assert.assertEquals(fileCount, files2.length); + + // Scan from archive + Scan scan = new Scan(); + setScan(scan, reversed, false); + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + List cells = res.listCells(); + for(Cell cell : cells) { + // Verify the value + Assert.assertEquals(Bytes.toString(value), + Bytes.toString(CellUtil.cloneValue(cell))); + count++; + } + } + results.close(); + Assert.assertEquals(3, count); + } + /** * Assert the value is not store in mob. */