From 983031c6026c89618f98fd062b5444456b3cb049 Mon Sep 17 00:00:00 2001 From: Pritam Damania Date: Fri, 5 Aug 2011 00:23:32 -0700 Subject: [PATCH] Validate store files. --- .../hadoop/hbase/regionserver/HRegionServer.java | 10 ++- .../apache/hadoop/hbase/regionserver/Store.java | 31 +++++++- .../java/org/apache/hadoop/hbase/util/FSUtils.java | 24 ++++-- .../hadoop/hbase/regionserver/TestCompaction.java | 44 ++++++++++ .../TestHRegionServerFileSystemFailure.java | 87 ++++++++++++++++++++ 5 files changed, 187 insertions(+), 9 deletions(-) create mode 100644 src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerFileSystemFailure.java diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 23225d7..687675c 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1054,9 +1054,17 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public boolean checkFileSystem() { if (this.fsOk && this.fs != null) { try { - FSUtils.checkFileSystemAvailable(this.fs); + FSUtils.checkFileSystemAvailable(this.fs, false); } catch (IOException e) { abort("File System not available", e); + // Wait for all threads to exit cleanly. + join(); + // Finally attempt to close the Filesystem, to flush out any open streams. + try { + this.fs.close(); + } catch (IOException ie) { + LOG.error("Could not close FileSystem", ie); + } this.fsOk = false; } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 5aedc47..31b1b3a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -512,6 +512,7 @@ public class Store implements HeapSize { // Write-out finished successfully, move into the right spot Path dstPath = StoreFile.getUniqueFile(fs, homedir); + validateStoreFile(writer.getPath()); String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath; LOG.info(msg); status.setStatus("Flushing " + this + ": " + msg); @@ -1053,7 +1054,7 @@ public class Store implements HeapSize { * nothing made it through the compaction. * @throws IOException */ - private StoreFile.Writer compactStore(final Collection filesToCompact, + StoreFile.Writer compactStore(final Collection filesToCompact, final boolean majorCompaction, final long maxId) throws IOException { // calculate maximum key count after compaction (for blooms) @@ -1136,6 +1137,31 @@ public class Store implements HeapSize { return writer; } + /** + * Validates a store file by opening and closing it. In HFileV2 this should + * not be an expensive operation. + * + * @param path + * the path to the store file + */ + private void validateStoreFile(Path path) + throws IOException { + StoreFile storeFile = null; + try { + storeFile = new StoreFile(this.fs, path, blockcache, this.conf, + this.family.getBloomFilterType(), this.inMemory); + storeFile.createReader(); + } catch (IOException e) { + LOG.error("Failed to open store file : " + path + + ", keeping it in tmp location", e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeReader(); + } + } + } + /* *

It works by processing a compaction that's been written to disk. * @@ -1155,13 +1181,14 @@ public class Store implements HeapSize { * @return StoreFile created. May be null. * @throws IOException */ - private StoreFile completeCompaction(final Collection compactedFiles, + StoreFile completeCompaction(final Collection compactedFiles, final StoreFile.Writer compactedFile) throws IOException { // 1. Moving the new files into place -- if there is a new file (may not // be if all cells were expired or deleted). StoreFile result = null; if (compactedFile != null) { + validateStoreFile(compactedFile.getPath()); Path p = null; try { p = StoreFile.rename(this.fs, compactedFile.getPath(), diff --git a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 3c361de..b1434e5 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -111,13 +111,23 @@ public class FSUtils { return p; } + public static void checkFileSystemAvailable(final FileSystem fs) + throws IOException { + checkFileSystemAvailable(fs, true); + } + /** * Checks to see if the specified file system is available * - * @param fs filesystem - * @throws IOException e + * @param fs + * filesystem + * @param shutdown + * whether or not to shutdown the filesystem. + * @throws IOException + * e */ - public static void checkFileSystemAvailable(final FileSystem fs) + public static void checkFileSystemAvailable(final FileSystem fs, + boolean shutdown) throws IOException { if (!(fs instanceof DistributedFileSystem)) { return; @@ -131,10 +141,12 @@ public class FSUtils { } catch (IOException e) { exception = RemoteExceptionHandler.checkIOException(e); } - try { - fs.close(); - } catch (Exception e) { + if (shutdown) { + try { + fs.close(); + } catch (Exception e) { LOG.error("file system close failed: ", e); + } } IOException io = new IOException("File system is not available"); io.initCause(exception); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 04a2d13..dd01109 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -21,16 +21,22 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import static org.junit.Assert.fail; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -457,4 +463,42 @@ public class TestCompaction extends HBaseTestCase { "bbb").getBytes(), null); loader.flushcache(); } + + public void testCompactionWithCorruptResult() throws Exception { + int nfiles = 10; + for (int i = 0; i < nfiles; i++) { + createStoreFile(r); + } + Store store = r.getStore(COLUMN_FAMILY); + + List storeFiles = store.getStorefiles(); + long maxId = StoreFile.getMaxSequenceIdInList(storeFiles); + + StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId); + + // Now lets corrupt the compacted file. + FileSystem fs = cluster.getFileSystem(); + Path origPath = compactedFile.getPath(); + Path homedir = store.getHomedir(); + Path dstPath = new Path(homedir, origPath.getName()); + FSDataOutputStream stream = fs.create(origPath, null, true, 512, (short) 3, + (long) 1024, + null); + stream.writeChars("CORRUPT FILE!!!!"); + stream.close(); + + try { + store.completeCompaction(storeFiles, compactedFile); + } catch (Exception e) { + // The complete compaction should fail and the corrupt file should remain + // in the 'tmp' directory; + assert (fs.exists(origPath)); + assert (!fs.exists(dstPath)); + e.printStackTrace(); + System.out.println("testCompactionWithCorruptResult Passed"); + return; + } + fail("testCompactionWithCorruptResult failed since no exception was" + + "thrown while completing a corrupt file"); + } } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerFileSystemFailure.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerFileSystemFailure.java new file mode 100644 index 0000000..845b17f --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerFileSystemFailure.java @@ -0,0 +1,87 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; + +import static org.junit.Assert.*; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestHRegionServerFileSystemFailure { + private static final Log LOG = LogFactory + .getLog(TestHRegionServerFileSystemFailure.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte[][] FAMILIES = { Bytes.toBytes("f1"), + Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4") }; + private static final int nLoaders = 10; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private static class TableLoader extends Thread { + private final HTable table; + + public TableLoader(HTable table) { + this.table = table; + } + + @Override + public void run() { + while (true) { + try { + for (int i = 0; i < FAMILIES.length; i++) { + byte[] columnFamily = FAMILIES[i]; + TEST_UTIL.loadTable(table, columnFamily); + } + } catch (IOException e) { + LOG.warn(e); + break; + } + } + } + } + + @Test + public void testHRegionServerFileSystemFailure() throws Exception { + // Build some data. + byte[] tableName = Bytes.toBytes("testCloseHRegion"); + TEST_UTIL.createTable(tableName, FAMILIES); + HTable table = new HTable(tableName); + for (int i = 0; i < FAMILIES.length; i++) { + byte[] columnFamily = FAMILIES[i]; + TEST_UTIL.createMultiRegions(table, columnFamily); + } + + for (int i = 0; i < nLoaders; i++) { + new TableLoader(table).start(); + } + + // Wait for loaders to build up some data. + Thread.sleep(10000); + + // Pick a regionserver. + Configuration conf = TEST_UTIL.getConfiguration(); + HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0); + + // Bring down HDFS. + TEST_UTIL.shutdownMiniDFSCluster(); + + // Verify checkFileSystem returns false and doesn't throw Exceptions. + assertFalse(server.checkFileSystem()); + } +} -- 1.7.4.4