commit 7a97d4427a02950f23b35ce161f075ee4065608e Author: Nicolas Spiegelberg Date: 7 days ago HBASE-2312 [jira] Possible data loss when RS goes into GC pause while rolling HLog Summary: There is a very corner case when bad things could happen(ie data loss): 1) RS #1 is going to roll its HLog - not yet created the new one, old one will get no more writes 2) RS #1 enters GC Pause of Death 3) Master lists HLog files of RS#1 that is has to split as RS#1 is dead, starts splitting 4) RS #1 wakes up, created the new HLog (previous one was rolled) and appends an edit - which is lost The following seems like a possible solution: 1) Master detects RS#1 is dead 2) The master renames the /hbase/.logs/ directory to something else (say /hbase/.logs/-dead) 3) Add mkdir support (as opposed to mkdirs) to HDFS - so that a file create fails if the directory doesn't exist. Dhruba tells me this is very doable. 4) RS#1 comes back up and is not able create the new hlog. It restarts itself. Test Plan: EMPTY Reviewers: JIRA, stack, khemani Reviewed By: khemani CC: tedyu, nspiegelberg, stack, Kannan, khemani, jgray Differential Revision: 99 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1196773 13f79535-47bb-0310-9956-ffa450edef68 diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index baf25a7..7216a04 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -183,55 +183,87 @@ public class MasterFileSystem { * {@link ServerName} */ void splitLogAfterStartup(final Set onlineServers) { + boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", + HLog.SPLIT_SKIP_ERRORS_DEFAULT); Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME); - try { - if (!this.fs.exists(logsDirPath)) { - return; - } - } catch (IOException e) { - throw new RuntimeException("Failed exists test on " + logsDirPath, e); - } - FileStatus[] logFolders; - try { - logFolders = this.fs.listStatus(logsDirPath); - } catch (IOException e) { - throw new RuntimeException("Failed listing " + logsDirPath.toString(), e); - } - if (logFolders == null || logFolders.length == 0) { - LOG.debug("No log files to split, proceeding..."); - return; - } - List serverNames = new ArrayList(); - for (FileStatus status : logFolders) { - String sn = status.getPath().getName(); - // Is this old or new style servername? If old style, it will be - // hostname, colon, and port. If new style, it will be formatted as - // ServerName.toString. - ServerName serverName = ServerName.parseServerName(sn); - if (!onlineServers.contains(serverName)) { - LOG.info("Log folder " + status.getPath() + " doesn't belong " + - "to a known region server, splitting"); - serverNames.add(serverName); - } else { - LOG.info("Log folder " + status.getPath() + - " belongs to an existing region server"); + do { + List serverNames = new ArrayList(); + try { + if (!this.fs.exists(logsDirPath)) return; + FileStatus[] logFolders = this.fs.listStatus(logsDirPath); + + if (logFolders == null || logFolders.length == 0) { + LOG.debug("No log files to split, proceeding..."); + return; + } + for (FileStatus status : logFolders) { + String sn = status.getPath().getName(); + // truncate splitting suffix if present (for ServerName parsing) + if (sn.endsWith(HLog.SPLITTING_EXT)) { + sn = sn.substring(0, sn.length() - HLog.SPLITTING_EXT.length()); + } + ServerName serverName = ServerName.parseServerName(sn); + if (!onlineServers.contains(serverName)) { + LOG.info("Log folder " + status.getPath() + " doesn't belong " + + "to a known region server, splitting"); + serverNames.add(serverName); + } else { + LOG.info("Log folder " + status.getPath() + + " belongs to an existing region server"); + } + } + splitLog(serverNames); + retrySplitting = false; + } catch (IOException ioe) { + LOG.warn("Failed splitting of " + serverNames, ioe); + if (!checkFileSystem()) { + LOG.warn("Bad Filesystem, exiting"); + Runtime.getRuntime().halt(1); + } + try { + if (retrySplitting) { + Thread.sleep(conf.getInt( + "hbase.hlog.split.failure.retry.interval", 30 * 1000)); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted, returning w/o splitting at startup"); + Thread.currentThread().interrupt(); + retrySplitting = false; + } } - } - splitLog(serverNames); + } while (retrySplitting); } - public void splitLog(final ServerName serverName){ + public void splitLog(final ServerName serverName) throws IOException { List serverNames = new ArrayList(); serverNames.add(serverName); splitLog(serverNames); } - public void splitLog(final List serverNames) { + public void splitLog(final List serverNames) throws IOException { long splitTime = 0, splitLogSize = 0; List logDirs = new ArrayList(); for(ServerName serverName: serverNames){ - Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName.toString())); - logDirs.add(logDir); + Path logDir = new Path(this.rootdir, + HLog.getHLogDirectoryName(serverName.toString())); + Path splitDir = logDir.suffix(HLog.SPLITTING_EXT); + // rename the directory so a rogue RS doesn't create more HLogs + if (fs.exists(logDir)) { + if (!this.fs.rename(logDir, splitDir)) { + throw new IOException("Failed fs.rename for log split: " + logDir); + } + logDir = splitDir; + LOG.debug("Renamed region directory: " + splitDir); + } else if (!fs.exists(splitDir)) { + LOG.info("Log dir for server " + serverName + " does not exist"); + continue; + } + logDirs.add(splitDir); + } + + if (logDirs.isEmpty()) { + LOG.info("No logs to split"); + return; } if (distributedLogSplitting) { @@ -240,15 +272,10 @@ public class MasterFileSystem { } splitTime = EnvironmentEdgeManager.currentTimeMillis(); try { - try { - splitLogSize = splitLogManager.splitLogDistributed(logDirs); - } catch (OrphanHLogAfterSplitException e) { - LOG.warn("Retrying distributed splitting for " + - serverNames + "because of:", e); - splitLogManager.splitLogDistributed(logDirs); - } - } catch (IOException e) { - LOG.error("Failed distributed splitting " + serverNames, e); + splitLogSize = splitLogManager.splitLogDistributed(logDirs); + } catch (OrphanHLogAfterSplitException e) { + LOG.warn("Retrying distributed splitting for " + serverNames, e); + splitLogManager.splitLogDistributed(logDirs); } splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; } else { @@ -272,8 +299,6 @@ public class MasterFileSystem { } splitTime = splitter.getTime(); splitLogSize = splitter.getSize(); - } catch (IOException e) { - LOG.error("Failed splitting " + logDir.toString(), e); } finally { this.splitLogLock.unlock(); } diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index 0718479..184d348 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -171,9 +171,14 @@ public class ServerShutdownHandler extends EventHandler { try { - if ( this.shouldSplitHlog ) { + try { LOG.info("Splitting logs for " + serverName); this.services.getMasterFileSystem().splitLog(serverName); + } catch (IOException ioe) { + this.services.getExecutorService().submit(this); + this.deadServers.add(serverName); + throw new IOException("failed log splitting for " + + serverName + ", will retry", ioe); } // Assign root and meta if we were carrying them. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 8e05b07..6d6ec93 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -115,6 +115,10 @@ public class HLog implements Syncable { public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); static final byte [] METAROW = Bytes.toBytes("METAROW"); + /** File Extension used while splitting an HLog into regions (HBASE-2312) */ + public static final String SPLITTING_EXT = "-splitting"; + public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; + /* * Name of directory that holds recovered edits written by the wal log * splitting code, one per region @@ -793,8 +797,7 @@ public class HLog implements Syncable { * @return Path to current writer or null if none. * @throws IOException */ - private Path cleanupCurrentWriter(final long currentfilenum) - throws IOException { + Path cleanupCurrentWriter(final long currentfilenum) throws IOException { Path oldFile = null; if (this.writer != null) { // Close the current writer, get a new one. @@ -809,6 +812,7 @@ public class HLog implements Syncable { sync(); } this.writer.close(); + this.writer = null; closeErrorCount.set(0); } catch (IOException e) { LOG.error("Failed close of HLog writer", e); @@ -944,7 +948,9 @@ public class HLog implements Syncable { if (LOG.isDebugEnabled()) { LOG.debug("closing hlog writer in " + this.dir.toString()); } - this.writer.close(); + if (this.writer != null) { + this.writer.close(); + } } } finally { cacheFlushLock.unlock(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 8986b2c..21747b1 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -367,7 +367,8 @@ public class HLogSplitter { boolean progress_failed = false; - boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true); + boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", + HLog.SPLIT_SKIP_ERRORS_DEFAULT); int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); // How often to send a progress report (default 1/2 master timeout) int period = conf.getInt("hbase.splitlog.report.period", diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 2fee321..a3eb32f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import org.apache.commons.logging.Log; @@ -32,7 +33,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Metadata; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; /** @@ -78,17 +81,50 @@ public class SequenceFileLogWriter implements HLog.Writer { } // Create a SF.Writer instance. - this.writer = SequenceFile.createWriter(fs, conf, path, - keyClass, WALEdit.class, - fs.getConf().getInt("io.file.buffer.size", 4096), - (short) conf.getInt("hbase.regionserver.hlog.replication", - fs.getDefaultReplication()), - conf.getLong("hbase.regionserver.hlog.blocksize", - fs.getDefaultBlockSize()), - SequenceFile.CompressionType.NONE, - new DefaultCodec(), - null, - new Metadata()); + try { + // reflection for a version of SequenceFile.createWriter that doesn't + // automatically create the parent directory (see HBASE-2312) + this.writer = (SequenceFile.Writer) SequenceFile.class + .getMethod("createWriter", new Class[] {FileSystem.class, + Configuration.class, Path.class, Class.class, Class.class, + Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE, + CompressionType.class, CompressionCodec.class, Metadata.class}) + .invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf), + WALEdit.class, + new Integer(fs.getConf().getInt("io.file.buffer.size", 4096)), + new Short((short) + conf.getInt("hbase.regionserver.hlog.replication", + fs.getDefaultReplication())), + new Long(conf.getLong("hbase.regionserver.hlog.blocksize", + fs.getDefaultBlockSize())), + new Boolean(false) /*createParent*/, + SequenceFile.CompressionType.NONE, new DefaultCodec(), + new Metadata() + }); + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw new IOException(ite.getCause()); + } catch (Exception e) { + // ignore all other exceptions. related to reflection failure + } + + // if reflection failed, use the old createWriter + if (this.writer == null) { + LOG.debug("new createWriter -- HADOOP-6840 -- not available"); + this.writer = SequenceFile.createWriter(fs, conf, path, + HLog.getKeyClass(conf), WALEdit.class, + fs.getConf().getInt("io.file.buffer.size", 4096), + (short) conf.getInt("hbase.regionserver.hlog.replication", + fs.getDefaultReplication()), + conf.getLong("hbase.regionserver.hlog.blocksize", + fs.getDefaultBlockSize()), + SequenceFile.CompressionType.NONE, + new DefaultCodec(), + null, + new Metadata()); + } else { + LOG.debug("using new createWriter -- HADOOP-6840"); + } this.writer_out = getSequenceFilePrivateFSDataOutputStreamAccessible(); this.syncFs = getSyncFs(); diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 5fd467d..30ec38a 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -482,15 +482,20 @@ public class ReplicationSource extends Thread Path deadRsDirectory = new Path(manager.getLogDir().getParent(), this.deadRegionServers[i]); - Path possibleLogLocation = - new Path(deadRsDirectory, currentPath.getName()); - LOG.info("Possible location " + possibleLogLocation.toUri().toString()); - if (this.manager.getFs().exists(possibleLogLocation)) { - // We found the right new location - LOG.info("Log " + this.currentPath + " still exists at " + - possibleLogLocation); - // Breaking here will make us sleep since reader is null - return true; + Path[] locs = new Path[] { + new Path(deadRsDirectory, currentPath.getName()), + new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT), + currentPath.getName()), + }; + for (Path possibleLogLocation : locs) { + LOG.info("Possible location " + possibleLogLocation.toUri().toString()); + if (this.manager.getFs().exists(possibleLogLocation)) { + // We found the right new location + LOG.info("Log " + this.currentPath + " still exists at " + + possibleLogLocation); + // Breaking here will make us sleep since reader is null + return true; + } } } // TODO What happens if the log was missing from every single location? diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index 7db8248..8131b92 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -57,6 +58,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -136,6 +138,8 @@ public class TestHLogSplit { assertTrue("Deleting " + dir.getPath(), fs.delete(dir.getPath(), true)); } + // create the HLog directory because recursive log creates are not allowed + fs.mkdirs(hlogDir); seq = 0; regions = new ArrayList(); Collections.addAll(regions, "bbb", "ccc"); @@ -820,7 +824,72 @@ public class TestHLogSplit { assertEquals(regions.size(), outputCounts.size()); } + // HBASE-2312: tests the case where a RegionServer enters a GC pause, + // comes back online after the master declared it dead and started to split. + // Want log rolling after a master split to fail + @Test + @Ignore("Need HADOOP-6886, HADOOP-6840, & HDFS-617 for this. HDFS 0.20.205.1+ should have this") + public void testLogRollAfterSplitStart() throws IOException { + // set flush interval to a large number so it doesn't interrupt us + final String F_INTERVAL = "hbase.regionserver.optionallogflushinterval"; + long oldFlushInterval = conf.getLong(F_INTERVAL, 1000); + conf.setLong(F_INTERVAL, 1000*1000*100); + HLog log = null; + Path thisTestsDir = new Path(hbaseDir, "testLogRollAfterSplitStart"); + try { + // put some entries in an HLog + byte [] tableName = Bytes.toBytes(this.getClass().getName()); + HRegionInfo regioninfo = new HRegionInfo(tableName, + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + log = new HLog(fs, thisTestsDir, oldLogDir, conf); + final int total = 20; + for (int i = 0; i < total; i++) { + WALEdit kvs = new WALEdit(); + kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName)); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor("column")); + log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd); + } + // Send the data to HDFS datanodes and close the HDFS writer + log.sync(); + log.cleanupCurrentWriter(log.getFilenum()); + + /* code taken from ProcessServerShutdown.process() + * handles RS shutdowns (as observed by the Master) + */ + // rename the directory so a rogue RS doesn't create more HLogs + Path rsSplitDir = new Path(thisTestsDir.getParent(), + thisTestsDir.getName() + "-splitting"); + fs.rename(thisTestsDir, rsSplitDir); + LOG.debug("Renamed region directory: " + rsSplitDir); + + // Process the old log files + HLogSplitter splitter = HLogSplitter.createLogSplitter(conf, + hbaseDir, rsSplitDir, oldLogDir, fs); + splitter.splitLog(); + + // Now, try to roll the HLog and verify failure + try { + log.rollWriter(); + Assert.fail("rollWriter() did not throw any exception."); + } catch (IOException ioe) { + if (ioe.getCause().getMessage().contains("FileNotFound")) { + LOG.info("Got the expected exception: ", ioe.getCause()); + } else { + Assert.fail("Unexpected exception: " + ioe); + } + } + } finally { + conf.setLong(F_INTERVAL, oldFlushInterval); + if (log != null) { + log.close(); + } + if (fs.exists(thisTestsDir)) { + fs.delete(thisTestsDir, true); + } + } + } /** * This thread will keep writing to the file after the split process has started @@ -1056,6 +1125,7 @@ public class TestHLogSplit { private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException { makeRegionDirs(fs, regions); + fs.mkdirs(hlogDir); for (int i = 0; i < writers; i++) { writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf); for (int j = 0; j < entries; j++) {