diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 5ac421c..a356f63 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -297,7 +298,13 @@ public class WALProcedureStore extends ProcedureStoreBase { FileStatus[] oldLogs = getLogFiles(); while (isRunning()) { // Get Log-MaxID and recover lease on old logs - flushLogId = initOldLogs(oldLogs); + try { + flushLogId = initOldLogs(oldLogs); + } catch (FileNotFoundException e) { + LOG.warn("someone else is active and deleted logs. retrying.", e); + oldLogs = getLogFiles(); + continue; + } // Create new state-log if (!rollWriter(flushLogId + 1)) { @@ -929,8 +936,9 @@ public class WALProcedureStore extends ProcedureStoreBase { } private FileStatus[] getLogFiles() throws IOException { + FileStatus[] logs = null; try { - return fs.listStatus(logDir, new PathFilter() { + logs = fs.listStatus(logDir, new PathFilter() { @Override public boolean accept(Path path) { String name = path.getName(); @@ -941,6 +949,15 @@ public class WALProcedureStore extends ProcedureStoreBase { LOG.warn("Log directory not found: " + e.getMessage()); return null; } + Arrays.sort(logs, new Comparator() { + @Override + public int compare(FileStatus o1, FileStatus o2) { + final long id1 = getLogIdFromName(o1.getPath().getName()); + final long id2 = getLogIdFromName(o2.getPath().getName()); + return Long.compare(id1, id2); + } + }); + return logs; } private static long getMaxLogId(final FileStatus[] logFiles) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index f964d86..8a244dd 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -468,6 +468,48 @@ public class TestWALProcedureStore { assertEquals(1, procStore.getActiveLogs().size()); } + @Test + public void testFileNotFoundDuringLeaseRecovery() throws IOException { + TestProcedure[] procs = new TestProcedure[3]; + for (int i = 0; i < procs.length; ++i) { + procs[i] = new TestProcedure(i + 1, 0); + procStore.insert(procs[i], null); + } + procStore.rollWriterForTesting(); + for (int i = 0; i < procs.length; ++i) { + procStore.update(procs[i]); + procStore.rollWriterForTesting(); + } + procStore.stop(false); + + FileStatus[] status = fs.listStatus(logDir); + assertEquals(procs.length + 2, status.length); + + // simulate another active master removing the wals + procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir, + new WALProcedureStore.LeaseRecovery() { + private int count = 0; + + @Override + public void recoverFileLease(FileSystem fs, Path path) throws IOException { + if (++count <= 2) { + fs.delete(path, false); + LOG.debug("Simulate FileNotFound at count=" + count + " for " + path); + throw new FileNotFoundException("test file not found " + path); + } + LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path); + } + }); + + final LoadCounter loader = new LoadCounter(); + procStore.start(PROCEDURE_STORE_SLOTS); + procStore.recoverLease(); + procStore.load(loader); + assertEquals(procs.length, loader.getMaxProcId()); + assertEquals(procs.length - 1, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); + } + private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException { assertTrue(logFile.getLen() > dropBytes);