Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1417558) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -132,9 +131,6 @@ private volatile long totalUncompressedBytes = 0L; private final Object flushLock = new Object(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final String storeNameStr; - private CompactionProgress progress; - private final int compactionKVMax; private final boolean verifyBulkLoads; /* The default priority for user-specified compaction requests. @@ -158,10 +154,6 @@ new CopyOnWriteArraySet(); private final int blocksize; - /** Compression algorithm for flush files and minor compaction */ - private final Compression.Algorithm compression; - /** Compression algorithm for major compaction */ - private final Compression.Algorithm compactionCompression; private HFileDataBlockEncoder dataBlockEncoder; /** Checksum configuration */ @@ -171,6 +163,8 @@ // Comparing KeyValues final KeyValue.KVComparator comparator; + private final Compactor compactor; + /** * Constructor * @param basedir qualified path under which the region directory lives; @@ -185,25 +179,16 @@ protected Store(Path basedir, HRegion region, HColumnDescriptor family, FileSystem fs, Configuration conf) throws IOException { - super(conf, region.getTableDesc().getNameAsString(), + super(conf, region.getRegionInfo().getTableNameAsString(), Bytes.toString(family.getName())); - HRegionInfo info = region.regionInfo; + HRegionInfo info = region.getRegionInfo(); this.fs = fs; - this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName()); - if (!this.fs.exists(this.homedir)) { - if (!this.fs.mkdirs(this.homedir)) - throw new IOException("Failed create of: " + this.homedir.toString()); - } + Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName()); + this.homedir = createStoreHomeDir(this.fs, p); this.region = region; this.family = family; this.conf = conf; this.blocksize = family.getBlocksize(); - this.compression = family.getCompression(); - // avoid overriding compression setting for major compactions if the user - // has not specified it separately - this.compactionCompression = - (family.getCompactionCompression() != Compression.Algorithm.NONE) ? - family.getCompactionCompression() : this.compression; this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(), @@ -228,7 +213,6 @@ "ms in store " + this); scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); this.memstore = new MemStore(conf, this.comparator); - this.storeNameStr = getColumnFamilyName(); // By default, compact if storefile.count >= minFilesToCompact this.minFilesToCompact = Math.max(2, @@ -245,10 +229,8 @@ this.region.memstoreFlushSize); this.maxCompactSize = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE); - this.compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10); - this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", - false); + this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); if (Store.closeCheckInterval == 0) { Store.closeCheckInterval = conf.getInt( @@ -260,9 +242,50 @@ this.checksumType = getChecksumType(conf); // initilize bytes per checksum this.bytesPerChecksum = getBytesPerChecksum(conf); + // Create a compaction tool instance + this.compactor = new Compactor(this.conf); } /** + * @param family + * @return + */ + long getTTL(final HColumnDescriptor family) { + // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. + long ttl = family.getTimeToLive(); + if (ttl == HConstants.FOREVER) { + // Default is unlimited ttl. + ttl = Long.MAX_VALUE; + } else if (ttl == -1) { + ttl = Long.MAX_VALUE; + } else { + // Second -> ms adjust for user data + ttl *= 1000; + } + return ttl; + } + + /** + * Create this store's homedir + * @param fs + * @param homedir + * @return Return homedir + * @throws IOException + */ + Path createStoreHomeDir(final FileSystem fs, + final Path homedir) throws IOException { + if (!fs.exists(homedir)) { + if (!fs.mkdirs(homedir)) + throw new IOException("Failed create of: " + homedir.toString()); + } + return homedir; + } + + FileSystem getFileSystem() { + return this.fs; + } + + /** * Returns the configured bytesPerChecksum value. * @param conf The configuration * @return The bytesPerChecksum that is set in the configuration @@ -320,7 +343,7 @@ * Return the directory in which this store stores its * StoreFiles */ - public Path getHomedir() { + Path getHomedir() { return homedir; } @@ -339,6 +362,10 @@ this.dataBlockEncoder = blockEncoder; } + FileStatus [] getStoreFiles() throws IOException { + return FSUtils.listStatus(this.fs, this.homedir, null); + } + /** * Creates an unsorted list of StoreFile loaded in parallel * from the given directory. @@ -346,7 +373,7 @@ */ private List loadStoreFiles() throws IOException { ArrayList results = new ArrayList(); - FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null); + FileStatus files[] = getStoreFiles(); if (files == null || files.length == 0) { return results; @@ -637,7 +664,7 @@ storeFileCloserThreadPool.shutdownNow(); } } - LOG.debug("closed " + this.storeNameStr); + LOG.info("Closed " + this); return result; } finally { this.lock.writeLock().unlock(); @@ -723,6 +750,7 @@ scanner = cpScanner; } try { + int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10); // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem // if we fail. @@ -736,7 +764,7 @@ List kvs = new ArrayList(); boolean hasMore; do { - hasMore = scanner.next(kvs, this.compactionKVMax); + hasMore = scanner.next(kvs, compactionKVMax); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { // If we know that this KV is going to be included always, then let us @@ -828,7 +856,7 @@ */ private StoreFile.Writer createWriterInTmp(int maxKeyCount) throws IOException { - return createWriterInTmp(maxKeyCount, this.compression, false); + return createWriterInTmp(maxKeyCount, this.family.getCompression(), false); } /* @@ -981,16 +1009,12 @@ * @param cr * compaction details obtained from requestCompaction() * @throws IOException + * @return Storefile we compacted into or null if we failed or opted out early. */ - void compact(CompactionRequest cr) throws IOException { - if (cr == null || cr.getFiles().isEmpty()) { - return; - } - Preconditions.checkArgument(cr.getStore().toString() - .equals(this.toString())); - + StoreFile compact(CompactionRequest cr) throws IOException { + if (cr == null || cr.getFiles().isEmpty()) return null; + Preconditions.checkArgument(cr.getStore().toString().equals(this.toString())); List filesToCompact = cr.getFiles(); - synchronized (filesCompacting) { // sanity check: we're compacting files that this store knows about // TODO: change this to LOG.error() after more debugging @@ -1002,19 +1026,26 @@ // Ready to go. Have list of files to compact. LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " - + this.storeNameStr + " of " + + this + " of " + this.region.getRegionInfo().getRegionNameAsString() + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize=" + StringUtils.humanReadableInt(cr.getSize())); StoreFile sf = null; try { - StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(), - maxId); + StoreFile.Writer writer = + this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId); // Move the compaction into place. - sf = completeCompaction(filesToCompact, writer); - if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postCompact(this, sf); + if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { + sf = completeCompaction(filesToCompact, writer); + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postCompact(this, sf); + } + } else { + // Create storefile around what we wrote with a reader on it. + sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf, + this.family.getBloomFilterType(), this.dataBlockEncoder); + sf.createReader(); } } finally { synchronized (filesCompacting) { @@ -1023,7 +1054,7 @@ } LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of " - + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of " + + filesToCompact.size() + " file(s) in " + this + " of " + this.region.getRegionInfo().getRegionNameAsString() + " into " + (sf == null ? "none" : sf.getPath().getName()) + @@ -1031,6 +1062,7 @@ StringUtils.humanReadableInt(sf.getReader().length())) + "; total size for store is " + StringUtils.humanReadableInt(storeSize)); + return sf; } /** @@ -1070,7 +1102,8 @@ try { // Ready to go. Have list of files to compact. - StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId); + StoreFile.Writer writer = + this.compactor.compact(this, filesToCompact, isMajor, maxId); // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, writer); if (region.getCoprocessorHost() != null) { @@ -1119,10 +1152,10 @@ } /** getter for CompactionProgress object - * @return CompactionProgress object + * @return CompactionProgress object; can be null */ public CompactionProgress getCompactionProgress() { - return this.progress; + return this.compactor.getProgress(); } /* @@ -1174,19 +1207,19 @@ if (sf.isMajorCompaction() && (this.ttl == HConstants.FOREVER || oldest < this.ttl)) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping major compaction of " + this.storeNameStr + + LOG.debug("Skipping major compaction of " + this + " because one (major) compacted file only and oldestTime " + oldest + "ms is < ttl=" + this.ttl); } } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) { - LOG.debug("Major compaction triggered on store " + this.storeNameStr + + LOG.debug("Major compaction triggered on store " + this + ", because keyvalues outdated; time since last major compaction " + (now - lowTimestamp) + "ms"); result = true; } } else { if (LOG.isDebugEnabled()) { - LOG.debug("Major compaction triggered on store " + this.storeNameStr + + LOG.debug("Major compaction triggered on store " + this + "; time since last major compaction " + (now - lowTimestamp) + "ms"); } result = true; @@ -1376,12 +1409,12 @@ compactSelection.getFilesToCompact().get(pos).getReader().length() > maxCompactSize && !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos; - compactSelection.clearSubList(0, pos); + if (pos != 0) compactSelection.clearSubList(0, pos); } if (compactSelection.getFilesToCompact().isEmpty()) { LOG.debug(this.getHRegionInfo().getEncodedName() + " - " + - this.storeNameStr + ": no store files to compact"); + this + ": no store files to compact"); compactSelection.emptyFileList(); return compactSelection; } @@ -1468,7 +1501,7 @@ // if we don't have enough files to compact, just wait if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipped compaction of " + this.storeNameStr + LOG.debug("Skipped compaction of " + this + ". Only " + (end - start) + " file(s) of size " + StringUtils.humanReadableInt(totalSize) + " have met compaction criteria."); @@ -1495,149 +1528,6 @@ } /** - * Do a minor/major compaction on an explicit set of storefiles in a Store. - * Uses the scan infrastructure to make it easy. - * - * @param filesToCompact which files to compact - * @param majorCompaction true to major compact (prune all deletes, max versions, etc) - * @param maxId Readers maximum sequence id. - * @return Product of compaction or null if all cells expired or deleted and - * nothing made it through the compaction. - * @throws IOException - */ - StoreFile.Writer compactStore(final Collection filesToCompact, - final boolean majorCompaction, final long maxId) - throws IOException { - // calculate maximum key count after compaction (for blooms) - int maxKeyCount = 0; - long earliestPutTs = HConstants.LATEST_TIMESTAMP; - for (StoreFile file : filesToCompact) { - StoreFile.Reader r = file.getReader(); - if (r != null) { - // NOTE: getFilterEntries could cause under-sized blooms if the user - // switches bloom type (e.g. from ROW to ROWCOL) - long keyCount = (r.getBloomFilterType() == family.getBloomFilterType()) - ? r.getFilterEntries() : r.getEntries(); - maxKeyCount += keyCount; - if (LOG.isDebugEnabled()) { - LOG.debug("Compacting " + file + - ", keycount=" + keyCount + - ", bloomtype=" + r.getBloomFilterType().toString() + - ", size=" + StringUtils.humanReadableInt(r.length()) + - ", encoding=" + r.getHFileReader().getEncodingOnDisk()); - } - } - // For major compactions calculate the earliest put timestamp - // of all involved storefiles. This is used to remove - // family delete marker during the compaction. - if (majorCompaction) { - byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS); - if (tmp == null) { - // there's a file with no information, must be an old one - // assume we have very old puts - earliestPutTs = HConstants.OLDEST_TIMESTAMP; - } else { - earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp)); - } - } - } - - // keep track of compaction progress - progress = new CompactionProgress(maxKeyCount); - - // For each file, obtain a scanner: - List scanners = StoreFileScanner - .getScannersForStoreFiles(filesToCompact, false, false, true); - - // Make the instantiation lazy in case compaction produces no product; i.e. - // where all source cells are expired or deleted. - StoreFile.Writer writer = null; - // Find the smallest read point across all the Scanners. - long smallestReadPoint = region.getSmallestReadPoint(); - MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); - try { - InternalScanner scanner = null; - try { - if (getHRegion().getCoprocessorHost() != null) { - scanner = getHRegion() - .getCoprocessorHost() - .preCompactScannerOpen(this, scanners, - majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs); - } - if (scanner == null) { - Scan scan = new Scan(); - scan.setMaxVersions(getFamily().getMaxVersions()); - /* Include deletes, unless we are doing a major compaction */ - scanner = new StoreScanner(this, getScanInfo(), scan, scanners, - majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, - smallestReadPoint, earliestPutTs); - } - if (getHRegion().getCoprocessorHost() != null) { - InternalScanner cpScanner = - getHRegion().getCoprocessorHost().preCompact(this, scanner); - // NULL scanner returned from coprocessor hooks means skip normal processing - if (cpScanner == null) { - return null; - } - scanner = cpScanner; - } - - int bytesWritten = 0; - // since scanner.next() can return 'false' but still be delivering data, - // we have to use a do/while loop. - ArrayList kvs = new ArrayList(); - // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME - boolean hasMore; - do { - hasMore = scanner.next(kvs, this.compactionKVMax); - if (writer == null && !kvs.isEmpty()) { - writer = createWriterInTmp(maxKeyCount, this.compactionCompression, - true); - } - if (writer != null) { - // output to writer: - for (KeyValue kv : kvs) { - if (kv.getMemstoreTS() <= smallestReadPoint) { - kv.setMemstoreTS(0); - } - writer.append(kv); - // update progress per key - ++progress.currentCompactedKVs; - - // check periodically to see if a system stop is requested - if (Store.closeCheckInterval > 0) { - bytesWritten += kv.getLength(); - if (bytesWritten > Store.closeCheckInterval) { - bytesWritten = 0; - if (!this.region.areWritesEnabled()) { - writer.close(); - fs.delete(writer.getPath(), false); - throw new InterruptedIOException( - "Aborting compaction of store " + this + - " in region " + this.region + - " because user requested stop."); - } - } - } - } - } - kvs.clear(); - } while (hasMore); - } finally { - if (scanner != null) { - scanner.close(); - } - } - } finally { - if (writer != null) { - writer.appendMetadata(maxId, majorCompaction); - writer.close(); - } - } - return writer; - } - - /** * Validates a store file by opening and closing it. In HFileV2 this should * not be an expensive operation. * @@ -1741,7 +1631,7 @@ } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files in " + this.storeNameStr + + LOG.error("Failed replacing compacted files in " + this + ". Compacted file is " + (result == null? "none": result.toString()) + ". Files replaced " + compactedFiles.toString() + " some of which may have been already removed", e); @@ -2027,7 +1917,7 @@ return mk.getRow(); } } catch(IOException e) { - LOG.warn("Failed getting store size for " + this.storeNameStr, e); + LOG.warn("Failed getting store size for " + this, e); } finally { this.lock.readLock().unlock(); } @@ -2080,7 +1970,7 @@ @Override public String toString() { - return this.storeNameStr; + return getColumnFamilyName(); } /** @@ -2196,7 +2086,7 @@ } HRegionInfo getHRegionInfo() { - return this.region.regionInfo; + return this.region.getRegionInfo(); } /** @@ -2324,8 +2214,8 @@ public static final long FIXED_OVERHEAD = ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE + - + (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG) - + (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); + + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG) + + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1417558) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.EOFException; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; @@ -62,6 +63,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -230,12 +232,12 @@ * The directory for the table this region is part of. * This directory contains the directory for this region. */ - final Path tableDir; + private final Path tableDir; - final HLog log; - final FileSystem fs; - final Configuration conf; - final int rowLockWaitDuration; + private final HLog log; + private final FileSystem fs; + private final Configuration conf; + private final int rowLockWaitDuration; static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; // The internal wait duration to acquire a lock before read/update @@ -256,8 +258,8 @@ // purge timeout, when a RPC call will be terminated by the RPC engine. final long maxBusyWaitDuration; - final HRegionInfo regionInfo; - final Path regiondir; + private final HRegionInfo regionInfo; + private final Path regiondir; KeyValue.KVComparator comparator; private ConcurrentHashMap scannerReadPoints; @@ -724,7 +726,7 @@ public long addAndGetGlobalMemstoreSize(long memStoreSize) { if (this.rsAccounting != null) { rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize); - } + } return this.memstoreSize.getAndAdd(memStoreSize); } @@ -750,7 +752,7 @@ // and then create the file Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE); - + // if datanode crashes or if the RS goes down just before the close is called while trying to // close the created regioninfo file in the .tmp directory then on next // creation we will be getting AlreadyCreatedException. @@ -758,7 +760,7 @@ if (FSUtils.isExists(fs, tmpPath)) { FSUtils.delete(fs, tmpPath, true); } - + FSDataOutputStream out = FSUtils.create(fs, tmpPath, perms); try { @@ -775,6 +777,26 @@ } } + /** + * @param fs + * @param dir + * @return An HRegionInfo instance gotten from the .regioninfo file under region dir + * @throws IOException + */ + public static HRegionInfo loadDotRegionInfoFileContent(final FileSystem fs, final Path dir) + throws IOException { + Path regioninfo = new Path(dir, HRegion.REGIONINFO_FILE); + if (!fs.exists(regioninfo)) throw new FileNotFoundException(regioninfo.toString()); + FSDataInputStream in = fs.open(regioninfo); + try { + HRegionInfo hri = new HRegionInfo(); + hri.readFields(in); + return hri; + } finally { + in.close(); + } + } + /** @return a HRegionInfo object for this region */ public HRegionInfo getRegionInfo() { return this.regionInfo; @@ -1021,19 +1043,16 @@ return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); } - private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, + static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, final String threadNamePrefix) { - ThreadPoolExecutor openAndCloseThreadPool = Threads - .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, - new ThreadFactory() { - private int count = 1; + return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, + new ThreadFactory() { + private int count = 1; - public Thread newThread(Runnable r) { - Thread t = new Thread(r, threadNamePrefix + "-" + count++); - return t; - } - }); - return openAndCloseThreadPool; + public Thread newThread(Runnable r) { + return new Thread(r, threadNamePrefix + "-" + count++); + } + }); } /** @@ -1979,7 +1998,7 @@ System.arraycopy(putsAndLocks, 0, mutationsAndLocks, 0, putsAndLocks.length); return batchMutate(mutationsAndLocks); } - + /** * Perform a batch of mutations. * It supports only Put and Delete mutations and will ignore other types passed. @@ -2333,7 +2352,7 @@ // do after lock final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis()- startTimeMs; - + // See if the column families were consistent through the whole thing. // if they were then keep them. If they were not then pass a null. // null will be treated as unknown. @@ -2636,7 +2655,7 @@ // do after lock final long after = EnvironmentEdgeManager.currentTimeMillis(); this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now); - + if (flush) { // Request a cache flush. Do it outside update lock. requestFlush(); @@ -3754,6 +3773,7 @@ * @param conf * @param hTableDescriptor * @param hlog shared HLog + * @param boolean initialize - true to initialize the region * @return new HRegion * * @throws IOException @@ -3761,8 +3781,37 @@ public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Configuration conf, final HTableDescriptor hTableDescriptor, - final HLog hlog) + final HLog hlog, + final boolean initialize) throws IOException { + return createHRegion(info, rootDir, conf, hTableDescriptor, + hlog, initialize, false); + } + + /** + * Convenience method creating new HRegions. Used by createTable. + * The {@link HLog} for the created region needs to be closed + * explicitly, if it is not null. + * Use {@link HRegion#getLog()} to get access. + * + * @param info Info for region to create. + * @param rootDir Root directory for HBase instance + * @param conf + * @param hTableDescriptor + * @param hlog shared HLog + * @param boolean initialize - true to initialize the region + * @param boolean ignoreHLog + - true to skip generate new hlog if it is null, mostly for createTable + * @return new HRegion + * + * @throws IOException + */ + public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, + final Configuration conf, + final HTableDescriptor hTableDescriptor, + final HLog hlog, + final boolean initialize, final boolean ignoreHLog) + throws IOException { LOG.info("creating HRegion " + info.getTableNameAsString() + " HTD == " + hTableDescriptor + " RootDir = " + rootDir + " Table name == " + info.getTableNameAsString()); @@ -3773,16 +3822,26 @@ FileSystem fs = FileSystem.get(conf); fs.mkdirs(regionDir); HLog effectiveHLog = hlog; - if (hlog == null) { + if (hlog == null && !ignoreHLog) { effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf); } HRegion region = HRegion.newHRegion(tableDir, effectiveHLog, fs, conf, info, hTableDescriptor, null); - region.initialize(); + if (initialize) { + region.initialize(); + } return region; } + public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, + final Configuration conf, + final HTableDescriptor hTableDescriptor, + final HLog hlog) + throws IOException { + return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true); + } + /** * Open a Region. * @param info Info for region to be opened. @@ -4294,7 +4353,7 @@ // do after lock final long after = EnvironmentEdgeManager.currentTimeMillis(); this.opMetrics.updateGetMetrics(get.familySet(), after - now); - + return results; } @@ -4622,10 +4681,10 @@ closeRegionOperation(); } - + long after = EnvironmentEdgeManager.currentTimeMillis(); this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before); - + if (flush) { // Request a cache flush. Do it outside update lock. requestFlush(); @@ -4750,7 +4809,7 @@ long after = EnvironmentEdgeManager.currentTimeMillis(); this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before); } - + if (flush) { // Request a cache flush. Do it outside update lock. requestFlush(); @@ -5243,7 +5302,7 @@ */ private void recordPutWithoutWal(final Map> familyMap) { if (numPutsWithoutWAL.getAndIncrement() == 0) { - LOG.info("writing data to region " + this + + LOG.info("writing data to region " + this + " with WAL disabled. Data may be lost in the event of a crash."); } @@ -5355,11 +5414,11 @@ final HLog log = new HLog(fs, logdir, oldLogDir, c); try { processTable(fs, tableDir, log, c, majorCompact); - } finally { + } finally { log.close(); // TODO: is this still right? BlockCache bc = new CacheConfig(c).getBlockCache(); if (bc != null) bc.shutdown(); - } + } } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (revision 1417559) @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; + +/** + * Compact passed set of files. + * Create an instance and then call {@ink #compact(Store, Collection, boolean, long)}. + */ +@InterfaceAudience.Private +class Compactor extends Configured { + private static final Log LOG = LogFactory.getLog(Compactor.class); + private CompactionProgress progress; + + Compactor(final Configuration c) { + super(c); + } + + /** + * Do a minor/major compaction on an explicit set of storefiles from a Store. + * + * @param store Store the files belong to + * @param filesToCompact which files to compact + * @param majorCompaction true to major compact (prune all deletes, max versions, etc) + * @param maxId Readers maximum sequence id. + * @return Product of compaction or null if all cells expired or deleted and + * nothing made it through the compaction. + * @throws IOException + */ + StoreFile.Writer compact(final Store store, + final Collection filesToCompact, + final boolean majorCompaction, final long maxId) + throws IOException { + // Calculate maximum key count after compaction (for blooms) + // Also calculate earliest put timestamp if major compaction + int maxKeyCount = 0; + long earliestPutTs = HConstants.LATEST_TIMESTAMP; + for (StoreFile file: filesToCompact) { + StoreFile.Reader r = file.getReader(); + if (r == null) { + LOG.warn("Null reader for " + file.getPath()); + continue; + } + // NOTE: getFilterEntries could cause under-sized blooms if the user + // switches bloom type (e.g. from ROW to ROWCOL) + long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())? + r.getFilterEntries() : r.getEntries(); + maxKeyCount += keyCount; + // For major compactions calculate the earliest put timestamp of all + // involved storefiles. This is used to remove family delete marker during + // compaction. + if (majorCompaction) { + byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS); + if (tmp == null) { + // There's a file with no information, must be an old one + // assume we have very old puts + earliestPutTs = HConstants.OLDEST_TIMESTAMP; + } else { + earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp)); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Compacting " + file + + ", keycount=" + keyCount + + ", bloomtype=" + r.getBloomFilterType().toString() + + ", size=" + StringUtils.humanReadableInt(r.length()) + + ", encoding=" + r.getHFileReader().getEncodingOnDisk() + + (majorCompaction? ", earliestPutTs=" + earliestPutTs: "")); + } + } + + // keep track of compaction progress + this.progress = new CompactionProgress(maxKeyCount); + + // For each file, obtain a scanner: + List scanners = StoreFileScanner + .getScannersForStoreFiles(filesToCompact, false, false, true); + + // Get some configs + int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10); + Compression.Algorithm compression = store.getFamily().getCompression(); + // Avoid overriding compression setting for major compactions if the user + // has not specified it separately + Compression.Algorithm compactionCompression = + (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ? + store.getFamily().getCompactionCompression(): compression; + // Make the instantiation lazy in case compaction produces no product; i.e. + // where all source cells are expired or deleted. + StoreFile.Writer writer = null; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = store.getHRegion().getSmallestReadPoint(); + MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); + try { + InternalScanner scanner = null; + try { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + /* Include deletes, unless we are doing a major compaction */ + scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners, + majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, + smallestReadPoint, earliestPutTs); + if (store.getHRegion().getCoprocessorHost() != null) { + InternalScanner cpScanner = + store.getHRegion().getCoprocessorHost().preCompact(store, scanner); + // NULL scanner returned from coprocessor hooks means skip normal processing + if (cpScanner == null) { + return null; + } + scanner = cpScanner; + } + + int bytesWritten = 0; + // Since scanner.next() can return 'false' but still be delivering data, + // we have to use a do/while loop. + List kvs = new ArrayList(); + // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME + boolean hasMore; + do { + hasMore = scanner.next(kvs, compactionKVMax); + if (writer == null && !kvs.isEmpty()) { + writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true); + } + if (writer != null) { + // output to writer: + for (KeyValue kv : kvs) { + if (kv.getMemstoreTS() <= smallestReadPoint) { + kv.setMemstoreTS(0); + } + writer.append(kv); + // update progress per key + ++progress.currentCompactedKVs; + + // check periodically to see if a system stop is requested + if (Store.closeCheckInterval > 0) { + bytesWritten += kv.getLength(); + if (bytesWritten > Store.closeCheckInterval) { + bytesWritten = 0; + isInterrupted(store, writer); + } + } + } + } + kvs.clear(); + } while (hasMore); + } finally { + if (scanner != null) { + scanner.close(); + } + } + } finally { + if (writer != null) { + writer.appendMetadata(maxId, majorCompaction); + writer.close(); + } + } + return writer; + } + + void isInterrupted(final Store store, final StoreFile.Writer writer) + throws IOException { + if (store.getHRegion().areWritesEnabled()) return; + // Else cleanup. + writer.close(); + store.getFileSystem().delete(writer.getPath(), false); + throw new InterruptedIOException( "Aborting compaction of store " + store + + " in region " + store.getHRegion() + " because user requested stop."); + } + + CompactionProgress getProgress() { + return this.progress; + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (revision 1417559) @@ -0,0 +1,468 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.LineReader; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.FSUtils; + +/* + * The CompactionTool allows to execute a compaction specifying a: + *
    + *
  • table folder (all regions and families will be compacted) + *
  • region folder (all families in the region will be compacted) + *
  • family folder (the store files will be compacted) + *
+ */ +@InterfaceAudience.Public +public class CompactionTool extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(CompactionTool.class); + + private final static String CONF_TMP_DIR = "hbase.tmp.dir"; + private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once"; + private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete"; + private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete"; + + /** + * Class responsible to execute the Compaction on the specified path. + * The path can be a table, region or family directory. + */ + private static class CompactionWorker { + private final boolean keepCompactedFiles; + private final boolean deleteCompacted; + private final Configuration conf; + private final FileSystem fs; + private final Path tmpDir; + + public CompactionWorker(final FileSystem fs, final Configuration conf) { + this.conf = conf; + this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true); + this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false); + this.tmpDir = new Path(conf.get(CONF_TMP_DIR)); + this.fs = fs; + } + + /** + * Execute the compaction on the specified path. + * + * @param path Directory path on which run a + * @param compactOnce Execute just a single step of compaction. + */ + public void compact(final Path path, final boolean compactOnce) throws IOException { + if (isFamilyDir(fs, path)) { + Path regionDir = path.getParent(); + Path tableDir = regionDir.getParent(); + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir); + HRegion region = loadRegion(fs, conf, htd, regionDir); + compactStoreFiles(region, path, compactOnce); + } else if (isRegionDir(fs, path)) { + Path tableDir = path.getParent(); + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir); + compactRegion(htd, path, compactOnce); + } else if (isTableDir(fs, path)) { + compactTable(path, compactOnce); + } else { + throw new IOException( + "Specified path is not a table, region or family directory. path=" + path); + } + } + + private void compactTable(final Path tableDir, final boolean compactOnce) + throws IOException { + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir); + LOG.info("Compact table=" + htd.getNameAsString()); + for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) { + compactRegion(htd, regionDir, compactOnce); + } + } + + private void compactRegion(final HTableDescriptor htd, final Path regionDir, + final boolean compactOnce) throws IOException { + HRegion region = loadRegion(fs, conf, htd, regionDir); + LOG.info("Compact table=" + htd.getNameAsString() + + " region=" + region.getRegionNameAsString()); + for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) { + compactStoreFiles(region, familyDir, compactOnce); + } + } + + /** + * Execute the actual compaction job. + * If the compact once flag is not specified, execute the compaction until + * no more compactions are needed. Uses the Configuration settings provided. + */ + private void compactStoreFiles(final HRegion region, final Path familyDir, + final boolean compactOnce) throws IOException { + LOG.info("Compact table=" + region.getTableDesc().getNameAsString() + + " region=" + region.getRegionNameAsString() + + " family=" + familyDir.getName()); + Store store = getStore(region, familyDir); + do { + CompactionRequest cr = store.requestCompaction(); + StoreFile storeFile = store.compact(cr); + if (storeFile != null) { + if (keepCompactedFiles && deleteCompacted) { + fs.delete(storeFile.getPath(), false); + } + } + } while (store.needsCompaction() && !compactOnce); + } + + /** + * Create a "mock" HStore that uses the tmpDir specified by the user and + * the store dir to compact as source. + */ + private Store getStore(final HRegion region, final Path storeDir) throws IOException { + byte[] familyName = Bytes.toBytes(storeDir.getName()); + HColumnDescriptor hcd = region.getTableDesc().getFamily(familyName); + // Create a Store w/ check of hbase.rootdir blanked out and return our + // list of files instead of have Store search its home dir. + return new Store(tmpDir, region, hcd, fs, conf) { + @Override + public FileStatus[] getStoreFiles() throws IOException { + return this.fs.listStatus(getHomedir()); + } + + @Override + Path createStoreHomeDir(FileSystem fs, Path homedir) throws IOException { + return storeDir; + } + }; + } + + private static HRegion loadRegion(final FileSystem fs, final Configuration conf, + final HTableDescriptor htd, final Path regionDir) throws IOException { + Path rootDir = regionDir.getParent().getParent(); + HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir); + return HRegion.createHRegion(hri, rootDir, conf, htd, null, false, true); + } + } + + private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException { + Path regionInfo = new Path(path, HRegion.REGIONINFO_FILE); + return fs.exists(regionInfo); + } + + private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException { + return FSTableDescriptors.getTableInfoPath(fs, path) != null; + } + + private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException { + return isRegionDir(fs, path.getParent()); + } + + private static class CompactionMapper + extends Mapper { + private CompactionWorker compactor = null; + private boolean compactOnce = false; + + @Override + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false); + + try { + FileSystem fs = FileSystem.get(conf); + this.compactor = new CompactionWorker(fs, conf); + } catch (IOException e) { + throw new RuntimeException("Could not get the input FileSystem", e); + } + } + + @Override + public void map(LongWritable key, Text value, Context context) + throws InterruptedException, IOException { + Path path = new Path(value.toString()); + this.compactor.compact(path, compactOnce); + } + } + + /** + * Input format that uses store files block location as input split locality. + */ + private static class CompactionInputFormat extends TextInputFormat { + @Override + protected boolean isSplitable(JobContext context, Path file) { + return true; + } + + /** + * Returns a split for each store files directory using the block location + * of each file as locality reference. + */ + @Override + public List getSplits(JobContext job) throws IOException { + List splits = new ArrayList(); + List files = listStatus(job); + + Text key = new Text(); + for (FileStatus file: files) { + Path path = file.getPath(); + FileSystem fs = path.getFileSystem(job.getConfiguration()); + LineReader reader = new LineReader(fs.open(path)); + long pos = 0; + int n; + try { + while ((n = reader.readLine(key)) > 0) { + String[] hosts = getStoreDirHosts(fs, path); + splits.add(new FileSplit(path, pos, n, hosts)); + pos += n; + } + } finally { + reader.close(); + } + } + + return splits; + } + + /** + * return the top hosts of the store files, used by the Split + */ + private static String[] getStoreDirHosts(final FileSystem fs, final Path path) + throws IOException { + FileStatus[] files = FSUtils.listStatus(fs, path, null); + if (files == null) { + return new String[] {}; + } + + HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); + for (FileStatus hfileStatus: files) { + HDFSBlocksDistribution storeFileBlocksDistribution = + FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen()); + hdfsBlocksDistribution.add(storeFileBlocksDistribution); + } + + List hosts = hdfsBlocksDistribution.getTopHosts(); + return hosts.toArray(new String[hosts.size()]); + } + + /** + * Create the input file for the given directories to compact. + * The file is a TextFile with each line corrisponding to a + * store files directory to compact. + */ + public static void createInputFile(final FileSystem fs, final Path path, + final Set toCompactDirs) throws IOException { + // Extract the list of store dirs + List storeDirs = new LinkedList(); + for (Path compactDir: toCompactDirs) { + if (isFamilyDir(fs, compactDir)) { + storeDirs.add(compactDir); + } else if (isRegionDir(fs, compactDir)) { + for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) { + storeDirs.add(familyDir); + } + } else if (isTableDir(fs, compactDir)) { + // Lookup regions + for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) { + for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) { + storeDirs.add(familyDir); + } + } + } else { + throw new IOException( + "Specified path is not a table, region or family directory. path=" + compactDir); + } + } + + // Write Input File + FSDataOutputStream stream = fs.create(path); + LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact."); + try { + final byte[] newLine = Bytes.toBytes("\n"); + for (Path storeDir: storeDirs) { + stream.write(Bytes.toBytes(storeDir.toString())); + stream.write(newLine); + } + } finally { + stream.close(); + } + } + } + + /** + * Execute compaction, using a Map-Reduce job. + */ + private int doMapReduce(final FileSystem fs, final Set toCompactDirs, + final boolean compactOnce) throws Exception { + Configuration conf = getConf(); + conf.setBoolean(CONF_COMPACT_ONCE, compactOnce); + + Job job = new Job(conf); + job.setJobName("CompactionTool"); + job.setJarByClass(CompactionTool.class); + job.setMapperClass(CompactionMapper.class); + job.setInputFormatClass(CompactionInputFormat.class); + job.setOutputFormatClass(NullOutputFormat.class); + job.setMapSpeculativeExecution(false); + job.setNumReduceTasks(0); + + String stagingName = "compact-" + EnvironmentEdgeManager.currentTimeMillis(); + Path stagingDir = new Path(conf.get(CONF_TMP_DIR), stagingName); + fs.mkdirs(stagingDir); + try { + // Create input file with the store dirs + Path inputPath = new Path(stagingDir, stagingName); + CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs); + CompactionInputFormat.addInputPath(job, inputPath); + + // Initialize credential for secure cluster + TableMapReduceUtil.initCredentials(job); + + // Start the MR Job and wait + return job.waitForCompletion(true) ? 0 : 1; + } finally { + fs.delete(stagingDir, true); + } + } + + /** + * Execute compaction, from this client, one path at the time. + */ + private int doClient(final FileSystem fs, final Set toCompactDirs, + final boolean compactOnce) throws IOException { + CompactionWorker worker = new CompactionWorker(fs, getConf()); + for (Path path: toCompactDirs) { + worker.compact(path, compactOnce); + } + return 0; + } + + @Override + public int run(String[] args) throws Exception { + Set toCompactDirs = new HashSet(); + boolean compactOnce = false; + boolean mapred = false; + + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(conf); + + try { + for (int i = 0; i < args.length; ++i) { + String opt = args[i]; + if (opt.equals("-compactOnce")) { + compactOnce = true; + } else if (opt.equals("-mapred")) { + mapred = true; + } else if (!opt.startsWith("-")) { + Path path = new Path(opt); + FileStatus status = fs.getFileStatus(path); + if (!status.isDir()) { + printUsage("Specified path is not a directory. path=" + path); + return 1; + } + toCompactDirs.add(path); + } else { + printUsage(); + } + } + } catch (Exception e) { + printUsage(e.getMessage()); + return 1; + } + + if (toCompactDirs.size() == 0) { + printUsage("No directories to compact specified."); + return 1; + } + + // Execute compaction! + if (mapred) { + return doMapReduce(fs, toCompactDirs, compactOnce); + } else { + return doClient(fs, toCompactDirs, compactOnce); + } + } + + private void printUsage() { + printUsage(null); + } + + private void printUsage(final String message) { + if (message != null && message.length() > 0) { + System.err.println(message); + } + System.err.println("Usage: java " + this.getClass().getName() + " \\"); + System.err.println(" [-compactOnce] [-mapred] [-D]* files..."); + System.err.println(); + System.err.println("Options:"); + System.err.println(" mapred Use MapReduce to run compaction."); + System.err.println(" compactOnce Execute just one compaction step. (default: while needed)"); + System.err.println(); + System.err.println("Note: -D properties will be applied to the conf used. "); + System.err.println("For example: "); + System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false"); + System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false"); + System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" To compact the full 'TestTable' using MapReduce:"); + System.err.println(" $ bin/hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/TestTable"); + System.err.println(); + System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':"); + System.err.println(" $ bin/hbase " + this.getClass().getName() + " hdfs:///hbase/TestTable/abc/x"); + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args)); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java (revision 1417558) +++ src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java (working copy) @@ -49,5 +49,4 @@ public float getProgressPct() { return currentCompactedKVs / totalCompactingKVs; } - } Index: src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (revision 1417558) +++ src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (working copy) @@ -142,16 +142,12 @@ List regionInfos = new ArrayList(); final int batchSize = this.conf.getInt("hbase.master.createtable.batchsize", 100); - HLog hlog = null; for (int regionIdx = 0; regionIdx < this.newRegions.length; regionIdx++) { HRegionInfo newRegion = this.newRegions[regionIdx]; // 1. Create HRegion HRegion region = HRegion.createHRegion(newRegion, this.fileSystemManager.getRootDir(), this.conf, - this.hTableDescriptor, hlog); - if (hlog == null) { - hlog = region.getLog(); - } + this.hTableDescriptor, null, false, true); regionInfos.add(region.getRegionInfo()); if (regionIdx % batchSize == 0) { @@ -163,7 +159,6 @@ // 3. Close the new region to flush to disk. Close log file too. region.close(); } - hlog.closeAndDelete(); if (regionInfos.size() > 0) { MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos); } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (revision 1417558) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (working copy) @@ -645,7 +645,7 @@ // Log size long totalSize = heapSize(); long freeSize = maxSize - totalSize; - LruBlockCache.LOG.debug("LRU Stats: " + + LruBlockCache.LOG.debug("Stats: " + "total=" + StringUtils.byteDesc(totalSize) + ", " + "free=" + StringUtils.byteDesc(freeSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", " + @@ -653,11 +653,11 @@ "accesses=" + stats.getRequestCount() + ", " + "hits=" + stats.getHitCount() + ", " + "hitRatio=" + - (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + + (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits=" + stats.getHitCachingCount() + ", " + "cachingHitsRatio=" + - (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + + (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + ", " + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount() + ", " + "evictedPerRun=" + stats.evictedPerEviction()); Index: src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java (revision 1417558) +++ src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java (working copy) @@ -25,9 +25,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.util.ChecksumFactory; - /** * Checksum types. The Checksum type is a one byte number * that stores a representation of the checksum algorithm @@ -70,7 +67,7 @@ ctor = ChecksumFactory.newConstructor(PURECRC32); LOG.info("Checksum using " + PURECRC32); } catch (Exception e) { - LOG.info(PURECRC32 + " not available."); + LOG.trace(PURECRC32 + " not available."); } try { // The default checksum class name is java.util.zip.CRC32. @@ -80,7 +77,7 @@ LOG.info("Checksum can use " + JDKCRC); } } catch (Exception e) { - LOG.warn(JDKCRC + " not available. ", e); + LOG.trace(JDKCRC + " not available."); } } @@ -113,7 +110,7 @@ ctor = ChecksumFactory.newConstructor(PURECRC32C); LOG.info("Checksum can use " + PURECRC32C); } catch (Exception e) { - LOG.info(PURECRC32C + " not available. "); + LOG.trace(PURECRC32C + " not available."); } } Index: src/main/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1417558) +++ src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -151,7 +151,7 @@ */ public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm, boolean overwrite) throws IOException { - LOG.debug("Creating file:" + path + "with permission:" + perm); + LOG.debug("Creating file=" + path + " with permission=" + perm); return fs.create(path, perm, overwrite, fs.getConf().getInt("io.file.buffer.size", 4096), @@ -1013,6 +1013,25 @@ } /** + * Given a particular region dir, return all the familydirs inside it + * + * @param fs A file system for the Path + * @param regionDir Path to a specific region directory + * @return List of paths to valid family directories in region dir. + * @throws IOException + */ + public static List getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException { + // assumes we are in a region dir. + FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs)); + List familyDirs = new ArrayList(fds.length); + for (FileStatus fdfs: fds) { + Path fdPath = fdfs.getPath(); + familyDirs.add(fdPath); + } + return familyDirs; + } + + /** * Filter for HFiles that excludes reference files. */ public static class HFileFilter implements PathFilter { Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (revision 1417558) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (working copy) @@ -587,8 +587,10 @@ List storeFiles = store.getStorefiles(); long maxId = StoreFile.getMaxSequenceIdInList(storeFiles); + Compactor tool = new Compactor(this.conf); - StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId); + StoreFile.Writer compactedFile = + tool.compact(store, storeFiles, false, maxId); // Now lets corrupt the compacted file. FileSystem fs = FileSystem.get(conf);