commit 33e55dd13ec16b94c081bcd1af1ed197068e8692 Author: Todd Lipcon Date: Thu Dec 9 23:33:05 2010 -0800 HBASE-3323 rewrite log splitter diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 5670247..ab1c326 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -160,6 +160,16 @@ public class HLogKey implements WritableComparable { return result; } + public void internTableName(byte []tablename) { + assert Bytes.equals(tablename, this.tablename); + this.tablename = tablename; + } + + public void internEncodedRegionName(byte []encodedRegionName) { + assert Bytes.equals(this.encodedRegionName, encodedRegionName); + this.encodedRegionName = encodedRegionName; + } + public void write(DataOutput out) throws IOException { Bytes.writeByteArray(out, this.encodedRegionName); Bytes.writeByteArray(out, this.tablename); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index ad104e1..7ed75aa 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -25,19 +25,12 @@ import java.io.EOFException; import java.io.IOException; import java.text.ParseException; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RemoteExceptionHandler; @@ -53,10 +47,9 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * This class is responsible for splitting up a bunch of regionserver commit log * files that are no longer being written to, into new files, one per region for @@ -229,30 +222,19 @@ public class HLogSplitter { final Configuration conf) throws IOException { List processedLogs = new ArrayList(); List corruptedLogs = new ArrayList(); - final Map logWriters = Collections - .synchronizedMap(new TreeMap( - Bytes.BYTES_COMPARATOR)); List splits = null; - // Number of logs in a read batch - // More means faster but bigger mem consumption - // TODO make a note on the conf rename and update hbase-site.xml if needed - int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3); boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false); splitSize = 0; + EntrySink entrySink = new EntrySink(128*1024*1024); // 128MB TODO + OutputSink outputSink = new OutputSink(conf, fs, rootDir); + List writerThreads = startWriterThreads(entrySink, outputSink, conf); + try { - int i = -1; - while (i < logfiles.length) { - final Map> editsByRegion = new TreeMap>( - Bytes.BYTES_COMPARATOR); - for (int j = 0; j < logFilesPerStep; j++) { - i++; - if (i == logfiles.length) { - break; - } - FileStatus log = logfiles[i]; + int i = 0; + for (FileStatus log : logfiles) { Path logPath = log.getPath(); long logLength = log.getLen(); splitSize += logLength; @@ -260,7 +242,7 @@ public class HLogSplitter { + ": " + logPath + ", length=" + logLength); try { recoverFileLease(fs, logPath, conf); - parseHLog(log, editsByRegion, fs, conf); + parseHLog(log, entrySink, fs, conf); processedLogs.add(logPath); } catch (EOFException eof) { // truncated files are expected if a RS crashes (see HBASE-2643) @@ -282,8 +264,6 @@ public class HLogSplitter { } } } - } - writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf); } if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) { @@ -291,84 +271,208 @@ public class HLogSplitter { + "HRegionServer was not dead when we started"); } archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); + + finishWriterThreads(writerThreads); } finally { - splits = new ArrayList(logWriters.size()); - for (WriterAndPath wap : logWriters.values()) { - wap.w.close(); - splits.add(wap.p); - LOG.debug("Closed " + wap.p); - } + splits = outputSink.close(); } return splits; } - - /** - * Takes splitLogsMap and concurrently writes them to region directories using a thread pool - * - * @param splitLogsMap map that contains the log splitting result indexed by region - * @param logWriters map that contains a writer per region - * @param rootDir hbase root dir - * @param fs - * @param conf - * @throws IOException - */ - private void writeEditsBatchToRegions( - final Map> splitLogsMap, - final Map logWriters, final Path rootDir, - final FileSystem fs, final Configuration conf) - throws IOException { + private List startWriterThreads(EntrySink entrySink, + OutputSink outSink, + Configuration conf) { // Number of threads to use when log splitting to rewrite the logs. // More means faster but bigger mem consumption. - int logWriterThreads = conf.getInt( + int numThreads = conf.getInt( "hbase.regionserver.hlog.splitlog.writer.threads", 3); - boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); - HashMap writeFutureResult = new HashMap(); - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - builder.setNameFormat("SplitWriter-%1$d"); - ThreadFactory factory = builder.build(); - ThreadPoolExecutor threadPool = - (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory); - for (final byte[] region : splitLogsMap.keySet()) { - Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, - region, fs, conf); - writeFutureResult.put(region, threadPool.submit(splitter)); - } - - threadPool.shutdown(); - // Wait for all threads to terminate - try { - for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) { - String message = "Waiting for hlog writers to terminate, elapsed " + j - * 5 + " seconds"; - if (j < 30) { - LOG.debug(message); - } else { - LOG.info(message); - } + List threads = new ArrayList(); + for (int i = 0; i < numThreads; i++) { + WriterThread t = new WriterThread(conf, i, entrySink, outSink); + t.start(); + threads.add(t); + } + return threads; + } + + + private void finishWriterThreads(List threads) + throws IOException { + LOG.info("Waiting for split writer threads to finish"); + for (WriterThread t : threads) { + t.finish(); + } + for (WriterThread t: threads) { + try { + t.join(); + } catch (InterruptedException ie) { + throw new IOException(ie); } - } catch (InterruptedException ex) { - LOG.warn("Hlog writers were interrupted, possible data loss!"); - if (!skipErrors) { - throw new IOException("Could not finish writing log entries", ex); - // TODO maybe we should fail here regardless if skipErrors is active or not + t.checkForErrors(); + } + LOG.info("Split writers finished"); + } + + class OutputSink { + private final Map logWriters = + new TreeMap(Bytes.BYTES_COMPARATOR); + private final FileSystem fs; + private final Path rootDir; + private final Configuration conf; + + OutputSink(Configuration conf, FileSystem fs, Path rootDir) { + this.conf = conf; + this.fs = fs; + this.rootDir = rootDir; + } + + public List close() throws IOException { + List paths = new ArrayList(); + for (WriterAndPath wap : logWriters.values()) { + wap.w.close(); + paths.add(wap.p); + LOG.info("Closed path " + wap.p); } + return paths; } - for (Map.Entry entry : writeFutureResult.entrySet()) { + WriterAndPath getWriterAndPath( + Entry entry) throws IOException { + + byte region[] = entry.getKey().getEncodedRegionName(); + WriterAndPath ret = logWriters.get(region); + if (ret == null) { + Path regionedits = getRegionSplitEditsPath(fs, + entry, rootDir); + if (regionedits == null) { + // Edits dir doesn't exist + return null; + } + + if (fs.exists(regionedits)) { + LOG.warn("Found existing old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + + regionedits + ", length=" + + fs.getFileStatus(regionedits).getLen()); + if (!fs.delete(regionedits, false)) { + LOG.warn("Failed delete of old " + regionedits); + } + } + Writer w = createWriter(fs, regionedits, conf); + ret = new WriterAndPath(regionedits, w); + logWriters.put(region, ret); + LOG.debug("Creating writer path=" + regionedits + " region=" + + Bytes.toStringBinary(region)); + } + return ret; + } + } + + + static class WriterThread extends Thread { + private volatile boolean shouldStop = false; + private final EntrySink sink; + private volatile Throwable thrown = null; + private final OutputSink outputSink; + + WriterThread(Configuration conf, int i, + EntrySink sink, + OutputSink outputSink) { + // TODO conf unused + super("WriterThread-" + i); + this.sink = sink; + this.outputSink = outputSink; + } + + public void run() { + try { + doRun(); + } catch (Throwable t) { + LOG.error("Error in log splitting write thread", t); + thrown = t; + } + } + + public void checkForErrors() throws IOException { + if (thrown == null) return; + if (thrown instanceof IOException) { + throw (IOException)thrown; + } else { + throw new RuntimeException(thrown); + } + } + + private void doRun() throws IOException { + LOG.debug("Writer thread " + this + ": starting"); + while (true) { + RegionEntryBuffer buffer = sink.getChunkToWrite(); + LOG.debug("Writer thread got buffer: " + buffer); + if (buffer == null) { + if (shouldStop) return; + LOG.debug("Writer thread " + this + ": nothing to write!"); + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + if (!shouldStop) { + throw new RuntimeException(ie); + } + } + continue; + } + try { + writeBuffer(buffer); + } finally { + LOG.debug("Writer thread " + this + ": finished buffer" + buffer); + sink.doneWriting(buffer); + } + } + } + + private void writeBuffer(RegionEntryBuffer buffer) throws IOException { + List entries = buffer.entryBuffer; + byte[] region = buffer.encodedRegionName; + + if (entries.isEmpty()) { + LOG.debug(this.getName() + " got an empty buffer, skipping"); + return; + } + + WriterAndPath wap = null; + + LOG.debug(this.getName() + " got " + entries.size() + " to process"); + long threadTime = System.currentTimeMillis(); try { - entry.getValue().get(); - } catch (ExecutionException e) { - throw (new IOException(e.getCause())); - } catch (InterruptedException e1) { - LOG.warn("Writer for region " + Bytes.toString(entry.getKey()) - + " was interrupted, however the write process should have " - + "finished. Throwing up ", e1); - throw (new IOException(e1.getCause())); + int editsCount = 0; + + for (Entry logEntry : entries) { + if (wap == null) { + wap = outputSink.getWriterAndPath(logEntry); + if (wap == null) { + // getWriterAndPath decided we don't need to write these edits + // Message was already logged + return; + } + } + + wap.w.append(logEntry); + editsCount++; + } + LOG.debug(this.getName() + " Applied " + editsCount + + " total edits to " + Bytes.toStringBinary(region) + " in " + + (System.currentTimeMillis() - threadTime) + "ms"); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal(this.getName() + " Got while writing log entry to log", e); + throw e; } } + + public void finish() { + shouldStop = true; + } } + /** * Moves processed logs to a oldLogDir after successful processing Moves @@ -459,7 +563,7 @@ public class HLogSplitter { * @throws IOException if hlog is corrupted, or can't be open */ private void parseHLog(final FileStatus logfile, - final Map> splitLogsMap, final FileSystem fs, + EntrySink entrySink, final FileSystem fs, final Configuration conf) throws IOException { // Check for possibly empty file. With appends, currently Hadoop reports a @@ -489,15 +593,11 @@ public class HLogSplitter { try { Entry entry; while ((entry = in.next()) != null) { - byte[] region = entry.getKey().getEncodedRegionName(); - LinkedList queue = splitLogsMap.get(region); - if (queue == null) { - queue = new LinkedList(); - splitLogsMap.put(region, queue); - } - queue.addLast(entry); + entrySink.appendEntry(entry); editsCount++; } + } catch (InterruptedException ie) { + throw new RuntimeException(ie); } finally { LOG.debug("Pushed=" + editsCount + " entries from " + path); try { @@ -511,61 +611,6 @@ public class HLogSplitter { } } } - - private Callable createNewSplitter(final Path rootDir, - final Map logWriters, - final Map> logEntries, final byte[] region, - final FileSystem fs, final Configuration conf) { - return new Callable() { - public String getName() { - return "Split writer thread for region " + Bytes.toStringBinary(region); - } - - @Override - public Void call() throws IOException { - LinkedList entries = logEntries.get(region); - LOG.debug(this.getName() + " got " + entries.size() + " to process"); - long threadTime = System.currentTimeMillis(); - try { - int editsCount = 0; - WriterAndPath wap = logWriters.get(region); - for (Entry logEntry : entries) { - if (wap == null) { - Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir); - if (regionedits == null) { - // we already print a message if it's null in getRegionSplitEditsPath - break; - } - if (fs.exists(regionedits)) { - LOG.warn("Found existing old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " - + regionedits + ", length=" - + fs.getFileStatus(regionedits).getLen()); - if (!fs.delete(regionedits, false)) { - LOG.warn("Failed delete of old " + regionedits); - } - } - Writer w = createWriter(fs, regionedits, conf); - wap = new WriterAndPath(regionedits, w); - logWriters.put(region, wap); - LOG.debug("Creating writer path=" + regionedits + " region=" - + Bytes.toStringBinary(region)); - } - wap.w.append(logEntry); - editsCount++; - } - LOG.debug(this.getName() + " Applied " + editsCount - + " total edits to " + Bytes.toStringBinary(region) + " in " - + (System.currentTimeMillis() - threadTime) + "ms"); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.fatal(this.getName() + " Got while writing log entry to log", e); - throw e; - } - return null; - } - }; - } /** * Create a new {@link Writer} for writing log splits. @@ -576,7 +621,7 @@ public class HLogSplitter { * @return A new Writer instance * @throws IOException */ - protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf) + protected Writer(FileSystem fs, Path logfile, Configuration conf) throws IOException { return HLog.createWriter(fs, logfile, conf); } @@ -595,4 +640,108 @@ public class HLogSplitter { return HLog.getReader(fs, curLogFile, conf); } + + + static class RegionEntryBuffer implements HeapSize { + long heapInBuffer = 0; + List entryBuffer; + byte[] tableName; + byte[] encodedRegionName; + + RegionEntryBuffer(byte[] table, byte[] region) { + this.tableName = table; + this.encodedRegionName = region; + this.entryBuffer = new LinkedList(); + } + + long appendEntry(Entry entry) { + internify(entry); + entryBuffer.add(entry); + long incrHeap = entry.getEdit().heapSize() + + ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers + 0; // TODO linkedlist entry + heapInBuffer += incrHeap; + return incrHeap; + } + + private void internify(Entry entry) { + HLogKey k = entry.getKey(); + k.internTableName(this.tableName); + k.internEncodedRegionName(this.encodedRegionName); + } + + public long heapSize() { + return heapInBuffer; + } + } + + static class EntrySink { + Map buffers = + new TreeMap(Bytes.BYTES_COMPARATOR); + + /* Track which regions are currently in the middle of writing. We don't allow + an IO thread to pick up bytes from a region if we're already writing + data for that region in a different IO thread. */ + Set currentlyWriting = new TreeSet(Bytes.BYTES_COMPARATOR); + + long totalBuffered = 0; + long maxHeapUsage; + + public EntrySink(long maxHeapUsage) { + this.maxHeapUsage = maxHeapUsage; + } + + synchronized void appendEntry(Entry entry) throws InterruptedException { + HLogKey key = entry.getKey(); + RegionEntryBuffer buffer = buffers.get(key.getEncodedRegionName()); + if (buffer == null) { + buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName()); + buffers.put(key.getEncodedRegionName(), buffer); + } + long incrHeap = buffer.appendEntry(entry); + totalBuffered += incrHeap; + + // If we crossed the chunk threshold + while (totalBuffered > maxHeapUsage) { + LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads"); + this.wait(3000); + } + } + + synchronized RegionEntryBuffer getChunkToWrite() { + long biggestSize=0; + byte[] biggestBufferKey=null; + + for (Map.Entry entry : buffers.entrySet()) { + long size = entry.getValue().heapSize(); + if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) { + biggestSize = size; + biggestBufferKey = entry.getKey(); + } + } + if (biggestBufferKey == null) { + return null; + } + + RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); + currentlyWriting.add(biggestBufferKey); + LOG.debug("IO Thread " + Thread.currentThread() + " picked up " + biggestSize + " bytes of data"); + return buffer; + } + + synchronized void doneWriting(RegionEntryBuffer buffer) { + boolean removed = currentlyWriting.remove(buffer.encodedRegionName); + assert removed; + long size = buffer.heapSize(); + totalBuffered -= size; + LOG.debug("IO Thread " + Thread.currentThread() + " reported having finished writing " + size + " bytes"); + notify(); + } + + synchronized boolean isRegionCurrentlyWriting(byte[] region) { + return currentlyWriting.contains(region); + } + } + + } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 52c2446..e1117ef 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -66,7 +67,7 @@ import org.apache.hadoop.io.Writable; * is an old style KeyValue or the new style WALEdit. * */ -public class WALEdit implements Writable { +public class WALEdit implements Writable, HeapSize { private final int VERSION_2 = -1; @@ -154,7 +155,19 @@ public class WALEdit implements Writable { out.writeInt(scopes.get(key)); } } + } + public long heapSize() { + long ret = 0; + for (KeyValue kv : kvs) { + ret += kv.heapSize(); + } + if (scopes != null) { + ret += ClassSize.TREEMAP; + ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY); + // TODO this isn't quite right, need help here + } + return ret; } public String toString() { diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java index 01154ab..3eb8b75 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java @@ -28,12 +28,19 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValueTestUtil; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntrySink; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; /** * Simple testing of a few HLog methods. */ public class TestHLogMethods { + private static final byte[] TEST_REGION = Bytes.toBytes("test_region");; + private static final byte[] TEST_TABLE = Bytes.toBytes("test_table"); + private final HBaseTestingUtility util = new HBaseTestingUtility(); /** @@ -84,4 +91,67 @@ public class TestHLogMethods { FSDataOutputStream fdos = fs.create(new Path(testdir, name), true); fdos.close(); } -} \ No newline at end of file + + @Test + public void testRegionEntryBuffer() throws Exception { + HLogSplitter.RegionEntryBuffer reb = new HLogSplitter.RegionEntryBuffer( + TEST_TABLE, TEST_REGION); + assertEquals(0, reb.heapSize()); + + reb.appendEntry(createTestLogEntry(1)); + assertTrue(reb.heapSize() > 0); + } + + @Test + public void testEntrySink() throws Exception { + // First make a sink that allows 1MB but only put 1000 entries in + EntrySink sink = new EntrySink(1*1024*1024); + for (int i = 0; i < 1000; i++) { + HLog.Entry entry = createTestLogEntry(i); + sink.appendEntry(entry); + } + + assertTrue(sink.totalBuffered > 0); + long amountInChunk = sink.totalBuffered; + // Get a chunk + RegionEntryBuffer chunk = sink.getChunkToWrite(); + assertEquals(chunk.heapSize(), amountInChunk); + + // Make sure it got marked that a thread is "working on this" + assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION)); + + // Insert some more entries + for (int i = 0; i < 500; i++) { + HLog.Entry entry = createTestLogEntry(i); + sink.appendEntry(entry); + } + // Asking for another chunk shouldn't work since the first one + // is still writing + assertNull(sink.getChunkToWrite()); + + // If we say we're done writing the first chunk, then we should be able + // to get the second + sink.doneWriting(chunk); + + RegionEntryBuffer chunk2 = sink.getChunkToWrite(); + assertNotNull(chunk2); + assertNotSame(chunk, chunk2); + long amountInChunk2 = sink.totalBuffered; + // The second chunk had fewer rows than the first + assertTrue(amountInChunk2 < amountInChunk); + + sink.doneWriting(chunk2); + assertEquals(0, sink.totalBuffered); + } + + private HLog.Entry createTestLogEntry(int i) { + long seq = i; + long now = i * 1000; + + WALEdit edit = new WALEdit(); + edit.add(KeyValueTestUtil.create("row", "fam", "qual", 1234, "val")); + HLogKey key = new HLogKey(TEST_REGION, TEST_TABLE, seq, now); + HLog.Entry entry = new HLog.Entry(key, edit); + return entry; + } +} diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index e300dcc..3aefd7e 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -553,7 +553,7 @@ public class TestHLogSplit { logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); } catch (IOException e) { - assertEquals("java.io.IOException: This exception is instrumented and should only be thrown for testing", e.getMessage()); + assertEquals("This exception is instrumented and should only be thrown for testing", e.getMessage()); throw e; } finally { InstrumentedSequenceFileLogWriter.activateFailure = false;