diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index 13aff26..fea8606 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -130,9 +130,6 @@ public interface HConstants { /** Like the previous, but for old logs that are about to be deleted */ static final String HREGION_OLDLOGDIR_NAME = ".oldlogs"; - /** Name of old log file for reconstruction */ - static final String HREGION_OLDLOGFILE_NAME = "oldlogfile.log"; - /** Used to construct the name of the compaction directory during compaction */ static final String HREGION_COMPACTIONDIR_NAME = "compaction.dir"; diff --git a/src/main/java/org/apache/hadoop/hbase/HMerge.java b/src/main/java/org/apache/hadoop/hbase/HMerge.java index 63afdd5..73cbe21 100644 --- a/src/main/java/org/apache/hadoop/hbase/HMerge.java +++ b/src/main/java/org/apache/hadoop/hbase/HMerge.java @@ -153,12 +153,12 @@ class HMerge implements HConstants { if (currentRegion == null) { currentRegion = HRegion.newHRegion(tabledir, hlog, fs, conf, info[i], null); - currentRegion.initialize(null, null); + currentRegion.initialize(); currentSize = currentRegion.getLargestHStoreSize(); } nextRegion = HRegion.newHRegion(tabledir, hlog, fs, conf, info[i + 1], null); - nextRegion.initialize(null, null); + nextRegion.initialize(); nextSize = nextRegion.getLargestHStoreSize(); if ((currentSize + nextSize) <= (maxFilesize / 2)) { @@ -328,7 +328,7 @@ class HMerge implements HConstants { root = HRegion.newHRegion(rootTableDir, hlog, fs, conf, HRegionInfo.ROOT_REGIONINFO, null); - root.initialize(null, null); + root.initialize(); Scan scan = new Scan(); scan.addColumn(CATALOG_FAMILY, REGIONINFO_QUALIFIER); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6b6d098..83200cb 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -59,6 +60,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; +import java.io.EOFException; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.lang.reflect.Constructor; @@ -217,7 +219,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock(); private final Object splitLock = new Object(); - private long minSequenceId; private boolean splitRequest; private final ReadWriteConsistencyControl rwcc = @@ -300,51 +301,42 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } /** - * Initialize this region and get it ready to roll. - * Called after construction. + * Initialize this region. + * @return What the next sequence (edit) id should be. + * @throws IOException e + */ + public long initialize() throws IOException { + return initialize(null); + } + + /** + * Initialize this region. * - * @param initialFiles path - * @param reporter progressable + * @param reporter Tickled every so often if initialize is taking a while. + * @return What the next sequence (edit) id should be. * @throws IOException e */ - public void initialize(Path initialFiles, final Progressable reporter) + public long initialize(final Progressable reporter) throws IOException { - Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME); - - moveInitialFilesIntoPlace(this.fs, initialFiles, this.regiondir); - // Write HRI to a file in case we need to recover .META. checkRegioninfoOnFilesystem(); - // Load in all the HStores. + // Load in all the HStores. Get min and max seqids across all families. long maxSeqId = -1; - long minSeqIdToRecover = Integer.MAX_VALUE; - + long minSeqId = Integer.MAX_VALUE; for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) { - Store store = instantiateHStore(this.basedir, c, oldLogFile, reporter); + Store store = instantiateHStore(this.basedir, c); this.stores.put(c.getName(), store); long storeSeqId = store.getMaxSequenceId(); if (storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } - - long storeSeqIdBeforeRecovery = store.getMaxSeqIdBeforeLogRecovery(); - if (storeSeqIdBeforeRecovery < minSeqIdToRecover) { - minSeqIdToRecover = storeSeqIdBeforeRecovery; + if (minSeqId < storeSeqId) { + minSeqId = storeSeqId; } } - - // Play log if one. Delete when done. - doReconstructionLog(oldLogFile, minSeqIdToRecover, maxSeqId, reporter); - if (fs.exists(oldLogFile)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting old log file: " + oldLogFile); - } - fs.delete(oldLogFile, false); - } - - // Add one to the current maximum sequence id so new edits are beyond. - this.minSequenceId = maxSeqId + 1; + // Recover any edits if available. + long seqid = replayRecoveredEdits(this.regiondir, minSeqId, maxSeqId, reporter); // Get rid of any splits or merges that were lost in-progress. Clean out // these directories here on open. We may be opening a region that was @@ -360,8 +352,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // HRegion is ready to go! this.writestate.compacting = false; this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); - LOG.info("region " + this + - " available; sequence id is " + this.minSequenceId); + LOG.info(this.toString() + " is online; sequenceid=" + seqid); + return seqid + 1; } /* @@ -417,14 +409,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } } - /** - * @return Updates to this region need to have a sequence id that is >= to - * the this number. - */ - long getMinSequenceId() { - return this.minSequenceId; - } - /** @return a HRegionInfo object for this region */ public HRegionInfo getRegionInfo() { return this.regionInfo; @@ -607,7 +591,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } /** @return the last time the region was flushed */ - @SuppressWarnings({"UnusedDeclaration"}) public long getLastFlushTime() { return this.lastFlushTime; } @@ -1274,7 +1257,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ List kvs = e.getValue(); Map kvCount = new TreeMap(Bytes.BYTES_COMPARATOR); - Store store = getStore(family); for (KeyValue kv: kvs) { // Check if time is LATEST, change to time of most recent addition if so // This is expensive. @@ -1710,19 +1692,186 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ return size > this.memstoreFlushSize; } - // Do any reconstruction needed from the log - protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId, - Progressable reporter) + /** + * @param regiondir + * @param minSeqId + * @param maxSeqId + * @param reporter + * @return the new max sequence id or -1 if no log recovered + * @throws UnsupportedEncodingException + * @throws IOException + */ + protected long replayRecoveredEdits(final Path regiondir, + final long minSeqId, final long maxSeqId, final Progressable reporter) + throws UnsupportedEncodingException, IOException { + long seqid = -1; + Path oldedits = new Path(regiondir, HLog.RECOVERED_EDITS); + try { + seqid = replayRecoveredEdits(oldedits, minSeqId, reporter); + } catch (EOFException e) { + // Presume we got here because of lack of append or because we are at the + // last record and did a partial read. Lets keep + LOG.warn("Exception processing recovered edits " + oldedits + + " -- continuing. Presume partial edit at end of file.", e); + } catch (IOException e) { + // Presume we got here because of some HDFS issue. Don't just keep going. + // Fail to open the HStore. Probably means we'll fail over and over + // again until human intervention but alternative has us skipping logs + // and losing edits: HBASE-642. TODO: Revisit. + LOG.warn("Exception processing recovered edits " + oldedits, e); + throw e; + } + // Clean up old edits. + if (this.fs.exists(oldedits)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting recovered edits file: " + oldedits); + } + this.fs.delete(oldedits, false); + } + return seqid; + } + + /* + * Read the edits log put under this region by wal log splitting process. Put + * the recovered edits back up into this region. + * + * We can ignore any log message that has a sequence ID that's equal to or + * lower than minSeqId. (Because we know such log messages are already + * reflected in the HFiles.) + * + * @return the new max sequence id or -1 if no log recovered + */ + private long replayRecoveredEdits(final Path edits, final long minSeqId, + final Progressable reporter) throws UnsupportedEncodingException, IOException { - // Nothing to do (Replaying is done in HStores) - // Used by subclasses; e.g. THBase. + if (edits == null || !this.fs.exists(edits)) return -1; + if (isZeroLengthThenDelete(this.fs, edits)) return -1; + + long maxSeqIdInLog = -1; + long firstSeqIdInLog = -1; + long skippedEdits = 0; + long editsCount = 0; + HLog.Entry entry; + Store store = null; + // Get map of family name to maximum sequence id. Do it here up front + // because as we progress, the sequence id can change if we happen to flush + // and we can then skip edits. + Map familyToOriginalMaxSeqId = familyToMaxSeqId(this.stores); + // How many edits to apply before we send a progress report. + int reportInterval = + this.conf.getInt("hbase.hstore.report.interval.edits", 2000); + HLog.Reader logReader = HLog.getReader(this.fs, edits, conf); + try { + // TBD: Need to add an exception handler around logReader.next. + // A transaction now appears as a single edit. If logReader.next() + // returns an exception, then it must be a incomplete/partial + // transaction at the end of the file. Rather than bubble up + // the exception, we should catch it and simply ignore the + // partial transaction during this recovery phase. + while ((entry = logReader.next()) != null) { + HLogKey key = entry.getKey(); + WALEdit val = entry.getEdit(); + if (firstSeqIdInLog == -1) { + firstSeqIdInLog = key.getLogSeqNum(); + } + // Now, figure if we should skip this edit. + maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum()); + if (key.getLogSeqNum() <= minSeqId) { + skippedEdits++; + continue; + } + for (KeyValue kv : val.getKeyValues()) { + // Check this edit is for me. Also, guard against writing the special + // METACOLUMN info such as HBASE::CACHEFLUSH entries + if (kv.matchingFamily(HLog.METAFAMILY) || + !Bytes.equals(key.getRegionName(), this.regionInfo.getRegionName())) { + skippedEdits++; + continue; + } + if (store == null || !kv.matchingFamily(store.getFamily().getName())) { + store = this.stores.get(kv.getFamily()); + } + if (store == null) { + LOG.warn("No family for " + kv); + skippedEdits++; + continue; + } + long storeMaxSeqId = familyToOriginalMaxSeqId.get(store.getFamily().getName()); + if (storeMaxSeqId >= maxSeqIdInLog) { + skippedEdits++; + continue; + } + apply(kv); + editsCount++; + } + + // Every 2k edits, tell the reporter we're making progress. + // Have seen 60k edits taking 3minutes to complete. + if (reporter != null && (editsCount % reportInterval) == 0) { + reporter.progress(); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits + + ", firstSeqIdInLog=" + firstSeqIdInLog + + ", maxSeqIdInLog=" + maxSeqIdInLog); + } + } finally { + logReader.close(); + } + return maxSeqIdInLog; + } + + /* + * @param stores + * @return Map of family name to maximum sequenceid. + */ + private Map familyToMaxSeqId(final Map stores) { + Map map = new TreeMap(Bytes.BYTES_COMPARATOR); + for (Map.Entry e: stores.entrySet()) { + map.put(e.getKey(), e.getValue().getMaxSequenceId()); + } + return map; + } + + /* + * @param kv Apply this value to this region. + * @throws IOException + */ + private void apply(final KeyValue kv) throws IOException { + // This is really expensive to do per edit. Loads of object creation. + // TODO: Optimization. Apply edits batched by family. + Map> map = + new TreeMap>(Bytes.BYTES_COMPARATOR); + List list = new ArrayList(1); + list.add(kv); + map.put(kv.getFamily(), list); + LOG.info("Applied=" + kv); + if (kv.isDelete()) { + delete(map, true); + } else { + put(map, true); + } + } + + /* + * @param fs + * @param p File to check. + * @return True if file was zero-length (if so, we'll delete it). + * @throws IOException + */ + private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p) + throws IOException { + FileStatus stat = fs.getFileStatus(p); + if (stat.getLen() > 0) return false; + LOG.warn("File " + p + " is zero-length, deleting."); + fs.delete(p, false); + return true; } - protected Store instantiateHStore(Path baseDir, - HColumnDescriptor c, Path oldLogFile, Progressable reporter) + protected Store instantiateHStore(Path baseDir, HColumnDescriptor c) throws IOException { - return new Store(baseDir, this, c, this.fs, oldLogFile, - this.conf, reporter); + return new Store(baseDir, this, c, this.fs, this.conf); } /** @@ -2180,7 +2329,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), new Path(regionDir, HREGION_OLDLOGDIR_NAME), conf, null), fs, conf, info, null); - region.initialize(null, null); + region.initialize(); return region; } @@ -2209,9 +2358,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ HRegion r = HRegion.newHRegion( HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()), log, FileSystem.get(conf), conf, info, null); - r.initialize(null, null); + long seqid = r.initialize(); if (log != null) { - log.setSequenceNumber(r.getMinSequenceId()); + log.setSequenceNumber(seqid); } return r; } @@ -2514,7 +2663,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ listPaths(fs, newRegionDir); } HRegion dstRegion = HRegion.newHRegion(basedir, log, fs, conf, newRegionInfo, null); - dstRegion.initialize(null, null); + dstRegion.initialize(); dstRegion.compactStores(); if (LOG.isDebugEnabled()) { LOG.debug("Files for new region"); @@ -2784,7 +2933,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ throw new IOException("Not a known catalog table: " + p.toString()); } try { - region.initialize(null, null); + region.initialize(); if (majorCompact) { region.compactStores(true); } else { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a1baff4..f76e901 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1417,7 +1417,7 @@ public class HRegionServer implements HConstants, HRegionInterface, HRegion region = this.onlineRegions.get(mapKey); if (region == null) { try { - region = instantiateRegion(regionInfo); + region = instantiateRegion(regionInfo, this.hlog); // Startup a compaction early if one is needed, if region has references // or if a store has too many store files if (region.hasReferences() || region.hasTooManyStoreFiles()) { @@ -1436,7 +1436,6 @@ public class HRegionServer implements HConstants, HRegionInterface, } this.lock.writeLock().lock(); try { - this.hlog.setSequenceNumber(region.getMinSequenceId()); this.onlineRegions.put(mapKey, region); } finally { this.lock.writeLock().unlock(); @@ -1445,16 +1444,28 @@ public class HRegionServer implements HConstants, HRegionInterface, reportOpen(regionInfo); } - protected HRegion instantiateRegion(final HRegionInfo regionInfo) + /* + * @param regionInfo RegionInfo for the Region we're to instantiate and + * initialize. + * @param wal Set into here the regions' seqid. + * @return + * @throws IOException + */ + protected HRegion instantiateRegion(final HRegionInfo regionInfo, final HLog wal) throws IOException { HRegion r = HRegion.newHRegion(HTableDescriptor.getTableDir(rootDir, regionInfo .getTableDesc().getName()), this.hlog, this.fs, conf, regionInfo, this.cacheFlusher); - r.initialize(null, new Progressable() { + long seqid = r.initialize(new Progressable() { public void progress() { addProcessingMessage(regionInfo); } }); + // If a wal and if its current seqid is < that of new region, use new + // regions seqid. + if (wal != null) { + if (seqid > wal.getSequenceNumber()) wal.setSequenceNumber(seqid); + } return r; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 131f8c3..bf5360b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -19,6 +19,18 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -30,46 +42,23 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import java.io.EOFException; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.locks.ReentrantReadWriteLock; - /** * A Store holds a column family in a Region. Its a memstore and a set of zero * or more StoreFiles, which stretch backwards over time. @@ -79,28 +68,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * services to manage sets of StoreFiles. One of the most important of those * services is compaction services where files are aggregated once they pass * a configurable threshold. - * - *

The only thing having to do with logs that Store needs to deal with is - * the reconstructionLog. This is a segment of an HRegion's log that might - * NOT be present upon startup. If the param is NULL, there's nothing to do. - * If the param is non-NULL, we need to process the log to reconstruct - * a TreeMap that might not have been written to disk before the process - * died. - * - *

It's assumed that after this constructor returns, the reconstructionLog - * file will be deleted (by whoever has instantiated the Store). * *

Locking and transactions are handled at a higher level. This API should * not be called directly but by an HRegion manager. */ public class Store implements HConstants, HeapSize { static final Log LOG = LogFactory.getLog(Store.class); - /** - * Comparator that looks at columns and compares their family portions. - * Presumes columns have already been checked for presence of delimiter. - * If no delimiter present, presume the buffer holds a store name so no need - * of a delimiter. - */ protected final MemStore memstore; // This stores directory in the filesystem. private final Path homedir; @@ -116,7 +89,6 @@ public class Store implements HConstants, HeapSize { private volatile long storeSize = 0L; private final Object flushLock = new Object(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - final byte [] storeName; private final String storeNameStr; private final boolean inMemory; @@ -131,14 +103,6 @@ public class Store implements HConstants, HeapSize { private final CopyOnWriteArraySet changedReaderObservers = new CopyOnWriteArraySet(); - // The most-recent log-seq-ID. The most-recent such ID means we can ignore - // all log messages up to and including that ID (because they're already - // reflected in the TreeMaps). - private volatile long maxSeqId = -1; - - // The most-recent log-seq-id before we recovered from the LOG. - private long maxSeqIdBeforeLogRecovery = -1; - private final Path regionCompactionDir; private final Object compactLock = new Object(); private final int compactionThreshold; @@ -156,21 +120,16 @@ public class Store implements HConstants, HeapSize { * @param region * @param family HColumnDescriptor for this column * @param fs file system object - * @param reconstructionLog existing log file to apply if any * @param conf configuration object - * @param reporter Call on a period so hosting server can report we're - * making progress to master -- otherwise master might think region deploy * failed. Can be null. * @throws IOException */ protected Store(Path basedir, HRegion region, HColumnDescriptor family, - FileSystem fs, Path reconstructionLog, Configuration conf, - final Progressable reporter) + FileSystem fs, Configuration conf) throws IOException { HRegionInfo info = region.regionInfo; this.fs = fs; - this.homedir = getStoreHomedir(basedir, info.getEncodedName(), - family.getName()); + this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName()); if (!this.fs.exists(this.homedir)) { if (!this.fs.mkdirs(this.homedir)) throw new IOException("Failed create of: " + this.homedir.toString()); @@ -196,8 +155,7 @@ public class Store implements HConstants, HeapSize { this.memstore = new MemStore(this.comparator); this.regionCompactionDir = new Path(HRegion.getCompactionDir(basedir), info.getEncodedName()); - this.storeName = this.family.getName(); - this.storeNameStr = Bytes.toString(this.storeName); + this.storeNameStr = Bytes.toString(this.family.getName()); // By default, we compact if an HStore has more than // MIN_COMMITS_FOR_COMPACTION map files @@ -225,28 +183,19 @@ public class Store implements HConstants, HeapSize { this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10); - // loadStoreFiles calculates this.maxSeqId. as side-effect. + // loadStoreFiles and calculate maximum sequence/edit id. this.storefiles = ImmutableList.copyOf(loadStoreFiles()); - - this.maxSeqIdBeforeLogRecovery = this.maxSeqId; - - // Do reconstruction log. - long newId = runReconstructionLog(reconstructionLog, this.maxSeqId, reporter); - if (newId != -1) { - this.maxSeqId = newId; // start with the log id we just recovered. - } } HColumnDescriptor getFamily() { return this.family; } + /** + * @return The maximum sequence id in all store files. + */ long getMaxSequenceId() { - return this.maxSeqId; - } - - long getMaxSeqIdBeforeLogRecovery() { - return maxSeqIdBeforeLogRecovery; + return StoreFile.getMaxSequenceIdInList(this.getStorefiles()); } /** @@ -262,140 +211,6 @@ public class Store implements HConstants, HeapSize { } /* - * Run reconstruction log - * @param reconstructionLog - * @param msid - * @param reporter - * @return the new max sequence id as per the log - * @throws IOException - */ - private long runReconstructionLog(final Path reconstructionLog, - final long msid, final Progressable reporter) - throws IOException { - try { - return doReconstructionLog(reconstructionLog, msid, reporter); - } catch (EOFException e) { - // Presume we got here because of lack of HADOOP-1700; for now keep going - // but this is probably not what we want long term. If we got here there - // has been data-loss - LOG.warn("Exception processing reconstruction log " + reconstructionLog + - " opening " + Bytes.toString(this.storeName) + - " -- continuing. Probably lack-of-HADOOP-1700 causing DATA LOSS!", e); - } catch (IOException e) { - // Presume we got here because of some HDFS issue. Don't just keep going. - // Fail to open the HStore. Probably means we'll fail over and over - // again until human intervention but alternative has us skipping logs - // and losing edits: HBASE-642. - LOG.warn("Exception processing reconstruction log " + reconstructionLog + - " opening " + Bytes.toString(this.storeName), e); - throw e; - } - return -1; - } - - /* - * Read the reconstructionLog and put into memstore. - * - * We can ignore any log message that has a sequence ID that's equal to or - * lower than maxSeqID. (Because we know such log messages are already - * reflected in the HFiles.) - * - * @return the new max sequence id as per the log, or -1 if no log recovered - */ - private long doReconstructionLog(final Path reconstructionLog, - final long maxSeqID, final Progressable reporter) - throws UnsupportedEncodingException, IOException { - if (reconstructionLog == null || !this.fs.exists(reconstructionLog)) { - // Nothing to do. - return -1; - } - FileStatus stat = this.fs.getFileStatus(reconstructionLog); - if (stat.getLen() <= 0) { - LOG.warn("Passed reconstruction log " + reconstructionLog + - " is zero-length. Deleting existing file"); - fs.delete(reconstructionLog, false); - return -1; - } - - // TODO: This could grow large and blow heap out. Need to get it into - // general memory usage accounting. - long maxSeqIdInLog = -1; - long firstSeqIdInLog = -1; - HLog.Reader logReader = HLog.getReader(this.fs, reconstructionLog, conf); - try { - long skippedEdits = 0; - long editsCount = 0; - // How many edits to apply before we send a progress report. - int reportInterval = - this.conf.getInt("hbase.hstore.report.interval.edits", 2000); - HLog.Entry entry; - // TBD: Need to add an exception handler around logReader.next. - // - // A transaction now appears as a single edit. If logReader.next() - // returns an exception, then it must be a incomplete/partial - // transaction at the end of the file. Rather than bubble up - // the exception, we should catch it and simply ignore the - // partial transaction during this recovery phase. - // - while ((entry = logReader.next()) != null) { - HLogKey key = entry.getKey(); - WALEdit val = entry.getEdit(); - if (firstSeqIdInLog == -1) { - firstSeqIdInLog = key.getLogSeqNum(); - } - maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum()); - if (key.getLogSeqNum() <= maxSeqID) { - skippedEdits++; - continue; - } - - for (KeyValue kv : val.getKeyValues()) { - // Check this edit is for me. Also, guard against writing the special - // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (kv.matchingFamily(HLog.METAFAMILY) || - !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) || - !kv.matchingFamily(family.getName())) { - continue; - } - if (kv.isDelete()) { - this.memstore.delete(kv); - } else { - this.memstore.add(kv); - } - editsCount++; - } - - // Every 2k edits, tell the reporter we're making progress. - // Have seen 60k edits taking 3minutes to complete. - if (reporter != null && (editsCount % reportInterval) == 0) { - reporter.progress(); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits + - "; store maxSeqID=" + maxSeqID + - ", firstSeqIdInLog=" + firstSeqIdInLog + - ", maxSeqIdInLog=" + maxSeqIdInLog); - } - } finally { - logReader.close(); - } - - if (maxSeqIdInLog > -1) { - // We read some edits, so we should flush the memstore - StoreFlusher flusher = getStoreFlusher(maxSeqIdInLog); - flusher.prepare(); - flusher.flushCache(); - boolean needCompaction = flusher.commit(); - - if (needCompaction) { - this.compact(false); - } - } - return maxSeqIdInLog; - } - - /* * Creates a series of StoreFile loaded from the given directory. * @throws IOException */ @@ -433,7 +248,6 @@ public class Store implements HConstants, HeapSize { } results.add(curfile); } - maxSeqId = StoreFile.getMaxSequenceIdInList(results); Collections.sort(results, StoreFile.Comparators.FLUSH_TIME); return results; } @@ -589,7 +403,6 @@ public class Store implements HConstants, HeapSize { // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. return internalFlushCache(snapshot, logCacheFlushId); - } /* diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 034690e..309ec47 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -35,7 +35,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.ListIterator; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; @@ -142,6 +141,11 @@ public class HLog implements HConstants, Syncable { private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas final static Object [] NO_ARGS = new Object []{}; + /** Name of file that holds recovered edits written by the wal log splitting + * code, one per region + */ + public static final String RECOVERED_EDITS = "recovered.edits"; + // used to indirectly tell syncFs to force the sync private boolean forceSync = false; @@ -1254,8 +1258,7 @@ public class HLog implements HConstants, Syncable { // Number of logs in a read batch // More means faster but bigger mem consumption //TODO make a note on the conf rename and update hbase-site.xml if needed - int logFilesPerStep = - conf.getInt("hbase.hlog.split.batch.size", 3); + int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3); boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false); @@ -1623,7 +1626,7 @@ public class HLog implements HConstants, Syncable { HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename()); Path regionDir = HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName())); - return new Path(regionDir, HREGION_OLDLOGFILE_NAME); + return new Path(regionDir, RECOVERED_EDITS); } diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java index dbb21d4..dc38b3b 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java @@ -168,7 +168,7 @@ public abstract class HBaseTestCase extends TestCase { HRegion r = new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(), closedRegion.getFilesystem(), closedRegion.getConf(), closedRegion.getRegionInfo(), null); - r.initialize(null, null); + r.initialize(); return r; } diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 2f2f306..cb5637f 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -19,8 +19,11 @@ */ package org.apache.hadoop.hbase; +import static org.junit.Assert.assertTrue; + import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; @@ -48,11 +51,13 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.zookeeper.ZooKeeper; -import static org.junit.Assert.*; /** * Facility for testing HBase. Added as tool to abet junit4 testing. Replaces @@ -827,4 +832,50 @@ public class HBaseTestingUtility { } } -} + + /** + * This method clones the passed c configuration setting a new + * user into the clone. Use it getting new instances of FileSystem. + * @param c Initial configuration + * @param differentiatingSuffix Suffix to differentiate this user from others. + * @return A new configuration instance with a different user set into it. + * @throws IOException + */ + public static Configuration setDifferentUser(final Configuration c, + final String differentiatingSuffix) + throws IOException { + FileSystem currentfs = FileSystem.get(c); + if (!(currentfs instanceof DistributedFileSystem)) return c; + // Else distributed filesystem. Make a new instance per daemon. Below + // code is taken from the AppendTestUtil over in hdfs. + Configuration c2 = new Configuration(c); + String username = UserGroupInformation.getCurrentUGI().getUserName() + + differentiatingSuffix; + UnixUserGroupInformation.saveToConf(c2, + UnixUserGroupInformation.UGI_PROPERTY_NAME, + new UnixUserGroupInformation(username, new String[]{"supergroup"})); + return c2; + } + + /** + * Set soft and hard limits in namenode. + * You'll get a NPE if you call before you've started a minidfscluster. + * @param int soft Soft limit + * @param int hard Hard limit + * @throws NoSuchFieldException + * @throws SecurityException + * @throws IllegalAccessException + * @throws IllegalArgumentException + */ + public void setNameNodeNameSystemLeasePeriod(final int soft, final int hard) + throws SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException { + // TODO: If 0.20 hadoop do one thing, if 0.21 hadoop do another. + // Not available in 0.20 hdfs. Use reflection to make it happen. + + // private NameNode nameNode; + Field field = this.dfsCluster.getClass().getDeclaredField("nameNode"); + field.setAccessible(true); + NameNode nn = (NameNode)field.get(this.dfsCluster); + nn.namesystem.leaseManager.setLeasePeriod(100, 50000); + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 6509c8f..bfb7403 100644 --- a/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -179,7 +179,7 @@ public class MiniHBaseCluster implements HConstants { new UnixUserGroupInformation(username, new String[]{"supergroup"})); return c2; } - + @Override protected void init(MapWritable c) throws IOException { super.init(c); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 43a8a28..a65e947 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -20,30 +20,30 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; + import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.Progressable; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableSet; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentSkipListSet; /** * Test class fosr the Store @@ -96,10 +96,8 @@ public class TestStore extends TestCase { Path logdir = new Path(DIR+methodName+"/logs"); Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME); HColumnDescriptor hcd = new HColumnDescriptor(family); - HBaseConfiguration conf = new HBaseConfiguration(); + Configuration conf = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(conf); - Path reconstructionLog = null; - Progressable reporter = null; fs.delete(logdir, true); @@ -109,8 +107,7 @@ public class TestStore extends TestCase { HLog hlog = new HLog(fs, logdir, oldLogDir, conf, null); HRegion region = new HRegion(basedir, hlog, fs, conf, info, null); - store = new Store(basedir, region, hcd, fs, reconstructionLog, conf, - reporter); + store = new Store(basedir, region, hcd, fs, conf); } @@ -133,7 +130,7 @@ public class TestStore extends TestCase { StoreFile f = this.store.getStorefiles().get(0); Path storedir = f.getPath().getParent(); long seqid = f.getMaxSequenceId(); - HBaseConfiguration c = new HBaseConfiguration(); + Configuration c = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(c); StoreFile.Writer w = StoreFile.createWriter(fs, storedir, StoreFile.DEFAULT_BLOCKSIZE_SMALL); @@ -143,7 +140,7 @@ public class TestStore extends TestCase { // Reopen it... should pick up two files this.store = new Store(storedir.getParent().getParent(), this.store.getHRegion(), - this.store.getFamily(), fs, null, c, null); + this.store.getFamily(), fs, c); System.out.println(this.store.getHRegionInfo().getEncodedName()); assertEquals(2, this.store.getStorefilesCount()); this.store.get(get, qualifiers, result); @@ -318,4 +315,4 @@ public class TestStore extends TestCase { storeFlusher.flushCache(); storeFlusher.commit(); } -} +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java deleted file mode 100644 index 4f0417d..0000000 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.*; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; -import java.util.NavigableSet; -import java.util.concurrent.ConcurrentSkipListSet; - -public class TestStoreReconstruction { - - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private Path dir; - private MiniDFSCluster cluster; - private static final String TABLE = "testtable"; - private static final int TOTAL_EDITS = 10000; - private Configuration conf; - - /** - * @throws java.lang.Exception - */ - @BeforeClass - public static void setUpBeforeClass() throws Exception { - } - - /** - * @throws java.lang.Exception - */ - @AfterClass - public static void tearDownAfterClass() throws Exception { } - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - - conf = TEST_UTIL.getConfiguration(); - cluster = new MiniDFSCluster(conf, 3, true, (String[])null); - // Set the hbase.rootdir to be the home directory in mini dfs. - TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, - this.cluster.getFileSystem().getHomeDirectory().toString()); - this.dir = new Path("/hbase", TABLE); - conf.setInt("hbase.regionserver.flushlogentries", 1); - - if (cluster.getFileSystem().exists(dir)) { - cluster.getFileSystem().delete(dir, true); - } - } - - /** - * @throws java.lang.Exception - */ - @After - public void tearDown() throws Exception {} - - /** - * Create a Store with the result of a HLog split and test we only - * see the good edits - * @throws Exception - */ - @Test - public void runReconstructionLog() throws Exception { - byte[] family = Bytes.toBytes("column"); - HColumnDescriptor hcd = new HColumnDescriptor(family); - HTableDescriptor htd = new HTableDescriptor(TABLE); - htd.addFamily(hcd); - HRegionInfo info = new HRegionInfo(htd, null, null, false); - Path oldLogDir = new Path(this.dir, HConstants.HREGION_OLDLOGDIR_NAME); - Path logDir = new Path(this.dir, HConstants.HREGION_LOGDIR_NAME); - HLog log = new HLog(cluster.getFileSystem(), logDir, oldLogDir, conf, null); - HRegion region = new HRegion(dir, log, - cluster.getFileSystem(),conf, info, null); - List result = new ArrayList(); - - // Empty set to get all columns - NavigableSet qualifiers = - new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); - - final byte[] tableName = Bytes.toBytes(TABLE); - final byte[] rowName = tableName; - final byte[] regionName = info.getRegionName(); - - // Add 10 000 edits to HLog on the good family - for (int j = 0; j < TOTAL_EDITS; j++) { - byte[] qualifier = Bytes.toBytes(Integer.toString(j)); - byte[] column = Bytes.toBytes("column:" + Integer.toString(j)); - WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, qualifier, - System.currentTimeMillis(), column)); - log.append(info, tableName, edit, - System.currentTimeMillis()); - } - // Add a cache flush, shouldn't have any effect - long logSeqId = log.startCacheFlush(); - log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion()); - - // Add an edit to another family, should be skipped. - WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, - System.currentTimeMillis(), rowName)); - log.append(info, tableName, edit, - System.currentTimeMillis()); - log.sync(); - - // TODO dont close the file here. - log.close(); - - List splits = HLog.splitLog(new Path(conf.get(HConstants.HBASE_DIR)), - logDir, oldLogDir, cluster.getFileSystem(), conf); - - // Split should generate only 1 file since there's only 1 region - assertEquals(1, splits.size()); - - // Make sure the file exists - assertTrue(cluster.getFileSystem().exists(splits.get(0))); - - // This will run the log reconstruction - Store store = new Store(dir, region, hcd, cluster.getFileSystem(), - splits.get(0), conf, null); - - Get get = new Get(rowName); - store.get(get, qualifiers, result); - // Make sure we only see the good edits - assertEquals(TOTAL_EDITS, result.size()); - } -} diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index 86cf4ea..3fff2fa 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue; import java.io.FileNotFoundException; import java.io.IOException; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -39,15 +38,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.ipc.RemoteException; import org.junit.After; import org.junit.AfterClass; @@ -125,17 +121,7 @@ public class TestHLogSplit { Collections.addAll(regions, "bbb", "ccc"); InstrumentedSequenceFileLogWriter.activateFailure = false; // Set the soft lease for hdfs to be down from default of 5 minutes or so. - // TODO: If 0.20 hadoop do one thing, if 0.21 hadoop do another. - // Not available in 0.20 hdfs - // TEST_UTIL.getDFSCluster().getNamesystem().leaseManager. - // setLeasePeriod(100, 50000); - // Use reflection to get at the 0.20 version of above. - MiniDFSCluster dfsCluster = TEST_UTIL.getDFSCluster(); - // private NameNode nameNode; - Field field = dfsCluster.getClass().getDeclaredField("nameNode"); - field.setAccessible(true); - NameNode nn = (NameNode)field.get(dfsCluster); - nn.namesystem.leaseManager.setLeasePeriod(100, 50000); + TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 50000); } @After @@ -628,7 +614,7 @@ public class TestHLogSplit { return new Path(HRegion.getRegionDir(HTableDescriptor .getTableDir(rootdir, table), HRegionInfo.encodeRegionName(region.getBytes())), - HConstants.HREGION_OLDLOGFILE_NAME); + HLog.RECOVERED_EDITS); } private void corruptHLog(Path path, Corruptions corruption, boolean close, @@ -736,8 +722,8 @@ public class TestHLogSplit { FileStatus[] f2 = fs.listStatus(p2); for (int i=0; i splits = HLog.splitLog(hbaseRootDir, logDir, oldLogDir, newFS, + newConf); + + // Split should generate only 1 file since there's only 1 region + assertEquals(1, splits.size()); + + // Make sure the file exists + assertTrue(newFS.exists(splits.get(0))); + Log.info("Split file=" + splits.get(0)); + + // Open the region. Set memstore small in size so we flush. + Path basedir = new Path(hbaseRootDir, table); + // 100k seems to make for about 4 flushes during HRegion#initialize. + newConf.setInt("hbase.hregion.memstore.flush.size", 1024 * 1); + // Make a new wal for new region. + HLog newWal = new HLog(newFS, logDir, oldLogDir, newConf, null); + try { + TestFlusher flusher = new TestFlusher(); + final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, + flusher); + flusher.r = region; + long seqid = region.initialize(); + // Assert we flushed. + assertTrue(flusher.count > 0); + assertTrue(seqid > wal.getSequenceNumber()); + + Get get = new Get(rowName); + Result result = region.get(get, -1); + // Make sure we only see the good edits + assertEquals(countPerFamily * (htd.getFamilies().size() - 1), result.size()); + } finally { + newWal.close(); + } + } + + // Flusher used in this test. Keep count of how often we are called and + // actually run the flush inside here. + class TestFlusher implements FlushRequester { + private int count = 0; + private HRegion r; + + @Override + public void request(HRegion region) { + count++; + try { + r.flushcache(); + } catch (IOException e) { + throw new RuntimeException("Exception flushing", e); + } + } + } + + // Add edits to a family + private void addEdits (final byte [] tableName, final HRegionInfo hri, + final byte [] rowName, final byte [] family, + final int count, EnvironmentEdge ee, final HLog wal) + throws IOException { + String familyStr = Bytes.toString(family); + for (int j = 0; j < count; j++) { + byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); + byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, qualifierBytes, + ee.currentTimeMillis(), columnBytes)); + wal.append(hri, tableName, edit, ee.currentTimeMillis()); + } + } +} \ No newline at end of file