diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
deleted file mode 100644
index 5f13502..0000000
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mob;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagType;
-import org.apache.hadoop.hbase.regionserver.HMobStore;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
-import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Compact passed set of files in the mob-enabled column family.
- */
-@InterfaceAudience.Private
-public class DefaultMobCompactor extends DefaultCompactor {
-
- private static final Log LOG = LogFactory.getLog(DefaultMobCompactor.class);
- private long mobSizeThreshold;
- private HMobStore mobStore;
- public DefaultMobCompactor(Configuration conf, Store store) {
- super(conf, store);
- // The mob cells reside in the mob-enabled column family which is held by HMobStore.
- // During the compaction, the compactor reads the cells from the mob files and
- // probably creates new mob files. All of these operations are included in HMobStore,
- // so we need to cast the Store to HMobStore.
- if (!(store instanceof HMobStore)) {
- throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
- }
- mobStore = (HMobStore) store;
- mobSizeThreshold = MobUtils.getMobThreshold(store.getFamily());
- }
-
- /**
- * Creates a writer for a new file in a temporary directory.
- * @param fd The file details.
- * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
- * @return Writer for a new StoreFile in the tmp dir.
- * @throws IOException
- */
- @Override
- protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
- // make this writer with tags always because of possible new cells with tags.
- StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
- true, fd.maxMVCCReadpoint >= smallestReadPoint, true);
- return writer;
- }
-
- /**
- * Performs compaction on a column family with the mob flag enabled.
- * This is for when the mob threshold size has changed or if the mob
- * column family mode has been toggled via an alter table statement.
- * Compacts the files by the following rules.
- * 1. If the cell has a mob reference tag, the cell's value is the path of the mob file.
- *
- *
- * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
- * directly copy the (with mob tag) cell into the new store file.
- *
- *
- * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
- * the new store file.
- *
- *
- * 2. If the cell doesn't have a reference tag.
- *
- *
- * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
- * write this cell to a mob file, and write the path of this mob file to the store file.
- *
- *
- * Otherwise, directly write this cell into the store file.
- *
- *
- * @param fd File details
- * @param scanner Where to read from.
- * @param writer Where to write to.
- * @param smallestReadPoint Smallest read point.
- * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
- * @param major Is a major compaction.
- * @return Whether compaction ended; false if it was interrupted for any reason.
- */
- @Override
- protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
- long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException {
- int bytesWritten = 0;
- // Since scanner.next() can return 'false' but still be delivering data,
- // we have to use a do/while loop.
- List cells = new ArrayList();
- // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
- int closeCheckInterval = HStore.getCloseCheckInterval();
- boolean hasMore;
- Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
- byte[] fileName = null;
- StoreFile.Writer mobFileWriter = null;
- long mobCells = 0;
- Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
- .getName());
- try {
- try {
- // If the mob file writer could not be created, directly write the cell to the store file.
- mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
- store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
- fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
- } catch (IOException e) {
- LOG.error(
- "Fail to create mob writer, "
- + "we will continue the compaction by writing MOB cells directly in store files",
- e);
- }
- do {
- hasMore = scanner.next(cells, compactionKVMax);
- // output to writer:
- for (Cell c : cells) {
- // TODO remove the KeyValueUtil.ensureKeyValue before merging back to trunk.
- KeyValue kv = KeyValueUtil.ensureKeyValue(c);
- resetSeqId(smallestReadPoint, cleanSeqId, kv);
- if (mobFileWriter == null || kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
- // If the mob file writer is null or the kv type is not put, directly write the cell
- // to the store file.
- writer.append(kv);
- } else if (MobUtils.isMobReferenceCell(kv)) {
- if (MobUtils.isValidMobRefCellValue(kv)) {
- int size = MobUtils.getMobValueLength(kv);
- if (size > mobSizeThreshold) {
- // If the value size is larger than the threshold, it's regarded as a mob. Since
- // its value is already in the mob file, directly write this cell to the store file
- writer.append(kv);
- } else {
- // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
- // the mob cell from the mob file, and write it back to the store file.
- Cell cell = mobStore.resolve(kv, false);
- if (cell.getValueLength() != 0) {
- // put the mob data back to the store file
- KeyValue mobKv = KeyValueUtil.ensureKeyValue(cell);
- mobKv.setSequenceId(kv.getSequenceId());
- writer.append(mobKv);
- } else {
- // If the value of a file is empty, there might be issues when retrieving,
- // directly write the cell to the store file, and leave it to be handled by the
- // next compaction.
- writer.append(kv);
- }
- }
- } else {
- LOG.warn("The value format of the KeyValue " + kv
- + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
- writer.append(kv);
- }
- } else if (kv.getValueLength() <= mobSizeThreshold) {
- // If the value size of a cell is not larger than the threshold, directly write it to
- // the store file.
- writer.append(kv);
- } else {
- // If the value size of a cell is larger than the threshold, it's regarded as a mob,
- // write this cell to a mob file, and write the path to the store file.
- mobCells++;
- // append the original keyValue in the mob file.
- mobFileWriter.append(kv);
- KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
- // write the cell whose value is the path of a mob file to the store file.
- writer.append(reference);
- }
- ++progress.currentCompactedKVs;
-
- // check periodically to see if a system stop is requested
- if (closeCheckInterval > 0) {
- bytesWritten += kv.getLength();
- if (bytesWritten > closeCheckInterval) {
- bytesWritten = 0;
- if (!store.areWritesEnabled()) {
- progress.cancel();
- return false;
- }
- }
- }
- }
- cells.clear();
- } while (hasMore);
- } finally {
- if (mobFileWriter != null) {
- appendMetadataAndCloseWriter(mobFileWriter, fd, major);
- }
- }
- if(mobFileWriter!=null) {
- if (mobCells > 0) {
- // If the mob file is not empty, commit it.
- mobStore.commitFile(mobFileWriter.getPath(), path);
- } else {
- try {
- // If the mob file is empty, delete it instead of committing.
- store.getFileSystem().delete(mobFileWriter.getPath(), true);
- } catch (IOException e) {
- LOG.error("Fail to delete the temp mob file", e);
- }
- }
- }
- progress.complete();
- return true;
- }
-}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
index 2d5f1ad..d5e6f2e 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
@@ -37,12 +37,4 @@ public class MobStoreEngine extends DefaultStoreEngine {
// specific compactor and policy when that is implemented.
storeFlusher = new DefaultMobStoreFlusher(conf, store);
}
-
- /**
- * Creates the DefaultMobCompactor.
- */
- @Override
- protected void createCompactor(Configuration conf, Store store) throws IOException {
- compactor = new DefaultMobCompactor(conf, store);
- }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index e52d336..c106e0b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -260,44 +260,4 @@ public class MobUtils {
reference.setSequenceId(kv.getSequenceId());
return reference;
}
-
- /**
- * Indicates whether the current mob ref cell has a valid value.
- * A mob ref cell has a mob reference tag.
- * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
- * The real mob value length takes 4 bytes.
- * The remaining part is the mob file name.
- * @param cell The mob ref cell.
- * @return True if the cell has a valid value.
- */
- public static boolean isValidMobRefCellValue(Cell cell) {
- return cell.getValueLength() > Bytes.SIZEOF_INT;
- }
-
- /**
- * Gets the mob value length from the mob ref cell.
- * A mob ref cell has a mob reference tag.
- * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
- * The real mob value length takes 4 bytes.
- * The remaining part is the mob file name.
- * @param cell The mob ref cell.
- * @return The real mob value length.
- */
- public static int getMobValueLength(Cell cell) {
- return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT);
- }
-
- /**
- * Gets the mob file name from the mob ref cell.
- * A mob ref cell has a mob reference tag.
- * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
- * The real mob value length takes 4 bytes.
- * The remaining part is the mob file name.
- * @param cell The mob ref cell.
- * @return The mob file name.
- */
- public static String getMobFileName(Cell cell) {
- return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
- cell.getValueLength() - Bytes.SIZEOF_INT);
- }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 9c6f34e..17d3802 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -219,8 +219,9 @@ public class HMobStore extends HStore {
*/
public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
Cell result = null;
- if (MobUtils.isValidMobRefCellValue(reference)) {
- String fileName = MobUtils.getMobFileName(reference);
+ if (reference.getValueLength() > Bytes.SIZEOF_INT) {
+ String fileName = Bytes.toString(reference.getValueArray(), reference.getValueOffset()
+ + Bytes.SIZEOF_INT, reference.getValueLength() - Bytes.SIZEOF_INT);
Path targetPath = new Path(mobFamilyPath, fileName);
MobFile file = null;
try {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 13967c2..2b053a6 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -43,9 +43,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.StringUtils;
/**
@@ -59,7 +57,7 @@ public abstract class Compactor {
protected Configuration conf;
protected Store store;
- protected int compactionKVMax;
+ private int compactionKVMax;
protected Compression.Algorithm compactionCompression;
/** specify how many days to keep MVCC values during major compaction **/
@@ -94,8 +92,6 @@ public abstract class Compactor {
public long maxKeyCount = 0;
/** Earliest put timestamp if major compaction */
public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
- /** Latest put timestamp */
- public long latestPutTs = HConstants.LATEST_TIMESTAMP;
/** The last key in the files we're compacting. */
public long maxSeqId = 0;
/** Latest memstore read point found in any of the involved files */
@@ -162,14 +158,6 @@ public abstract class Compactor {
fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
}
}
- tmp = fileInfo.get(StoreFile.TIMERANGE_KEY);
- TimeRangeTracker trt = new TimeRangeTracker();
- if (tmp == null) {
- fd.latestPutTs = HConstants.LATEST_TIMESTAMP;
- } else {
- Writables.copyWritable(tmp, trt);
- fd.latestPutTs = trt.getMaximumTimestamp();
- }
if (LOG.isDebugEnabled()) {
LOG.debug("Compacting " + file +
", keycount=" + keyCount +
@@ -228,16 +216,14 @@ public abstract class Compactor {
/**
* Performs the compaction.
- * @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
- * @param major Is a major compaction.
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
- protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
- long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException {
+ protected boolean performCompaction(InternalScanner scanner,
+ CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
@@ -255,7 +241,9 @@ public abstract class Compactor {
// output to writer:
for (Cell c : kvs) {
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
- resetSeqId(smallestReadPoint, cleanSeqId, kv);
+ if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
+ CellUtil.setSequenceId(kv, 0);
+ }
writer.append(kv);
++progress.currentCompactedKVs;
progress.totalCompactedSize += kv.getLength();
@@ -321,29 +309,4 @@ public abstract class Compactor {
return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
}
-
- /**
- * Resets the sequence id.
- * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
- * @param cleanSeqId Should clean the sequence id.
- * @param kv The current KeyValue.
- */
- protected void resetSeqId(long smallestReadPoint, boolean cleanSeqId, KeyValue kv) {
- if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
- kv.setSequenceId(0);
- }
- }
-
- /**
- * Appends the metadata and closes the writer.
- * @param writer The current store writer.
- * @param fd The file details.
- * @param isMajor Is a major compaction.
- * @throws IOException
- */
- protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
- boolean isMajor) throws IOException {
- writer.appendMetadata(fd.maxSeqId, isMajor);
- writer.close();
- }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 8056dd0..d5b2b63 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -76,9 +76,9 @@ public class DefaultCompactor extends Compactor {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
- writer = createTmpWriter(fd, smallestReadPoint);
- boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
- request.isAllFiles());
+ writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
+ fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
+ boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
if (!finished) {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
@@ -94,7 +94,8 @@ public class DefaultCompactor extends Compactor {
}
} finally {
if (writer != null) {
- appendMetadataAndCloseWriter(writer, fd, request.isAllFiles());
+ writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
+ writer.close();
newFiles.add(writer.getPath());
}
}
@@ -102,20 +103,6 @@ public class DefaultCompactor extends Compactor {
}
/**
- * Creates a writer for a new file in a temporary directory.
- * @param fd The file details.
- * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
- * @return Writer for a new StoreFile in the tmp dir.
- * @throws IOException
- */
- protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint)
- throws IOException {
- StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
- true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
- return writer;
- }
-
- /**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
* {@link #compact(CompactionRequest)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 3109015..487ff46 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -127,8 +127,7 @@ public class StripeCompactor extends Compactor {
// It is ok here if storeScanner is null.
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
- finished = performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId,
- request.isMajor());
+ finished = performCompaction(scanner, mw, smallestReadPoint, cleanSeqId);
if (!finished) {
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
deleted file mode 100644
index f8d6ce4..0000000
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.mob.MobConstants;
-import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-/**
- * Test mob compaction
- */
-@Category(MediumTests.class)
-public class TestMobCompaction {
- @Rule
- public TestName name = new TestName();
- static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName());
- private HBaseTestingUtility UTIL = null;
- private Configuration conf = null;
-
- private HRegion region = null;
- private HTableDescriptor htd = null;
- private HColumnDescriptor hcd = null;
- private long mobCellThreshold = 1000;
-
- private FileSystem fs;
-
- private static final byte[] COLUMN_FAMILY = fam1;
- private final byte[] STARTROW = Bytes.toBytes(START_KEY);
- private int compactionThreshold;
-
- private void init(long mobThreshold) throws Exception {
- this.mobCellThreshold = mobThreshold;
-
- UTIL = HBaseTestingUtility.createLocalHTU();
-
- conf = UTIL.getConfiguration();
- compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
-
- htd = UTIL.createTableDescriptor(name.getMethodName());
- hcd = new HColumnDescriptor(COLUMN_FAMILY);
- hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
- hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(mobThreshold));
- hcd.setMaxVersions(1);
- htd.addFamily(hcd);
-
- region = UTIL.createLocalHRegion(htd, null, null);
- fs = FileSystem.get(conf);
- }
-
- @After
- public void tearDown() throws Exception {
- region.close();
- fs.delete(UTIL.getDataTestDir(), true);
- }
-
- /**
- * During compaction, cells smaller than the threshold won't be affected.
- */
- @Test
- public void testSmallerValue() throws Exception {
- init(500);
- byte[] dummyData = makeDummyData(300); // smaller than mob threshold
- HRegionIncommon loader = new HRegionIncommon(region);
- // one hfile per row
- for (int i = 0; i < compactionThreshold; i++) {
- Put p = createPut(i, dummyData);
- loader.put(p);
- loader.flushcache();
- }
- assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
- assertEquals("Before compaction: mob file count", 0, countMobFiles());
- assertEquals("Before compaction: rows", compactionThreshold, countRows());
- assertEquals("Before compaction: mob rows", 0, countMobRows());
-
- region.compactStores();
-
- assertEquals("After compaction: store files", 1, countStoreFiles());
- assertEquals("After compaction: mob file count", 0, countMobFiles());
- assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
- assertEquals("After compaction: rows", compactionThreshold, countRows());
- assertEquals("After compaction: mob rows", 0, countMobRows());
- }
-
- /**
- * During compaction, the mob threshold size is changed.
- */
- @Test
- public void testLargerValue() throws Exception {
- init(200);
- byte[] dummyData = makeDummyData(300); // larger than mob threshold
- HRegionIncommon loader = new HRegionIncommon(region);
- for (int i = 0; i < compactionThreshold; i++) {
- Put p = createPut(i, dummyData);
- loader.put(p);
- loader.flushcache();
- }
- assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
- assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
- assertEquals("Before compaction: rows", compactionThreshold, countRows());
- assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
- // Change the threshold larger than the data size
- region.getTableDesc().getFamily(COLUMN_FAMILY).setValue(
- MobConstants.MOB_THRESHOLD, Bytes.toBytes(500L));
- region.initialize();
- region.compactStores(true);
- assertEquals("After compaction: store files", 1, countStoreFiles());
- assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
- assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
- assertEquals("After compaction: rows", compactionThreshold, countRows());
- assertEquals("After compaction: mob rows", 0, countMobRows());
- }
-
- /**
- * This test will first generate store files, then bulk load them and trigger the compaction. When
- * compaction, the cell value will be larger than the threshold.
- */
- @Test
- public void testMobCompactionWithBulkload() throws Exception {
- // The following will produce store files of 600.
- init(300);
- byte[] dummyData = makeDummyData(600);
-
- Path hbaseRootDir = FSUtils.getRootDir(conf);
- Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
- List> hfiles = new ArrayList>(1);
- for (int i = 0; i < compactionThreshold; i++) {
- Path hpath = new Path(basedir, "hfile" + i);
- hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
- createHFile(hpath, i, dummyData);
- }
-
- // The following will bulk load the above generated store files and compact, with 600(fileSize)
- // > 300(threshold)
- boolean result = region.bulkLoadHFiles(hfiles, true);
- assertTrue("Bulkload result:", result);
- assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
- assertEquals("Before compaction: mob file count", 0, countMobFiles());
- assertEquals("Before compaction: rows", compactionThreshold, countRows());
- assertEquals("Before compaction: mob rows", 0, countMobRows());
- assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles());
-
- region.compactStores();
-
- assertEquals("After compaction: store files", 1, countStoreFiles());
- assertEquals("After compaction: mob file count:", 1, countMobFiles());
- assertEquals("After compaction: rows", compactionThreshold, countRows());
- assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
- assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
- }
-
- private int countStoreFiles() throws IOException {
- Store store = region.getStore(COLUMN_FAMILY);
- return store.getStorefilesCount();
- }
-
- private int countMobFiles() throws IOException {
- Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
- hcd.getNameAsString());
- if (fs.exists(mobDirPath)) {
- FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
- return files.length;
- }
- return 0;
- }
-
- private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
- Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
- p.setDurability(Durability.SKIP_WAL);
- p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
- return p;
- }
-
- /**
- * Create an HFile with the given number of bytes
- */
- private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
- HFileContext meta = new HFileContextBuilder().build();
- HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
- .withFileContext(meta).create();
- long now = System.currentTimeMillis();
- try {
- KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
- Bytes.toBytes("colX"), now, dummyData);
- writer.append(kv);
- } finally {
- writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
- writer.close();
- }
- }
-
- private int countMobRows() throws IOException {
- Scan scan = new Scan();
- // Do not retrieve the mob data when scanning
- scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
- InternalScanner scanner = region.getScanner(scan);
-
- int scannedCount = 0;
- List results = new ArrayList();
- boolean hasMore = scanner.next(results);
- while (hasMore) {
- for (Cell c : results) {
- if (MobUtils.isMobReferenceCell(c)) {
- scannedCount++;
- }
- }
- hasMore = scanner.next(results);
- }
- scanner.close();
-
- return scannedCount;
- }
-
- private int countRows() throws IOException {
- Scan scan = new Scan();
- // Do not retrieve the mob data when scanning
- InternalScanner scanner = region.getScanner(scan);
-
- int scannedCount = 0;
- List results = new ArrayList();
- boolean hasMore = scanner.next(results);
- while (hasMore) {
- scannedCount += results.size();
- hasMore = scanner.next(results);
- }
- scanner.close();
-
- return scannedCount;
- }
-
- private byte[] makeDummyData(int size) {
- byte[] dummyData = new byte[size];
- new Random().nextBytes(dummyData);
- return dummyData;
- }
-
- private int countReferencedMobFiles() throws IOException {
- Scan scan = new Scan();
- // Do not retrieve the mob data when scanning
- scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
- InternalScanner scanner = region.getScanner(scan);
-
- List kvs = new ArrayList();
- boolean hasMore = true;
- String fileName;
- Set files = new HashSet();
- do {
- kvs.clear();
- hasMore = scanner.next(kvs);
- for (Cell c : kvs) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(c);
- if (!MobUtils.isMobReferenceCell(kv)) {
- continue;
- }
- if (!MobUtils.isValidMobRefCellValue(kv)) {
- continue;
- }
- int size = MobUtils.getMobValueLength(kv);
- if (size <= mobCellThreshold) {
- continue;
- }
- fileName = MobUtils.getMobFileName(kv);
- if (fileName.isEmpty()) {
- continue;
- }
- files.add(fileName);
- Path familyPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(),
- hcd.getNameAsString());
- assertTrue(fs.exists(new Path(familyPath, fileName)));
- }
- } while (hasMore);
-
- scanner.close();
-
- return files.size();
- }
-}
diff --git src/main/docbkx/hbase_mob.xml src/main/docbkx/hbase_mob.xml
new file mode 100644
index 0000000..54672ad
--- /dev/null
+++ src/main/docbkx/hbase_mob.xml
@@ -0,0 +1,227 @@
+
+
+
+
+ HBase Medium Object (MOB) Storage
+ Data comes in many sizes, and saving all of your data in HBase, including binary data such
+ as images and documents, is ideal. HBase can technically handle binary objects with cells
+ that are up to 10MB in size. However, HBase's normal read and write paths are optimized for
+ values smaller than 100KB in size. When HBase deals with large numbers of values up to 10MB,
+ referred to here as medium objects, or MOBs,
+ performance is degraded due to write amplification caused by splits and compactions. HBase
+ 2.0+ adds support for better managing large numbers of MOBs while maintaining performance,
+ consistency, and low operational overhead. MOB support is provided by the work done in HBASE-11339.
+
+ To take advantage of MOB, you need to use HFile version 3. Optionally, configure the MOB
+ file reader's cache settings for each RegionServer (see ), then configure specific columns to hold MOB data. Currently, you also need to configure
+ a periodic re-optimization of MOB data layout, but this requirement is expected to be
+ removed at a later date.
+ Client code does not need to change to take advantage of HBase MOB support. The feature is
+ transparent to the client.
+
+
+ Limitations of MOB Functionality
+ Work on HBase MOB is ongoing. Work is needed for support for snapshots (HBASE-11645),
+ metrics (HBASE-11683), and a native compaction mechanism (HBASE-11861).
+
+
+
+ Configure Columns for MOB
+ You can configure columns to support MOB during table creation or alteration, either
+ in HBase Shell or via the Java API. The two relevant properties are the boolean
+ IS_MOB and the MOB_THRESHOLD, which is the number of bytes
+ at which an object is considered to be a MOB. Only IS_MOB is required. If
+ you do not specify the MOB_THRESHOLD, the default threshold value of 100 kb
+ is used.
+
+ Configure a Column for MOB Using HBase Shell
+
+hbase> create 't1', 'f1', {IS_MOB => true, MOB_THRESHOLD => 102400}
+hbase> alter ‘t1′, {NAME => ‘f1', IS_MOB => true, MOB_THRESHOLD => 102400}
+
+
+
+ Configure a Column for MOB Using the API
+
+...
+HColumnDescriptor hcd = new HColumnDescriptor(“f”);
+hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+...
+HColumnDescriptor hcd;
+hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(102400L);
+...
+
+
+
+
+
+ Testing MOB
+ The utility org.apache.hadoop.hbase.IntegrationTestIngestMOB is
+ provided to assist with testing the MOB feature. The utility is run as follows:
+ $ sudo -u hbase hbase org.apache.hadoop.hbase.IntegrationTestIngestMOB \
+ -threshold 100*1024 \
+ -minMobDataSize 100*1024*4/5 \
+ -maxMobDataSize 100*1024*50
+
+
+ threshold is the threshold at which cells are considered to
+ be MOBs. The default is 100 kb.
+
+
+ minMobDataSize is the minimum value for the size of MOB
+ data. The default is 80 kb.
+
+
+ maxMobDataSize is the maximum value for the size of MOB
+ data. The default is 5 MB.
+
+
+
+
+
+ Set Up MOB Re-Optimization Tasks
+ The MOB feature introduces a new read and write path to HBase and currently requires
+ an external tool, the sweeper tool, for housekeeping and
+ optimization. The sweeper tool uses MapReduce to coalesce small MOB
+ files or MOB files with many deletions or updates
+
+
+ Configure and Run the sweeper Tool
+
+ First, configure the sweeper's properties in the
+ RegionServer's hbase-site.xml file. Adjust these properties
+ to suit your environment.
+
+ hbase.mob.sweep.tool.compaction.ratio
+ 0.5f
+
+ If there're too many cells deleted in a mob file, it's regarded
+ as an invalid file and needs to be merged.
+ If existingCellsSize/mobFileSize is less than ratio, it's regarded
+ as an invalid file. The default value is 0.5f.
+
+
+
+ hbase.mob.sweep.tool.compaction.mergeable.size
+ 134217728
+
+ If the size of a mob file is less than this value, it's regarded as a small
+ file and needs to be merged. The default value is 128MB.
+
+
+
+ hbase.mob.sweep.tool.compaction.memstore.flush.size
+ 134217728
+
+ The flush size for the memstore used by sweep job. Each sweep reducer owns such a memstore.
+ The default value is 128MB.
+
+
+
+ hbase.mob.cleaner.interval
+ 86400000
+
+ The period that ExpiredMobFileCleaner runs. The unit is millisecond.
+ The default value is one day.
+
+]]>
+
+
+
+ Next, add the HBase install directory, $HBASE_HOME/*, and HBase
+ library directory to yarn-site.xml Adjust this example to
+ suit your environment.
+
+ Classpath for typical applications.
+ yarn.application.classpath
+
+ $HADOOP_CONF_DIR
+ $HADOOP_COMMON_HOME/*,$HADOOP_COMMON_HOME/lib/*
+ $HADOOP_HDFS_HOME/*,$HADOOP_HDFS_HOME/lib/*,
+ $HADOOP_MAPRED_HOME/*,$HADOOP_MAPRED_HOME/lib/*
+ $HADOOP_YARN_HOME/*,$HADOOP_YARN_HOME/lib/*
+ $HBASE_HOME/*, $HBASE_HOME/lib/*
+
+]]>
+
+
+
+ Finally, run the sweeper tool for each column which is
+ configured for MOB..
+ $ org.apache.hadoop.hbase.mob.compactions.Sweeper \
+ tableName \
+ familyName
+
+
+
+
+ Configure the MOB Cache
+ Because there can be a large number of MOB files at any time, as compared to the
+ number of HFiles, MOB files are not always kept open. The MOB file reader cache is a LRU
+ cache which keeps the most recently used MOB files open. To configure the MOB file
+ reader's cache on each RegionServer, add the following properties to the RegionServer's
+ hbase-site.xml, customize the configuration to suit your
+ environment, and restart or rolling restart the RegionServer.
+
+ hbase.mob.file.cache.size
+ 1000
+
+ Number of opened file handlers to cache.
+ A larger value will benefit reads by provinding more file handlers per mob
+ file cache and would reduce frequent file opening and closing.
+ However, if this is set too high, this could lead to a "too many opened file handers"
+ The default value is 1000.
+
+
+
+ hbase.mob.cache.evict.period
+ 3600
+
+ The amount of time in seconds before the mob cache evicts cached mob files.
+ The default value is 3600 seconds.
+
+
+
+ hbase.mob.cache.evict.remain.ratio
+ 0.5f
+
+ The ratio (between 0.0 and 1.0) of files that remains cached after an eviction
+ is triggered when the number of cached mob files exceeds the hbase.mob.file.cache.size.
+ The default value is 0.5f.
+
+
+]]>
+
+
+
\ No newline at end of file