diff --git src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java new file mode 100644 index 0000000..adca0bd --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java @@ -0,0 +1,250 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Compact passed set of files. + */ +public class CompactionTool implements Tool { + private static final Log LOG = LogFactory.getLog(CompactionTool.class); + private Configuration conf; + + CompactionTool() { + super(); + } + + CompactionTool(final Configuration conf) { + setConf(conf); + } + + /** + * Do a minor/major compaction on an explicit set of storefiles in a Store. + * + * @param hcd Descriptor for family we're compacting for + * @param filesToCompact which files to compact from descriptor + * @param majorCompaction true to major compact (prune all deletes, + * max versions, etc) + * @param maxId Current readers' maximum sequence id. + * @return Product of compaction or null if all cells expired or deleted and + * nothing made it through the compaction. + * @throws IOException + */ + StoreFile.Writer compact(final HColumnDescriptor hcd, + final Collection filesToCompact, final boolean majorCompaction, + final long maxId) + throws IOException { + int compactionKVMax = conf.getInt("hbase.hstore.compaction.kv.max", 10); + Compression.Algorithm compression = hcd.getCompression(); + // Avoid overriding compression setting for major compactions if user has + // not specified it separately + Compression.Algorithm compactionCompression = + (hcd.getCompactionCompression() != Compression.Algorithm.NONE)? + hcd.getCompactionCompression(): compression; + // Calculate maximum key count after compaction (for blooms) + int maxKeyCount = 0; + long earliestPutTs = HConstants.LATEST_TIMESTAMP; + for (StoreFile file: filesToCompact) { + StoreFile.Reader r = file.getReader(); + if (r == null) { + LOG.warn("Failed get a Reader on " + file.getPath()); + continue; + } + // NOTE: getFilterEntries could cause under-sized blooms if the user + // switches bloom type (e.g. from ROW to ROWCOL) + long keyCount = (r.getBloomFilterType() == hcd.getBloomFilterType())? + r.getFilterEntries() : r.getEntries(); + maxKeyCount += keyCount; + + // For major compactions calculate the earliest put timestamp of all + // involved storefiles. This is used to remove family delete marker during + // the compaction. + if (majorCompaction) { + byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS); + if (tmp == null) { + // There is a file with no information, must be an old one assume we + // have very old puts + earliestPutTs = HConstants.OLDEST_TIMESTAMP; + } else { + earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp)); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Compacting " + file + + ", keycount=" + keyCount + + ", bloomtype=" + r.getBloomFilterType().toString() + + ", size=" + StringUtils.humanReadableInt(r.length()) + + ", encoding=" + r.getHFileReader().getEncodingOnDisk() + + ", earliestPutTs=" + earliestPutTs); + } + } + + // keep track of compaction progress + CompactionProgress progress = new CompactionProgress(maxKeyCount); + + // Make the instantiation lazy in case compaction produces no product; i.e. + // where all source cells are expired or deleted. + StoreFile.Writer writer = null; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = getSmallestReadPoint(); + MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); + try { + InternalScanner scanner = null; + try { + scanner = getScanner(hcd, majorCompaction, filesToCompact, + smallestReadPoint, earliestPutTs); + + int bytesWritten = 0; + // since scanner.next() can return 'false' but still be delivering data, + // we have to use a do/while loop. + ArrayList kvs = new ArrayList(); + // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME + boolean hasMore; + do { + hasMore = scanner.next(kvs, compactionKVMax); + if (writer == null && !kvs.isEmpty()) { + writer = createWriterInTmp(maxKeyCount, compactionCompression, true); + } + if (writer != null) { + // output to writer: + for (KeyValue kv : kvs) { + if (kv.getMemstoreTS() <= smallestReadPoint) { + kv.setMemstoreTS(0); + } + writer.append(kv); + // update progress per key + ++progress.currentCompactedKVs; + + // check periodically to see if a system stop is requested + if (Store.closeCheckInterval > 0) { + bytesWritten += kv.getLength(); + if (bytesWritten > Store.closeCheckInterval) { + abort(writer); + } + } + } + } + kvs.clear(); + } while (hasMore); + } finally { + if (scanner != null) { + scanner.close(); + } + } + } finally { + if (writer != null) { + writer.appendMetadata(maxId, majorCompaction); + writer.close(); + } + } + return writer; + } + + /** + * @param hcd + * @param majorCompaction + * @param filesToCompact + * @param smallestReadPoint + * @param earliestPutTs + * @return Scanner instance to use compacting + */ + InternalScanner getScanner(final HColumnDescriptor hcd, + final boolean majorCompaction, final Collection filesToCompact, + final long smallestReadPoint, final long earliestPutTs) { + // For each file, obtain a scanner: + List scanners = + StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true); + Scan scan = new Scan(); + scan.setMaxVersions(hcd.getMaxVersions()); + /* include deletes, unless we are doing a major compaction */ + InternalScanner scanner = new StoreScanner(this, scan, scanners, + majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, + smallestReadPoint, earliestPutTs); + scanner = precompact(scanner); + } + + long getSmallestReadPoint() { + return -1; // TODO region.getSmallestReadPoint(); + } + StoreFile.Writer createWriterInTmp(int maxKeyCount, + Compression.Algorithm compression, boolean isCompaction) + throws IOException { + // TODO + return null; + } + + /** + * Called after we made the compaction scanner but before we actually start + * in on the compaction. + * @param scanner + * @return Scanner to proceed compacting with. + */ + InternalScanner precompact(final InternalScanner scanner) { + return scanner; + // TOOD + /* + * if (region.getCoprocessorHost() != null) { + InternalScanner cpScanner = region.getCoprocessorHost().preCompact( + this, scanner); + // NULL scanner returned from coprocessor hooks means skip normal processing + if (cpScanner == null) { + return null; + } + scanner = cpScanner; + } + */ + } + + void abort(final StoreFile.Writer writer) throws IOException { + writer.close(); + // TODO + /**if (!this.region.areWritesEnabled()) { + writer.close(); + fs.delete(writer.getPath(), false); + throw new InterruptedIOException( + "Aborting compaction of store " + this + + " in region " + this.region + + " because user requested stop."); + }*/ + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration c) { + this.conf = c; + } + + @Override + public int run(final String[] args) throws Exception { + return 0; + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), + new CompactionTool(), args)); + } +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 859d3c2..9e623e7 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5317,11 +5317,11 @@ public class HRegion implements HeapSize { // , Writable{ final HLog log = new HLog(fs, logdir, oldLogDir, c); try { processTable(fs, tableDir, log, c, majorCompact); - } finally { + } finally { log.close(); // TODO: is this still right? BlockCache bc = new CacheConfig(c).getBlockCache(); if (bc != null) bc.shutdown(); - } + } } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 919d814..135db1d 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.NavigableSet; @@ -31,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; @@ -45,7 +47,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; class StoreScanner extends NonLazyKeyValueScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver { static final Log LOG = LogFactory.getLog(StoreScanner.class); - private Store store; private ScanQueryMatcher matcher; private KeyValueHeap heap; private boolean cacheBlocks; @@ -195,6 +196,10 @@ class StoreScanner extends NonLazyKeyValueScanner heap = new KeyValueHeap(scanners, scanInfo.getComparator()); } + KVComparator getKVComparator() { + return null; + } + /** * Method used internally to initialize metric names throughout the * constructors. @@ -218,8 +223,7 @@ class StoreScanner extends NonLazyKeyValueScanner */ private List getScannersNoCompaction() throws IOException { final boolean isCompaction = false; - return selectScannersFrom(store.getScanners(cacheBlocks, isGet, - isCompaction, matcher)); + return selectScannersFrom(store.getScanners(cacheBlocks, isGet, isCompaction, matcher)); } /** @@ -294,7 +298,7 @@ class StoreScanner extends NonLazyKeyValueScanner List scanners = getScannersNoCompaction(); - heap = new KeyValueHeap(scanners, store.comparator); + heap = new KeyValueHeap(scanners, getKVComparator()); } return this.heap.seek(key); @@ -349,15 +353,14 @@ class StoreScanner extends NonLazyKeyValueScanner List results = new ArrayList(); // Only do a sanity-check if store and comparator are available. - KeyValue.KVComparator comparator = - store != null ? store.getComparator() : null; + KeyValue.KVComparator comparator = getKVComparator(); LOOP: while((kv = this.heap.peek()) != null) { // Check that the heap gives us KVs in an increasing order. if (prevKV != null && comparator != null && comparator.compare(prevKV, kv) > 0) { throw new IOException("Key " + prevKV + " followed by a " + - "smaller key " + kv + " in cf " + store); + "smaller key " + kv + " in cf " + this.hcd.get); } prevKV = kv; ScanQueryMatcher.MatchCode qcode = matcher.match(kv); @@ -494,8 +497,8 @@ class StoreScanner extends NonLazyKeyValueScanner private boolean checkReseek() throws IOException { if (this.heap == null && this.lastTop != null) { resetScannerStack(this.lastTop); - if (this.heap.peek() == null - || store.comparator.compare(this.lastTop, this.heap.peek()) != 0) { + if (this.heap.peek() == null || + getKVComparator().compare(this.lastTop, this.heap.peek()) != 0) { LOG.debug("Storescanner.peek() is changed where before = " + this.lastTop.toString() + ",and after = " + this.heap.peek()); this.lastTop = null; @@ -522,7 +525,7 @@ class StoreScanner extends NonLazyKeyValueScanner } // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.comparator); + heap = new KeyValueHeap(scanners, getKVComparator()); // Reset the state of the Query Matcher and set to top row. // Only reset and call setRow if the row changes; avoids confusing the