commit 121687818d35d9c1bf4a42aea9e75a89a404de1d Author: Todd Lipcon Date: Thu Dec 9 23:33:05 2010 -0800 HBASE-3323 rewrite log splitter diff --git src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index ea3924a..b90b25b 100644 --- src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -189,8 +189,9 @@ public class MasterFileSystem { long splitTime = 0, splitLogSize = 0; Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName)); try { - HLogSplitter splitter = HLogSplitter.createLogSplitter(conf); - splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf); + HLogSplitter splitter = HLogSplitter.createLogSplitter( + conf, rootdir, logDir, oldLogDir, this.fs); + splitter.splitLog(); splitTime = splitter.getTime(); splitLogSize = splitter.getSize(); } catch (IOException e) { diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index acaa770..6acfe6f 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -1438,8 +1438,9 @@ public class HLog implements Syncable { throw new IOException(p + " is not a directory"); } - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(baseDir, p, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter( + conf, baseDir, p, oldLogDir, fs); + logSplitter.splitLog(); } /** diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 5670247..5cb31b4 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -160,6 +160,32 @@ public class HLogKey implements WritableComparable { return result; } + /** + * Drop this instance's tablename byte array and instead + * hold a reference to the provided tablename. This is not + * meant to be a general purpose setter - it's only used + * to collapse references to conserve memory. + */ + void internTableName(byte []tablename) { + // We should not use this as a setter - only to swap + // in a new reference to the same table name. + assert Bytes.equals(tablename, this.tablename); + this.tablename = tablename; + } + + /** + * Drop this instance's region name byte array and instead + * hold a reference to the provided region name. This is not + * meant to be a general purpose setter - it's only used + * to collapse references to conserve memory. + */ + void internEncodedRegionName(byte []encodedRegionName) { + // We should not use this as a setter - only to swap + // in a new reference to the same table name. + assert Bytes.equals(this.encodedRegionName, encodedRegionName); + this.encodedRegionName = encodedRegionName; + } + public void write(DataOutput out) throws IOException { Bytes.writeByteArray(out, this.encodedRegionName); Bytes.writeByteArray(out, this.tablename); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index ad104e1..b3176c1 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -23,21 +23,18 @@ import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease; import java.io.EOFException; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RemoteExceptionHandler; @@ -53,9 +51,11 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.io.MultipleIOException; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * This class is responsible for splitting up a bunch of regionserver commit log @@ -66,74 +66,115 @@ public class HLogSplitter { private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl"; - static final Log LOG = LogFactory.getLog(HLogSplitter.class); - - private long splitTime = 0; - private long splitSize = 0; - /** * Name of file that holds recovered edits written by the wal log splitting * code, one per region */ public static final String RECOVERED_EDITS = "recovered.edits"; + + static final Log LOG = LogFactory.getLog(HLogSplitter.class); + + private boolean hasSplit = false; + private long splitTime = 0; + private long splitSize = 0; + + + // Parameters for split process + protected final Path rootDir; + protected final Path srcDir; + protected final Path oldLogDir; + protected final FileSystem fs; + protected final Configuration conf; + + // Major subcomponents of the split process. + // These are separated into inner classes to make testing easier. + OutputSink outputSink; + EntryBuffers entryBuffers; + + // If an exception is thrown by one of the other threads, it will be + // stored here. + protected AtomicReference thrown = new AtomicReference(); + + // Wait/notify for when data has been produced by the reader thread, + // consumed by the reader thread, or an exception occurred + Object dataAvailable = new Object(); + + /** * Create a new HLogSplitter using the given {@link Configuration} and the * hbase.hlog.splitter.impl property to derived the instance * class to use. - * - * @param conf - * @return New HLogSplitter instance + * + * @param rootDir hbase directory + * @param srcDir logs directory + * @param oldLogDir directory where processed logs are archived to + * @param logfiles the list of log files to split */ - public static HLogSplitter createLogSplitter(Configuration conf) { + public static HLogSplitter createLogSplitter(Configuration conf, + final Path rootDir, final Path srcDir, + Path oldLogDir, final FileSystem fs) { + @SuppressWarnings("unchecked") Class splitterClass = (Class) conf .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class); try { - return splitterClass.newInstance(); + Constructor constructor = + splitterClass.getConstructor( + Configuration.class, // conf + Path.class, // rootDir + Path.class, // srcDir + Path.class, // oldLogDir + FileSystem.class); // fs + return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs); + } catch (IllegalArgumentException e) { + throw new RuntimeException(e); } catch (InstantiationException e) { throw new RuntimeException(e); } catch (IllegalAccessException e) { throw new RuntimeException(e); + } catch (InvocationTargetException e) { + throw new RuntimeException(e); + } catch (SecurityException e) { + throw new RuntimeException(e); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); } } - - - // Private immutable datastructure to hold Writer and its Path. - private final static class WriterAndPath { - final Path p; - final Writer w; - - WriterAndPath(final Path p, final Writer w) { - this.p = p; - this.w = w; - } + public HLogSplitter(Configuration conf, Path rootDir, Path srcDir, + Path oldLogDir, FileSystem fs) { + this.conf = conf; + this.rootDir = rootDir; + this.srcDir = srcDir; + this.oldLogDir = oldLogDir; + this.fs = fs; + + entryBuffers = new EntryBuffers( + conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", + 128*1024*1024)); + outputSink = new OutputSink(); } - + /** * Split up a bunch of regionserver commit log files that are no longer being * written to, into new files, one per region for region to replay on startup. * Delete the old log files when finished. * - * @param rootDir - * qualified root directory of the HBase instance - * @param srcDir - * Directory of log files to split: e.g. + * @param rootDir qualified root directory of the HBase instance + * @param srcDir Directory of log files to split: e.g. * ${ROOTDIR}/log_HOST_PORT - * @param oldLogDir - * directory where processed (split) logs will be archived to - * @param fs - * FileSystem - * @param conf - * Configuration - * @throws IOException - * will throw if corrupted hlogs aren't tolerated + * @param oldLogDir directory where processed (split) logs will be archived to + * @param fs FileSystem + * @param conf Configuration + * @throws IOException will throw if corrupted hlogs aren't tolerated * @return the list of splits */ - public List splitLog(final Path rootDir, final Path srcDir, - Path oldLogDir, final FileSystem fs, final Configuration conf) + public List splitLog() throws IOException { + Preconditions.checkState(!hasSplit, + "An HLogSplitter instance may only be used one"); + hasSplit = true; long startTime = System.currentTimeMillis(); List splits = null; @@ -148,29 +189,8 @@ public class HLogSplitter { } LOG.info("Splitting " + logfiles.length + " hlog(s) in " + srcDir.toString()); - splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf); - try { - FileStatus[] files = fs.listStatus(srcDir); - for (FileStatus file : files) { - Path newPath = HLog.getHLogArchivePath(oldLogDir, file.getPath()); - LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " - + FSUtils.getPath(newPath)); - if (!fs.rename(file.getPath(), newPath)) { - throw new IOException("Unable to rename " + file.getPath() + - " to " + newPath); - } - } - LOG.debug("Moved " + files.length + " log files to " - + FSUtils.getPath(oldLogDir)); - if (!fs.delete(srcDir, true)) { - throw new IOException("Unable to delete " + srcDir); - } - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - IOException io = new IOException("Cannot delete: " + srcDir); - io.initCause(e); - throw io; - } + splits = splitLog(logfiles); + splitTime = System.currentTimeMillis() - startTime; LOG.info("hlog file splitting completed in " + splitTime + " ms for " + srcDir.toString()); @@ -190,186 +210,94 @@ public class HLogSplitter { public long getSize() { return this.splitSize; } + + + /** + * @return a map from encoded region ID to the number of edits written out + * for that region. + */ + Map getOutputCounts() { + Preconditions.checkState(hasSplit); + return outputSink.getOutputCounts(); + } + /** - * Sorts the HLog edits in the given list of logfiles (that are a mix of edits + * Splits the HLog edits in the given list of logfiles (that are a mix of edits * on multiple regions) by region and then splits them per region directories, - * in batches of (hbase.hlog.split.batch.size) - * - * A batch consists of a set of log files that will be sorted in a single map - * of edits indexed by region the resulting map will be concurrently written - * by multiple threads to their corresponding regions * - * Each batch consists of more more log files that are - recovered (files is - * opened for append then closed to ensure no process is writing into it) - - * parsed (each edit in the log is appended to a list of edits indexed by - * region see {@link #parseHLog} for more details) - marked as either - * processed or corrupt depending on parsing outcome - the resulting edits - * indexed by region are concurrently written to their corresponding region - * region directories - original files are then archived to a different - * directory + * This process is split into multiple threads. In the main thread, we loop + * through the logs to be split. For each log, we: + * - Recover it (take and drop HDFS lease) to ensure no other process can write + * - Read each edit (see {@link #parseHLog} + * - Mark as "processed" or "corrupt" depending on outcome * + * Each edit is passed into the EntryBuffers instance, which takes care of + * memory accounting and splitting the edits by region. * + * The OutputSink object then manages N other WriterThreads which pull chunks + * of edits from EntryBuffers and write them to the output region directories. * - * @param rootDir - * hbase directory - * @param srcDir - * logs directory - * @param oldLogDir - * directory where processed logs are archived to - * @param logfiles - * the list of log files to split - * @param fs - * @param conf - * @return - * @throws IOException + * After the process is complete, the log files are archived to a separate + * directory. */ - private List splitLog(final Path rootDir, final Path srcDir, - Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs, - final Configuration conf) throws IOException { + private List splitLog(final FileStatus[] logfiles) throws IOException { List processedLogs = new ArrayList(); List corruptedLogs = new ArrayList(); - final Map logWriters = Collections - .synchronizedMap(new TreeMap( - Bytes.BYTES_COMPARATOR)); List splits = null; - // Number of logs in a read batch - // More means faster but bigger mem consumption - // TODO make a note on the conf rename and update hbase-site.xml if needed - int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3); boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false); splitSize = 0; + outputSink.startWriterThreads(entryBuffers); + try { - int i = -1; - while (i < logfiles.length) { - final Map> editsByRegion = new TreeMap>( - Bytes.BYTES_COMPARATOR); - for (int j = 0; j < logFilesPerStep; j++) { - i++; - if (i == logfiles.length) { - break; - } - FileStatus log = logfiles[i]; - Path logPath = log.getPath(); - long logLength = log.getLen(); - splitSize += logLength; - LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length - + ": " + logPath + ", length=" + logLength); - try { - recoverFileLease(fs, logPath, conf); - parseHLog(log, editsByRegion, fs, conf); + int i = 0; + for (FileStatus log : logfiles) { + Path logPath = log.getPath(); + long logLength = log.getLen(); + splitSize += logLength; + LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length + + ": " + logPath + ", length=" + logLength); + try { + recoverFileLease(fs, logPath, conf); + parseHLog(log, entryBuffers, fs, conf); + processedLogs.add(logPath); + } catch (EOFException eof) { + // truncated files are expected if a RS crashes (see HBASE-2643) + LOG.info("EOF from hlog " + logPath + ". continuing"); processedLogs.add(logPath); - } catch (EOFException eof) { - // truncated files are expected if a RS crashes (see HBASE-2643) - LOG.info("EOF from hlog " + logPath + ". continuing"); - processedLogs.add(logPath); - } catch (IOException e) { - // If the IOE resulted from bad file format, - // then this problem is idempotent and retrying won't help - if (e.getCause() instanceof ParseException) { - LOG.warn("ParseException from hlog " + logPath + ". continuing"); - processedLogs.add(logPath); + } catch (IOException e) { + // If the IOE resulted from bad file format, + // then this problem is idempotent and retrying won't help + if (e.getCause() instanceof ParseException) { + LOG.warn("ParseException from hlog " + logPath + ". continuing"); + processedLogs.add(logPath); + } else { + if (skipErrors) { + LOG.info("Got while parsing hlog " + logPath + + ". Marking as corrupted", e); + corruptedLogs.add(logPath); } else { - if (skipErrors) { - LOG.info("Got while parsing hlog " + logPath + - ". Marking as corrupted", e); - corruptedLogs.add(logPath); - } else { - throw e; - } + throw e; } } } - writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf); } if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) { throw new IOException("Discovered orphan hlog after split. Maybe " + "HRegionServer was not dead when we started"); } - archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); + archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf); } finally { - splits = new ArrayList(logWriters.size()); - for (WriterAndPath wap : logWriters.values()) { - wap.w.close(); - splits.add(wap.p); - LOG.debug("Closed " + wap.p); - } + splits = outputSink.finishWritingAndClose(); } return splits; } - - /** - * Takes splitLogsMap and concurrently writes them to region directories using a thread pool - * - * @param splitLogsMap map that contains the log splitting result indexed by region - * @param logWriters map that contains a writer per region - * @param rootDir hbase root dir - * @param fs - * @param conf - * @throws IOException - */ - private void writeEditsBatchToRegions( - final Map> splitLogsMap, - final Map logWriters, final Path rootDir, - final FileSystem fs, final Configuration conf) - throws IOException { - // Number of threads to use when log splitting to rewrite the logs. - // More means faster but bigger mem consumption. - int logWriterThreads = conf.getInt( - "hbase.regionserver.hlog.splitlog.writer.threads", 3); - boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); - HashMap writeFutureResult = new HashMap(); - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - builder.setNameFormat("SplitWriter-%1$d"); - ThreadFactory factory = builder.build(); - ThreadPoolExecutor threadPool = - (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory); - for (final byte[] region : splitLogsMap.keySet()) { - Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, - region, fs, conf); - writeFutureResult.put(region, threadPool.submit(splitter)); - } - - threadPool.shutdown(); - // Wait for all threads to terminate - try { - for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) { - String message = "Waiting for hlog writers to terminate, elapsed " + j - * 5 + " seconds"; - if (j < 30) { - LOG.debug(message); - } else { - LOG.info(message); - } - - } - } catch (InterruptedException ex) { - LOG.warn("Hlog writers were interrupted, possible data loss!"); - if (!skipErrors) { - throw new IOException("Could not finish writing log entries", ex); - // TODO maybe we should fail here regardless if skipErrors is active or not - } - } - - for (Map.Entry entry : writeFutureResult.entrySet()) { - try { - entry.getValue().get(); - } catch (ExecutionException e) { - throw (new IOException(e.getCause())); - } catch (InterruptedException e1) { - LOG.warn("Writer for region " + Bytes.toString(entry.getKey()) - + " was interrupted, however the write process should have " - + "finished. Throwing up ", e1); - throw (new IOException(e1.getCause())); - } - } - } - + /** * Moves processed logs to a oldLogDir after successful processing Moves * corrupted logs (any log that couldn't be successfully parsed to corruptDir @@ -382,7 +310,9 @@ public class HLogSplitter { * @param conf * @throws IOException */ - private static void archiveLogs(final List corruptedLogs, + private static void archiveLogs( + final Path srcDir, + final List corruptedLogs, final List processedLogs, final Path oldLogDir, final FileSystem fs, final Configuration conf) throws IOException { final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get( @@ -410,6 +340,10 @@ public class HLogSplitter { LOG.info("Archived processed log " + p + " to " + newPath); } } + + if (!fs.delete(srcDir, true)) { + throw new IOException("Unable to delete src dir: " + srcDir); + } } /** @@ -459,7 +393,7 @@ public class HLogSplitter { * @throws IOException if hlog is corrupted, or can't be open */ private void parseHLog(final FileStatus logfile, - final Map> splitLogsMap, final FileSystem fs, + EntryBuffers entryBuffers, final FileSystem fs, final Configuration conf) throws IOException { // Check for possibly empty file. With appends, currently Hadoop reports a @@ -489,15 +423,11 @@ public class HLogSplitter { try { Entry entry; while ((entry = in.next()) != null) { - byte[] region = entry.getKey().getEncodedRegionName(); - LinkedList queue = splitLogsMap.get(region); - if (queue == null) { - queue = new LinkedList(); - splitLogsMap.put(region, queue); - } - queue.addLast(entry); + entryBuffers.appendEntry(entry); editsCount++; } + } catch (InterruptedException ie) { + throw new RuntimeException(ie); } finally { LOG.debug("Pushed=" + editsCount + " entries from " + path); try { @@ -505,76 +435,30 @@ public class HLogSplitter { in.close(); } } catch (IOException e) { - LOG - .warn("Close log reader in finally threw exception -- continuing", - e); + LOG.warn("Close log reader in finally threw exception -- continuing", + e); } } } - - private Callable createNewSplitter(final Path rootDir, - final Map logWriters, - final Map> logEntries, final byte[] region, - final FileSystem fs, final Configuration conf) { - return new Callable() { - public String getName() { - return "Split writer thread for region " + Bytes.toStringBinary(region); - } - @Override - public Void call() throws IOException { - LinkedList entries = logEntries.get(region); - LOG.debug(this.getName() + " got " + entries.size() + " to process"); - long threadTime = System.currentTimeMillis(); - try { - int editsCount = 0; - WriterAndPath wap = logWriters.get(region); - for (Entry logEntry : entries) { - if (wap == null) { - Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir); - if (regionedits == null) { - // we already print a message if it's null in getRegionSplitEditsPath - break; - } - if (fs.exists(regionedits)) { - LOG.warn("Found existing old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " - + regionedits + ", length=" - + fs.getFileStatus(regionedits).getLen()); - if (!fs.delete(regionedits, false)) { - LOG.warn("Failed delete of old " + regionedits); - } - } - Writer w = createWriter(fs, regionedits, conf); - wap = new WriterAndPath(regionedits, w); - logWriters.put(region, wap); - LOG.debug("Creating writer path=" + regionedits + " region=" - + Bytes.toStringBinary(region)); - } - wap.w.append(logEntry); - editsCount++; - } - LOG.debug(this.getName() + " Applied " + editsCount - + " total edits to " + Bytes.toStringBinary(region) + " in " - + (System.currentTimeMillis() - threadTime) + "ms"); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.fatal(this.getName() + " Got while writing log entry to log", e); - throw e; - } - return null; - } - }; + private void writerThreadError(Throwable t) { + thrown.compareAndSet(null, t); + } + + /** + * Check for errors in the writer threads. If any is found, rethrow it. + */ + private void checkForErrors() throws IOException { + Throwable thrown = this.thrown.get(); + if (thrown == null) return; + if (thrown instanceof IOException) { + throw (IOException)thrown; + } else { + throw new RuntimeException(thrown); + } } - /** * Create a new {@link Writer} for writing log splits. - * - * @param fs - * @param logfile - * @param conf - * @return A new Writer instance - * @throws IOException */ protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { @@ -583,16 +467,408 @@ public class HLogSplitter { /** * Create a new {@link Reader} for reading logs to split. - * - * @param fs - * @param curLogFile - * @param conf - * @return A new Reader instance - * @throws IOException */ protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf) throws IOException { return HLog.getReader(fs, curLogFile, conf); } + + /** + * Class which accumulates edits and separates them into a buffer per region + * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses + * a predefined threshold. + * + * Writer threads then pull region-specific buffers from this class. + */ + class EntryBuffers { + Map buffers = + new TreeMap(Bytes.BYTES_COMPARATOR); + + /* Track which regions are currently in the middle of writing. We don't allow + an IO thread to pick up bytes from a region if we're already writing + data for that region in a different IO thread. */ + Set currentlyWriting = new TreeSet(Bytes.BYTES_COMPARATOR); + + long totalBuffered = 0; + long maxHeapUsage; + + EntryBuffers(long maxHeapUsage) { + this.maxHeapUsage = maxHeapUsage; + } + + /** + * Append a log entry into the corresponding region buffer. + * Blocks if the total heap usage has crossed the specified threshold. + * + * @throws InterruptedException + * @throws IOException + */ + void appendEntry(Entry entry) throws InterruptedException, IOException { + HLogKey key = entry.getKey(); + + RegionEntryBuffer buffer; + synchronized (this) { + buffer = buffers.get(key.getEncodedRegionName()); + if (buffer == null) { + buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName()); + buffers.put(key.getEncodedRegionName(), buffer); + } + long incrHeap = buffer.appendEntry(entry); + totalBuffered += incrHeap; + } + + // If we crossed the chunk threshold, wait for more space to be available + synchronized (dataAvailable) { + while (totalBuffered > maxHeapUsage && thrown == null) { + LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads..."); + dataAvailable.wait(3000); + } + dataAvailable.notifyAll(); + } + checkForErrors(); + } + + synchronized RegionEntryBuffer getChunkToWrite() { + long biggestSize=0; + byte[] biggestBufferKey=null; + + for (Map.Entry entry : buffers.entrySet()) { + long size = entry.getValue().heapSize(); + if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) { + biggestSize = size; + biggestBufferKey = entry.getKey(); + } + } + if (biggestBufferKey == null) { + return null; + } + + RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); + currentlyWriting.add(biggestBufferKey); + return buffer; + } + + void doneWriting(RegionEntryBuffer buffer) { + synchronized (this) { + boolean removed = currentlyWriting.remove(buffer.encodedRegionName); + assert removed; + } + long size = buffer.heapSize(); + + synchronized (dataAvailable) { + totalBuffered -= size; + // We may unblock writers + dataAvailable.notifyAll(); + } + } + + synchronized boolean isRegionCurrentlyWriting(byte[] region) { + return currentlyWriting.contains(region); + } + } + + /** + * A buffer of some number of edits for a given region. + * This accumulates edits and also provides a memory optimization in order to + * share a single byte array instance for the table and region name. + * Also tracks memory usage of the accumulated edits. + */ + static class RegionEntryBuffer implements HeapSize { + long heapInBuffer = 0; + List entryBuffer; + byte[] tableName; + byte[] encodedRegionName; + + RegionEntryBuffer(byte[] table, byte[] region) { + this.tableName = table; + this.encodedRegionName = region; + this.entryBuffer = new LinkedList(); + } + + long appendEntry(Entry entry) { + internify(entry); + entryBuffer.add(entry); + long incrHeap = entry.getEdit().heapSize() + + ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers + 0; // TODO linkedlist entry + heapInBuffer += incrHeap; + return incrHeap; + } + + private void internify(Entry entry) { + HLogKey k = entry.getKey(); + k.internTableName(this.tableName); + k.internEncodedRegionName(this.encodedRegionName); + } + + public long heapSize() { + return heapInBuffer; + } + } + + + class WriterThread extends Thread { + private volatile boolean shouldStop = false; + + WriterThread(int i) { + super("WriterThread-" + i); + } + + public void run() { + try { + doRun(); + } catch (Throwable t) { + LOG.error("Error in log splitting write thread", t); + writerThreadError(t); + } + } + + private void doRun() throws IOException { + LOG.debug("Writer thread " + this + ": starting"); + while (true) { + RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); + if (buffer == null) { + // No data currently available, wait on some more to show up + synchronized (dataAvailable) { + if (shouldStop) return; + try { + dataAvailable.wait(1000); + } catch (InterruptedException ie) { + if (!shouldStop) { + throw new RuntimeException(ie); + } + } + } + continue; + } + + assert buffer != null; + try { + writeBuffer(buffer); + } finally { + entryBuffers.doneWriting(buffer); + } + } + } + + private void writeBuffer(RegionEntryBuffer buffer) throws IOException { + List entries = buffer.entryBuffer; + if (entries.isEmpty()) { + LOG.warn(this.getName() + " got an empty buffer, skipping"); + return; + } + + WriterAndPath wap = null; + + long startTime = System.nanoTime(); + try { + int editsCount = 0; + + for (Entry logEntry : entries) { + if (wap == null) { + wap = outputSink.getWriterAndPath(logEntry); + if (wap == null) { + // getWriterAndPath decided we don't need to write these edits + // Message was already logged + return; + } + } + wap.w.append(logEntry); + editsCount++; + } + // Pass along summary statistics + wap.incrementEdits(editsCount); + wap.incrementNanoTime(System.nanoTime() - startTime); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal(this.getName() + " Got while writing log entry to log", e); + throw e; + } + } + + void finish() { + shouldStop = true; + } + } + + /** + * Class that manages the output streams from the log splitting process. + */ + class OutputSink { + private final Map logWriters = Collections.synchronizedMap( + new TreeMap(Bytes.BYTES_COMPARATOR)); + private final List writerThreads = Lists.newArrayList(); + + /* Set of regions which we've decided should not output edits */ + private final Set blacklistedRegions = Collections.synchronizedSet( + new TreeSet(Bytes.BYTES_COMPARATOR)); + + private boolean hasClosed = false; + + /** + * Start the threads that will pump data from the entryBuffers + * to the output files. + * @return the list of started threads + */ + synchronized void startWriterThreads(EntryBuffers entryBuffers) { + // More threads could potentially write faster at the expense + // of causing more disk seeks as the logs are split. + // 3. After a certain setting (probably around 3) the + // process will be bound on the reader in the current + // implementation anyway. + int numThreads = conf.getInt( + "hbase.regionserver.hlog.splitlog.writer.threads", 3); + + for (int i = 0; i < numThreads; i++) { + WriterThread t = new WriterThread(i); + t.start(); + writerThreads.add(t); + } + } + + List finishWritingAndClose() throws IOException { + LOG.info("Waiting for split writer threads to finish"); + for (WriterThread t : writerThreads) { + t.finish(); + } + for (WriterThread t: writerThreads) { + try { + t.join(); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + checkForErrors(); + } + LOG.info("Split writers finished"); + + return closeStreams(); + } + + /** + * Close all of the output streams. + * @return the list of paths written. + */ + private List closeStreams() throws IOException { + Preconditions.checkState(!hasClosed); + + List paths = new ArrayList(); + List thrown = Lists.newArrayList(); + + for (WriterAndPath wap : logWriters.values()) { + try { + wap.w.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close log at " + wap.p, ioe); + thrown.add(ioe); + continue; + } + paths.add(wap.p); + LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in " + + (wap.nanosSpent / 1000/ 1000) + "ms)"); + } + if (!thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } + + hasClosed = true; + return paths; + } + + /** + * Get a writer and path for a log starting at the given entry. + * + * This function is threadsafe so long as multiple threads are always + * acting on different regions. + * + * @return null if this region shouldn't output any logs + */ + WriterAndPath getWriterAndPath(Entry entry) throws IOException { + + byte region[] = entry.getKey().getEncodedRegionName(); + WriterAndPath ret = logWriters.get(region); + if (ret != null) { + return ret; + } + + // If we already decided that this region doesn't get any output + // we don't need to check again. + if (blacklistedRegions.contains(region)) { + return null; + } + + // Need to create writer + Path regionedits = getRegionSplitEditsPath(fs, + entry, rootDir); + if (regionedits == null) { + // Edits dir doesn't exist + blacklistedRegions.add(region); + return null; + } + deletePreexistingOldEdits(regionedits); + Writer w = createWriter(fs, regionedits, conf); + ret = new WriterAndPath(regionedits, w); + logWriters.put(region, ret); + LOG.debug("Creating writer path=" + regionedits + " region=" + + Bytes.toStringBinary(region)); + + return ret; + } + + /** + * If the specified path exists, issue a warning and delete it. + */ + private void deletePreexistingOldEdits(Path regionedits) throws IOException { + if (fs.exists(regionedits)) { + LOG.warn("Found existing old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + + regionedits + ", length=" + + fs.getFileStatus(regionedits).getLen()); + if (!fs.delete(regionedits, false)) { + LOG.warn("Failed delete of old " + regionedits); + } + } + } + + /** + * @return a map from encoded region ID to the number of edits written out + * for that region. + */ + private Map getOutputCounts() { + TreeMap ret = new TreeMap( + Bytes.BYTES_COMPARATOR); + for (Map.Entry entry : logWriters.entrySet()) { + ret.put(entry.getKey(), entry.getValue().editsWritten); + } + return ret; + } + } + + /** + * Private data structure that wraps a Writer and its Path, + * also collecting statistics about the data written to this + * output. + */ + private final static class WriterAndPath { + final Path p; + final Writer w; + + /* Count of edits written to this path */ + long editsWritten = 0; + /* Number of nanos spent writing to this log */ + long nanosSpent = 0; + + WriterAndPath(final Path p, final Writer w) { + this.p = p; + this.w = w; + } + + void incrementEdits(int edits) { + editsWritten += edits; + } + + void incrementNanoTime(long nanos) { + nanosSpent += nanos; + } + } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 52c2446..e1117ef 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -66,7 +67,7 @@ import org.apache.hadoop.io.Writable; * is an old style KeyValue or the new style WALEdit. * */ -public class WALEdit implements Writable { +public class WALEdit implements Writable, HeapSize { private final int VERSION_2 = -1; @@ -154,7 +155,19 @@ public class WALEdit implements Writable { out.writeInt(scopes.get(key)); } } + } + public long heapSize() { + long ret = 0; + for (KeyValue kv : kvs) { + ret += kv.heapSize(); + } + if (scopes != null) { + ret += ClassSize.TREEMAP; + ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY); + // TODO this isn't quite right, need help here + } + return ret; } public String toString() { diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 66b79c4..08ba8cb 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -164,10 +164,10 @@ public class TestHLog { log.rollWriter(); } log.close(); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, logdir, this.oldLogDir, this.fs); List splits = - logSplitter.splitLog(hbaseDir, logdir, - this.oldLogDir, this.fs, conf); + logSplitter.splitLog(); verifySplits(splits, howmany); log = null; } finally { diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java index 01154ab..50d297b 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java @@ -24,16 +24,28 @@ import static org.junit.Assert.*; import java.io.IOException; import java.util.NavigableSet; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValueTestUtil; +import org.apache.hadoop.hbase.MultithreadedTestUtil; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; +import static org.mockito.Mockito.mock; /** * Simple testing of a few HLog methods. */ public class TestHLogMethods { + private static final byte[] TEST_REGION = Bytes.toBytes("test_region");; + private static final byte[] TEST_TABLE = Bytes.toBytes("test_table"); + private final HBaseTestingUtility util = new HBaseTestingUtility(); /** @@ -84,4 +96,71 @@ public class TestHLogMethods { FSDataOutputStream fdos = fs.create(new Path(testdir, name), true); fdos.close(); } -} \ No newline at end of file + + @Test + public void testRegionEntryBuffer() throws Exception { + HLogSplitter.RegionEntryBuffer reb = new HLogSplitter.RegionEntryBuffer( + TEST_TABLE, TEST_REGION); + assertEquals(0, reb.heapSize()); + + reb.appendEntry(createTestLogEntry(1)); + assertTrue(reb.heapSize() > 0); + } + + @Test + public void testEntrySink() throws Exception { + Configuration conf = new Configuration(); + HLogSplitter splitter = HLogSplitter.createLogSplitter( + conf, mock(Path.class), mock(Path.class), mock(Path.class), + mock(FileSystem.class)); + + EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024); + for (int i = 0; i < 1000; i++) { + HLog.Entry entry = createTestLogEntry(i); + sink.appendEntry(entry); + } + + assertTrue(sink.totalBuffered > 0); + long amountInChunk = sink.totalBuffered; + // Get a chunk + RegionEntryBuffer chunk = sink.getChunkToWrite(); + assertEquals(chunk.heapSize(), amountInChunk); + + // Make sure it got marked that a thread is "working on this" + assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION)); + + // Insert some more entries + for (int i = 0; i < 500; i++) { + HLog.Entry entry = createTestLogEntry(i); + sink.appendEntry(entry); + } + // Asking for another chunk shouldn't work since the first one + // is still writing + assertNull(sink.getChunkToWrite()); + + // If we say we're done writing the first chunk, then we should be able + // to get the second + sink.doneWriting(chunk); + + RegionEntryBuffer chunk2 = sink.getChunkToWrite(); + assertNotNull(chunk2); + assertNotSame(chunk, chunk2); + long amountInChunk2 = sink.totalBuffered; + // The second chunk had fewer rows than the first + assertTrue(amountInChunk2 < amountInChunk); + + sink.doneWriting(chunk2); + assertEquals(0, sink.totalBuffered); + } + + private HLog.Entry createTestLogEntry(int i) { + long seq = i; + long now = i * 1000; + + WALEdit edit = new WALEdit(); + edit.add(KeyValueTestUtil.create("row", "fam", "qual", 1234, "val")); + HLogKey key = new HLogKey(TEST_REGION, TEST_TABLE, seq, now); + HLog.Entry entry = new HLog.Entry(key, edit); + return entry; + } +} diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index e300dcc..0c6fd44 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -19,16 +19,14 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -39,6 +37,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -52,9 +51,16 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; /** * Testing {@link HLog} splitting code. @@ -119,11 +125,15 @@ public class TestHLogSplit { @Before public void setUp() throws Exception { + flushToConsole("Cleaning up cluster for new test\n" + + "--------------------------"); conf = TEST_UTIL.getConfiguration(); fs = TEST_UTIL.getDFSCluster().getFileSystem(); FileStatus[] entries = fs.listStatus(new Path("/")); + flushToConsole("Num entries in /:" + entries.length); for (FileStatus dir : entries){ - fs.delete(dir.getPath(), true); + assertTrue("Deleting " + dir.getPath(), + fs.delete(dir.getPath(), true)); } seq = 0; regions = new ArrayList(); @@ -161,18 +171,23 @@ public class TestHLogSplit { public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted() throws IOException { AtomicBoolean stop = new AtomicBoolean(false); + + FileStatus[] stats = fs.listStatus(new Path("/hbase/t1")); + assertTrue("Previous test should clean up table dir", + stats == null || stats.length == 0); + generateHLogs(-1); - fs.initialize(fs.getUri(), conf); + try { (new ZombieNewLogWriterRegionServer(stop)).start(); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); } finally { stop.set(true); } } - @Test public void testSplitPreservesEdits() throws IOException{ final String REGION = "region__1"; @@ -181,8 +196,9 @@ public class TestHLogSplit { generateHLogs(1, 10, -1); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); Path originalLog = (fs.listStatus(oldLogDir))[0].getPath(); Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION); @@ -202,8 +218,9 @@ public class TestHLogSplit { // initialize will create a new DFSClient with a new client ID fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); for (String region : regions) { @@ -224,8 +241,9 @@ public class TestHLogSplit { // initialize will create a new DFSClient with a new client ID fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); for (String region : regions) { Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); @@ -240,8 +258,9 @@ public class TestHLogSplit { fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); for (String region : regions) { Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); @@ -260,8 +279,9 @@ public class TestHLogSplit { Corruptions.APPEND_GARBAGE, true, fs); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); for (String region : regions) { Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); @@ -278,8 +298,9 @@ public class TestHLogSplit { Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); for (String region : regions) { Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf)); @@ -296,8 +317,9 @@ public class TestHLogSplit { corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); for (String region : regions) { Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); @@ -323,13 +345,13 @@ public class TestHLogSplit { Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0"); conf.setClass("hbase.regionserver.hlog.reader.impl", FaultySequenceFileLogReader.class, HLog.Reader.class); - String[] failureTypes = { "begin", "middle", "end" }; for (FaultySequenceFileLogReader.FailureType failureType : FaultySequenceFileLogReader.FailureType.values()) { conf.set("faultysequencefilelogreader.failuretype", failureType.name()); generateHLogs(1, ENTRIES, -1); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); FileStatus[] archivedLogs = fs.listStatus(corruptDir); assertEquals("expected a different file", c1.getName(), archivedLogs[0] .getPath().getName()); @@ -358,8 +380,9 @@ public class TestHLogSplit { conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name()); generateHLogs(Integer.MAX_VALUE); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class); @@ -383,9 +406,10 @@ public class TestHLogSplit { conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name()); generateHLogs(-1); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); try { - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + logSplitter.splitLog(); } catch (IOException e) { assertEquals( "if skip.errors is false all files should remain in place", @@ -413,8 +437,9 @@ public class TestHLogSplit { corruptHLog(c1, Corruptions.TRUNCATE, true, fs); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); Path originalLog = (fs.listStatus(oldLogDir))[0].getPath(); Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION); @@ -437,8 +462,9 @@ public class TestHLogSplit { generateHLogs(-1); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); FileStatus[] archivedLogs = fs.listStatus(oldLogDir); @@ -449,8 +475,9 @@ public class TestHLogSplit { public void testSplit() throws IOException { generateHLogs(-1); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); for (String region : regions) { Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); @@ -464,12 +491,16 @@ public class TestHLogSplit { throws IOException { generateHLogs(-1); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); FileStatus [] statuses = null; try { statuses = fs.listStatus(hlogDir); - assertNull(statuses); + if (statuses != null) { + Assert.fail("Files left in log dir: " + + Joiner.on(",").join(FileUtil.stat2Paths(statuses))); + } } catch (FileNotFoundException e) { // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null } @@ -516,8 +547,9 @@ public class TestHLogSplit { try { zombie.start(); try { - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); } catch (IOException ex) {/* expected */} int logFilesNumber = fs.listStatus(hlogDir).length; @@ -549,11 +581,12 @@ public class TestHLogSplit { try { InstrumentedSequenceFileLogWriter.activateFailure = true; - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); } catch (IOException e) { - assertEquals("java.io.IOException: This exception is instrumented and should only be thrown for testing", e.getMessage()); + assertEquals("This exception is instrumented and should only be thrown for testing", e.getMessage()); throw e; } finally { InstrumentedSequenceFileLogWriter.activateFailure = false; @@ -561,7 +594,10 @@ public class TestHLogSplit { } -// @Test + // @Test TODO this test has been disabled since it was created! + // It currently fails because the second split doesn't output anything + // -- because there are no region dirs after we move aside the first + // split result public void testSplittingLargeNumberOfRegionsConsistency() throws IOException { regions.removeAll(regions); @@ -572,8 +608,9 @@ public class TestHLogSplit { generateHLogs(1, 100, -1); fs.initialize(fs.getUri(), conf); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); fs.rename(oldLogDir, hlogDir); Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first"); Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME)); @@ -582,7 +619,9 @@ public class TestHLogSplit { fs.initialize(fs.getUri(), conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath)); } @@ -600,11 +639,147 @@ public class TestHLogSplit { Path regiondir = new Path(tabledir, region); fs.delete(regiondir, true); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); - logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, hlogDir, oldLogDir, fs); + logSplitter.splitLog(); assertFalse(fs.exists(regiondir)); } + + @Test + public void testIOEOnOutputThread() throws Exception { + conf.setBoolean(HBASE_SKIP_ERRORS, false); + + generateHLogs(-1); + + fs.initialize(fs.getUri(), conf); + // Set up a splitter that will throw an IOE on the output side + HLogSplitter logSplitter = new HLogSplitter( + conf, hbaseDir, hlogDir, oldLogDir, fs) { + protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) + throws IOException { + HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); + Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.any()); + return mockWriter; + + } + }; + try { + logSplitter.splitLog(); + fail("Didn't throw!"); + } catch (IOException ioe) { + assertTrue(ioe.toString().contains("Injected")); + } + } + + /** + * Test log split process with fake data and lots of edits to trigger threading + * issues. + */ + @Test + public void testThreading() throws Exception { + doTestThreading(20000, 128*1024*1024, 0); + } + + /** + * Test blocking behavior of the log split process if writers are writing slower + * than the reader is reading. + */ + @Test + public void testThreadingSlowWriterSmallBuffer() throws Exception { + doTestThreading(200, 1024, 50); + } + + /** + * Sets up a log splitter with a mock reader and writer. The mock reader generates + * a specified number of edits spread across 5 regions. The mock writer optionally + * sleeps for each edit it is fed. + * * + * After the split is complete, verifies that the statistics show the correct number + * of edits output into each region. + * + * @param numFakeEdits number of fake edits to push through pipeline + * @param bufferSize size of in-memory buffer + * @param writerSlowness writer threads will sleep this many ms per edit + */ + private void doTestThreading(final int numFakeEdits, + final int bufferSize, + final int writerSlowness) throws Exception { + + Configuration localConf = new Configuration(conf); + localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); + + // Create a fake log file (we'll override the reader to produce a stream of edits) + FSDataOutputStream out = fs.create(new Path(hlogDir, HLOG_FILE_PREFIX + ".fake")); + out.close(); + + // Make region dirs for our destination regions so the output doesn't get skipped + final List regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); + makeRegionDirs(fs, regions); + + // Create a splitter that reads and writes the data without touching disk + HLogSplitter logSplitter = new HLogSplitter( + localConf, hbaseDir, hlogDir, oldLogDir, fs) { + + /* Produce a mock writer that doesn't write anywhere */ + protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) + throws IOException { + HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); + if (writerSlowness > 0) { + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + try { + Thread.sleep(writerSlowness); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + return null; + } + }).when(mockWriter).append(Mockito.any()); + } + return mockWriter; + } + + + /* Produce a mock reader that generates fake entries */ + protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf) + throws IOException { + Reader mockReader = Mockito.mock(Reader.class); + Mockito.doAnswer(new Answer() { + int index = 0; + + @Override + public HLog.Entry answer(InvocationOnMock invocation) throws Throwable { + if (++index > numFakeEdits) return null; + + // Generate r0 through r4 in round robin fashion + int regionIdx = index % regions.size(); + byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)}; + + return createTestEntry(TABLE_NAME, region, + Bytes.toBytes(index), FAMILY, QUALIFIER, VALUE, index); + } + }).when(mockReader).next(); + return mockReader; + } + }; + + logSplitter.splitLog(); + + // Verify number of written edits per region + + Map outputCounts = logSplitter.getOutputCounts(); + for (Map.Entry entry : outputCounts.entrySet()) { + LOG.info("Got " + entry.getValue() + " output edits for region " + + Bytes.toString(entry.getKey())); + + assertEquals((long)entry.getValue(), numFakeEdits / regions.size()); + } + assertEquals(regions.size(), outputCounts.size()); + } + + /** * This thread will keep writing to the file after the split process has started @@ -677,29 +852,19 @@ public class TestHLogSplit { if (stop.get()) { return; } - boolean splitStarted = false; - Path p = new Path(hbaseDir, new String(TABLE_NAME)); - while (!splitStarted) { - try { - FileStatus [] statuses = fs.listStatus(p); - // In 0.20, listStatus comes back with a null if file doesn't exit. - // In 0.21, it throws FNFE. - if (statuses != null && statuses.length > 0) { - // Done. - break; - } - } catch (FileNotFoundException e) { - // Expected in hadoop 0.21 - } catch (IOException e1) { - assertTrue("Failed to list status ", false); - } - flushToConsole("Juliet: split not started, sleeping a bit..."); - Threads.sleep(100); - } + Path tableDir = new Path(hbaseDir, new String(TABLE_NAME)); + Path regionDir = new Path(tableDir, regions.get(0)); + Path recoveredEdits = new Path(regionDir, HLogSplitter.RECOVERED_EDITS); String region = "juliet"; Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet"); try { - fs.mkdirs(new Path(new Path(hbaseDir, region), region)); + + while (!fs.exists(recoveredEdits) && !stop.get()) { + flushToConsole("Juliet: split not started, sleeping a bit..."); + Threads.sleep(10); + } + + fs.mkdirs(new Path(tableDir, region)); HLog.Writer writer = HLog.createWriter(fs, julietLog, conf); appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(), @@ -722,10 +887,15 @@ public class TestHLogSplit { generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen); } - private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException { + private void makeRegionDirs(FileSystem fs, List regions) throws IOException { for (String region : regions) { + flushToConsole("Creating dir for region " + region); fs.mkdirs(new Path(tabledir, region)); } + } + + private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException { + makeRegionDirs(fs, regions); for (int i = 0; i < writers; i++) { writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf); for (int j = 0; j < entries; j++) { @@ -835,14 +1005,20 @@ public class TestHLogSplit { byte[] value, long seq) throws IOException { + writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); + writer.sync(); + return seq; + } + + private HLog.Entry createTestEntry( + byte[] table, byte[] region, + byte[] row, byte[] family, byte[] qualifier, + byte[] value, long seq) { long time = System.nanoTime(); WALEdit edit = new WALEdit(); seq++; edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value)); - writer.append(new HLog.Entry(new HLogKey(region, table, seq, time), edit)); - writer.sync(); - return seq; - + return new HLog.Entry(new HLogKey(region, table, seq, time), edit); } @@ -864,6 +1040,14 @@ public class TestHLogSplit { private int compareHLogSplitDirs(Path p1, Path p2) throws IOException { FileStatus[] f1 = fs.listStatus(p1); FileStatus[] f2 = fs.listStatus(p2); + assertNotNull("Path " + p1 + " doesn't exist", f1); + assertNotNull("Path " + p2 + " doesn't exist", f2); + + System.out.println("Files in " + p1 + ": " + + Joiner.on(",").join(FileUtil.stat2Paths(f1))); + System.out.println("Files in " + p2 + ": " + + Joiner.on(",").join(FileUtil.stat2Paths(f2))); + assertEquals(f1.length, f2.length); for (int i = 0; i < f1.length; i++) { // Regions now have a directory named RECOVERED_EDITS_DIR and in here diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 4acea51..d10ab13 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -487,9 +487,9 @@ public class TestWALReplay { */ private Path runWALSplit(final Configuration c) throws IOException { FileSystem fs = FileSystem.get(c); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c); - List splits = logSplitter.splitLog(this.hbaseRootDir, this.logDir, - this.oldLogDir, fs, c); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c, + this.hbaseRootDir, this.logDir, this.oldLogDir, fs); + List splits = logSplitter.splitLog(); // Split should generate only 1 file since there's only 1 region assertEquals(1, splits.size()); // Make sure the file exists