diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index e479eae..2fb4897 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -128,7 +128,9 @@ public class HLog implements HConstants, Syncable { private final List actionListeners = Collections.synchronizedList(new ArrayList()); + @SuppressWarnings("unchecked") private static Class logWriterClass; + @SuppressWarnings("unchecked") private static Class logReaderClass; private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer @@ -470,7 +472,6 @@ public class HLog implements HConstants, Syncable { * @return A WAL reader. Close when done with it. * @throws IOException */ - @SuppressWarnings("unchecked") public static Reader getReader(final FileSystem fs, final Path path, Configuration conf) throws IOException { @@ -483,10 +484,11 @@ public class HLog implements HConstants, Syncable { HLog.Reader reader = (HLog.Reader) logReaderClass.newInstance(); reader.init(fs, path, conf); return reader; - } catch (Exception e) { - IOException ie = new IOException("cannot get log reader"); - ie.initCause(e); - throw ie; + } catch (IOException e) { + throw e; + } + catch (Exception e) { + throw new IOException("Cannot get log reader", e); } } @@ -497,7 +499,6 @@ public class HLog implements HConstants, Syncable { * @return A WAL writer. Close when done with it. * @throws IOException */ - @SuppressWarnings("unchecked") public static Writer createWriter(final FileSystem fs, final Path path, Configuration conf) throws IOException { @@ -1130,10 +1131,11 @@ public class HLog implements HConstants, Syncable { * @param rootDir qualified root directory of the HBase instance * @param srcDir Directory of log files to split: e.g. * ${ROOTDIR}/log_HOST_PORT - * @param oldLogDir + * @param oldLogDir directory where processed (split) logs will be archived to * @param fs FileSystem * @param conf Configuration - * @throws IOException + * @throws IOException will throw if corrupted hlogs aren't tolerated + * @return the list of splits */ public static List splitLog(final Path rootDir, final Path srcDir, Path oldLogDir, final FileSystem fs, final Configuration conf) @@ -1152,18 +1154,10 @@ public class HLog implements HConstants, Syncable { } LOG.info("Splitting " + logfiles.length + " hlog(s) in " + srcDir.toString()); - splits = splitLog(rootDir, oldLogDir, logfiles, fs, conf); + splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf); try { - FileStatus[] files = fs.listStatus(srcDir); - for(FileStatus file : files) { - Path newPath = getHLogArchivePath(oldLogDir, file.getPath()); - LOG.debug("Moving " + FSUtils.getPath(file.getPath()) + " to " + - FSUtils.getPath(newPath)); - fs.rename(file.getPath(), newPath); - } - LOG.debug("Moved " + files.length + " log files to " + - FSUtils.getPath(oldLogDir)); - fs.delete(srcDir, true); + LOG.info("Spliting is done. Removing old log dir "+srcDir); + fs.delete(srcDir, false); } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); IOException io = new IOException("Cannot delete: " + srcDir); @@ -1203,296 +1197,77 @@ public class HLog implements HConstants, Syncable { } } - /* - * @param rootDir - * @param logfiles - * @param fs - * @param conf - * @throws IOException - * @return List of splits made. - */ - private static List splitLog(final Path rootDir, + private static List splitLog(final Path rootDir, final Path srcDir, Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs, final Configuration conf) throws IOException { + List processedLogs = new ArrayList(); + List corruptedLogs = new ArrayList(); final Map logWriters = Collections.synchronizedMap( new TreeMap(Bytes.BYTES_COMPARATOR)); List splits = null; - // Number of threads to use when log splitting to rewrite the logs. - // More means faster but bigger mem consumption. - int logWriterThreads = - conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); - - // Number of logs to read into memory before writing to their appropriate - // regions when log splitting. More means faster but bigger mem consumption + // Number of logs in a read batch + // More means faster but bigger mem consumption int logFilesPerStep = conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3); + boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); // append support = we can avoid data loss (yay) // we open for append, then close to recover the correct file length final boolean appendSupport = isAppend(conf); - // store corrupt logs for post-mortem analysis (empty string = discard) - final String corruptDir = - conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"); - - List finishedFiles = new LinkedList(); - List corruptFiles = new LinkedList(); - try { - int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) / - logFilesPerStep)).intValue(); - for (int step = 0; step < maxSteps; step++) { - - // Step 1: read N log files into memory - final Map> logEntries = - new TreeMap>(Bytes.BYTES_COMPARATOR); - int endIndex = step == maxSteps - 1? logfiles.length: - step * logFilesPerStep + logFilesPerStep; - for (int i = (step * logFilesPerStep); i < endIndex; i++) { - Path curLogFile = logfiles[i].getPath(); - - // make sure we get the right file length before opening for read - recoverLog(fs, curLogFile, appendSupport); - - long length = fs.getFileStatus(curLogFile).getLen(); - if (LOG.isDebugEnabled()) { - LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + - ": " + curLogFile + ", length=" + length); + int i = -1; + while (i < logfiles.length) { + final Map> editsByRegion = + new TreeMap>(Bytes.BYTES_COMPARATOR); + for (int j = 0; j < logFilesPerStep; j++) { + i++; + if (i == logfiles.length) { + break; } - - Reader in = null; - boolean cleanRead = false; - int count = 0; + FileStatus log = logfiles[i]; + Path logPath = log.getPath(); + long logLength = log.getLen(); + LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + + ": " + logPath + ", length=" + logLength ); try { - in = HLog.getReader(fs, curLogFile, conf); - try { - Entry entry; - while ((entry = in.next()) != null) { - byte [] regionName = entry.getKey().getRegionName(); - LinkedList queue = logEntries.get(regionName); - if (queue == null) { - queue = new LinkedList(); - LOG.debug("Adding queue for " + Bytes.toStringBinary(regionName)); - logEntries.put(regionName, queue); - } - queue.push(entry); - count++; - } - LOG.debug("Pushed=" + count + " entries from " + curLogFile); - cleanRead = true; - } catch (IOException e) { - LOG.debug("IOE Pushed=" + count + " entries from " + curLogFile); - e = RemoteExceptionHandler.checkIOException(e); - if (!(e instanceof EOFException)) { - String msg = "Exception processing " + curLogFile + - " -- continuing. Possible DATA LOSS!"; - if (corruptDir.length() > 0) { - msg += " Storing in hlog corruption directory."; - } - LOG.warn(msg, e); - } - } - } catch (IOException e) { - if (length <= 0) { - LOG.warn("Empty hlog, continuing: " + logfiles[i]); - cleanRead = true; - continue; - } - throw e; - } finally { - try { - if (in != null) { - in.close(); - } - } catch (IOException e) { - LOG.warn("File.close() threw exception -- continuing, " - + "but marking file as corrupt.", e); - cleanRead = false; - } - if (cleanRead) { - finishedFiles.add(curLogFile); - } else { - corruptFiles.add(curLogFile); - } - /* TODO FOR J-D REVIEW - // Archive the input file now so we do not replay edits. We could - // have gotten here because of an exception. If so, probably - // nothing we can do about it. Replaying it, it could work but we - // could be stuck replaying for ever. Just continue though we - // could have lost some edits. - fs.rename(logfiles[i].getPath(), - getHLogArchivePath(oldLogDir, logfiles[i].getPath())); - */ - } - } - - // Step 2: Some regionserver log files have been read into memory. - // Assign them to the appropriate region directory. - class ThreadWithException extends Thread { - ThreadWithException(String name) { super(name); } - public IOException exception = null; - } - List threadList = - new ArrayList(logEntries.size()); - ExecutorService threadPool = - Executors.newFixedThreadPool(logWriterThreads); - for (final byte [] region: logEntries.keySet()) { - ThreadWithException thread = - new ThreadWithException(Bytes.toStringBinary(region)) { - @Override - public void run() { - LinkedList entries = logEntries.get(region); - LOG.debug("Thread got " + entries.size() + " to process"); - if(entries.size() <= 0) { - LOG.warn("Got a region with no entries to process."); - return; - } - long threadTime = System.currentTimeMillis(); - try { - int count = 0; - // get the logfile associated with this region. 2 logs often - // write to the same region, so persist this info across logs - WriterAndPath wap = logWriters.get(region); - if (wap == null) { - // first write to this region, make new logfile - assert entries.size() > 0; - Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor - .getTableDir(rootDir, - entries.getFirst().getKey().getTablename()), - HRegionInfo.encodeRegionName(region)), - HREGION_OLDLOGFILE_NAME); - - // If splitLog() was running when the user restarted his - // cluster, then we could already have a 'logfile'. - // Since we don't delete logs until everything is written to - // their respective regions, we can safely remove this tmp. - if (fs.exists(logfile)) { - LOG.warn("Deleting old hlog file: " + logfile); - // TODO: Archive? - fs.delete(logfile, true); - } - - // associate an OutputStream with this logfile - Writer w = createWriter(fs, logfile, conf); - wap = new WriterAndPath(logfile, w); - logWriters.put(region, wap); - if (LOG.isDebugEnabled()) { - LOG.debug("Creating new hlog file writer for path " - + logfile + " and region " + Bytes.toStringBinary(region)); - } - } - - // Items were added to the linkedlist oldest first. Pull them - // out in that order. - for (ListIterator i = entries.listIterator(entries.size()); - i.hasPrevious();) { - wap.w.append(i.previous()); - count++; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Applied " + count + " total edits to " - + Bytes.toStringBinary(region) + " in " - + (System.currentTimeMillis() - threadTime) + "ms"); - } - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.warn("Got while writing region " - + Bytes.toStringBinary(region) + " log " + e); - e.printStackTrace(); - exception = e; - } - } - }; - threadList.add(thread); - threadPool.execute(thread); - } - threadPool.shutdown(); - // Wait for all threads to terminate - try { - for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) { - LOG.debug("Waiting for hlog writers to terminate, iteration #" + i); - } - } catch (InterruptedException ex) { - LOG.warn("Hlog writers were interrupted during splitLog(). " - +"Retaining log files to avoid data loss."); - throw new IOException(ex.getMessage(), ex.getCause()); - } - // throw an exception if one of the threads reported one - for (ThreadWithException t : threadList) { - if (t.exception != null) { - throw t.exception; + recoverLog(appendSupport, fs, logPath); + parseHLog(log, editsByRegion, fs, conf); + processedLogs.add(logPath); + } catch (IOException e) { + if (skipErrors) { + LOG.warn("Got "+ e +" while parsing hlog " + logPath + + ". Marking as corrupted"); + corruptedLogs.add(logPath); + } else { + throw e; + } } } - - // End of for loop. Rinse and repeat + writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf); } + if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) { + throw new IOException("Discovered orphan hlog after split. Maybe " + + "HRegionServer was not dead when we started"); + } + archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); } finally { splits = new ArrayList(logWriters.size()); for (WriterAndPath wap : logWriters.values()) { wap.w.close(); - LOG.debug("Closed " + wap.p); splits.add(wap.p); + LOG.debug("Closed " + wap.p); } } - - // Step 3: All writes succeeded! Get rid of the now-unnecessary logs - for(Path p : finishedFiles) { - if (LOG.isDebugEnabled()) { - LOG.debug("Successfully split Hlog file. Deleting " + p); - } - try { - if (!fs.delete(p, true) && LOG.isDebugEnabled()) { - LOG.debug("Delete of split Hlog (" + p + ") failed."); - } - } catch (IOException e) { - // don't throw error here. worst case = double-read - LOG.warn("Error deleting successfully split Hlog (" + p + ") -- " + e); - } - } - for (Path p : corruptFiles) { - if (corruptDir.length() > 0) { - // store any corrupt logs for later analysis - Path cp = new Path(conf.get(HBASE_DIR), corruptDir); - if(!fs.exists(cp)) { - fs.mkdirs(cp); - } - Path newp = new Path(cp, p.getName()); - if (!fs.exists(newp)) { - if (!fs.rename(p, newp)) { - LOG.warn("Rename of " + p + " to " + newp + " failed."); - } else { - LOG.warn("Corrupt Hlog (" + p + ") moved to " + newp); - } - } else { - LOG.warn("Corrupt Hlog (" + p + ") already moved to " + newp + - ". Ignoring"); - } - } else { - // data loss is less important than disk space, delete - try { - if (!fs.delete(p, true) ) { - if (LOG.isDebugEnabled()) { - LOG.debug("Delete of split Hlog " + p + " failed."); - } - } else { - LOG.warn("Corrupt Hlog (" + p + ") deleted!"); - } - } catch (IOException e) { - LOG.warn("Error deleting corrupt Hlog (" + p + ") -- " + e); - } - } - } - return splits; } /* * @param conf - * @return True if append enabled and we have the syncFs in our path. + * @return True if append support */ static boolean isAppend(final Configuration conf) { boolean append = conf.getBoolean("dfs.support.append", false); @@ -1507,6 +1282,8 @@ public class HLog implements HConstants, Syncable { } catch (NoSuchMethodException e) { append = false; } + } else { + // TODO: Test that was are on hadoop 0.21 w/ hdfs-265 support. How? } return append; } @@ -1567,64 +1344,6 @@ public class HLog implements HConstants, Syncable { } } - /* - * Recover log. - * Try and open log in append mode. - * Doing this, we get a hold of the file that crashed writer - * was writing to. Once we have it, close it. This will - * allow subsequent reader to see up to last sync. - * @param fs - * @param p - * @param append - */ - public static void recoverLog(final FileSystem fs, final Path p, - final boolean append) throws IOException { - if (!append) { - return; - } - - // lease recovery not needed for local file system case. - // currently, local file system doesn't implement append either. - if (!(fs instanceof DistributedFileSystem)) { - return; - } - - LOG.debug("Recovering DFS lease for path " + p); - long startWaiting = System.currentTimeMillis(); - - // Trying recovery - boolean recovered = false; - while (!recovered) { - try { - FSDataOutputStream out = fs.append(p); - out.close(); - recovered = true; - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - if (e instanceof AlreadyBeingCreatedException) { - // We expect that we'll get this message while the lease is still - // within its soft limit, but if we get it past that, it means - // that the RS is holding onto the file even though it lost its - // znode. We could potentially abort after some time here. - long waitedFor = System.currentTimeMillis() - startWaiting; - - if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) { - LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p - + ":" + e.getMessage()); - } - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - // ignore it and try again - } - } else { - throw new IOException("Failed to open " + p + " for append", e); - } - } - } - LOG.info("Past out lease recovery"); - } - /** * Construct the HLog directory name * @@ -1668,28 +1387,244 @@ public class HLog implements HConstants, Syncable { return new Path(oldLogDir, System.currentTimeMillis() + "." + p.getName()); } - private static void usage() { - System.err.println("Usage: java org.apache.hbase.HLog" + - " {--dump ... | --split ...}"); + private static void writeEditsBatchToRegions( + final Map> splitLogsMap, + final Map logWriters, + final Path rootDir, final FileSystem fs, final Configuration conf) + throws IOException { + // Number of threads to use when log splitting to rewrite the logs. + // More means faster but bigger mem consumption. + int logWriterThreads = + conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); + boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); + ExecutorService threadPool = Executors.newFixedThreadPool(logWriterThreads); + for (final byte[] key : splitLogsMap.keySet()) { + Thread thread = createNewSplitThread(rootDir, logWriters, splitLogsMap, key, fs, conf); + threadPool.execute(thread); + } + threadPool.shutdown(); + // Wait for all threads to terminate + try { + for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) { + LOG.debug("Waiting for hlog writers to terminate, iteration #" + j); + } + } catch(InterruptedException ex) { + if (skipErrors) { + LOG.warn("Hlog writers were interrupted, possible data loss!"); + } else { + throw new IOException("Could not finish writing log entries", ex); + } + } } - /** + /* + * Parse a single hlog and put the edits in @splitLogsMap * - * @param list + * @param logfile to split + * @param splitLogsMap output parameter: a map with region names as keys and a + * list of edits as values + * @param fs the filesystem + * @param conf the configuration + * @throws IOException if hlog is corrupted, or can't be open */ + private static void parseHLog(final FileStatus logfile, + final Map> splitLogsMap, final FileSystem fs, + final Configuration conf) + throws IOException { + // Check for possibly empty file. With appends, currently Hadoop reports a + // zero length even if the file has been sync'd. Revisit if HDFS-376 or + // HDFS-878 is committed. + long length = logfile.getLen(); + if (length <= 0) { + LOG.warn("File " + logfile + " might be still open, length is 0"); + } + Path path = logfile.getPath(); + Reader in; + int editsCount = 0; + try { + in = HLog.getReader(fs, path, conf); + } catch (EOFException e) { + if (length <= 0) { + //TODO should we ignore an empty file if skip.errors is false? + //The caller should decide what to do. E.g. ignore if this is the last + //log in sequence. + //TODO is this scenario still possible if the log has been recovered (i.e. closed) + LOG.warn("Could not open " + path + " for reading. File is empty" + e); + return; + } else { + throw e; + } + } + try { + Entry entry; + while ((entry = in.next()) != null) { + byte[] region = entry.getKey().getRegionName(); + LinkedList queue = splitLogsMap.get(region); + if (queue == null) { + queue = new LinkedList(); + splitLogsMap.put(region, queue); + } + queue.push(entry); + editsCount++; + } + LOG.debug("Pushed=" + editsCount + " entries from " + path); + } finally { + try { + if (in != null) { + in.close(); + } + } catch (IOException e) { + LOG.warn("Close log reader in finally threw exception -- continuing", e); + } + } + } + + private static Thread createNewSplitThread(final Path rootDir, + final Map logWriters, + final Map> logEntries, + final byte[] key, final FileSystem fs, final Configuration conf) { + return new Thread(Bytes.toStringBinary(key)) { + @Override + public void run() { + LinkedList entries = logEntries.get(key); + LOG.debug("Thread " + this.getId() + " got " + entries.size() + " to process"); + long threadTime = System.currentTimeMillis(); + try { + int editsCount = 0; + // Items were added to the linkedlist oldest first. Pull them + // out in that order. + for (ListIterator iterator = entries.listIterator(entries.size()); + iterator.hasPrevious();) { + Entry logEntry = iterator.previous(); + WriterAndPath wap = logWriters.get(key); + if (wap == null) { + Path logFile = getRegionLogPath(logEntry, rootDir, key); + if (fs.exists(logFile)) { + LOG.warn("Deleting old hlog file " + logFile + + ", length=" + fs.getFileStatus(logFile).getLen()); + fs.delete(logFile, false); + } + Writer w = createWriter(fs, logFile, conf); + wap = new WriterAndPath(logFile, w); + logWriters.put(key, wap); + LOG.debug("Creating writer path=" + logFile + + " region=" + Bytes.toStringBinary(key)); + } + wap.w.append(logEntry); + wap.w.sync(); + editsCount++; + } + LOG.debug(this.getId() + " Applied " + editsCount + + " total edits to " + Bytes.toStringBinary(key) + + " in " + (System.currentTimeMillis() - threadTime) + "ms"); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.warn(this.getId() + " Got while writing log entry " + + Bytes.toStringBinary(key) + " to log", e); + } + } + }; + } + + private static void archiveLogs(final List corruptedLogs, + final List processedLogs, final Path oldLogDir, + final FileSystem fs, final Configuration conf) + throws IOException{ + final Path corruptDir = new Path(conf.get(HBASE_DIR), + conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt")); + if (!fs.exists(corruptDir)) { + fs.mkdirs(corruptDir); + } + if (!fs.exists(oldLogDir)) { + fs.mkdirs(oldLogDir); + } + for (Path corrupted: corruptedLogs) { + Path p = new Path(corruptDir, corrupted.getName()); + LOG.info("Moving corrupted log " + corrupted + " to " + p); + fs.rename(corrupted, p); + } + + for (Path p: processedLogs) { + Path newPath = getHLogArchivePath(oldLogDir, p); + fs.rename(p, newPath); + LOG.info("Archived processed log " + p + " to " + newPath); + } + } + + private static Path getRegionLogPath(Entry logEntry, Path rootDir, byte[] key) { + Path tableDir = + HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename()); + Path regionDir = + HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(key)); + return new Path(regionDir, HREGION_OLDLOGFILE_NAME); + } + + /* + * Recover log p + * @param fs + * @param p + * @param append True if append supported + * @throws IOException + */ + public static void recoverLog(final boolean append, final FileSystem fs, final Path p) + throws IOException{ + if (!append) { + return; + } + // lease recovery not needed for local file system case. + // currently, local file system doesn't implement append either. + if (!(fs instanceof DistributedFileSystem)) { + return; + } + LOG.info("Recovering hlog" + p); + long startWaiting = System.currentTimeMillis(); + + // Trying recovery + boolean recovered = false; + while (!recovered) { + try { + FSDataOutputStream out = fs.append(p); + out.close(); + recovered = true; + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + if (e instanceof AlreadyBeingCreatedException) { + // We expect that we'll get this message while the lease is still + // within its soft limit, but if we get it past that, it means + // that the RS is holding onto the file even though it lost its + // znode. We could potentially abort after some time here. + long waitedFor = System.currentTimeMillis() - startWaiting; + if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) { + LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p + + ":" + e.getMessage()); + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + // ignore it and try again + } + } else { + throw new IOException("Failed to open " + p + " for append", e); + } + } + } + LOG.info("Past out lease recovery off " + p); + } + public void addLogActionsListerner(LogActionsListener list) { LOG.info("Adding a listener"); this.actionListeners.add(list); } - /** - * - * @param list - */ public boolean removeLogActionsListener(LogActionsListener list) { return this.actionListeners.remove(list); } + private static void usage() { + System.err.println("Usage: java org.apache.hbase.HLog" + + " {--dump ... | --split ...}"); + } + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 3feaedd..aa81b3f 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -46,22 +46,24 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.zookeeper.ZooKeeper; /** * Facility for testing HBase. Added as tool to abet junit4 testing. Replaces * old HBaseTestCase and HBaseCluserTestCase functionality. * Create an instance and keep it around doing HBase testing. This class is - * meant to be your one-stop shop for anything you mind need testing. Manages - * one cluster at a time only. Depends on log4j on classpath and hbase-site.xml - * for logging and test-run configuration. It does not set logging levels nor - * make changes to configuration parameters. + * meant to be your one-stop shop for anything you might need testing. Manages + * one cluster at a time only. Depends on log4j being on classpath and + * hbase-site.xml for logging and test-run configuration. It does not set + * logging levels nor make changes to configuration parameters. */ public class HBaseTestingUtility { private final Log LOG = LogFactory.getLog(getClass()); - private final Configuration conf; private MiniZooKeeperCluster zkCluster = null; private MiniDFSCluster dfsCluster = null; @@ -71,6 +73,23 @@ public class HBaseTestingUtility { private File clusterTestBuildDir = null; private HBaseAdmin hbaseAdmin = null; + // Cache this. For some reason only works first time I get it. TODO: Figure + // out why. + private final static UserGroupInformation UGI; + static { + UGI = UserGroupInformation.getCurrentUGI(); + } + + /** + * System property key to get test directory value. + */ + public static final String TEST_DIRECTORY_KEY = "test.build.data"; + + /** + * Default parent directory for test output. + */ + public static final String DEFAULT_TEST_DIRECTORY = "target/build/data"; + public HBaseTestingUtility() { this(HBaseConfiguration.create()); } @@ -79,10 +98,6 @@ public class HBaseTestingUtility { this.conf = conf; } - /** System property key to get test directory value. - */ - public static final String TEST_DIRECTORY_KEY = "test.build.data"; - /** * @return Instance of Configuration. */ @@ -91,25 +106,38 @@ public class HBaseTestingUtility { } /** - * @return Where to write test data on local filesystem; usually build/test/data + * @return Where to write test data on local filesystem; usually + * {@link #DEFAULT_TEST_DIRECTORY} + * @see #setupClusterTestBuildDir() */ public static Path getTestDir() { - return new Path(System.getProperty(TEST_DIRECTORY_KEY, "target/test/data")); + return new Path(System.getProperty(TEST_DIRECTORY_KEY, + DEFAULT_TEST_DIRECTORY)); + } + + /** + * @param subdirName + * @return Path to a subdirectory named subdirName under + * {@link #getTestDir()}. + * @see #setupClusterTestBuildDir() + */ + public static Path getTestDir(final String subdirName) { + return new Path(getTestDir(), subdirName); } /** - * Home our cluster in a dir under build/test. Give it a random name + * Home our cluster in a dir under target/test. Give it a random name * so can have many concurrent clusters running if we need to. Need to * amend the test.build.data System property. Its what minidfscluster bases * it data dir on. Moding a System property is not the way to do concurrent * instances -- another instance could grab the temporary - * value unintentionally -- but not anything can do about it at moment; its - * how the minidfscluster works. + * value unintentionally -- but not anything can do about it at moment; + * single instance only is how the minidfscluster works. * @return The calculated cluster test build directory. */ File setupClusterTestBuildDir() { String oldTestBuildDir = - System.getProperty(TEST_DIRECTORY_KEY, "build/test/data"); + System.getProperty(TEST_DIRECTORY_KEY, DEFAULT_TEST_DIRECTORY); String randomStr = UUID.randomUUID().toString(); String dirStr = oldTestBuildDir + "." + randomStr; File dir = new File(dirStr).getAbsoluteFile(); @@ -128,20 +156,47 @@ public class HBaseTestingUtility { } /** - * @param subdirName - * @return Path to a subdirectory named subdirName under - * {@link #getTestDir()}. + * Start up a minicluster of hbase, dfs, and zookeeper. + * @throws Exception + * @see {@link #shutdownMiniDFSCluster()} */ - public Path getTestDir(final String subdirName) { - return new Path(getTestDir(), subdirName); + public void startMiniCluster() throws Exception { + startMiniCluster(1); } /** - * Start up a minicluster of hbase, dfs, and zookeeper. + * Start a minidfscluster. + * @param servers How many DNs to start. * @throws Exception + * @see {@link #shutdownMiniDFSCluster()} */ - public void startMiniCluster() throws Exception { - startMiniCluster(1); + public void startMiniDFSCluster(int servers) throws Exception { + startMiniDFSCluster(servers, null); + } + + /** + * Start a minidfscluster. + * @param dir Where to home your dfs cluster. + * @param servers How many DNs to start. + * @throws Exception + * @see {@link #shutdownMiniDFSCluster()} + */ + public void startMiniDFSCluster(int servers, final File dir) + throws Exception { + // This does the following to home the minidfscluster + // base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/"); + if (dir == null) this.clusterTestBuildDir = setupClusterTestBuildDir(); + else this.clusterTestBuildDir = dir; + System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.toString()); + this.dfsCluster = new MiniDFSCluster(12345, this.conf, servers, true, + true, true, null, null, null, null); + } + + public void shutdownMiniDFSCluster() throws Exception { + if (this.dfsCluster != null) { + // The below throws an exception per dn, AsynchronousCloseException. + this.dfsCluster.shutdown(); + } } /** @@ -174,9 +229,10 @@ public class HBaseTestingUtility { } /** - * Start up a minicluster of hbase, optinally dfs, and zookeeper. + * Start up a minicluster of hbase, optionally dfs, and zookeeper. * Modifies Configuration. Homes the cluster data directory under a random * subdirectory in a directory under System property test.build.data. + * Directory is cleaned up on exit. * @param servers Number of servers to start up. We'll start this many * datanodes and regionservers. If servers is > 1, then make sure * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise @@ -189,29 +245,23 @@ public class HBaseTestingUtility { LOG.info("Starting up minicluster"); // If we already put up a cluster, fail. isRunningCluster(); - String oldBuildTestDir = - System.getProperty(TEST_DIRECTORY_KEY, "build/test/data"); + // Make a new random dir to home everything in. Set it as system property. + // minidfs reads home from system property. this.clusterTestBuildDir = setupClusterTestBuildDir(); - - // Set our random dir while minidfscluster is being constructed. System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath()); // Bring up mini dfs cluster. This spews a bunch of warnings about missing - // scheme. TODO: fix. - // Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. - this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, - true, true, null, null, null, null); - // Restore System property. minidfscluster accesses content of - // the TEST_DIRECTORY_KEY to make bad blocks, a feature we are not using, - // but otherwise, just in constructor. - System.setProperty(TEST_DIRECTORY_KEY, oldBuildTestDir); + // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. + startMiniDFSCluster(servers, this.clusterTestBuildDir); // Mangle conf so fs parameter points to minidfs we just started up FileSystem fs = this.dfsCluster.getFileSystem(); this.conf.set("fs.defaultFS", fs.getUri().toString()); + // Do old style too just to be safe. + this.conf.set("fs.default.name", fs.getUri().toString()); this.dfsCluster.waitClusterUp(); - - // It could be created before the cluster - if(this.zkCluster == null) { + + // Start up a zk cluster. + if (this.zkCluster == null) { startMiniZKCluster(this.clusterTestBuildDir); } @@ -503,25 +553,6 @@ public class HBaseTestingUtility { } /** - * Removes all rows from the .META. in preparation to add custom ones. - * - * @throws IOException When removing the rows fails. - */ - private void emptyMetaTable() throws IOException { - HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME); - ArrayList deletes = new ArrayList(); - ResultScanner s = t.getScanner(new Scan()); - for (Result result : s) { - LOG.info("emptyMetaTable: remove row -> " + - Bytes.toStringBinary(result.getRow())); - Delete del = new Delete(result.getRow()); - deletes.add(del); - } - s.close(); - t.delete(deletes); - } - - /** * Starts a MiniMRCluster with a default number of * TaskTracker's. * @@ -688,4 +719,23 @@ public class HBaseTestingUtility { public MiniDFSCluster getDFSCluster() { return dfsCluster; } -} + + /** + * Create a copy of the passed configuration laden with a new user. Use it + * to do things like get a new FileSystem instance. + * @param c + * @param index Some unique number used to make a unique user. + * @return Copy of c with new user stuff loaded into it. + * @throws IOException + */ + public static Configuration setDifferentUser(final Configuration c, + final int index) + throws IOException { + Configuration c2 = new Configuration(c); + String username = UGI.getUserName() + ".hrs." + index; + UnixUserGroupInformation.saveToConf(c2, + UnixUserGroupInformation.UGI_PROPERTY_NAME, + new UnixUserGroupInformation(username, new String[]{"supergroup"})); + return c2; + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 92f6577..a26dd50 100644 --- a/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase; import java.io.IOException; -import java.net.BindException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -36,10 +35,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.security.UnixUserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation; /** * This class creates a single process HBase cluster. @@ -49,13 +45,6 @@ import org.apache.hadoop.security.UserGroupInformation; */ public class MiniHBaseCluster implements HConstants { static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName()); - // Cache this. For some reason only works first time I get it. TODO: Figure - // out why. - private final static UserGroupInformation UGI; - static { - UGI = UserGroupInformation.getCurrentUGI(); - } - private Configuration conf; public LocalHBaseCluster hbaseCluster; @@ -133,28 +122,7 @@ public class MiniHBaseCluster implements HConstants { public MiniHBaseClusterRegionServer(Configuration conf) throws IOException { - super(setDifferentUser(conf)); - } - - /* - * @param c - * @param currentfs We return this if we did not make a new one. - * @param uniqueName Same name used to help identify the created fs. - * @return A new fs instance if we are up on DistributeFileSystem. - * @throws IOException - */ - private static Configuration setDifferentUser(final Configuration c) - throws IOException { - FileSystem currentfs = FileSystem.get(c); - if (!(currentfs instanceof DistributedFileSystem)) return c; - // Else distributed filesystem. Make a new instance per daemon. Below - // code is taken from the AppendTestUtil over in hdfs. - Configuration c2 = new Configuration(c); - String username = UGI.getUserName() + ".hrs." + index++; - UnixUserGroupInformation.saveToConf(c2, - UnixUserGroupInformation.UGI_PROPERTY_NAME, - new UnixUserGroupInformation(username, new String[]{"supergroup"})); - return c2; + super(HBaseTestingUtility.setDifferentUser(conf, index++)); } @Override diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java new file mode 100644 index 0000000..5a259e4 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java @@ -0,0 +1,579 @@ +package org.apache.hadoop.hbase.master; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.ipc.RemoteException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestHLogSplit { + + private Configuration conf; + private FileSystem fs; + + private final static HBaseTestingUtility + TEST_UTIL = new HBaseTestingUtility(); + + + private static final Path hbaseDir = new Path("/hbase"); + private static final Path hlogDir = new Path(hbaseDir, "hlog"); + private static final Path oldLogDir = new Path(hbaseDir, "hlog.old"); + private static final Path corruptDir = new Path(hbaseDir, ".corrupt"); + + private static final int NUM_WRITERS = 10; + private static final int ENTRIES = 10; // entries per writer per region + + private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS]; + private long seq = 0; + private static final byte[] TABLE_NAME = "t1".getBytes(); + private static final byte[] FAMILY = "f1".getBytes(); + private static final byte[] QUALIFIER = "q1".getBytes(); + private static final byte[] VALUE = "v1".getBytes(); + private static final String HLOG_FILE_PREFIX = "hlog.dat."; + private static List regions; + + + static enum Corruptions { + INSERT_GARBAGE_ON_FIRST_LINE, + INSERT_GARBAGE_IN_THE_MIDDLE, + APPEND_GARBAGE, + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration(). + setInt("hbase.regionserver.flushlogentries", 1); + TEST_UTIL.getConfiguration(). + setBoolean("dfs.support.append", true); + TEST_UTIL.getConfiguration(). + setStrings("hbase.rootdir", hbaseDir.toString()); + TEST_UTIL.startMiniDFSCluster(2); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniDFSCluster(); + } + + @Before + public void setUp() throws Exception { + conf = TEST_UTIL.getConfiguration(); + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + FileStatus[] entries = fs.listStatus(new Path("/")); + for (FileStatus dir : entries){ + fs.delete(dir.getPath(), true); + } + seq = 0; + regions = new ArrayList(); + Collections.addAll(regions, "bbb", "ccc"); + /* Not available in 0.20 hdfs + TEST_UTIL.getDFSCluster().getNamesystem().leaseManager. + setLeasePeriod(100, 50000); + */ + } + + @After + public void tearDown() throws Exception { + } + + + //TODO: check the edits order is respected (scenarios) + //TODO: test the split of a large (lots of regions > 500 file). In my tests it seems without hflush + // we're losing entries while writing to regions + + @Test + public void testEmptyLogFiles() throws IOException { + + injectEmptyFile(".empty", true); + generateHLogs(Integer.MAX_VALUE); + injectEmptyFile("empty", true); + + // make fs act as a different client now + // initialize will create a new DFSClient with a new client ID + fs.initialize(fs.getUri(), conf); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + } + + + @Test + public void testEmptyOpenLogFiles() throws IOException { + injectEmptyFile(".empty", false); + generateHLogs(Integer.MAX_VALUE); + injectEmptyFile("empty", false); + Configuration otherConf = + HBaseTestingUtility.setDifferentUser(this.conf, 999); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, FileSystem.get(otherConf), + otherConf); + + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + @Test + public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { + // generate logs but leave hlog.dat.5 open. + generateHLogs(5); + + fs.initialize(fs.getUri(), conf); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + + @Test + public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + @Test + public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + + @Test + public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + // the entries in the original logs are alternating regions + // considering the sequence file header, the middle corruption should + // affect at least half of the entries + int goodEntries = (NUM_WRITERS - 1) * ENTRIES; + int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; + assertTrue("The file up to the corrupted area hasn't been parsed", + goodEntries + firstHalfEntries <= countHLog(logfile, fs, conf)); + } + } + + @Test + public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + + Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0"); + Path c2 = new Path(hlogDir, HLOG_FILE_PREFIX + "5"); + Path c3 = new Path(hlogDir, HLOG_FILE_PREFIX + (NUM_WRITERS - 1)); + generateHLogs(-1); + corruptHLog(c1, Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs); + corruptHLog(c2, Corruptions.APPEND_GARBAGE, true, fs); + corruptHLog(c3, Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + FileStatus[] archivedLogs = fs.listStatus(corruptDir); + + assertEquals("expected a different file", c1.getName(), archivedLogs[0].getPath().getName()); + assertEquals("expected a different file", c2.getName(), archivedLogs[1].getPath().getName()); + assertEquals("expected a different file", c3.getName(), archivedLogs[2].getPath().getName()); + assertEquals(archivedLogs.length, 3); + + } + + @Test + public void testLogsGetArchivedAfterSplit() throws IOException { + conf.setBoolean("hbase.skip.errors", false); + + generateHLogs(-1); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + FileStatus[] archivedLogs = fs.listStatus(oldLogDir); + + assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); + } + + + + @Test(expected = IOException.class) + public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { + conf.setBoolean("hbase.skip.errors", false); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } + + @Test + public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException { + conf.setBoolean("hbase.skip.errors", false); + generateHLogs(-1); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + fs.initialize(fs.getUri(), conf); + try { + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } catch (IOException e) {/* expected */} + + assertEquals("if skip.errors is false all files should remain in place", + NUM_WRITERS, fs.listStatus(hlogDir).length); + } + + + @Test + public void testSplit() throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + + } + } + + @Test(expected = FileNotFoundException.class) + public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + fs.listStatus(hlogDir); + + } + + @Test + public void testLogCannotBeWrittenOnceParsed() throws IOException { + generateHLogs(9); + fs.initialize(fs.getUri(), conf); + + + AtomicLong counter = new AtomicLong(0); + (new ZombieLastLogWriterRegionServer(counter)).start(); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet"); + + // It's possible that the writer got an error while appending and didn't count it + // however the entry will in fact be written to file and split with the rest + long numberOfEditsInRegion = countHLog(logfile, fs, conf); + assertTrue("The log file could have at most 1 extra log entry, but " + + "can't have less" + logfile, counter.get() == numberOfEditsInRegion || + counter.get() + 1 == numberOfEditsInRegion); + } + + + @Test(expected = IOException.class) + public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted() throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + (new ZombieNewLogWriterRegionServer()).start(); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } + + @Test + public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted() throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + (new ZombieNewLogWriterRegionServer()).start(); + try { + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } catch (IOException ex) {/* expected */} + int logFilesNumber = fs.listStatus(hlogDir).length; + + assertEquals("Log files should not be archived if there's an extra file after split", + NUM_WRITERS + 1, logFilesNumber); + return; + + } + + /** + * This thread will keep writing to the file after the split process has started + * It simulates a region server that was considered dead but woke up and wrote + * some more to he last log entry + */ + class ZombieLastLogWriterRegionServer extends Thread { + AtomicLong editsCount; + Path log; + HLog.Writer lastLogWriter = writer[NUM_WRITERS - 1]; + public ZombieLastLogWriterRegionServer(AtomicLong counter) { + this.editsCount = counter; + } + + @Override + public void run() { + flushToConsole("starting"); + while (true) { + try { + appendEntry(lastLogWriter, TABLE_NAME, "juliet".getBytes(), + ("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0); + lastLogWriter.sync(); + editsCount.incrementAndGet(); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + // + } + + + } catch (IOException ex) { + if (ex instanceof RemoteException) { + flushToConsole("Juliet: got RemoteException " + + ex.getMessage() + " while writing " + (editsCount.get() + 1)); + break; + } else { + assertTrue("Failed to write " + editsCount.get(), false); + } + + } + } + + + } + } + + /** + * This thread will keep adding new log files + * It simulates a region server that was considered dead but woke up and wrote + * some more to a new hlog + */ + class ZombieNewLogWriterRegionServer extends Thread { + + + @Override + public void run() { + + boolean splitStarted = false; + while (!splitStarted) { + try { + splitStarted = fs.listStatus(new Path(hbaseDir, new String(TABLE_NAME))).length > 0; + } catch (FileNotFoundException e) { + try { + flushToConsole("Juliet: split not started, sleeping a bit..."); + Thread.sleep(100); + } catch (InterruptedException e1) { + // + } + } catch (IOException e1) { + assertTrue("Failed to list status ", false); + } + } + + Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet"); + try { + HLog.Writer writer = HLog.createWriter(fs, + julietLog, conf); + appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(), + ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0); + writer.close(); + flushToConsole("Juliet file creator: created file " + julietLog); + } catch (IOException e1) { + assertTrue("Failed to create file " + julietLog, false); + } + + } + + } + + private void flushToConsole(String s) { + System.out.println(s); + System.out.flush(); + } + + + private void generateHLogs(int leaveOpen) throws IOException { + for (int i = 0; i < NUM_WRITERS; i++) { + writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf); + for (int j = 0; j < ENTRIES; j++) { + int prefix = 0; + for (String region : regions) { + String row_key = region + prefix++ + i + j; + appendEntry(writer[i], TABLE_NAME, region.getBytes(), + row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq); + } + } + if (i != leaveOpen) { + writer[i].close(); + flushToConsole("Closing writer " + i); + } + } + } + + private Path getLogForRegion(Path rootdir, byte[] table, String region) { + return new Path(HRegion.getRegionDir(HTableDescriptor + .getTableDir(rootdir, table), + HRegionInfo.encodeRegionName(region.getBytes())), + "oldlogfile.log"); + } + + private void corruptHLog(Path path, Corruptions corruption, boolean close, + FileSystem fs) throws IOException { + + FSDataOutputStream out; + int fileSize = (int) fs.listStatus(path)[0].getLen(); + + FSDataInputStream in = fs.open(path); + byte[] corrupted_bytes = new byte[fileSize]; + in.readFully(0, corrupted_bytes, 0, fileSize); + in.close(); + + switch (corruption) { + case APPEND_GARBAGE: + out = fs.append(path); + out.write("-----".getBytes()); + closeOrFlush(close, out); + break; + + case INSERT_GARBAGE_ON_FIRST_LINE: + fs.delete(path, false); + out = fs.create(path); + out.write(0); + out.write(corrupted_bytes); + closeOrFlush(close, out); + break; + + case INSERT_GARBAGE_IN_THE_MIDDLE: + fs.delete(path, false); + out = fs.create(path); + int middle = (int) Math.floor(corrupted_bytes.length / 2); + out.write(corrupted_bytes, 0, middle); + out.write(0); + out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); + closeOrFlush(close, out); + break; + } + + + } + + private void closeOrFlush(boolean close, FSDataOutputStream out) + throws IOException { + if (close) { + out.close(); + } else { + out.sync(); + // Not in 0out.hflush(); + } + } + + @SuppressWarnings("unused") + private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException { + HLog.Entry entry; + HLog.Reader in = HLog.getReader(fs, log, conf); + while ((entry = in.next()) != null) { + System.out.println(entry); + } + } + + private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException { + int count = 0; + HLog.Reader in = HLog.getReader(fs, log, conf); + while (in.next() != null) { + count++; + } + return count; + } + + + public long appendEntry(HLog.Writer writer, byte[] table, byte[] region, + byte[] row, byte[] family, byte[] qualifier, + byte[] value, long seq) + throws IOException { + + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + seq++; + edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value)); + writer.append(new HLog.Entry(new HLogKey(region, table, seq, time), edit)); + writer.sync(); + return seq; + + } + + private void injectEmptyFile(String suffix, boolean closeFile) + throws IOException { + HLog.Writer writer = HLog.createWriter( + fs, new Path(hlogDir, HLOG_FILE_PREFIX + suffix), conf); + if (closeFile) writer.close(); + } + + @SuppressWarnings("unused") + private void listLogs(FileSystem fs, Path dir) throws IOException { + for (FileStatus file : fs.listStatus(dir)) { + System.out.println(file.getPath()); + } + + } + +}