diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java new file mode 100644 index 0000000..6c5c25c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java @@ -0,0 +1,75 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import org.apache.hadoop.hbase.util.Bytes; + +public class MobConstants { + + public static final String IS_MOB = "is_mob"; + + public static final byte MOB_REFERENCE_TAG_TYPE = (byte) 211; + public static final byte[] MOB_REFERENCE_TAG_VALUE = Bytes.toBytes("ref"); + + public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw"; + public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks"; + + public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size"; + public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000; + public static final String MOB_CELL_SIZE_THRESHOLD = "hbase.mob.cell.size.threshold"; + public static final int DEFAULT_MOB_CELL_SIZE_THRESHOLD = 0; + + public static final String MOB_ROOTDIR = "hbase.mob.rootdir"; + public static final String DEFAULT_MOB_ROOTDIR_NAME = "mob"; + public static final String MOB_REGION_NAME = ".mob"; + + public static final String MOB_COMPACTION_START_DATE = "hbase.mob.compaction.start.date"; + + public static final String MOB_DELETE_EXPIRED_MOBFILES = "hbase.mob.delete.expired.mobfiles"; + public static final String MOB_DELETE_EXPIRED_MOBFILES_INTERVAL = + "hbase.mob.delete.expired.mobfiles.interval"; + + public static final String MOB_COMPACTION_ZOOKEEPER = "hbase.mob.compaction.zookeeper"; + + public static final String MOB_COMPACTION_INVALID_FILE_RATIO = + "hbase.mob.compaction.invalid.file.ratio"; + public static final String MOB_COMPACTION_SMALL_FILE_THRESHOLD = + "hbase.mob.compaction.small.file.threshold"; + + public static final float DEFAULT_MOB_COMPACTION_INVALID_FILE_RATIO = 0.3f; + public static final long DEFAULT_MOB_COMPACTION_SMALL_FILE_THRESHOLD = 64 * 1024 * 1024; // 64M + + public static final String MOB_COMPACTION_TEMP_DIR = "hbase.mob.compaction.temp.dir"; + public static final String DEFAULT_MOB_COMPACTION_TEMP_DIR = "/tmp/hadoop/mobcompaction"; + + public static final String MOB_COMPACTION_MEMSTORE_FLUSH_SIZE = + "hbase.mob.compaction.memstore.flush.size"; + public static final long DEFAULT_MOB_COMPACTION_MEMSTORE_FLUSH_SIZE = 1024 * 1024 * 128L; + public static final String MOB_COMPACTION_MEMSTORE_BLOCK_MULTIPLIER = + "hbase.mob.compaction.memstore.block.multiplier"; + + public static final String MOB_COMPACTION_JOB_WORKING_DIR = + "hbase.mob.compaction.job.working.dir"; + public static final String MOB_COMPACTION_OUTPUT_TABLE_FACTORY = + "hbase.mob.compaction.output.table.factory"; + + private MobConstants() { + + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java new file mode 100644 index 0000000..74b8d6a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java @@ -0,0 +1,109 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; + +public class MobFile { + + private static final Log LOG = LogFactory.getLog(MobFile.class); + + private StoreFile sf; + + protected MobFile(StoreFile sf) { + this.sf = sf; + } + + /** + * Internal use only. This is used by the sweeper. + * + * @return + * @throws IOException + */ + public StoreFileScanner getScanner() throws IOException { + List sfs = new ArrayList(); + sfs.add(sf); + + List sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, + false, null, sf.getMaxMemstoreTS()); + + if (!sfScanners.isEmpty()) { + return sfScanners.get(0); + } + return null; + } + + public Cell readKeyValue(KeyValue search, boolean cacheMobBlocks) { + Cell result = null; + StoreFileScanner scanner = null; + String errorMsg = null; + List sfs = new ArrayList(); + sfs.add(sf); + try { + List sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, cacheMobBlocks, + true, false, null, sf.getMaxMemstoreTS()); + + if (!sfScanners.isEmpty()) { + scanner = sfScanners.get(0); + if (true == scanner.seek(search)) { + result = scanner.peek(); + } + } + } catch (IOException ioe) { + errorMsg = "Failed to read KeyValue!"; + if (ioe.getCause() instanceof FileNotFoundException) { + errorMsg += "The mob file does not exist!"; + } + LOG.error(errorMsg, ioe); + result = null; + } catch (NullPointerException npe) { + // When delete the file during the scan, the hdfs getBlockRange will + // throw NullPointerException, catch it and manage it. + errorMsg = "Failed to read KeyValue! "; + LOG.error(errorMsg, npe); + result = null; + } finally { + if (scanner != null) { + scanner.close(); + } + } + return result; + } + + public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf) + throws IOException { + StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); + return new MobFile(sf); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java new file mode 100644 index 0000000..f89c31d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java @@ -0,0 +1,150 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.math.BigInteger; +import java.security.InvalidParameterException; +import java.util.Date; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.Bytes; + +public class MobFileName { + + private String date; + private int startKey; + private String uuid; + private long count; + + static public MobFileName create(String startKey, long count, Date date, String uuid) { + String dateString = MobUtils.formatDate(date); + return new MobFileName(dateString, startKey, count, uuid); + } + + static public MobFileName create(String startKey, long count, String date, String uuid) { + return new MobFileName(date, startKey, count, uuid); + } + + static public MobFileName create(String fileName) { + String date = fileName.substring(0, 8); + int startKey = hexString2Int(fileName.substring(8, 16)); +// int count = hexString2Int(fileName.substring(16, 24)); + long count = new BigInteger(fileName.substring(16, 32), 16).longValue(); + String uuid = fileName.substring(32); + return new MobFileName(date, startKey, count, uuid); + } + + public static String int2HexString(int i) { + int shift = 4; + char[] buf = new char[8]; + + int charPos = 8; + int radix = 1 << shift; + int mask = radix - 1; + do { + buf[--charPos] = digits[i & mask]; + i >>>= shift; + } while (charPos > 0); + + return new String(buf); + } + + public static int hexString2Int(String hex) { + byte[] buffer = Bytes.toBytes(hex); + if (buffer.length != 8) { + throw new InvalidParameterException("hexString2Int length not valid"); + } + + for (int i = 0; i < buffer.length; i++) { + byte ch = buffer[i]; + if (ch >= 'a' && ch <= 'z') { + buffer[i] = (byte) (ch - 'a' + 10); + } else { + buffer[i] = (byte) (ch - '0'); + } + } + + buffer[0] = (byte) ((buffer[0] << 4) ^ buffer[1]); + buffer[1] = (byte) ((buffer[2] << 4) ^ buffer[3]); + buffer[2] = (byte) ((buffer[4] << 4) ^ buffer[5]); + buffer[3] = (byte) ((buffer[6] << 4) ^ buffer[7]); + return Bytes.toInt(buffer, 0, 4); + } + + final static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', + 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', + 'v', 'w', 'x', 'y', 'z' }; + + public MobFileName(String date, String startKey, long count, String uuid) { + this(date, hexString2Int(startKey), count, uuid); + } + + public MobFileName(String date, int startKey, long count, String uuid) { + + this.startKey = startKey; + this.count = count; + this.uuid = uuid; + this.date = date; + } + + public String getStartKey() { + return int2HexString(startKey); + } + + public String getDate() { + return this.date; + } + + @Override + public int hashCode() { + StringBuilder builder = new StringBuilder(); + builder.append(date); + builder.append(startKey); + builder.append(uuid); + builder.append(count); + return builder.toString().hashCode(); + } + + @Override + public boolean equals(Object anObject) { + if (this == anObject) { + return true; + } + if (anObject instanceof MobFileName) { + MobFileName another = (MobFileName) anObject; + if (this.date.equals(another.date) && this.startKey == another.startKey + && this.uuid.equals(another.uuid) && this.count == another.count) { + return true; + } + } + return false; + } + + public long getRecordCount() { + return this.count; + } + + public Path getAbsolutePath(Path rootPath) { + return new Path(rootPath, getFileName()); + } + + public String getFileName() { + return this.date + int2HexString(this.startKey) + Long.toHexString(this.count) + this.uuid; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java new file mode 100644 index 0000000..e33d057 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -0,0 +1,195 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.MobFileStore; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.Strings; + +public class MobUtils { + + private static final Log LOG = LogFactory.getLog(MobUtils.class); + public final static String TMP = ".tmp"; + + private static final ThreadLocal LOCAL_FORMAT = new ThreadLocal() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat("yyyyMMdd"); + } + }; + + public static boolean isMobFamily(HColumnDescriptor hcd) { + String is_mob = hcd.getValue(MobConstants.IS_MOB); + if (is_mob != null) { + return Boolean.parseBoolean(is_mob); + } + return false; + } + + public static HColumnDescriptor getColumnDescriptor(FileSystem fs, Path homeDir, + TableName tableName, String familyName) throws IOException { + TableDescriptors htds = new FSTableDescriptors(fs, homeDir.getParent()); + HTableDescriptor htd = htds.get(tableName); + return htd == null ? null : htd.getFamily(Bytes.toBytes(familyName)); + } + + public static String formatDate(Date date) { + return LOCAL_FORMAT.get().format(date); + } + + public static Date parseDate(String dateString) throws ParseException { + return LOCAL_FORMAT.get().parse(dateString); + } + + public static boolean isMobReferenceKeyValue(KeyValue kv) { + boolean isMob = false; + List tags = kv.getTags(); + if (!tags.isEmpty()) { + for (Tag tag : tags) { + if (tag.getType() == MobConstants.MOB_REFERENCE_TAG_TYPE) { + isMob = true; + break; + } + } + } + return isMob; + } + + public static boolean isRawMobScan(Scan scan) { + byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW); + try { + return raw == null ? false : Bytes.toBoolean(raw); + } catch (IllegalArgumentException e) { + return false; + } + } + + public static boolean isCacheMobBlocks(Scan scan) { + byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS); + try { + return cache == null ? false : Bytes.toBoolean(cache); + } catch (IllegalArgumentException e) { + return false; + } + } + + public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) { + scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks)); + } + + public static void cleanExpiredData(FileSystem fs, MobFileStore store) throws IOException { + cleanExpiredDate(fs, store, new Date()); + } + + public static void cleanExpiredDate(FileSystem fs, MobFileStore store, Date current) + throws IOException { + HColumnDescriptor columnDescriptor = store.getColumnDescriptor(); + + if (current == null) { + current = new Date(); + } + // Here the default value of columnDescriptor.getTimeToLive() is + // Integer.MAX_VALUE + // It must be parsed as long before computing in case of overflow + long timeToLive = columnDescriptor.getTimeToLive(); + + if (Integer.MAX_VALUE == timeToLive) { + // no need to clean, because the TTL is not set. + return; + } + + Date expireDate = new Date(current.getTime() - timeToLive * 1000); + expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate()); + LOG.info("[MOB] File before " + expireDate.toGMTString() + " will be deleted!"); + + FileStatus[] stats = fs.listStatus(store.getPath()); + if (null == stats) { + // no file found + return; + } + for (int i = 0; i < stats.length; i++) { + FileStatus file = stats[i]; + if (!file.isDirectory()) { + continue; + } + String fileName = file.getPath().getName(); + if (".tmp".equals(fileName)) { + continue; + } + try { + Date fileDate = parseDate(fileName.substring(0, 8)); + LOG.info("[MOB] Checking folder " + fileName); + if (fileDate.getTime() < expireDate.getTime()) { + LOG.info("[MOB] Delete expired folder " + fileName); + fs.delete(file.getPath(), true); + } + } catch (ParseException e) { + LOG.error("Cannot parse the fileName as date" + fileName, e); + continue; + } + } + } + + public static String getStoreZNodeName(String tableName, String columnName) { + return tableName + ":" + columnName; + } + + public static Path getMobHome(Configuration conf) { + String mobRootdir = conf.get(MobConstants.MOB_ROOTDIR); + Path mobHome; + if (Strings.isEmpty(mobRootdir)) { + Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); + mobHome = new Path(hbaseDir, MobConstants.DEFAULT_MOB_ROOTDIR_NAME); + } else { + mobHome = new Path(mobRootdir); + } + return mobHome; + } + + public static Path getCompactionOutputPath(String root, String jobName) { + Path parent = new Path(root, jobName); + return new Path(parent, "output"); + } + + public static Path getCompactionWorkingPath(String root, String jobName) { + Path parent = new Path(root, jobName); + return new Path(parent, "working"); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java new file mode 100644 index 0000000..b172fac --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java @@ -0,0 +1,154 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +public class MobZookeeper { + + private static final Log LOG = LogFactory.getLog(MobZookeeper.class); + + private ZooKeeperWatcher zkw; + private String mobZnode; + private static final String LOCK_EPHEMERAL = "-lock"; + private static final String SWEEPER_EPHEMERAL = "-sweeper"; + private static final String MAJOR_COMPACTION_EPHEMERAL = "-mc-ephemeral"; + + public MobZookeeper(Configuration conf) throws IOException, KeeperException { + zkw = new ZooKeeperWatcher(conf, "LobZookeeper", new DummyLobAbortable()); + mobZnode = ZKUtil.joinZNode(zkw.baseZNode, "MOB"); + if (ZKUtil.checkExists(zkw, mobZnode) == -1) { + ZKUtil.createWithParents(zkw, mobZnode); + } + } + + public static MobZookeeper newInstance(Configuration conf) throws IOException, KeeperException { + return new MobZookeeper(conf); + } + + public boolean lockStore(String tableName, String storeName) { + String znodeName = MobUtils.getStoreZNodeName(tableName, storeName); + boolean locked = false; + try { + locked = ZKUtil.createEphemeralNodeAndWatch(zkw, + ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL), null); + if (LOG.isDebugEnabled()) { + LOG.debug(locked ? "Locked the store " + znodeName : "Can not lock the store " + znodeName); + } + } catch (KeeperException e) { + LOG.error("Fail to lock the store " + znodeName, e); + } + return locked; + } + + public void unlockStore(String tableName, String storeName) { + String znodeName = MobUtils.getStoreZNodeName(tableName, storeName); + if (LOG.isDebugEnabled()) { + LOG.debug("Unlocking the store " + znodeName); + } + try { + ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL)); + } catch (KeeperException e) { + LOG.warn("Fail to unlock the store " + znodeName, e); + } + } + + public boolean addSweeperZNode(String tableName, String storeName) { + boolean add = false; + String znodeName = MobUtils.getStoreZNodeName(tableName, storeName); + try { + add = ZKUtil.createEphemeralNodeAndWatch(zkw, + ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL), null); + if (LOG.isDebugEnabled()) { + LOG.debug(add ? "Added a znode for sweeper " + znodeName + : "Cannot add a znode for sweeper " + znodeName); + } + } catch (KeeperException e) { + LOG.error("Fail to add a znode for sweeper " + znodeName, e); + } + return add; + } + + public void deleteSweeperZNode(String tableName, String storeName) { + String znodeName = MobUtils.getStoreZNodeName(tableName, storeName); + try { + ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)); + } catch (KeeperException e) { + LOG.error("Fail to delete a znode for sweeper " + znodeName, e); + } + } + + public boolean isSweeperZNodeExist(String tableName, String storeName) throws KeeperException { + String znodeName = MobUtils.getStoreZNodeName(tableName, storeName); + return ZKUtil.checkExists(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)) >= 0; + } + + public boolean hasMajorCompactionChildren(String tableName, String storeName) + throws KeeperException { + String znodeName = MobUtils.getStoreZNodeName(tableName, storeName); + String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL); + List children = ZKUtil.listChildrenNoWatch(zkw, mcPath); + ; + return children != null && !children.isEmpty(); + } + + public boolean addMajorCompactionZNode(String tableName, String storeName, String compactionName) + throws KeeperException { + String znodeName = MobUtils.getStoreZNodeName(tableName, storeName); + String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL); + ZKUtil.createEphemeralNodeAndWatch(zkw, mcPath, null); + String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName); + return ZKUtil.createEphemeralNodeAndWatch(zkw, eachMcPath, null); + } + + public void deleteMajorCompactionZNode(String tableName, String storeName, String compactionName) + throws KeeperException { + String znodeName = MobUtils.getStoreZNodeName(tableName, storeName); + String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL); + String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName); + ZKUtil.deleteNode(zkw, eachMcPath); + } + + public void close() { + this.zkw.close(); + } + + private static class DummyLobAbortable implements Abortable { + + private boolean abort = false; + + public void abort(String why, Throwable e) { + abort = true; + } + + public boolean isAborted() { + return abort; + } + + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/ExpiredMobFileCleaner.java new file mode 100644 index 0000000..d7ae440 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/ExpiredMobFileCleaner.java @@ -0,0 +1,91 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.regionserver.MobFileStore; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.google.protobuf.ServiceException; + +public class ExpiredMobFileCleaner extends Configured implements Tool { + + public void cleanExpiredMobFiles(String tableName, String familyName) throws ServiceException, + IOException { + Configuration conf = getConf(); + HBaseAdmin.checkHBaseAvailable(conf); + HBaseAdmin admin = new HBaseAdmin(conf); + try { + FileSystem fs = FileSystem.get(conf); + if (!admin.tableExists(tableName)) { + throw new IOException("Table " + tableName + " not exist"); + } + HTableDescriptor htd = admin.getTableDescriptor(Bytes.toBytes(tableName)); + HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName)); + if (!MobUtils.isMobFamily(family)) { + throw new IOException("It's not a MOB column family"); + } + if(family.getMinVersions() > 0) { + throw new IOException( + "The minVersions of the column family is not 0, could not be handled by this cleaner"); + } + System.out.println("Cleaning the expired MOB files..."); + MobFileStore mobFileStore = MobFileStore.create(conf, fs, MobUtils.getMobHome(conf), + TableName.valueOf(tableName), family); + MobUtils.cleanExpiredData(fs, mobFileStore); + } finally { + try { + admin.close(); + } catch (IOException e) { + System.out.println("Fail to close the HBaseAdmin: " + e.getMessage()); + } + } + } + + public static void main(String[] args) throws Exception { + + System.out.print("Usage:\n" + "--------------------------\n" + + ExpiredMobFileCleaner.class.getName() + "[tableName] [familyName]"); + + Configuration conf = HBaseConfiguration.create(); + ToolRunner.run(conf, new ExpiredMobFileCleaner(), args); + } + + public int run(String[] args) throws Exception { + if (args.length >= 2) { + String tableName = args[0]; + String familyName = args[1]; + cleanExpiredMobFiles(tableName, familyName); + } + return 0; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobFilePathHashPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobFilePathHashPartitioner.java new file mode 100644 index 0000000..b5fd35a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobFilePathHashPartitioner.java @@ -0,0 +1,35 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Partitioner; + +public class MobFilePathHashPartitioner extends Partitioner { + + @Override + public int getPartition(Text filePath, KeyValue kv, int numPartitions) { + MobFileName mobFilePath = MobFileName.create(filePath.toString()); + String date = mobFilePath.getDate(); + int hash = date.hashCode(); + return (hash & Integer.MAX_VALUE) % numPartitions; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/ReferenceOnlyFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/ReferenceOnlyFilter.java new file mode 100644 index 0000000..efe155a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/ReferenceOnlyFilter.java @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.mob.MobUtils; + +public class ReferenceOnlyFilter extends FilterBase { + + @Override + public ReturnCode filterKeyValue(Cell cell) { + if (null != cell) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + if (MobUtils.isMobReferenceKeyValue(kv)) { + return ReturnCode.INCLUDE; + } + } + return ReturnCode.SKIP; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepCounter.java new file mode 100644 index 0000000..776ccc1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepCounter.java @@ -0,0 +1,28 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +public enum SweepCounter { + INPUT_FILE_COUNT, + FILE_TO_BE_MERGE_OR_CLEAN, + FILE_NO_REFERENCE, + FILE_AFTER_MERGE_OR_CLEAN, + RECORDS_UPDATED, + DISK_SPACE_RECLAIMED +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepJob.java new file mode 100644 index 0000000..9ceeb70 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepJob.java @@ -0,0 +1,167 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import java.io.IOException; +import java.util.Date; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.MobZookeeper; +import org.apache.hadoop.hbase.regionserver.MobFileStore; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.zookeeper.KeeperException; + +public class SweepJob { + + private FileSystem fs; + + public SweepJob(FileSystem fs) { + this.fs = fs; + } + + private static final Log LOG = LogFactory.getLog(SweepJob.class); + + public void sweep(MobFileStore store) throws IOException, + ClassNotFoundException, InterruptedException, KeeperException { + Configuration newConf = new Configuration(store.getConfiguration()); + ZKUtil.applyClusterKeyToConf(newConf, + store.getConfiguration().get(MobConstants.MOB_COMPACTION_ZOOKEEPER)); + MobZookeeper zk = MobZookeeper.newInstance(newConf); + try { + if (!zk.lockStore(store.getTableName(), store.getFamilyName())) { + LOG.warn("Can not lock the store " + store.getFamilyName() + + ". The major compaction in Apache HBase may be in-progress. Please re-run the job."); + return; + } + try { + boolean hasChildren = zk.hasMajorCompactionChildren(store.getTableName(), store.getFamilyName()); + if (hasChildren) { + LOG.warn("The major compaction in Apache HBase may be in-progress. Please re-run the job."); + return; + } else { + boolean hasSweeper = zk.isSweeperZNodeExist(store.getTableName(), store.getFamilyName()); + if(hasSweeper) { + LOG.warn("Another sweep job is running"); + return; + } else { + // add the sweeper znode + zk.addSweeperZNode(store.getTableName(), store.getFamilyName()); + } + } + } finally { + zk.unlockStore(store.getTableName(), store.getFamilyName()); + } + try { + Scan scan = new Scan(); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + scan.setFilter(new ReferenceOnlyFilter()); + scan.setCaching(10000); + scan.setMaxVersions(store.getColumnDescriptor().getMaxVersions()); + + Job job = prepareTableJob(store, scan, SweepMapper.class, Text.class, KeyValue.class, + SweepReducer.class, Text.class, Writable.class, NullOutputFormat.class, newConf); + job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, store.getFamilyName()); + /** + * Record the compaction begin time + */ + job.getConfiguration().set(MobConstants.MOB_COMPACTION_START_DATE, + String.valueOf(new Date().getTime())); + + job.setPartitionerClass(MobFilePathHashPartitioner.class); + job.waitForCompletion(true); + } finally { + zk.deleteSweeperZNode(store.getTableName(), store.getFamilyName()); + } + } finally { + zk.close(); + } + } + + protected Job prepareTableJob(MobFileStore store, Scan scan, Class mapper, + Class mapOutputKey, Class mapOutputValue, + Class reducer, Class reduceOutputKey, + Class reduceOutputValue, Class outputFormat, + Configuration conf) throws IOException { + + Job job = Job.getInstance(conf); + + job.setJarByClass(mapper); + TableMapReduceUtil.initTableMapperJob(store.getTableName(), scan, mapper, reduceOutputKey, + reduceOutputValue, job); + + job.setInputFormatClass(TableInputFormat.class); + job.setMapOutputKeyClass(mapOutputKey); + job.setMapOutputValueClass(mapOutputValue); + job.setReducerClass(reducer); + job.setOutputFormatClass(outputFormat); + String jobName = getCustomJobName(this.getClass().getSimpleName(), mapper, reducer, + store.getTableName(), store.getFamilyName()); + // Add the Output directory of the FileOutputFormat + Path outputPath = MobUtils.getCompactionOutputPath(conf.get( + MobConstants.MOB_COMPACTION_TEMP_DIR, MobConstants.DEFAULT_MOB_COMPACTION_TEMP_DIR), + jobName); + FileOutputFormat.setOutputPath(job, outputPath); + job.setJobName(jobName); + // delete this output path + fs.delete(outputPath, true); + // delete the temp directory of the mob files in case the failure in the previous + // execution. + Path workingPath = MobUtils.getCompactionWorkingPath(conf.get( + MobConstants.MOB_COMPACTION_TEMP_DIR, MobConstants.DEFAULT_MOB_COMPACTION_TEMP_DIR), + jobName); + job.getConfiguration().set(MobConstants.MOB_COMPACTION_JOB_WORKING_DIR, workingPath.toString()); + fs.delete(workingPath, true); + return job; + } + + private static String getCustomJobName(String className, Class mapper, + Class reducer, String tableName, String familyName) { + StringBuilder name = new StringBuilder(); + name.append(className); + name.append('-').append(mapper.getSimpleName()); + name.append('-').append(reducer.getSimpleName()); + name.append('-').append(tableName); + name.append('-').append(familyName); + return name.toString(); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepMapper.java new file mode 100644 index 0000000..c3f26e0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepMapper.java @@ -0,0 +1,46 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; + +public class SweepMapper extends TableMapper { + + @Override + public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException, + InterruptedException { + if (columns != null) { + KeyValue[] kvList = columns.raw(); + if (kvList != null && kvList.length > 0) { + for (KeyValue kv : kvList) { + String fileName = Bytes.toString(kv.getValue()); + KeyValue keyOnly = kv.createKeyOnly(false); + context.write(new Text(fileName), keyOnly); + } + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepReducer.java new file mode 100644 index 0000000..c4d39da --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/SweepReducer.java @@ -0,0 +1,416 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTableFactory; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.HTableInterfaceFactory; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFile; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.regionserver.DefaultMemStore; +import org.apache.hadoop.hbase.regionserver.MemStore; +import org.apache.hadoop.hbase.regionserver.MemStoreWrapper; +import org.apache.hadoop.hbase.regionserver.MobFileStore; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Reducer; + +public class SweepReducer extends Reducer { + + private static final Log LOG = LogFactory.getLog(SweepReducer.class); + + private MemStoreWrapper memstore; + private Configuration conf; + private FileSystem fs; + + private String tableName; + private String familyName; + private Path familyDir; + private MobFileStore mobFileStore; + private CacheConfig cacheConf; + private HTableInterface table; + private String compactionBeginString; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + this.conf = context.getConfiguration(); + this.fs = FileSystem.get(conf); + this.tableName = conf.get(TableInputFormat.INPUT_TABLE); + this.familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY); + Path mobPath = new Path(MobUtils.getMobHome(conf), new Path(tableName, + MobConstants.MOB_REGION_NAME)); + this.familyDir = new Path(mobPath, familyName); + + HBaseAdmin admin = new HBaseAdmin(this.conf); + HColumnDescriptor family = null; + try { + family = admin.getTableDescriptor(Bytes.toBytes(tableName)).getFamily( + Bytes.toBytes(familyName)); + } finally { + try { + admin.close(); + } catch (IOException e) { + LOG.warn("Fail to close the HBaseAdmin", e); + } + } + this.mobFileStore = MobFileStore.create(conf, fs, MobUtils.getMobHome(conf), + TableName.valueOf(tableName), family); + this.cacheConf = new CacheConfig(conf, mobFileStore.getColumnDescriptor()); + + String htableFactoryClassName = conf.get(MobConstants.MOB_COMPACTION_OUTPUT_TABLE_FACTORY, + HTableFactory.class.getName()); + HTableInterfaceFactory htableFactory = null; + try { + htableFactory = (HTableInterfaceFactory) Class.forName(htableFactoryClassName).newInstance(); + } catch (Exception e) { + throw new IOException("Fail to instantiate the HTableInterfaceFactory", e); + } + this.table = htableFactory.createHTableInterface(this.conf, Bytes.toBytes(tableName)); + this.table.setAutoFlush(false, false); + + this.table.setWriteBufferSize(1 * 1024 * 1024); // 1MB + memstore = new MemStoreWrapper(context, table, new DefaultMemStore(), mobFileStore); + + long compactionBegin = Long.parseLong(conf.get(MobConstants.MOB_COMPACTION_START_DATE, "0")); + this.compactionBeginString = MobUtils.formatDate(new Date(compactionBegin)); + } + + private SweepPartition createPartition(SweepPartitionId id, Context context) throws IOException { + + String partitionDate = id.getDate(); + + // Skip compacting the data which is generated after the compaction begin + // Or the data is generated same day of compaction begin + if (partitionDate.compareTo(compactionBeginString) >= 0) { + return null; + } else { + return new SweepPartition(id, context); + } + } + + @Override + public void run(Context context) throws IOException, InterruptedException { + setup(context); + try { + SweepPartitionId id = null; + SweepPartition partition = null; + while (context.nextKey()) { + Text key = context.getCurrentKey(); + id = SweepPartitionId.create(key.toString()); + if (null == partition || !id.equals(partition.getId())) { + if (null != partition) { + partition.close(); + } + + partition = createPartition(id, context); + } + if (partition != null) { + partition.execute(key, context.getValues()); + } + } + if (null != partition) { + partition.close(); + } + } finally { + cleanup(context); + } + + } + + public class SweepPartition { + + private SweepPartitionId id; + private Context context; + private boolean memstoreUpdated = false; + private boolean mergeSmall = false; + private Map fileInfos = new HashMap(); + private List toBeDeleted; + + public SweepPartition(SweepPartitionId id, Context context) throws IOException { + this.id = id; + this.context = context; + this.toBeDeleted = new ArrayList(); + memstore.setPartitionId(id); + init(); + } + + public SweepPartitionId getId() { + return this.id; + } + + private void init() throws IOException { + Path partitionPath = new Path(familyDir, id.getDate()); + + FileStatus[] fileStatus = listStatus(partitionPath, id.getStartKey()); + if (null == fileStatus) { + return; + } + + int smallFileCount = 0; + float invalidFileRatio = conf.getFloat(MobConstants.MOB_COMPACTION_INVALID_FILE_RATIO, + MobConstants.DEFAULT_MOB_COMPACTION_INVALID_FILE_RATIO); + long smallFileThreshold = conf.getLong(MobConstants.MOB_COMPACTION_SMALL_FILE_THRESHOLD, + MobConstants.DEFAULT_MOB_COMPACTION_SMALL_FILE_THRESHOLD); + for (int i = 0; i < fileStatus.length; i++) { + MobFileStatus info = new MobFileStatus(fileStatus[i]); + info.setInvalidFileRatio(invalidFileRatio).setSmallFileThreshold(smallFileThreshold); + if (info.needMerge()) { + smallFileCount++; + } + fileInfos.put(info.getPath(), info); + } + if (smallFileCount >= 2) { + this.mergeSmall = true; + } + } + + public void close() throws IOException { + if (null == id) { + return; + } + // delete files that have no reference + Set filePaths = fileInfos.keySet(); + Iterator iter = filePaths.iterator(); + while (iter.hasNext()) { + MobFileName path = iter.next(); + MobFileStatus fileInfo = fileInfos.get(path); + if (fileInfo.getReferenceCount() <= 0) { + fs.delete(path.getAbsolutePath(familyDir), false); + context.getCounter(SweepCounter.FILE_NO_REFERENCE).increment(1); + context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(1); + } + } + + // flush remain key values into mob files + if (memstoreUpdated) { + memstore.flushMemStore(); + } + + // delete old files after compaction + for (int i = 0; i < toBeDeleted.size(); i++) { + Path path = toBeDeleted.get(i).getAbsolutePath(familyDir); + LOG.info("[In Partition close] Delete the file " + path.getName() + " in partition close"); + context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(1); + fs.delete(path, false); + } + fileInfos.clear(); + } + + public void execute(Text path, Iterable values) throws IOException { + if (null == values) { + return; + } + MobFileName mobFilePath = MobFileName.create(path.toString()); + LOG.info("[In reducer] The file path: " + path.toString()); + MobFileStatus info = fileInfos.get(mobFilePath); + if (null == info) { + LOG.info("[In reducer] Cannot find the file, probably this record is obsolte"); + return; + } + Set kvs = new HashSet(); + Iterator iter = values.iterator(); + while (iter.hasNext()) { + KeyValue kv = iter.next(); + kvs.add(kv); + info.addReference(); + } + if (info.needClean() || (mergeSmall && info.needMerge())) { + context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1); + MobFile file = MobFile.create(fs, mobFilePath.getAbsolutePath(familyDir), conf, cacheConf); + StoreFileScanner scanner = null; + try { + scanner = file.getScanner(); + scanner.seek(KeyValueUtil.createFirstOnRow(new byte[] {})); + + Cell cell = null; + while (null != (cell = scanner.next())) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue keyOnly = kv.createKeyOnly(false); + if (kvs.contains(keyOnly)) { + memstore.addToMemstore(kv); + memstoreUpdated = true; + memstore.flushMemStoreIfNecessary(); + } + } + } finally { + if (scanner != null) { + scanner.close(); + } + } + toBeDeleted.add(mobFilePath); + } + } + + private FileStatus[] listStatus(Path p, String prefix) throws IOException { + return fs.listStatus(p, new PathPrefixFilter(prefix)); + } + } + + static class PathPrefixFilter implements PathFilter { + + private String prefix; + + public PathPrefixFilter(String prefix) { + this.prefix = prefix; + } + + public boolean accept(Path path) { + return path.getName().startsWith(prefix, 8); + } + + } + + public static class SweepPartitionId { + private String date; + private String startKey; + + public SweepPartitionId(MobFileName filePath) { + this.date = filePath.getDate(); + this.startKey = filePath.getStartKey(); + } + + public SweepPartitionId(String date, String startKey) { + this.date = date; + this.startKey = startKey; + } + + public static SweepPartitionId create(String key) { + return new SweepPartitionId(MobFileName.create(key)); + } + + @Override + public boolean equals(Object anObject) { + if (this == anObject) { + return true; + } + if (anObject instanceof MobFileName) { + MobFileName another = (MobFileName) anObject; + if (this.date.equals(another.getDate()) && this.startKey.equals(another.getStartKey())) { + return true; + } + } + return false; + } + + public String getDate() { + return this.date; + } + + public String getStartKey() { + return this.startKey; + } + + public void setDate(String date) { + this.date = date; + } + + public void setStartKey(String startKey) { + this.startKey = startKey; + } + } + + private static class MobFileStatus { + private long count; + private int referenceCount; + private MobFileName mobFilePath; + private long length; + + private float invalidFileRatio = MobConstants.DEFAULT_MOB_COMPACTION_INVALID_FILE_RATIO; + private long smallFileThreshold = MobConstants.DEFAULT_MOB_COMPACTION_SMALL_FILE_THRESHOLD; + + public MobFileStatus(FileStatus status) { + Path path = status.getPath(); + String fileName = path.getName(); + + this.length = status.getLen(); + + this.mobFilePath = MobFileName.create(fileName); + this.count = mobFilePath.getRecordCount(); + referenceCount = 0; + } + + public MobFileStatus setInvalidFileRatio(float invalidFileRatio) { + this.invalidFileRatio = invalidFileRatio; + return this; + } + + public MobFileStatus setSmallFileThreshold(long smallFileThreshold) { + this.smallFileThreshold = smallFileThreshold; + return this; + } + + public void addReference() { + this.referenceCount++; + } + + public int getReferenceCount() { + return this.referenceCount; + } + + public boolean needClean() { + if (referenceCount >= count) { + return false; + } + if (count - referenceCount > invalidFileRatio * count) { + return true; + } + return false; + } + + public boolean needMerge() { + if (this.length < smallFileThreshold) { + return true; + } + return false; + } + + public MobFileName getPath() { + return mobFilePath; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/Sweeper.java new file mode 100644 index 0000000..33d45f9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/Sweeper.java @@ -0,0 +1,87 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.regionserver.MobFileStore; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.KeeperException; + +import com.google.protobuf.ServiceException; + +public class Sweeper extends Configured implements Tool { + + public void sweepFamily(String table, String familyName) throws IOException, InterruptedException, + ClassNotFoundException, KeeperException, ServiceException { + Configuration conf = getConf(); + HBaseAdmin.checkHBaseAvailable(conf); + HBaseAdmin admin = new HBaseAdmin(conf); + try { + FileSystem fs = FileSystem.get(conf); + if (!admin.tableExists(table)) { + throw new IOException("Table " + table + " not exist"); + } + HTableDescriptor htd = admin.getTableDescriptor(Bytes.toBytes(table)); + HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName)); + if (!MobUtils.isMobFamily(family)) { + throw new IOException("It's not a MOB column family"); + } + MobFileStore store = MobFileStore.create(conf, fs, MobUtils.getMobHome(conf), + TableName.valueOf(table), family); + SweepJob job = new SweepJob(fs); + job.sweep(store); + } finally { + try { + admin.close(); + } catch (IOException e) { + System.out.println("Fail to close the HBaseAdmin: " + e.getMessage()); + } + } + } + + public static void main(String[] args) throws Exception { + + System.out.print("Usage:\n" + "--------------------------\n" + Sweeper.class.getName() + + "[tableName] [familyName]"); + + Configuration conf = HBaseConfiguration.create(); + ToolRunner.run(conf, new Sweeper(), args); + } + + public int run(String[] args) throws Exception { + if (args.length >= 2) { + String table = args[0]; + String family = args[1]; + sweepFamily(table, family); + } + return 0; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMobStoreFlusher.java new file mode 100644 index 0000000..f7ff9a1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMobStoreFlusher.java @@ -0,0 +1,230 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; + +/** + * This is the StoreFlusher used by the MOB store. + * + */ +public class DefaultMobStoreFlusher extends DefaultStoreFlusher { + + private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class); + private final Object flushLock = new Object(); + private boolean isMob = false; + private int mobCellSizeThreshold = 0; + private Path targetPath; + private MobFileStore mobFileStore; + private Object lock = new Object(); + private Path mobHome; + + public DefaultMobStoreFlusher(Configuration conf, Store store) { + super(conf, store); + isMob = MobUtils.isMobFamily(store.getFamily()); + mobCellSizeThreshold = conf.getInt(MobConstants.MOB_CELL_SIZE_THRESHOLD, + MobConstants.DEFAULT_MOB_CELL_SIZE_THRESHOLD); + mobHome = MobUtils.getMobHome(conf); + Path mobPath = new Path(mobHome, new Path(store.getTableName().getNameAsString(), + MobConstants.MOB_REGION_NAME)); + this.targetPath = new Path(mobPath, store.getColumnFamilyName()); + } + + @Override + public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, + MonitoredTask status) throws IOException { + ArrayList result = new ArrayList(); + if (snapshot.getSize() == 0) { + return result; // don't flush if there are no entries + } + + // Use a store scanner to find which rows to flush. + long smallestReadPoint = store.getSmallestReadPoint(); + InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + if (scanner == null) { + return result; // NULL scanner returned from coprocessor hooks means skip normal processing + } + if (!isMob) { + StoreFile.Writer writer; + try { + // TODO: We can fail in the below block before we complete adding this flush to + // list of store files. Add cleanup of anything put on filesystem if we fail. + synchronized (flushLock) { + status.setStatus("Flushing " + store + ": creating writer"); + // Write the map out to the disk + writer = store.createWriterInTmp(snapshot.getSize(), store.getFamily().getCompression(), + false, true, true); + writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); + try { + performFlush(scanner, writer, smallestReadPoint); + } finally { + finalizeWriter(writer, cacheFlushSeqNum, status); + } + } + } finally { + scanner.close(); + } + LOG.info("Flushed, sequenceid=" + cacheFlushSeqNum + ", memsize=" + + snapshot.getSize() + ", hasBloomFilter=" + writer.hasGeneralBloom() + + ", into tmp file " + writer.getPath()); + result.add(writer.getPath()); + return result; + } else { + StoreFile.Writer writer; + mobFileStore = currentMobFileStore(); + StoreFile.Writer mobFileWriter = null; + long flushed = 0; + try { + synchronized (flushLock) { + int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + status.setStatus("Flushing " + store + ": creating writer"); + // A. Write the map out to the disk + writer = store.createWriterInTmp(snapshot.getSize(), store.getFamily().getCompression(), + false, true, true); + writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); + + KeyValueScanner snapshotScanner = snapshot.getScanner(); + int mobKVCount = 0; + long time = 0; + Cell cell = snapshotScanner.next(); + while (cell != null) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + if (kv.getValueLength() >= mobCellSizeThreshold && !MobUtils.isMobReferenceKeyValue(kv) + && kv.getTypeByte() == KeyValue.Type.Put.getCode()) { + mobKVCount++; + time = time < kv.getTimestamp() ? kv.getTimestamp() : time; + } + cell = snapshotScanner.next(); + } + + mobFileWriter = mobFileStore.createWriterInTmp(new Date(time), mobKVCount, store.getFamily() + .getCompression(), store.getRegionInfo().getStartKey()); + // the target path is {tableName}/.mob/{cfName}/mobFiles + // the relative path is mobFiles + String relativePath = mobFileWriter.getPath().getName(); + byte[] referenceValue = Bytes.toBytes(relativePath); + try { + List kvs = new ArrayList(); + boolean hasMore; + do { + hasMore = scanner.next(kvs, compactionKVMax); + if (!kvs.isEmpty()) { + for (Cell c : kvs) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to + // disk. + KeyValue kv = KeyValueUtil.ensureKeyValue(c); + if (kv.getMvccVersion() <= smallestReadPoint) { + // let us not change the original KV. It could be in the memstore + // changing its memstoreTS could affect other threads/scanners. + kv = kv.shallowCopy(); + kv.setSequenceId(0); + } + + if (kv.getValueLength() < mobCellSizeThreshold + || MobUtils.isMobReferenceKeyValue(kv) + || kv.getTypeByte() != KeyValue.Type.Put.getCode()) { + writer.append(kv); + } else { + // append the original keyValue in the mob file. + mobFileWriter.append(kv); + + // append the tag to the KeyValue. + // The key is same, the value is the filename of the mob file + List existingTags = kv.getTags(); + if (existingTags.isEmpty()) { + existingTags = new ArrayList(); + } + Tag mobRefTag = new Tag(MobConstants.MOB_REFERENCE_TAG_TYPE, + MobConstants.MOB_REFERENCE_TAG_VALUE); + existingTags.add(mobRefTag); + KeyValue reference = new KeyValue(kv.getRowArray(), kv.getRowOffset(), + kv.getRowLength(), kv.getFamilyArray(), kv.getFamilyOffset(), + kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), + kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put, + referenceValue, 0, referenceValue.length, existingTags); + writer.append(reference); + } + + flushed += DefaultMemStore.heapSizeChange(kv, true); + } + kvs.clear(); + } + } while (hasMore); + } finally { + // Write out the log sequence number that corresponds to this output + // hfile. Also write current time in metadata as minFlushTime. + // The hfile is current up to and including cacheFlushSeqNum. + status.setStatus("Flushing " + store + ": appending metadata"); + mobFileWriter.appendMetadata(cacheFlushSeqNum, false); + writer.appendMetadata(cacheFlushSeqNum, false); + status.setStatus("Flushing " + store + ": closing flushed file"); + mobFileWriter.close(); + writer.close(); + + // commit the mob file from temp folder to target folder. + mobFileStore.commitFile(mobFileWriter.getPath(), targetPath); + } + } + } finally { + scanner.close(); + } + LOG.info("Flushed, sequenceid=" + cacheFlushSeqNum + ", memsize=" + + StringUtils.humanReadableInt(flushed) + ", hasBloomFilter=" + writer.hasGeneralBloom() + + ", into tmp file " + writer.getPath()); + result.add(writer.getPath()); + return result; + } + } + + private MobFileStore currentMobFileStore() throws IOException { + if (null == mobFileStore) { + synchronized (lock) { + if (null == mobFileStore) { + this.store.getFileSystem().mkdirs(targetPath); + mobFileStore = MobFileStore.create(conf, this.store.getFileSystem(), mobHome, + this.store.getTableName(), this.store.getFamily()); + } + } + } + return mobFileStore; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobRegion.java new file mode 100644 index 0000000..92feac5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobRegion.java @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.regionserver.wal.HLog; + +/** + * A customized HRegion for the BMOB (Binary Medium Object). + * + * The difference between HRegion and HMobRegion is, the HMobRegion creates different Stores + * according to the type of column family. + *
    + *
  • If the column family is a Mob one, the method instantiateHStore creates the HMobStore.
  • + *
  • Otherwise, uses the HRegion.instantiateHStore to create the HStore.
  • + *
+ * + */ +public class HMobRegion extends HRegion { + + @Deprecated + public HMobRegion(final Path tableDir, final HLog log, final FileSystem fs, + final Configuration confParam, final HRegionInfo regionInfo, final HTableDescriptor htd, + final RegionServerServices rsServices) { + super(tableDir, log, fs, confParam, regionInfo, htd, rsServices); + } + + public HMobRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam, + final HTableDescriptor htd, final RegionServerServices rsServices) { + super(fs, log, confParam, htd, rsServices); + } + + @Override + protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException { + if (MobUtils.isMobFamily(family)) { + return new HMobStore(this, family, this.conf); + } + return super.instantiateHStore(family); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java new file mode 100644 index 0000000..4e31fd4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -0,0 +1,130 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.MobZookeeper; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.zookeeper.KeeperException; + +public class HMobStore extends HStore { + + private MobFileStore mobFileStore; + + public HMobStore(final HRegion region, final HColumnDescriptor family, + final Configuration confParam) throws IOException { + super(region, family, confParam); + Path home = MobUtils.getMobHome(region.conf); + mobFileStore = MobFileStore.create(region.conf, region.getFilesystem(), home, + this.getTableName(), this.getFamily()); + } + + @Override + public KeyValueScanner getScanner(Scan scan, NavigableSet targetCols, long readPt) + throws IOException { + lock.readLock().lock(); + try { + KeyValueScanner scanner = null; + if (this.getCoprocessorHost() != null) { + scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); + } + if (scanner == null) { + scanner = scan.isReversed() ? new MobReversedStoreScanner(this, getScanInfo(), scan, + targetCols, readPt, mobFileStore) : new MobStoreScanner(this, getScanInfo(), scan, + targetCols, readPt, mobFileStore); + } + return scanner; + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List compact(CompactionContext compaction) throws IOException { + // If it's major compaction, try to find whether there's a sweeper is running + // If yes, change the major compaction to a minor one. + if (compaction.getRequest().isMajor() && MobUtils.isMobFamily(getFamily())) { + MobZookeeper zk = null; + try { + zk = MobZookeeper.newInstance(this.getHRegion().conf); + } catch (KeeperException e) { + LOG.error("Cannot connect to the zookeeper, ready to perform the minor compaction instead", + e); + // change the major compaction into a minor one + compaction.getRequest().setIsMajor(false, false); + return super.compact(compaction); + } + boolean major = false; + String compactionName = UUID.randomUUID().toString().replaceAll("-", ""); + try { + if (zk.lockStore(getTableName().getNameAsString(), getFamily().getNameAsString())) { + try { + LOG.info("Obtain the lock for the store[" + this + + "], ready to perform the major compaction"); + // check the sweeper znode + boolean hasSweeper = zk.isSweeperZNodeExist(getTableName().getNameAsString(), + getFamily().getNameAsString()); + if (!hasSweeper) { + // if not, add a child region znode to the family znode + major = zk.addMajorCompactionZNode(getTableName().getNameAsString(), getFamily() + .getNameAsString(), compactionName); + } + } catch (Exception e) { + LOG.error("Fail to handle the Zookeeper", e); + } finally { + zk.unlockStore(getTableName().getNameAsString(), getFamily().getNameAsString()); + } + } + try { + if (major) { + return super.compact(compaction); + } else { + LOG.info("Cannot obtain the lock or there's another major compaction for the store[" + + this + "], ready to perform the minor compaction instead"); + // change the major compaction into a minor one + compaction.getRequest().setIsMajor(false, false); + return super.compact(compaction); + } + } finally { + if (major) { + try { + zk.deleteMajorCompactionZNode(getTableName().getNameAsString(), getFamily() + .getNameAsString(), compactionName); + } catch (KeeperException e) { + LOG.error("Fail to delete the compaction znode" + compactionName, e); + } + } + } + } finally { + zk.close(); + } + } else { + return super.compact(compaction); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreWrapper.java new file mode 100644 index 0000000..631d50a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreWrapper.java @@ -0,0 +1,155 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.SortedSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.compactions.SweepCounter; +import org.apache.hadoop.hbase.mob.compactions.SweepReducer.SweepPartitionId; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; +import org.apache.hadoop.mapreduce.Reducer.Context; + +public class MemStoreWrapper { + + private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class); + + private DefaultMemStore memstore; + private long blockingMemStoreSize; + private SweepPartitionId partitionId; + private Context context; + private Configuration conf; + private MobFileStore mobFileStore; + private HTableInterface table; + + public MemStoreWrapper(Context context, HTableInterface table, DefaultMemStore memstore, + MobFileStore mobFileStore) { + this.memstore = memstore; + this.context = context; + this.mobFileStore = mobFileStore; + this.table = table; + this.conf = context.getConfiguration(); + long flushSize = this.conf.getLong(MobConstants.MOB_COMPACTION_MEMSTORE_FLUSH_SIZE, + MobConstants.DEFAULT_MOB_COMPACTION_MEMSTORE_FLUSH_SIZE); + this.blockingMemStoreSize = flushSize + * this.conf.getLong(MobConstants.MOB_COMPACTION_MEMSTORE_BLOCK_MULTIPLIER, 2); + } + + public void setPartitionId(SweepPartitionId partitionId) { + this.partitionId = partitionId; + } + + public void flushMemStoreIfNecessary() throws IOException { + if (memstore.heapSize() >= blockingMemStoreSize) { + flushMemStore(); + } + } + + public void flushMemStore() throws IOException { + MemStoreSnapshot snapshot = memstore.snapshot(); + internalFlushCache(snapshot, Long.MAX_VALUE); + memstore.clearSnapshot(snapshot.getId()); + } + + private void internalFlushCache(final MemStoreSnapshot snapshot, final long logCacheFlushId) + throws IOException { + if (snapshot.getSize() == 0) { + return; + } + // generate the temp files into a fixed path. + String tempPathString = context.getConfiguration().get( + MobConstants.MOB_COMPACTION_JOB_WORKING_DIR); + StoreFile.Writer mobFileWriter = mobFileStore.createWriterInTmp(partitionId.getDate(), + new Path(tempPathString), snapshot.getSize(), mobFileStore.getColumnDescriptor() + .getCompactionCompression(), partitionId.getStartKey()); + + Path targetPath = mobFileStore.getPath(); + + String relativePath = mobFileWriter.getPath().getName(); + LOG.info("Create temp files under " + mobFileWriter.getPath().toString()); + + byte[] referenceValue = Bytes.toBytes(relativePath); + int keyValueCount = 0; + KeyValueScanner scanner = snapshot.getScanner(); + scanner.seek(KeyValueUtil.createFirstOnRow(new byte[] {})); + Cell cell = null; + while (null != (cell = scanner.next())) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + kv.setSequenceId(0); + mobFileWriter.append(kv); + keyValueCount++; + } + scanner.close(); + // Write out the log sequence number that corresponds to this output + // hfile. The hfile is current up to and including logCacheFlushId. + mobFileWriter.appendMetadata(logCacheFlushId, false); + mobFileWriter.close(); + + mobFileStore.commitFile(mobFileWriter.getPath(), targetPath); + context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1); + // write reference + scanner = snapshot.getScanner(); + scanner.seek(KeyValueUtil.createFirstOnRow(new byte[] {})); + cell = null; + while (null != (cell = scanner.next())) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + kv.setSequenceId(0); + List existingTags = kv.getTags(); + if (existingTags.isEmpty()) { + existingTags = new ArrayList(); + } + Tag mobRefTag = new Tag(MobConstants.MOB_REFERENCE_TAG_TYPE, + MobConstants.MOB_REFERENCE_TAG_VALUE); + existingTags.add(mobRefTag); + + KeyValue reference = new KeyValue(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), + kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getBuffer(), + kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put, + referenceValue, 0, referenceValue.length, existingTags); + + Put put = new Put(reference.getRow()); + put.add(reference); + table.put(put); + context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1); + } + if (keyValueCount > 0) { + table.flushCommits(); + } + scanner.close(); + } + + public void addToMemstore(KeyValue kv) { + memstore.add(kv); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobFileStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobFileStore.java new file mode 100644 index 0000000..9734c88 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobFileStore.java @@ -0,0 +1,212 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Date; +import java.util.UUID; +import java.util.zip.CRC32; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFile; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.util.Bytes; + +public class MobFileStore { + + private static final Log LOG = LogFactory.getLog(MobFileStore.class); + private Configuration conf; + private FileSystem fs; + private Path homePath; + private CacheConfig cacheConf; + private String tableName; + private HColumnDescriptor family; + private Path mobFamilyPath; + private static final int MIN_BLOCK_SIZE = 1024; + private final static String TMP = ".tmp"; + + private MobFileStore(Configuration conf, FileSystem fs, Path homedPath, String tableName, + HColumnDescriptor family) { + this.fs = fs; + this.homePath = homedPath; + this.conf = conf; + this.cacheConf = new CacheConfig(conf, family); + this.tableName = tableName; + this.family = family; + Path mobPath = new Path(homePath, new Path(tableName, MobConstants.MOB_REGION_NAME)); + this.mobFamilyPath = new Path(mobPath, family.getNameAsString()); + } + + public static MobFileStore create(Configuration conf, FileSystem fs, Path familyPath, + String tableName, HColumnDescriptor family) throws IOException { + if (null == family) { + LOG.warn("fail to create the MobFileStore because the family is null in table [" + tableName + + "]!"); + return null; + } + String familyName = family.getNameAsString(); + if (!MobUtils.isMobFamily(family)) { + LOG.warn("failed to create the MobFileStore because the family [" + familyName + + "] in table [" + tableName + "] is not a mob one!"); + return null; + } + return new MobFileStore(conf, fs, familyPath, tableName, family); + } + + public static MobFileStore create(Configuration conf, FileSystem fs, Path mobHome, + TableName tableName, HColumnDescriptor family) throws IOException { + return create(conf, fs, mobHome, tableName.getNameAsString(), family); + } + + public HColumnDescriptor getColumnDescriptor() { + return this.family; + } + + public Path getPath() { + return mobFamilyPath; + } + + public Path getTmpDir() { + return new Path(homePath, TMP); + } + + public String getTableName() { + return tableName; + } + + public String getFamilyName() { + return family.getNameAsString(); + } + + public Configuration getConfiguration() { + return this.conf; + } + + public StoreFile.Writer createWriterInTmp(Date date, int maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + if (null == startKey || startKey.length == 0) { + startKey = new byte[1]; + startKey[0] = 0x00; + } + + CRC32 crc = new CRC32(); + crc.update(startKey); + int checksum = (int) crc.getValue(); + return createWriterInTmp(date, maxKeyCount, compression, MobFileName.int2HexString(checksum)); + } + + public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount, + Compression.Algorithm compression, String startKey) throws IOException { + Path path = getTmpDir(); + return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey); + } + + public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, String startKey) throws IOException { + MobFileName mobFileName = MobFileName.create(startKey, maxKeyCount, date, UUID.randomUUID() + .toString().replaceAll("-", "")); + Path mobFilePath = new Path(mobFamilyPath, mobFileName.getFileName()); + final CacheConfig writerCacheConf = cacheConf; + HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) + .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE) + .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withBlockSize(MIN_BLOCK_SIZE) + .withHBaseCheckSum(true).withDataBlockEncoding(DataBlockEncoding.NONE).build(); + + StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, fs) + .withFilePath(mobFilePath).withComparator(KeyValue.COMPARATOR) + .withBloomType(BloomType.NONE).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext) + .build(); + return w; + } + + public void commitFile(final Path sourceFile, Path targetPath) throws IOException { + if (null == sourceFile) { + throw new NullPointerException(); + } + + Path dstPath = new Path(targetPath, sourceFile.getName()); + validateStoreFile(sourceFile); + String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; + LOG.info(msg); + + Path parent = dstPath.getParent(); + if (!fs.exists(parent)) { + fs.mkdirs(parent); + } + + if (!fs.rename(sourceFile, dstPath)) { + LOG.warn("Unable to rename " + sourceFile + " to " + dstPath); + } + } + + private void validateStoreFile(Path path) throws IOException { + StoreFile storeFile = null; + try { + storeFile = new StoreFile(this.fs, path, conf, this.cacheConf, BloomType.NONE); + + storeFile.createReader(); + } catch (IOException e) { + LOG.error("Failed to open lob store file[" + path + "], keeping it in tmp location[" + + getTmpDir() + "].", e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeReader(false); + } + } + } + + public Cell resolve(KeyValue reference, boolean cacheBlocks) throws IOException { + byte[] referenceValue = reference.getValue(); + String fileName = Bytes.toString(referenceValue); + Path targetPath = new Path(mobFamilyPath, fileName); + MobFile file = MobFile.create(fs, targetPath, conf, cacheConf); + Cell result = file.readKeyValue(reference, cacheBlocks); + + if (result == null) { + LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family," + + "qualifier,timestamp,type and tags but with an empty value to return."); + result = new KeyValue(reference.getRowArray(), reference.getRowOffset(), + reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(), + reference.getFamilyLength(), reference.getQualifierArray(), + reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(), + Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY, + reference.getValueOffset(), 0, reference.getTags()); + } + return result; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReversedStoreScanner.java new file mode 100644 index 0000000..704c722 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReversedStoreScanner.java @@ -0,0 +1,61 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobUtils; + +public class MobReversedStoreScanner extends ReversedStoreScanner { + + private boolean cacheMobBlocks = false; + private MobFileStore mobFileStore; + + MobReversedStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet columns, + long readPt, MobFileStore mobFileStore) throws IOException { + super(store, scanInfo, scan, columns, readPt); + cacheMobBlocks = MobUtils.isCacheMobBlocks(scan); + this.mobFileStore = mobFileStore; + } + + @Override + public boolean next(List outResult, int limit) throws IOException { + boolean result = super.next(outResult, limit); + if (!MobUtils.isRawMobScan(scan)) { + // retrieve the mob data + if (outResult.isEmpty()) { + return result; + } + for (int i = 0; i < outResult.size(); i++) { + Cell cell = outResult.get(i); + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + if (MobUtils.isMobReferenceKeyValue(kv)) { + outResult.set(i, mobFileStore.resolve(kv, cacheMobBlocks)); + } + } + } + return result; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java new file mode 100644 index 0000000..a969b2e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java @@ -0,0 +1,93 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobUtils; + +/** + * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into List + * for a single row. + * + */ +public class MobStoreScanner extends StoreScanner { + + private boolean cacheMobBlocks = false; + private MobFileStore mobFileStore; + + protected MobStoreScanner(Store store, boolean cacheBlocks, Scan scan, + final NavigableSet columns, long ttl, int minVersions, long readPt, + MobFileStore mobFileStore) { + super(store, cacheBlocks, scan, columns, ttl, minVersions, readPt); + cacheMobBlocks = MobUtils.isCacheMobBlocks(scan); + this.mobFileStore = mobFileStore; + } + + public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + List scanners, ScanType scanType, long smallestReadPoint, + long earliestPutTs, MobFileStore mobFileStore) throws IOException { + super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs); + cacheMobBlocks = MobUtils.isCacheMobBlocks(scan); + this.mobFileStore = mobFileStore; + } + + public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + List scanners, long smallestReadPoint, long earliestPutTs, + byte[] dropDeletesFromRow, byte[] dropDeletesToRow, MobFileStore mobFileStore) + throws IOException { + super(store, scanInfo, scan, scanners, smallestReadPoint, earliestPutTs, dropDeletesFromRow, + dropDeletesToRow); + cacheMobBlocks = MobUtils.isCacheMobBlocks(scan); + this.mobFileStore = mobFileStore; + } + + public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + final NavigableSet columns, long readPt, MobFileStore mobFileStore) + throws IOException { + super(store, scanInfo, scan, columns, readPt); + cacheMobBlocks = MobUtils.isCacheMobBlocks(scan); + this.mobFileStore = mobFileStore; + } + + @Override + public boolean next(List outResult, int limit) throws IOException { + boolean result = super.next(outResult, limit); + if (!MobUtils.isRawMobScan(scan)) { + // retrieve the mob data + if (outResult.isEmpty()) { + return result; + } + for (int i = 0; i < outResult.size(); i++) { + Cell cell = outResult.get(i); + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + if (MobUtils.isMobReferenceKeyValue(kv)) { + outResult.set(i, mobFileStore.resolve(kv, cacheMobBlocks)); + } + } + } + return result; + } +}