commit a6d4475d3451dc4d46c6c40fc06103b360fd3619 Author: Todd Lipcon Date: Mon Jun 21 23:55:26 2010 -0700 HBASE-2729. Flushes should write to a temporary directory and only move completed files into place diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3b8f095..717272e 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -163,7 +163,6 @@ public class HRegion implements HeapSize { // , Writable{ final Configuration conf; final HRegionInfo regionInfo; final Path regiondir; - private final Path regionCompactionDir; KeyValue.KVComparator comparator; /* @@ -245,7 +244,6 @@ public class HRegion implements HeapSize { // , Writable{ this.fs = null; this.memstoreFlushSize = 0L; this.log = null; - this.regionCompactionDir = null; this.regiondir = null; this.regionInfo = null; this.threadWakeFrequency = 0L; @@ -294,8 +292,6 @@ public class HRegion implements HeapSize { // , Writable{ // Write out region name as string and its encoded name. LOG.debug("Creating region " + this); } - this.regionCompactionDir = - new Path(getCompactionDir(basedir), encodedNameStr); long flushSize = regionInfo.getTableDesc().getMemStoreFlushSize(); if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) { flushSize = conf.getLong("hbase.hregion.memstore.flush.size", @@ -327,6 +323,9 @@ public class HRegion implements HeapSize { // , Writable{ // Write HRI to a file in case we need to recover .META. checkRegioninfoOnFilesystem(); + // Remove temporary data left over from old regions + cleanupTmpDir(); + // Load in all the HStores. Get min and max seqids across all families. long maxSeqId = -1; long minSeqId = Integer.MAX_VALUE; @@ -730,28 +729,25 @@ public class HRegion implements HeapSize { // , Writable{ } /* - * @param dir - * @return compaction directory for the passed in dir - */ - static Path getCompactionDir(final Path dir) { - return new Path(dir, HConstants.HREGION_COMPACTIONDIR_NAME); - } - - /* * Do preparation for pending compaction. - * Clean out any vestiges of previous failed compactions. * @throws IOException */ private void doRegionCompactionPrep() throws IOException { - doRegionCompactionCleanup(); } /* - * Removes the compaction directory for this Store. - * @throws IOException + * Removes the temporary directory for this Store. + */ + private void cleanupTmpDir() throws IOException { + FSUtils.deleteDirectory(this.fs, getTmpDir()); + } + + /** + * Get the temporary diretory for this region. This directory + * will have its contents removed when the region is reopened. */ - private void doRegionCompactionCleanup() throws IOException { - FSUtils.deleteDirectory(this.fs, this.regionCompactionDir); + Path getTmpDir() { + return new Path(this.basedir, "_tmp"); } void setForceMajorCompaction(final boolean b) { @@ -832,7 +828,6 @@ public class HRegion implements HeapSize { // , Writable{ splitRow = ss.getSplitRow(); } } - doRegionCompactionCleanup(); String timeTaken = StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(), startTime); LOG.info("compaction completed on region " + this + " in " + timeTaken); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index e8983c1..2fbe7b7 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.collect.ImmutableList; @@ -103,7 +102,6 @@ public class Store implements HeapSize { private final CopyOnWriteArraySet changedReaderObservers = new CopyOnWriteArraySet(); - private final Path regionCompactionDir; private final Object compactLock = new Object(); private final int compactionThreshold; private final int blocksize; @@ -153,8 +151,6 @@ public class Store implements HeapSize { this.ttl *= 1000; } this.memstore = new MemStore(this.comparator); - this.regionCompactionDir = new Path(HRegion.getCompactionDir(basedir), - info.getEncodedName()); this.storeNameStr = Bytes.toString(this.family.getName()); // By default, we compact if an HStore has more than @@ -207,6 +203,14 @@ public class Store implements HeapSize { return new Path(tabledir, new Path(encodedName, new Path(Bytes.toString(family)))); } + + /** + * Return the directory in which this store stores its + * StoreFiles + */ + public Path getHomedir() { + return homedir; + } /* * Creates a series of StoreFile loaded from the given directory. @@ -322,8 +326,7 @@ public class Store implements HeapSize { if (!srcFs.equals(fs)) { LOG.info("File " + srcPath + " on different filesystem than " + "destination store - moving to this filesystem."); - Path tmpDir = new Path(homedir, "_tmp"); - Path tmpPath = StoreFile.getRandomFilename(fs, tmpDir); + Path tmpPath = getTmpPath(); FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf); LOG.info("Copied to temporary path on dst filesystem: " + tmpPath); srcPath = tmpPath; @@ -355,6 +358,16 @@ public class Store implements HeapSize { } /** + * Get a temporary path in this region. These temporary files + * will get cleaned up when the region is re-opened if they are + * still around. + */ + private Path getTmpPath() throws IOException { + return StoreFile.getRandomFilename( + fs, region.getTmpDir()); + } + + /** * Close all the readers * * We don't need to worry about subsequent requests because the HRegion holds @@ -424,7 +437,7 @@ public class Store implements HeapSize { // if we fail. synchronized (flushLock) { // A. Write the map out to the disk - writer = createWriter(this.homedir, set.size()); + writer = createWriterInTmp(set.size()); int entries = 0; try { for (KeyValue kv: set) { @@ -441,7 +454,13 @@ public class Store implements HeapSize { writer.close(); } } - StoreFile sf = new StoreFile(this.fs, writer.getPath(), blockcache, + + // Write-out finished successfully, move into the right spot + Path dstPath = StoreFile.getUniqueFile(fs, homedir); + LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath); + fs.rename(writer.getPath(), dstPath); + + StoreFile sf = new StoreFile(this.fs, dstPath, blockcache, this.conf, this.family.getBloomFilterType(), this.inMemory); Reader r = sf.createReader(); this.storeSize += r.length(); @@ -456,13 +475,11 @@ public class Store implements HeapSize { } /* - * @return Writer for this store. - * @param basedir Directory to put writer in. - * @throws IOException + * @return Writer for a new StoreFile in the tmp dir. */ - private StoreFile.Writer createWriter(final Path basedir, int maxKeyCount) + private StoreFile.Writer createWriterInTmp(int maxKeyCount) throws IOException { - return StoreFile.createWriter(this.fs, basedir, this.blocksize, + return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize, this.compression, this.comparator, this.conf, this.family.getBloomFilterType(), maxKeyCount); } @@ -570,12 +587,6 @@ public class Store implements HeapSize { return checkSplit(forceSplit); } - if (!fs.exists(this.regionCompactionDir) && - !fs.mkdirs(this.regionCompactionDir)) { - LOG.warn("Mkdir on " + this.regionCompactionDir.toString() + " failed"); - return checkSplit(forceSplit); - } - // HBASE-745, preparing all store file sizes for incremental compacting // selection. int countOfFiles = filesToCompact.size(); @@ -641,7 +652,7 @@ public class Store implements HeapSize { LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() + (references? ", hasReferences=true,": " ") + " into " + - FSUtils.getPath(this.regionCompactionDir) + ", seqid=" + maxId); + region.getTmpDir() + ", seqid=" + maxId); HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, writer); @@ -783,7 +794,7 @@ public class Store implements HeapSize { // output to writer: for (KeyValue kv : kvs) { if (writer == null) { - writer = createWriter(this.regionCompactionDir, maxKeyCount); + writer = createWriterInTmp(maxKeyCount); } writer.append(kv); } @@ -798,7 +809,7 @@ public class Store implements HeapSize { MinorCompactingStoreScanner scanner = null; try { scanner = new MinorCompactingStoreScanner(this, scanners); - writer = createWriter(this.regionCompactionDir, maxKeyCount); + writer = createWriterInTmp(maxKeyCount); while (scanner.next(writer)) { // Nothing to do } diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 9e5ca46..34b8044 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -74,9 +74,6 @@ public class TestCompaction extends HBaseTestCase { super.setUp(); HTableDescriptor htd = createTableDescriptor(getName()); this.r = createNewHRegion(htd, null, null); - this.compactionDir = HRegion.getCompactionDir(this.r.getBaseDir()); - this.regionCompactionDir = new Path(this.compactionDir, - this.r.getRegionInfo().getEncodedName()); } @Override @@ -150,10 +147,6 @@ public class TestCompaction extends HBaseTestCase { // assertEquals(cellValues.length, 3); r.flushcache(); r.compactStores(); - // check compaction dir is exists - assertTrue(this.cluster.getFileSystem().exists(this.compactionDir)); - // check Compaction Dir for this Regions is cleaned up - assertTrue(!this.cluster.getFileSystem().exists(this.regionCompactionDir)); // Always 3 versions if that is what max versions is. byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); // Increment the least significant character so we get to next row. diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index a65e947..b15ae53 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -31,8 +32,15 @@ import java.util.concurrent.ConcurrentSkipListSet; import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -44,11 +52,17 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.security.UnixUserGroupInformation; + +import com.google.common.base.Joiner; /** * Test class fosr the Store */ public class TestStore extends TestCase { + public static final Log LOG = LogFactory.getLog(TestStore.class); + Store store; byte [] table = Bytes.toBytes("table"); byte [] family = Bytes.toBytes("family"); @@ -91,12 +105,16 @@ public class TestStore extends TestCase { } private void init(String methodName) throws IOException { + init(methodName, HBaseConfiguration.create()); + } + + private void init(String methodName, Configuration conf) + throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); Path logdir = new Path(DIR+methodName+"/logs"); Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME); HColumnDescriptor hcd = new HColumnDescriptor(family); - Configuration conf = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(conf); fs.delete(logdir, true); @@ -309,6 +327,93 @@ public class TestStore extends TestCase { } + public void testHandleErrorsInFlush() throws Exception { + LOG.info("Setting up a faulty file system that cannot write"); + + Configuration conf = HBaseConfiguration.create(); + // Set a different UGI so we don't get the same cached LocalFS instance + conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, + "testhandleerrorsinflush,foo"); + // Inject our faulty LocalFileSystem + conf.setClass("fs.file.impl", FaultyFileSystem.class, + FileSystem.class); + // Make sure it worked (above is sensitive to caching details in hadoop core) + FileSystem fs = FileSystem.get(conf); + assertEquals(FaultyFileSystem.class, fs.getClass()); + + // Initialize region + init(getName(), conf); + + LOG.info("Adding some data"); + this.store.add(new KeyValue(row, family, qf1, null)); + this.store.add(new KeyValue(row, family, qf2, null)); + this.store.add(new KeyValue(row, family, qf3, null)); + + LOG.info("Before flush, we should have no files"); + FileStatus[] files = fs.listStatus(store.getHomedir()); + Path[] paths = FileUtil.stat2Paths(files); + System.err.println("Got paths: " + Joiner.on(",").join(paths)); + assertEquals(0, paths.length); + + //flush + try { + LOG.info("Flushing"); + flush(1); + fail("Didn't bubble up IOE!"); + } catch (IOException ioe) { + assertTrue(ioe.getMessage().contains("Fault injected")); + } + + LOG.info("After failed flush, we should still have no files!"); + files = fs.listStatus(store.getHomedir()); + paths = FileUtil.stat2Paths(files); + System.err.println("Got paths: " + Joiner.on(",").join(paths)); + assertEquals(0, paths.length); + } + + + static class FaultyFileSystem extends FilterFileSystem { + List> outStreams = + new ArrayList>(); + private long faultPos = 200; + + public FaultyFileSystem() { + super(new LocalFileSystem()); + System.err.println("Creating faulty!"); + } + + @Override + public FSDataOutputStream create(Path p) throws IOException { + return new FaultyOutputStream(super.create(p), faultPos); + } + + } + + static class FaultyOutputStream extends FSDataOutputStream { + volatile long faultPos = Long.MAX_VALUE; + + public FaultyOutputStream(FSDataOutputStream out, + long faultPos) throws IOException { + super(out, null); + this.faultPos = faultPos; + } + + @Override + public void write(byte[] buf, int offset, int length) throws IOException { + System.err.println("faulty stream write at pos " + getPos()); + injectFault(); + super.write(buf, offset, length); + } + + private void injectFault() throws IOException { + if (getPos() >= faultPos) { + throw new IOException("Fault injected"); + } + } + } + + + private static void flushStore(Store store, long id) throws IOException { StoreFlusher storeFlusher = store.getStoreFlusher(id); storeFlusher.prepare();