From 04d3ab08b8007630553d449b89925c0594b4a60e Mon Sep 17 00:00:00 2001 From: Pritam Damania Date: Fri, 5 Aug 2011 00:23:32 -0700 Subject: [PATCH] Validate store files after compactions/flushes. --- .../apache/hadoop/hbase/regionserver/Store.java | 30 +++++++++++++- .../hadoop/hbase/regionserver/TestCompaction.java | 43 ++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 5aedc47..4fc61c8 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -512,6 +512,7 @@ public class Store implements HeapSize { // Write-out finished successfully, move into the right spot Path dstPath = StoreFile.getUniqueFile(fs, homedir); + validateStoreFile(writer.getPath()); String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath; LOG.info(msg); status.setStatus("Flushing " + this + ": " + msg); @@ -1053,7 +1054,7 @@ public class Store implements HeapSize { * nothing made it through the compaction. * @throws IOException */ - private StoreFile.Writer compactStore(final Collection filesToCompact, + StoreFile.Writer compactStore(final Collection filesToCompact, final boolean majorCompaction, final long maxId) throws IOException { // calculate maximum key count after compaction (for blooms) @@ -1136,6 +1137,30 @@ public class Store implements HeapSize { return writer; } + /** + * Validates a store file by opening and closing it. In HFileV2 this should + * not be an expensive operation. + * + * @param path the path to the store file + */ + private void validateStoreFile(Path path) + throws IOException { + StoreFile storeFile = null; + try { + storeFile = new StoreFile(this.fs, path, blockcache, this.conf, + this.family.getBloomFilterType(), this.inMemory); + storeFile.createReader(); + } catch (IOException e) { + LOG.error("Failed to open store file : " + path + + ", keeping it in tmp location", e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeReader(); + } + } + } + /* *

It works by processing a compaction that's been written to disk. * @@ -1155,13 +1180,14 @@ public class Store implements HeapSize { * @return StoreFile created. May be null. * @throws IOException */ - private StoreFile completeCompaction(final Collection compactedFiles, + StoreFile completeCompaction(final Collection compactedFiles, final StoreFile.Writer compactedFile) throws IOException { // 1. Moving the new files into place -- if there is a new file (may not // be if all cells were expired or deleted). StoreFile result = null; if (compactedFile != null) { + validateStoreFile(compactedFile.getPath()); Path p = null; try { p = StoreFile.rename(this.fs, compactedFile.getPath(), diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 04a2d13..c4cbfc1 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -21,16 +21,22 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import static org.junit.Assert.fail; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -457,4 +463,41 @@ public class TestCompaction extends HBaseTestCase { "bbb").getBytes(), null); loader.flushcache(); } + + public void testCompactionWithCorruptResult() throws Exception { + int nfiles = 10; + for (int i = 0; i < nfiles; i++) { + createStoreFile(r); + } + Store store = r.getStore(COLUMN_FAMILY); + + List storeFiles = store.getStorefiles(); + long maxId = StoreFile.getMaxSequenceIdInList(storeFiles); + + StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId); + + // Now lets corrupt the compacted file. + FileSystem fs = cluster.getFileSystem(); + Path origPath = compactedFile.getPath(); + Path homedir = store.getHomedir(); + Path dstPath = new Path(homedir, origPath.getName()); + FSDataOutputStream stream = fs.create(origPath, null, true, 512, (short) 3, + (long) 1024, + null); + stream.writeChars("CORRUPT FILE!!!!"); + stream.close(); + + try { + store.completeCompaction(storeFiles, compactedFile); + } catch (Exception e) { + // The complete compaction should fail and the corrupt file should remain + // in the 'tmp' directory; + assert (fs.exists(origPath)); + assert (!fs.exists(dstPath)); + System.out.println("testCompactionWithCorruptResult Passed"); + return; + } + fail("testCompactionWithCorruptResult failed since no exception was" + + "thrown while completing a corrupt file"); + } } -- 1.7.4.4