diff --git src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
new file mode 100644
index 0000000..adca0bd
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
@@ -0,0 +1,250 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Compact passed set of files.
+ */
+public class CompactionTool implements Tool {
+ private static final Log LOG = LogFactory.getLog(CompactionTool.class);
+ private Configuration conf;
+
+ CompactionTool() {
+ super();
+ }
+
+ CompactionTool(final Configuration conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Do a minor/major compaction on an explicit set of storefiles in a Store.
+ *
+ * @param hcd Descriptor for family we're compacting for
+ * @param filesToCompact which files to compact from descriptor
+ * @param majorCompaction true to major compact (prune all deletes,
+ * max versions, etc)
+ * @param maxId Current readers' maximum sequence id.
+ * @return Product of compaction or null if all cells expired or deleted and
+ * nothing made it through the compaction.
+ * @throws IOException
+ */
+ StoreFile.Writer compact(final HColumnDescriptor hcd,
+ final Collection filesToCompact, final boolean majorCompaction,
+ final long maxId)
+ throws IOException {
+ int compactionKVMax = conf.getInt("hbase.hstore.compaction.kv.max", 10);
+ Compression.Algorithm compression = hcd.getCompression();
+ // Avoid overriding compression setting for major compactions if user has
+ // not specified it separately
+ Compression.Algorithm compactionCompression =
+ (hcd.getCompactionCompression() != Compression.Algorithm.NONE)?
+ hcd.getCompactionCompression(): compression;
+ // Calculate maximum key count after compaction (for blooms)
+ int maxKeyCount = 0;
+ long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+ for (StoreFile file: filesToCompact) {
+ StoreFile.Reader r = file.getReader();
+ if (r == null) {
+ LOG.warn("Failed get a Reader on " + file.getPath());
+ continue;
+ }
+ // NOTE: getFilterEntries could cause under-sized blooms if the user
+ // switches bloom type (e.g. from ROW to ROWCOL)
+ long keyCount = (r.getBloomFilterType() == hcd.getBloomFilterType())?
+ r.getFilterEntries() : r.getEntries();
+ maxKeyCount += keyCount;
+
+ // For major compactions calculate the earliest put timestamp of all
+ // involved storefiles. This is used to remove family delete marker during
+ // the compaction.
+ if (majorCompaction) {
+ byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+ if (tmp == null) {
+ // There is a file with no information, must be an old one assume we
+ // have very old puts
+ earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+ } else {
+ earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compacting " + file +
+ ", keycount=" + keyCount +
+ ", bloomtype=" + r.getBloomFilterType().toString() +
+ ", size=" + StringUtils.humanReadableInt(r.length()) +
+ ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
+ ", earliestPutTs=" + earliestPutTs);
+ }
+ }
+
+ // keep track of compaction progress
+ CompactionProgress progress = new CompactionProgress(maxKeyCount);
+
+ // Make the instantiation lazy in case compaction produces no product; i.e.
+ // where all source cells are expired or deleted.
+ StoreFile.Writer writer = null;
+ // Find the smallest read point across all the Scanners.
+ long smallestReadPoint = getSmallestReadPoint();
+ MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+ try {
+ InternalScanner scanner = null;
+ try {
+ scanner = getScanner(hcd, majorCompaction, filesToCompact,
+ smallestReadPoint, earliestPutTs);
+
+ int bytesWritten = 0;
+ // since scanner.next() can return 'false' but still be delivering data,
+ // we have to use a do/while loop.
+ ArrayList kvs = new ArrayList();
+ // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+ boolean hasMore;
+ do {
+ hasMore = scanner.next(kvs, compactionKVMax);
+ if (writer == null && !kvs.isEmpty()) {
+ writer = createWriterInTmp(maxKeyCount, compactionCompression, true);
+ }
+ if (writer != null) {
+ // output to writer:
+ for (KeyValue kv : kvs) {
+ if (kv.getMemstoreTS() <= smallestReadPoint) {
+ kv.setMemstoreTS(0);
+ }
+ writer.append(kv);
+ // update progress per key
+ ++progress.currentCompactedKVs;
+
+ // check periodically to see if a system stop is requested
+ if (Store.closeCheckInterval > 0) {
+ bytesWritten += kv.getLength();
+ if (bytesWritten > Store.closeCheckInterval) {
+ abort(writer);
+ }
+ }
+ }
+ }
+ kvs.clear();
+ } while (hasMore);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ } finally {
+ if (writer != null) {
+ writer.appendMetadata(maxId, majorCompaction);
+ writer.close();
+ }
+ }
+ return writer;
+ }
+
+ /**
+ * @param hcd
+ * @param majorCompaction
+ * @param filesToCompact
+ * @param smallestReadPoint
+ * @param earliestPutTs
+ * @return Scanner instance to use compacting
+ */
+ InternalScanner getScanner(final HColumnDescriptor hcd,
+ final boolean majorCompaction, final Collection filesToCompact,
+ final long smallestReadPoint, final long earliestPutTs) {
+ // For each file, obtain a scanner:
+ List scanners =
+ StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true);
+ Scan scan = new Scan();
+ scan.setMaxVersions(hcd.getMaxVersions());
+ /* include deletes, unless we are doing a major compaction */
+ InternalScanner scanner = new StoreScanner(this, scan, scanners,
+ majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
+ smallestReadPoint, earliestPutTs);
+ scanner = precompact(scanner);
+ }
+
+ long getSmallestReadPoint() {
+ return -1; // TODO region.getSmallestReadPoint();
+ }
+ StoreFile.Writer createWriterInTmp(int maxKeyCount,
+ Compression.Algorithm compression, boolean isCompaction)
+ throws IOException {
+ // TODO
+ return null;
+ }
+
+ /**
+ * Called after we made the compaction scanner but before we actually start
+ * in on the compaction.
+ * @param scanner
+ * @return Scanner to proceed compacting with.
+ */
+ InternalScanner precompact(final InternalScanner scanner) {
+ return scanner;
+ // TOOD
+ /*
+ * if (region.getCoprocessorHost() != null) {
+ InternalScanner cpScanner = region.getCoprocessorHost().preCompact(
+ this, scanner);
+ // NULL scanner returned from coprocessor hooks means skip normal processing
+ if (cpScanner == null) {
+ return null;
+ }
+ scanner = cpScanner;
+ }
+ */
+ }
+
+ void abort(final StoreFile.Writer writer) throws IOException {
+ writer.close();
+ // TODO
+ /**if (!this.region.areWritesEnabled()) {
+ writer.close();
+ fs.delete(writer.getPath(), false);
+ throw new InterruptedIOException(
+ "Aborting compaction of store " + this +
+ " in region " + this.region +
+ " because user requested stop.");
+ }*/
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public void setConf(Configuration c) {
+ this.conf = c;
+ }
+
+ @Override
+ public int run(final String[] args) throws Exception {
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(HBaseConfiguration.create(),
+ new CompactionTool(), args));
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 859d3c2..9e623e7 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5317,11 +5317,11 @@ public class HRegion implements HeapSize { // , Writable{
final HLog log = new HLog(fs, logdir, oldLogDir, c);
try {
processTable(fs, tableDir, log, c, majorCompact);
- } finally {
+ } finally {
log.close();
// TODO: is this still right?
BlockCache bc = new CacheConfig(c).getBlockCache();
if (bc != null) bc.shutdown();
- }
+ }
}
}
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 919d814..135db1d 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.NavigableSet;
@@ -31,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
@@ -45,7 +47,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
class StoreScanner extends NonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(StoreScanner.class);
- private Store store;
private ScanQueryMatcher matcher;
private KeyValueHeap heap;
private boolean cacheBlocks;
@@ -195,6 +196,10 @@ class StoreScanner extends NonLazyKeyValueScanner
heap = new KeyValueHeap(scanners, scanInfo.getComparator());
}
+ KVComparator getKVComparator() {
+ return null;
+ }
+
/**
* Method used internally to initialize metric names throughout the
* constructors.
@@ -218,8 +223,7 @@ class StoreScanner extends NonLazyKeyValueScanner
*/
private List getScannersNoCompaction() throws IOException {
final boolean isCompaction = false;
- return selectScannersFrom(store.getScanners(cacheBlocks, isGet,
- isCompaction, matcher));
+ return selectScannersFrom(store.getScanners(cacheBlocks, isGet, isCompaction, matcher));
}
/**
@@ -294,7 +298,7 @@ class StoreScanner extends NonLazyKeyValueScanner
List scanners = getScannersNoCompaction();
- heap = new KeyValueHeap(scanners, store.comparator);
+ heap = new KeyValueHeap(scanners, getKVComparator());
}
return this.heap.seek(key);
@@ -349,15 +353,14 @@ class StoreScanner extends NonLazyKeyValueScanner
List results = new ArrayList();
// Only do a sanity-check if store and comparator are available.
- KeyValue.KVComparator comparator =
- store != null ? store.getComparator() : null;
+ KeyValue.KVComparator comparator = getKVComparator();
LOOP: while((kv = this.heap.peek()) != null) {
// Check that the heap gives us KVs in an increasing order.
if (prevKV != null && comparator != null
&& comparator.compare(prevKV, kv) > 0) {
throw new IOException("Key " + prevKV + " followed by a " +
- "smaller key " + kv + " in cf " + store);
+ "smaller key " + kv + " in cf " + this.hcd.get);
}
prevKV = kv;
ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
@@ -494,8 +497,8 @@ class StoreScanner extends NonLazyKeyValueScanner
private boolean checkReseek() throws IOException {
if (this.heap == null && this.lastTop != null) {
resetScannerStack(this.lastTop);
- if (this.heap.peek() == null
- || store.comparator.compare(this.lastTop, this.heap.peek()) != 0) {
+ if (this.heap.peek() == null ||
+ getKVComparator().compare(this.lastTop, this.heap.peek()) != 0) {
LOG.debug("Storescanner.peek() is changed where before = "
+ this.lastTop.toString() + ",and after = " + this.heap.peek());
this.lastTop = null;
@@ -522,7 +525,7 @@ class StoreScanner extends NonLazyKeyValueScanner
}
// Combine all seeked scanners with a heap
- heap = new KeyValueHeap(scanners, store.comparator);
+ heap = new KeyValueHeap(scanners, getKVComparator());
// Reset the state of the Query Matcher and set to top row.
// Only reset and call setRow if the row changes; avoids confusing the