commit 45763da831dc81744e8f4bfc2289a9b0a4a20ad4 Author: Todd Lipcon Date: Mon May 17 00:06:43 2010 -0700 HBASE-2231. Fencing around compactions diff --git core/src/main/java/org/apache/hadoop/hbase/FaultInjector.java core/src/main/java/org/apache/hadoop/hbase/FaultInjector.java new file mode 100644 index 0000000..e55d268 --- /dev/null +++ core/src/main/java/org/apache/hadoop/hbase/FaultInjector.java @@ -0,0 +1,70 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.EnumMap; + +public class FaultInjector { + static EnumMap injected = + new EnumMap(Point.class); + + public static Fault THROW_IOEXCEPTION = new Fault() { + public void run(Object... args) throws IOException { + throw new IOException("Injected fault"); + } + }; + + public enum Point { + COMPACTION_BEFORE_RENAME, COMPACTION_DELETE_OLD + } + + public static void runPoint(Point point, Object ...args) + throws IOException { + Fault f = injected.get(point); + if (f != null) { + f.run(args); + } + } + + public static void inject(Point p, Fault f) { + injected.put(p, f); + } + + public static void clear() { + injected.clear(); + } + + public static Fault throwOnNthInvocation(final int n) { + return new Fault() { + private int invocation = 0; + @Override + public void run(Object... args) throws IOException { + if (invocation++ == n) { + throw new IOException("Injected fault"); + } + } + }; + } + + public interface Fault { + public void run(Object... args) throws IOException; + } +} diff --git core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 00fafc9..f29c572 100644 --- core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -754,7 +754,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * Clean out any vestiges of previous failed compactions. * @throws IOException */ - private void doRegionCompactionPrep() throws IOException { + protected void doRegionCompactionPrep() throws IOException { doRegionCompactionCleanup(); } diff --git core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 02a6c16..34060e8 100644 --- core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.FaultInjector; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.regionserver.wal.HLog.CompactionMarker; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; @@ -108,7 +110,21 @@ public class Store implements HConstants, HeapSize { private int maxFilesToCompact; private final long desiredMaxFileSize; private volatile long storeSize = 0L; + + /** + * Lock that allows only one flush to be ongoing at a time. + */ private final Object flushLock = new Object(); + + /** + * RWLock for store operations. + * Locked in shared mode when the list of component stores is looked at: + * - all reads/writes to table data + * - checking for split + * Locked in exclusive mode when the list of component stores is modified: + * - closing + * - completing a compaction + */ final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final byte [] storeName; private final String storeNameStr; @@ -350,7 +366,7 @@ public class Store implements HConstants, HeapSize { for (KeyValue kv : val.getKeyValues()) { // Check this edit is for me. Also, guard against writing the special // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (kv.matchingFamily(HLog.METAFAMILY) || + if (kv.matchingFamily(HLog.MarkerEntries.METAFAMILY) || !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) || !kv.matchingFamily(family.getName())) { continue; @@ -958,19 +974,31 @@ public class Store implements HConstants, HeapSize { private StoreFile completeCompaction(final List compactedFiles, final HFile.Writer compactedFile) throws IOException { + + List inputPaths = new ArrayList(); + for (StoreFile f : compactedFiles) { + inputPaths.add(f.getPath()); + } + Path outputPath = (compactedFile != null) ? + compactedFile.getPath() : null; + CompactionMarker marker = new CompactionMarker( + region.getTableDesc().getName(), + region.getRegionName(), + getFamily().getName(), + inputPaths, + outputPath, + getHomeDir()); + + region.getLog().writeCompactionMarker(marker); + // 1. Moving the new files into place -- if there is a new file (may not // be if all cells were expired or deleted). StoreFile result = null; if (compactedFile != null) { - Path p = null; - try { - p = StoreFile.rename(this.fs, compactedFile.getPath(), - StoreFile.getRandomFilename(fs, this.homedir)); - } catch (IOException e) { - LOG.error("Failed move of compacted file " + compactedFile.getPath(), e); - return null; - } - result = new StoreFile(this.fs, p, blockcache, this.conf, this.inMemory); + Path dstPath = moveCompactedFileIntoHomeDir( + fs, compactedFile.getPath(), homedir); + + result = new StoreFile(this.fs, dstPath, blockcache, this.conf, this.inMemory); } this.lock.writeLock().lock(); try { @@ -1000,14 +1028,15 @@ public class Store implements HConstants, HeapSize { notifyChangedReadersObservers(); // Finally, delete old store files. for (StoreFile hsf: compactedFiles) { + FaultInjector.runPoint(FaultInjector.Point.COMPACTION_DELETE_OLD, hsf); hsf.delete(); } } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); LOG.error("Failed replacing compacted files in " + this.storeNameStr + ". Compacted file is " + (result == null? "none": result.toString()) + ". Files replaced " + compactedFiles.toString() + " some of which may have been already removed", e); + throw e; } // 4. Compute new store size this.storeSize = 0L; @@ -1024,6 +1053,40 @@ public class Store implements HConstants, HeapSize { } return result; } + + private static Path moveCompactedFileIntoHomeDir( + FileSystem fs, Path srcPath, Path homeDir) throws IOException { + Path dstPath = StoreFile.getRandomFilename(fs, homeDir); + try { + FaultInjector.runPoint(FaultInjector.Point.COMPACTION_BEFORE_RENAME, + srcPath, dstPath); + + StoreFile.rename(fs, srcPath, dstPath); + } catch (IOException e) { + LOG.error("Failed move of compacted file " + srcPath, e); + throw e; + } + return dstPath; + } + + // TODO doc + public static void completeCompactionMarker( + FileSystem fs, + HLog.CompactionMarker marker) throws IOException { + List inputPaths = marker.getCompactionInput(); + Path compactedFile = marker.getCompactedFile(); + Path homedir = marker.getStoreHomeDir(); + if (compactedFile != null && fs.exists(compactedFile)) { + // We didn't finish moving it + LOG.info("Moving compacted file " + compactedFile + + " into store directory " + homedir); + moveCompactedFileIntoHomeDir(fs, compactedFile, homedir); + } + for (Path p : inputPaths) { + LOG.info("Removing already-compacted file " + p); + fs.delete(p, true); + } + } // //////////////////////////////////////////////////////////////////////////// // Accessors. @@ -1318,6 +1381,10 @@ public class Store implements HConstants, HeapSize { return this.storefiles.size(); } + Path getHomeDir() { + return this.homedir; + } + /** * @return The size of the store files, in bytes. */ @@ -1424,8 +1491,8 @@ public class Store implements HConstants, HeapSize { // Column matching and version enforcement QueryMatcher matcher = new QueryMatcher(get, this.family.getName(), columns, this.ttl, keyComparator, versionsToReturn(get.getMaxVersions())); - this.lock.readLock().lock(); try { + this.lock.readLock().lock(); // Read from memstore if(this.memstore.get(matcher, result)) { // Received early-out from memstore diff --git core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index c43e5e9..f4f8539 100644 --- core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -19,7 +19,9 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; import java.io.EOFException; import java.io.FileNotFoundException; @@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; @@ -69,8 +72,10 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /** * HLog stores all the edits to the HStore. Its the hbase write-ahead-log @@ -114,8 +119,6 @@ import org.apache.hadoop.io.Writable; public class HLog implements HConstants, Syncable { static final Log LOG = LogFactory.getLog(HLog.class); private static final String HLOG_DATFILE = "hlog.dat."; - public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); - static final byte [] METAROW = Bytes.toBytes("METAROW"); private final FileSystem fs; private final Path dir; private final Configuration conf; @@ -204,13 +207,20 @@ public class HLog implements HConstants, Syncable { */ private final LogSyncer logSyncerThread; - static byte [] COMPLETE_CACHE_FLUSH; - static { - try { - COMPLETE_CACHE_FLUSH = "HBASE::CACHEFLUSH".getBytes(UTF8_ENCODING); - } catch (UnsupportedEncodingException e) { - assert(false); - } + /** + * Special log entries used to signify transitions taken by the + * region server. + */ + abstract public static class MarkerEntries { + public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); + static final byte [] METAROW = Bytes.toBytes("METAROW"); + static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH"); + static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION"); + // TODO this isn't very good + static HTableDescriptor MARKER_TABLE_DESC = + HTableDescriptor.META_TABLEDESC; + static HRegionInfo MARKER_REGIONINFO = + new HRegionInfo(MARKER_TABLE_DESC, null, null); } // For measuring latency of writes @@ -325,6 +335,13 @@ public class HLog implements HConstants, Syncable { } /** + * @return the directory this hlog writes into + */ + public Path getDir() { + return dir; + } + + /** * Called by HRegionServer when it opens a new region to ensure that log * sequence numbers are always greater than the latest sequence number of the * region being brought on-line. @@ -623,12 +640,14 @@ public class HLog implements HConstants, Syncable { public void closeAndDelete() throws IOException { close(); FileStatus[] files = fs.listStatus(this.dir); - for(FileStatus file : files) { - fs.rename(file.getPath(), - getHLogArchivePath(this.oldLogDir, file.getPath())); + if (files != null) { + for(FileStatus file : files) { + fs.rename(file.getPath(), + getHLogArchivePath(this.oldLogDir, file.getPath())); + } + LOG.debug("Moved " + files.length + " log files to " + + FSUtils.getPath(this.oldLogDir)); } - LOG.debug("Moved " + files.length + " log files to " + - FSUtils.getPath(this.oldLogDir)); fs.delete(dir, true); } @@ -725,7 +744,7 @@ public class HLog implements HConstants, Syncable { * * Later, if we sort by these keys, we obtain all the relevant edits for a * given key-range of the HRegion (TODO). Any edits that do not have a - * matching COMPLETE_CACHEFLUSH message can be discarded. + * matching COMPLETE_CACHE_FLUSH message can be discarded. * *

* Logs cannot be restarted once closed, or once the HLog process dies. Each @@ -956,6 +975,7 @@ public class HLog implements HConstants, Syncable { } } + // TODO remove "info" param (unused) protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { if (!this.enabled) { @@ -1016,7 +1036,8 @@ public class HLog implements HConstants, Syncable { /** * Complete the cache flush * - * Protected by cacheFlushLock + * Protected by cacheFlushLock, and releases this lock when done! + * TODO this is awful. * * @param regionName * @param tableName @@ -1033,7 +1054,7 @@ public class HLog implements HConstants, Syncable { } synchronized (updateLock) { long now = System.currentTimeMillis(); - WALEdit edit = completeCacheFlushLogEdit(); + WALEdit edit = createCompleteCacheFlushLogEdit(); HLogKey key = makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()); this.writer.append(new Entry(key, edit)); @@ -1053,9 +1074,31 @@ public class HLog implements HConstants, Syncable { } } - private WALEdit completeCacheFlushLogEdit() { - KeyValue kv = new KeyValue(METAROW, METAFAMILY, null, - System.currentTimeMillis(), COMPLETE_CACHE_FLUSH); + /** + * Write the marker that a compaction has succeeded and is about to be committed. + * This provides info to the HMaster to allow it to recover the compaction if + * this regionserver dies in the middle. It also prevents the compaction from + * finishing if this regionserver has already lost its lease on the log. + */ + public void writeCompactionMarker(CompactionMarker marker) + throws IOException { + byte[] markerBytes = WritableUtils.toByteArray(marker); + KeyValue metaKv = new KeyValue( + MarkerEntries.METAROW, MarkerEntries.METAFAMILY, + MarkerEntries.COMPACTION, + markerBytes); + + WALEdit edits = new WALEdit(); + edits.add(metaKv); + long now = System.currentTimeMillis(); + append(MarkerEntries.MARKER_REGIONINFO, edits, now, true); + LOG.info("Appended compaction marker " + marker); + } + + + private WALEdit createCompleteCacheFlushLogEdit() { + KeyValue kv = new KeyValue(MarkerEntries.METAROW, MarkerEntries.METAFAMILY, null, + System.currentTimeMillis(), MarkerEntries.COMPLETE_CACHE_FLUSH); WALEdit e = new WALEdit(); e.add(kv); return e; @@ -1072,14 +1115,6 @@ public class HLog implements HConstants, Syncable { } /** - * @param family - * @return true if the column is a meta column - */ - public static boolean isMetaFamily(byte [] family) { - return Bytes.equals(METAFAMILY, family); - } - - /** * Split up a bunch of regionserver commit log files that are no longer * being written to, into new files, one per region for region to replay on * startup. Delete the old log files when finished. @@ -1087,7 +1122,7 @@ public class HLog implements HConstants, Syncable { * @param rootDir qualified root directory of the HBase instance * @param srcDir Directory of log files to split: e.g. * ${ROOTDIR}/log_HOST_PORT - * @param oldLogDir + * @param oldLogDir directory into which post-split logs get moved * @param fs FileSystem * @param conf Configuration * @throws IOException @@ -1229,6 +1264,13 @@ public class HLog implements HConstants, Syncable { Entry entry; while ((entry = in.next()) != null) { byte [] regionName = entry.getKey().getRegionName(); + + if (Bytes.equals(regionName, + MarkerEntries.MARKER_REGIONINFO.getRegionName())) { + handleMarkerEntryRecovery(fs, entry.getKey(), entry.getEdit()); + continue; + } + LinkedList queue = logEntries.get(regionName); if (queue == null) { queue = new LinkedList(); @@ -1447,6 +1489,28 @@ public class HLog implements HConstants, Syncable { return splits; } + private static void handleMarkerEntryRecovery( + FileSystem fs, HLogKey key, WALEdit val) throws IOException { + LOG.info("Handling recovery of marker entry: " + key + " val=" + val); + List kvs = val.getKeyValues(); + if (kvs.size() != 1) { + throw new IOException("Invalid marker kv: " + val); + } + KeyValue kv = kvs.get(0); + assert Bytes.equals(kv.getFamily(), MarkerEntries.METAFAMILY); + assert Bytes.equals(kv.getRow(), MarkerEntries.METAROW); + byte[] qual = kv.getQualifier(); + if (Bytes.equals(qual, MarkerEntries.COMPACTION)) { + CompactionMarker compaction = new CompactionMarker(); + compaction.readFields( + new DataInputStream(new ByteArrayInputStream(kv.getValue()))); + LOG.info("Finishing possibly incomplete compaction: " + compaction); + Store.completeCompactionMarker(fs, compaction); + } else { + LOG.error("Unknown log marker entry type: " + Bytes.toString(qual)); + } + } + /* * @param conf * @return True if append enabled and we have the syncFs in our path. @@ -1633,6 +1697,98 @@ public class HLog implements HConstants, Syncable { } /** + * A marker that a compaction has occurred. This marker has enough + * information to allow a recovering regionserver to "finish" the + * compaction if the first fails in the middle. + */ + public static class CompactionMarker implements Writable { + private BytesWritable tableName, regionName, familyName; + private List compactionInput; + private Path compactedFile; + private Path storeHomeDir; + private static final int CUR_VERSION = 1; + + public CompactionMarker() { + tableName = new BytesWritable(); + regionName = new BytesWritable(); + familyName = new BytesWritable(); + } + + public CompactionMarker( + byte[] table, byte[] region, byte[] family, + List compactionInput, Path compactedFile, + Path storeHomeDir) { + tableName = new BytesWritable(table); + regionName = new BytesWritable(region); + familyName = new BytesWritable(family); + this.compactionInput = new ArrayList(compactionInput); + this.compactedFile = compactedFile; + this.storeHomeDir = storeHomeDir; + } + + public List getCompactionInput() { + return compactionInput; + } + + public Path getCompactedFile() { + return compactedFile; + } + + @Override + public void readFields(DataInput in) throws IOException { + int version = in.readInt(); + if (version != 1) { + throw new IOException("Unexpected version: " + version); + } + tableName.readFields(in); + regionName.readFields(in); + familyName.readFields(in); + if (in.readBoolean()) { + compactedFile = new Path(WritableUtils.readString(in)); + } else { + compactedFile = null; + } + storeHomeDir = new Path(WritableUtils.readString(in)); + int numInput = in.readInt(); + compactionInput = new ArrayList(numInput); + for (int i = 0; i < numInput; i++) { + compactionInput.add(new Path(WritableUtils.readString(in))); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(CUR_VERSION); + tableName.write(out); + regionName.write(out); + familyName.write(out); + if (compactedFile != null) { + out.writeBoolean(true); + WritableUtils.writeString(out, compactedFile.toString()); + } else { + out.writeBoolean(false); + } + WritableUtils.writeString(out, storeHomeDir.toString()); + out.writeInt(compactionInput.size()); + for (Path p : compactionInput) { + WritableUtils.writeString(out, p.toString()); + } + } + + public String toString() { + return "CompactionMarker(region=" + + Bytes.toString(regionName.getBytes(), 0, regionName.getLength()) + + ",family=" + + Bytes.toString(familyName.getBytes(), 0, familyName.getLength()) + + ")"; + } + + public Path getStoreHomeDir() { + return storeHomeDir; + } + } + + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. * diff --git core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 03814eb..a78e364 100644 --- core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -409,6 +409,27 @@ public class HBaseTestingUtility { return rowCount; } + public void loadNumericRows(final HTable t, final byte[] f, + int startRow, int endRow) throws IOException { + for (int i = startRow; i < endRow; i++) { + byte[] data = Bytes.toBytes(String.valueOf(i)); + Put put = new Put(data); + put.add(f, null, data); + t.put(put); + } + } + + public int countRows(final HTable table) throws IOException { + Scan scan = new Scan(); + ResultScanner results = table.getScanner(scan); + int count = 0; + for (@SuppressWarnings("unused") Result res : results) { + count++; + } + results.close(); + return count; + } + /** * Creates many regions names "aaa" to "zzz". * diff --git core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 4e290a1..d02a7d9 100644 --- core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -378,7 +379,20 @@ public class MiniHBaseCluster implements HConstants { } return index; } - + + public List findRegionsForTable(byte[] tableName) { + ArrayList ret = new ArrayList(); + for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { + HRegionServer hrs = rst.getRegionServer(); + + for (HRegion region : hrs.getOnlineRegions()) { + if (Bytes.equals(region.getTableDesc().getName(), tableName)) { + ret.add(region); + } + } + } + return ret; + } /** * Add a message to include in the responses send a regionserver when it * checks back in. diff --git core/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java core/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java index aad84ee..dd4b97f 100644 --- core/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java +++ core/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java @@ -98,13 +98,7 @@ public class TestFullLogReconstruction { // Load up the table with simple rows and count them int initialCount = TEST_UTIL.loadTable(table, FAMILY); - Scan scan = new Scan(); - ResultScanner results = table.getScanner(scan); - int count = 0; - for (Result res : results) { - count++; - } - results.close(); + int count = TEST_UTIL.countRows(table); assertEquals(initialCount, count); @@ -113,12 +107,7 @@ public class TestFullLogReconstruction { } TEST_UTIL.expireRegionServerSession(0); - scan = new Scan(); - results = table.getScanner(scan); - int newCount = 0; - for (Result res : results) { - newCount++; - } + int newCount = TEST_UTIL.countRows(table); assertEquals(count, newCount); } } diff --git core/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java core/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java new file mode 100644 index 0000000..093402b --- /dev/null +++ core/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -0,0 +1,237 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.log4j.Level; +import org.junit.Test; + +public class TestIOFencing { + static final Log LOG = LogFactory.getLog(TestIOFencing.class); + static { + ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); + } + + public static class CompactionBlockerRegion extends HRegion { + boolean compactionsBlocked = false; + Object compactionsBlockedLock = new Object(); + + Object compactionWaitingLock = new Object(); + boolean compactionWaiting = false; + + volatile int compactCount = 0; + + public CompactionBlockerRegion(Path basedir, HLog log, FileSystem fs, Configuration conf, + HRegionInfo regionInfo, FlushRequester flushListener) { + super(basedir, log, fs, conf, regionInfo, flushListener); + } + + public void stopCompactions() { + synchronized (compactionsBlockedLock) { + compactionsBlocked = true; + } + } + + public void allowCompactions() { + synchronized (compactionsBlockedLock) { + compactionsBlocked = false; + compactionsBlockedLock.notifyAll(); + } + } + + public void waitForCompactionToBlock() throws InterruptedException { + synchronized (compactionWaitingLock) { + while (!compactionWaiting) { + compactionWaitingLock.wait(); + } + } + } + + @Override + protected void doRegionCompactionPrep() throws IOException { + synchronized (compactionWaitingLock) { + compactionWaiting = true; + compactionWaitingLock.notifyAll(); + } + synchronized (compactionsBlockedLock) { + while (compactionsBlocked) { + try { + compactionsBlockedLock.wait(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } + synchronized (compactionWaitingLock) { + compactionWaiting = false; + compactionWaitingLock.notifyAll(); + } + super.doRegionCompactionPrep(); + } + + @Override + public byte [] compactStores() throws IOException { + try { + return super.compactStores(); + } finally { + compactCount++; + } + } + + public int countStoreFiles() { + int count = 0; + for (Store store : stores.values()) { + count += store.getNumberOfstorefiles(); + } + return count; + } + } + + private final static HBaseTestingUtility + TEST_UTIL = new HBaseTestingUtility(); + + private final static byte[] TABLE_NAME = Bytes.toBytes("tabletest"); + private final static byte[] FAMILY = Bytes.toBytes("family"); + + private static final int FIRST_BATCH_COUNT = 4000; + private static final int SECOND_BATCH_COUNT = 4000; + + @Test + public void testFencingAroundCompaction() throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + c.setClass(HConstants.REGION_IMPL, + CompactionBlockerRegion.class, + HRegion.class); + c.setBoolean("dfs.support.append", true); + // encourage plenty of flushes + c.setLong("hbase.hregion.memstore.flush.size", 200000); + // Only run compaction when we tell it to + c.setInt("hbase.hstore.compactionThreshold", 1000); + c.setLong("hbase.hstore.blockingStoreFiles", 1000); + // Compact quickly after we tell it to! + c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000); + + LOG.info("Starting mini cluster"); + TEST_UTIL.startMiniCluster(1); + CompactionBlockerRegion compactingRegion = null; + + try { + LOG.info("Creating admin"); + HBaseAdmin admin = new HBaseAdmin(c); + LOG.info("Creating table"); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + HTable table = new HTable(TABLE_NAME); + + LOG.info("Loading test table"); + // Load some rows + TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); + + // Find the region + + List testRegions = + TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME); + assertEquals(1, testRegions.size()); + compactingRegion = (CompactionBlockerRegion)testRegions.get(0); + assertTrue(compactingRegion.countStoreFiles() > 1); + byte REGION_NAME[] = compactingRegion.getRegionName(); + + LOG.info("Blocking compactions"); + compactingRegion.stopCompactions(); + + LOG.info("Asking for compaction"); + admin.majorCompact(TABLE_NAME); + + LOG.info("Waiting for compaction to be about to start"); + compactingRegion.waitForCompactionToBlock(); + + LOG.info("Starting a new server"); + RegionServerThread newServerThread = + TEST_UTIL.getMiniHBaseCluster().startRegionServer(); + HRegionServer newServer = newServerThread.getRegionServer(); + + LOG.info("Killing region server ZK lease"); + TEST_UTIL.expireRegionServerSession(0); + + + CompactionBlockerRegion newRegion = null; + long startWaitTime = System.currentTimeMillis(); + while (newRegion == null) { + LOG.info("Waiting for the new server to pick up the region"); + Thread.sleep(1000); + newRegion = + (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); + assertTrue("Timed out waiting for new server to open region", + System.currentTimeMillis() - startWaitTime < 60000); + } + + LOG.info("Allowing compaction to proceed"); + compactingRegion.allowCompactions(); + + while (compactingRegion.compactCount == 0) { + Thread.sleep(1000); + } + + LOG.info("Compaction finished, loading more data"); + + // Now we make sure that the region isn't totally confused + TEST_UTIL.loadNumericRows( + table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); + + admin.majorCompact(TABLE_NAME); + startWaitTime = System.currentTimeMillis(); + while (newRegion.compactCount == 0) { + Thread.sleep(1000); + assertTrue("New region never compacted", + System.currentTimeMillis() - startWaitTime < 30000); + } + + assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, + TEST_UTIL.countRows(table)); + } finally { + if (compactingRegion != null) { + compactingRegion.allowCompactions(); + } + TEST_UTIL.shutdownMiniCluster(); + } + } + +} diff --git core/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionCompletionDuringRecovery.java core/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionCompletionDuringRecovery.java new file mode 100644 index 0000000..f06eec0 --- /dev/null +++ core/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionCompletionDuringRecovery.java @@ -0,0 +1,140 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.FaultInjector; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.MiniDFSCluster; + + +@SuppressWarnings("deprecation") +public class TestCompactionCompletionDuringRecovery extends HBaseTestCase { + static final Log LOG = LogFactory.getLog(TestCompaction.class.getName()); + + private MiniDFSCluster cluster; + private HRegion r; + private Path oldLogPath = new Path("/old-logs"); + + public TestCompactionCompletionDuringRecovery() { + super(); + conf.setBoolean("dfs.support.append", true); + this.cluster = null; + } + + @Override + public void setUp() throws Exception { + this.cluster = new MiniDFSCluster(conf, 2, true, (String[])null); + this.conf.set(HConstants.HBASE_DIR, + this.cluster.getFileSystem().getHomeDirectory().toString()); + super.setUp(); + HTableDescriptor htd = createTableDescriptor(getName()); + this.r = createNewHRegion(htd, null, null); + } + + @Override + public void tearDown() throws Exception { + HLog hlog = r.getLog(); + this.r.close(); + hlog.closeAndDelete(); + if (this.cluster != null) { + shutdownDfs(cluster); + } + super.tearDown(); + } + + private void makeStoreFiles(HRegion r, int numFiles) throws IOException { + Put p = new Put(Bytes.toBytes("row")); + p.add(fam1, Bytes.toBytes("qualifier"), Bytes.toBytes("value")); + for (int i = 0; i < numFiles; i++) { + r.put(p); + r.flushcache(); + } + } + + public void testFaultBeforeRename() throws IOException { + makeStoreFiles(r, 5); + + FaultInjector.inject(FaultInjector.Point.COMPACTION_BEFORE_RENAME, + FaultInjector.THROW_IOEXCEPTION); + try { + Path storeDir = r.getStore(fam1).getHomeDir(); + assertEquals(5, fs.listStatus(storeDir).length); + try { + r.compactStores(true); + fail("Fault did not cause IOE to bubble up!"); + } catch (IOException ioe) { + LOG.info("Expected", ioe); + } + FaultInjector.clear(); + + HLog hlog = r.getLog(); + this.r.close(); + hlog.close(); + + assertEquals(5, fs.listStatus(storeDir).length); + HLog.splitLog(this.testDir, hlog.getDir(), oldLogPath, this.fs, this.conf); + assertEquals(1, fs.listStatus(storeDir).length); + } finally { + FaultInjector.clear(); + } + } + + + public void testFaultBeforeDelete() throws IOException { + makeStoreFiles(r, 5); + + FaultInjector.inject(FaultInjector.Point.COMPACTION_DELETE_OLD, + FaultInjector.throwOnNthInvocation(3)); + try { + Path storeDir = r.getStore(fam1).getHomeDir(); + assertEquals(5, fs.listStatus(storeDir).length); + try { + r.compactStores(true); + fail("Fault did not cause IOE to bubble up!"); + } catch (IOException ioe) { + LOG.info("Expected", ioe); + } + FaultInjector.clear(); + + HLog hlog = r.getLog(); + this.r.close(); + hlog.close(); + + // We now have fewer files, since we got past deleting 2 of them + assertEquals(3, fs.listStatus(storeDir).length); + HLog.splitLog(this.testDir, hlog.getDir(), oldLogPath, this.fs, this.conf); + // Splitting the log finishes us back to the 1 that we expect + assertEquals(1, fs.listStatus(storeDir).length); + } finally { + FaultInjector.clear(); + } + } + +} diff --git core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 76a6b86..674639c 100644 --- core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -307,9 +307,9 @@ public class TestHLog extends HBaseTestCase implements HConstants { assertTrue(Bytes.equals(regionName, key.getRegionName())); assertTrue(Bytes.equals(tableName, key.getTablename())); KeyValue kv = val.getKeyValues().get(0); - assertTrue(Bytes.equals(HLog.METAROW, kv.getRow())); - assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily())); - assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH, + assertTrue(Bytes.equals(HLog.MarkerEntries.METAROW, kv.getRow())); + assertTrue(Bytes.equals(HLog.MarkerEntries.METAFAMILY, kv.getFamily())); + assertEquals(0, Bytes.compareTo(HLog.MarkerEntries.COMPLETE_CACHE_FLUSH, val.getKeyValues().get(0).getValue())); System.out.println(key + " " + val); } @@ -373,9 +373,9 @@ public class TestHLog extends HBaseTestCase implements HConstants { assertTrue(Bytes.equals(hri.getRegionName(), entry.getKey().getRegionName())); assertTrue(Bytes.equals(tableName, entry.getKey().getTablename())); - assertTrue(Bytes.equals(HLog.METAROW, val.getRow())); - assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily())); - assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH, + assertTrue(Bytes.equals(HLog.MarkerEntries.METAROW, val.getRow())); + assertTrue(Bytes.equals(HLog.MarkerEntries.METAFAMILY, val.getFamily())); + assertEquals(0, Bytes.compareTo(HLog.MarkerEntries.COMPLETE_CACHE_FLUSH, val.getValue())); System.out.println(entry.getKey() + " " + val); }