diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6b6d098..75691ad 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -59,6 +60,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; +import java.io.EOFException; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.lang.reflect.Constructor; @@ -321,21 +323,22 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ long minSeqIdToRecover = Integer.MAX_VALUE; for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) { - Store store = instantiateHStore(this.basedir, c, oldLogFile, reporter); + Store store = instantiateHStore(this.basedir, c); this.stores.put(c.getName(), store); long storeSeqId = store.getMaxSequenceId(); if (storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } - long storeSeqIdBeforeRecovery = store.getMaxSeqIdBeforeLogRecovery(); - if (storeSeqIdBeforeRecovery < minSeqIdToRecover) { - minSeqIdToRecover = storeSeqIdBeforeRecovery; + long seqid = store.getMaxSequenceId(); + if (seqid < minSeqIdToRecover) { + minSeqIdToRecover = seqid; } } // Play log if one. Delete when done. - doReconstructionLog(oldLogFile, minSeqIdToRecover, maxSeqId, reporter); + long sequenceid = + doReconstructionLog(oldLogFile, minSeqIdToRecover, maxSeqId, reporter); if (fs.exists(oldLogFile)) { if (LOG.isDebugEnabled()) { LOG.debug("Deleting old log file: " + oldLogFile); @@ -344,7 +347,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } // Add one to the current maximum sequence id so new edits are beyond. - this.minSequenceId = maxSeqId + 1; + this.minSequenceId = sequenceid + 1; // Get rid of any splits or merges that were lost in-progress. Clean out // these directories here on open. We may be opening a region that was @@ -607,7 +610,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } /** @return the last time the region was flushed */ - @SuppressWarnings({"UnusedDeclaration"}) public long getLastFlushTime() { return this.lastFlushTime; } @@ -1274,7 +1276,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ List kvs = e.getValue(); Map kvCount = new TreeMap(Bytes.BYTES_COMPARATOR); - Store store = getStore(family); for (KeyValue kv: kvs) { // Check if time is LATEST, change to time of most recent addition if so // This is expensive. @@ -1710,19 +1711,129 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ return size > this.memstoreFlushSize; } - // Do any reconstruction needed from the log - protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId, + /* + * @return the new max sequence id or -1 if no log recovered + */ + protected long doReconstructionLog(Path edits, long minSeqId, long maxSeqId, Progressable reporter) throws UnsupportedEncodingException, IOException { - // Nothing to do (Replaying is done in HStores) - // Used by subclasses; e.g. THBase. + try { + return doReconstructionLog(edits, minSeqId, reporter); + } catch (EOFException e) { + // Presume we got here because of lack of HADOOP-1700; for now keep going + // but this is probably not what we want long term. If we got here there + // has been data-loss + LOG.warn("Exception processing reconstruction log " + edits + + " -- continuing. Probably lack-of-HADOOP-1700 causing DATA LOSS!", e); + } catch (IOException e) { + // Presume we got here because of some HDFS issue. Don't just keep going. + // Fail to open the HStore. Probably means we'll fail over and over + // again until human intervention but alternative has us skipping logs + // and losing edits: HBASE-642. + LOG.warn("Exception processing reconstruction log " + edits, e); + throw e; + } + return -1; + } + + /* + * Read the reconstructionLog and put into memstore. + * + * We can ignore any log message that has a sequence ID that's equal to or + * lower than minSeqId. (Because we know such log messages are already + * reflected in the HFiles.) + * + * @return the new max sequence id or -1 if no log recovered + */ + private long doReconstructionLog(final Path edits, final long minSeqId, + final Progressable reporter) + throws UnsupportedEncodingException, IOException { + if (edits == null || !this.fs.exists(edits)) { + // Nothing to do. + return -1; + } + FileStatus stat = this.fs.getFileStatus(edits); + if (stat.getLen() <= 0) { + LOG.warn("Passed edits log " + edits + " is zero-length, deleting."); + fs.delete(edits, false); + return -1; + } + + long maxSeqIdInLog = -1; + long firstSeqIdInLog = -1; + HLog.Reader logReader = HLog.getReader(this.fs, edits, conf); + try { + long skippedEdits = 0; + long editsCount = 0; + // How many edits to apply before we send a progress report. + int reportInterval = + this.conf.getInt("hbase.hstore.report.interval.edits", 2000); + HLog.Entry entry; + // TBD: Need to add an exception handler around logReader.next. + // + // A transaction now appears as a single edit. If logReader.next() + // returns an exception, then it must be a incomplete/partial + // transaction at the end of the file. Rather than bubble up + // the exception, we should catch it and simply ignore the + // partial transaction during this recovery phase. + // + Store store = null; + while ((entry = logReader.next()) != null) { + HLogKey key = entry.getKey(); + WALEdit val = entry.getEdit(); + if (firstSeqIdInLog == -1) { + firstSeqIdInLog = key.getLogSeqNum(); + } + maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum()); + if (key.getLogSeqNum() <= minSeqId) { + skippedEdits++; + continue; + } + + for (KeyValue kv : val.getKeyValues()) { + // Check this edit is for me. Also, guard against writing the special + // METACOLUMN info such as HBASE::CACHEFLUSH entries + if (kv.matchingFamily(HLog.METAFAMILY) || + !Bytes.equals(key.getRegionName(), this.regionInfo.getRegionName())) { + skippedEdits++; + continue; + } + // TODO: Making too many byte arrays here doing getFamily? + if (store == null || !kv.matchingFamily(store.getFamily().getName())) { + store = this.stores.get(kv.getFamily()); + } + if (store.getMaxSequenceId() >= maxSeqIdInLog) { + skippedEdits++; + continue; + } + if (kv.isDelete()) { + store.delete(kv); + } else { + store.add(kv); + } + editsCount++; + } + + // Every 2k edits, tell the reporter we're making progress. + // Have seen 60k edits taking 3minutes to complete. + if (reporter != null && (editsCount % reportInterval) == 0) { + reporter.progress(); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits + + ", firstSeqIdInLog=" + firstSeqIdInLog + + ", maxSeqIdInLog=" + maxSeqIdInLog); + } + } finally { + logReader.close(); + } + return maxSeqIdInLog; } - protected Store instantiateHStore(Path baseDir, - HColumnDescriptor c, Path oldLogFile, Progressable reporter) + protected Store instantiateHStore(Path baseDir, HColumnDescriptor c) throws IOException { - return new Store(baseDir, this, c, this.fs, oldLogFile, - this.conf, reporter); + return new Store(baseDir, this, c, this.fs, this.conf); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 131f8c3..bf5360b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -19,6 +19,18 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -30,46 +42,23 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import java.io.EOFException; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.locks.ReentrantReadWriteLock; - /** * A Store holds a column family in a Region. Its a memstore and a set of zero * or more StoreFiles, which stretch backwards over time. @@ -79,28 +68,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * services to manage sets of StoreFiles. One of the most important of those * services is compaction services where files are aggregated once they pass * a configurable threshold. - * - *

The only thing having to do with logs that Store needs to deal with is - * the reconstructionLog. This is a segment of an HRegion's log that might - * NOT be present upon startup. If the param is NULL, there's nothing to do. - * If the param is non-NULL, we need to process the log to reconstruct - * a TreeMap that might not have been written to disk before the process - * died. - * - *

It's assumed that after this constructor returns, the reconstructionLog - * file will be deleted (by whoever has instantiated the Store). * *

Locking and transactions are handled at a higher level. This API should * not be called directly but by an HRegion manager. */ public class Store implements HConstants, HeapSize { static final Log LOG = LogFactory.getLog(Store.class); - /** - * Comparator that looks at columns and compares their family portions. - * Presumes columns have already been checked for presence of delimiter. - * If no delimiter present, presume the buffer holds a store name so no need - * of a delimiter. - */ protected final MemStore memstore; // This stores directory in the filesystem. private final Path homedir; @@ -116,7 +89,6 @@ public class Store implements HConstants, HeapSize { private volatile long storeSize = 0L; private final Object flushLock = new Object(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - final byte [] storeName; private final String storeNameStr; private final boolean inMemory; @@ -131,14 +103,6 @@ public class Store implements HConstants, HeapSize { private final CopyOnWriteArraySet changedReaderObservers = new CopyOnWriteArraySet(); - // The most-recent log-seq-ID. The most-recent such ID means we can ignore - // all log messages up to and including that ID (because they're already - // reflected in the TreeMaps). - private volatile long maxSeqId = -1; - - // The most-recent log-seq-id before we recovered from the LOG. - private long maxSeqIdBeforeLogRecovery = -1; - private final Path regionCompactionDir; private final Object compactLock = new Object(); private final int compactionThreshold; @@ -156,21 +120,16 @@ public class Store implements HConstants, HeapSize { * @param region * @param family HColumnDescriptor for this column * @param fs file system object - * @param reconstructionLog existing log file to apply if any * @param conf configuration object - * @param reporter Call on a period so hosting server can report we're - * making progress to master -- otherwise master might think region deploy * failed. Can be null. * @throws IOException */ protected Store(Path basedir, HRegion region, HColumnDescriptor family, - FileSystem fs, Path reconstructionLog, Configuration conf, - final Progressable reporter) + FileSystem fs, Configuration conf) throws IOException { HRegionInfo info = region.regionInfo; this.fs = fs; - this.homedir = getStoreHomedir(basedir, info.getEncodedName(), - family.getName()); + this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName()); if (!this.fs.exists(this.homedir)) { if (!this.fs.mkdirs(this.homedir)) throw new IOException("Failed create of: " + this.homedir.toString()); @@ -196,8 +155,7 @@ public class Store implements HConstants, HeapSize { this.memstore = new MemStore(this.comparator); this.regionCompactionDir = new Path(HRegion.getCompactionDir(basedir), info.getEncodedName()); - this.storeName = this.family.getName(); - this.storeNameStr = Bytes.toString(this.storeName); + this.storeNameStr = Bytes.toString(this.family.getName()); // By default, we compact if an HStore has more than // MIN_COMMITS_FOR_COMPACTION map files @@ -225,28 +183,19 @@ public class Store implements HConstants, HeapSize { this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10); - // loadStoreFiles calculates this.maxSeqId. as side-effect. + // loadStoreFiles and calculate maximum sequence/edit id. this.storefiles = ImmutableList.copyOf(loadStoreFiles()); - - this.maxSeqIdBeforeLogRecovery = this.maxSeqId; - - // Do reconstruction log. - long newId = runReconstructionLog(reconstructionLog, this.maxSeqId, reporter); - if (newId != -1) { - this.maxSeqId = newId; // start with the log id we just recovered. - } } HColumnDescriptor getFamily() { return this.family; } + /** + * @return The maximum sequence id in all store files. + */ long getMaxSequenceId() { - return this.maxSeqId; - } - - long getMaxSeqIdBeforeLogRecovery() { - return maxSeqIdBeforeLogRecovery; + return StoreFile.getMaxSequenceIdInList(this.getStorefiles()); } /** @@ -262,140 +211,6 @@ public class Store implements HConstants, HeapSize { } /* - * Run reconstruction log - * @param reconstructionLog - * @param msid - * @param reporter - * @return the new max sequence id as per the log - * @throws IOException - */ - private long runReconstructionLog(final Path reconstructionLog, - final long msid, final Progressable reporter) - throws IOException { - try { - return doReconstructionLog(reconstructionLog, msid, reporter); - } catch (EOFException e) { - // Presume we got here because of lack of HADOOP-1700; for now keep going - // but this is probably not what we want long term. If we got here there - // has been data-loss - LOG.warn("Exception processing reconstruction log " + reconstructionLog + - " opening " + Bytes.toString(this.storeName) + - " -- continuing. Probably lack-of-HADOOP-1700 causing DATA LOSS!", e); - } catch (IOException e) { - // Presume we got here because of some HDFS issue. Don't just keep going. - // Fail to open the HStore. Probably means we'll fail over and over - // again until human intervention but alternative has us skipping logs - // and losing edits: HBASE-642. - LOG.warn("Exception processing reconstruction log " + reconstructionLog + - " opening " + Bytes.toString(this.storeName), e); - throw e; - } - return -1; - } - - /* - * Read the reconstructionLog and put into memstore. - * - * We can ignore any log message that has a sequence ID that's equal to or - * lower than maxSeqID. (Because we know such log messages are already - * reflected in the HFiles.) - * - * @return the new max sequence id as per the log, or -1 if no log recovered - */ - private long doReconstructionLog(final Path reconstructionLog, - final long maxSeqID, final Progressable reporter) - throws UnsupportedEncodingException, IOException { - if (reconstructionLog == null || !this.fs.exists(reconstructionLog)) { - // Nothing to do. - return -1; - } - FileStatus stat = this.fs.getFileStatus(reconstructionLog); - if (stat.getLen() <= 0) { - LOG.warn("Passed reconstruction log " + reconstructionLog + - " is zero-length. Deleting existing file"); - fs.delete(reconstructionLog, false); - return -1; - } - - // TODO: This could grow large and blow heap out. Need to get it into - // general memory usage accounting. - long maxSeqIdInLog = -1; - long firstSeqIdInLog = -1; - HLog.Reader logReader = HLog.getReader(this.fs, reconstructionLog, conf); - try { - long skippedEdits = 0; - long editsCount = 0; - // How many edits to apply before we send a progress report. - int reportInterval = - this.conf.getInt("hbase.hstore.report.interval.edits", 2000); - HLog.Entry entry; - // TBD: Need to add an exception handler around logReader.next. - // - // A transaction now appears as a single edit. If logReader.next() - // returns an exception, then it must be a incomplete/partial - // transaction at the end of the file. Rather than bubble up - // the exception, we should catch it and simply ignore the - // partial transaction during this recovery phase. - // - while ((entry = logReader.next()) != null) { - HLogKey key = entry.getKey(); - WALEdit val = entry.getEdit(); - if (firstSeqIdInLog == -1) { - firstSeqIdInLog = key.getLogSeqNum(); - } - maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum()); - if (key.getLogSeqNum() <= maxSeqID) { - skippedEdits++; - continue; - } - - for (KeyValue kv : val.getKeyValues()) { - // Check this edit is for me. Also, guard against writing the special - // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (kv.matchingFamily(HLog.METAFAMILY) || - !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) || - !kv.matchingFamily(family.getName())) { - continue; - } - if (kv.isDelete()) { - this.memstore.delete(kv); - } else { - this.memstore.add(kv); - } - editsCount++; - } - - // Every 2k edits, tell the reporter we're making progress. - // Have seen 60k edits taking 3minutes to complete. - if (reporter != null && (editsCount % reportInterval) == 0) { - reporter.progress(); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits + - "; store maxSeqID=" + maxSeqID + - ", firstSeqIdInLog=" + firstSeqIdInLog + - ", maxSeqIdInLog=" + maxSeqIdInLog); - } - } finally { - logReader.close(); - } - - if (maxSeqIdInLog > -1) { - // We read some edits, so we should flush the memstore - StoreFlusher flusher = getStoreFlusher(maxSeqIdInLog); - flusher.prepare(); - flusher.flushCache(); - boolean needCompaction = flusher.commit(); - - if (needCompaction) { - this.compact(false); - } - } - return maxSeqIdInLog; - } - - /* * Creates a series of StoreFile loaded from the given directory. * @throws IOException */ @@ -433,7 +248,6 @@ public class Store implements HConstants, HeapSize { } results.add(curfile); } - maxSeqId = StoreFile.getMaxSequenceIdInList(results); Collections.sort(results, StoreFile.Comparators.FLUSH_TIME); return results; } @@ -589,7 +403,6 @@ public class Store implements HConstants, HeapSize { // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. return internalFlushCache(snapshot, logCacheFlushId); - } /* diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 43a8a28..a65e947 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -20,30 +20,30 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; + import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.Progressable; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableSet; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentSkipListSet; /** * Test class fosr the Store @@ -96,10 +96,8 @@ public class TestStore extends TestCase { Path logdir = new Path(DIR+methodName+"/logs"); Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME); HColumnDescriptor hcd = new HColumnDescriptor(family); - HBaseConfiguration conf = new HBaseConfiguration(); + Configuration conf = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(conf); - Path reconstructionLog = null; - Progressable reporter = null; fs.delete(logdir, true); @@ -109,8 +107,7 @@ public class TestStore extends TestCase { HLog hlog = new HLog(fs, logdir, oldLogDir, conf, null); HRegion region = new HRegion(basedir, hlog, fs, conf, info, null); - store = new Store(basedir, region, hcd, fs, reconstructionLog, conf, - reporter); + store = new Store(basedir, region, hcd, fs, conf); } @@ -133,7 +130,7 @@ public class TestStore extends TestCase { StoreFile f = this.store.getStorefiles().get(0); Path storedir = f.getPath().getParent(); long seqid = f.getMaxSequenceId(); - HBaseConfiguration c = new HBaseConfiguration(); + Configuration c = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(c); StoreFile.Writer w = StoreFile.createWriter(fs, storedir, StoreFile.DEFAULT_BLOCKSIZE_SMALL); @@ -143,7 +140,7 @@ public class TestStore extends TestCase { // Reopen it... should pick up two files this.store = new Store(storedir.getParent().getParent(), this.store.getHRegion(), - this.store.getFamily(), fs, null, c, null); + this.store.getFamily(), fs, c); System.out.println(this.store.getHRegionInfo().getEncodedName()); assertEquals(2, this.store.getStorefilesCount()); this.store.get(get, qualifiers, result); @@ -318,4 +315,4 @@ public class TestStore extends TestCase { storeFlusher.flushCache(); storeFlusher.commit(); } -} +} \ No newline at end of file /** * Copyright 2009 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; /** * Test reconstruction log replay. */ public class TestReconstruction { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private Path dir; private static final String TABLE = "testtable"; private static final int TOTAL_EDITS = 10000; private MiniDFSCluster cluster; /** * @throws java.lang.Exception */ @BeforeClass public static void setUpBeforeClass() throws Exception { } /** * @throws java.lang.Exception */ @AfterClass public static void tearDownAfterClass() throws Exception { } /** * @throws java.lang.Exception */ @Before public void setUp() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setBoolean("dfs.support.append", true); this.cluster = new MiniDFSCluster(conf, 3, true, (String[])null); // Set the hbase.rootdir to be the home directory in mini dfs. TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, cluster.getFileSystem().getHomeDirectory().toString()); this.dir = new Path("/hbase", TABLE); conf.setInt("hbase.regionserver.flushlogentries", 1); if (this.cluster.getFileSystem().exists(dir)) { this.cluster.getFileSystem().delete(dir, true); } } /** * @throws java.lang.Exception */ @After public void tearDown() throws Exception {} /** * Create a Store with the result of a HLog split and test we only * see the good edits * @throws Exception */ @Test public void runReconstructionLog() throws Exception { byte [] family = Bytes.toBytes("column"); HColumnDescriptor hcd = new HColumnDescriptor(family); HTableDescriptor htd = new HTableDescriptor(TABLE); htd.addFamily(hcd); HRegionInfo info = new HRegionInfo(htd, null, null, false); Path oldLogDir = new Path(this.dir, HConstants.HREGION_OLDLOGDIR_NAME); Path logDir = new Path(this.dir, HConstants.HREGION_LOGDIR_NAME); Configuration conf = TEST_UTIL.getConfiguration(); HLog log = new HLog(cluster.getFileSystem(), logDir, oldLogDir, conf, null); final byte[] tableName = Bytes.toBytes(TABLE); final byte[] rowName = tableName; final byte[] regionName = info.getRegionName(); // Add 10 000 edits to HLog on the good family for (int j = 0; j < TOTAL_EDITS; j++) { byte[] qualifier = Bytes.toBytes(Integer.toString(j)); byte[] column = Bytes.toBytes("column:" + Integer.toString(j)); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifier, System.currentTimeMillis(), column)); log.append(info, tableName, edit, System.currentTimeMillis()); } // Add a cache flush, shouldn't have any effect long logSeqId = log.startCacheFlush(); log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion()); // Add an edit to another family, should be skipped. WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, System.currentTimeMillis(), rowName)); log.append(info, tableName, edit, System.currentTimeMillis()); log.sync(); // TODO: Remove this close log.close(); List splits = HLog.splitLog(new Path(conf.get(HConstants.HBASE_DIR)), logDir, oldLogDir, cluster.getFileSystem(), conf); // Split should generate only 1 file since there's only 1 region assertEquals(1, splits.size()); // Make sure the file exists assertTrue(cluster.getFileSystem().exists(splits.get(0))); // Open the region. HRegion region = new HRegion(dir, log, cluster.getFileSystem(), conf, info, null); region.initialize(splits.get(0), null); Get get = new Get(rowName); Result result = region.get(get, -1); // Make sure we only see the good edits assertEquals(TOTAL_EDITS, result.size()); } }