diff --git hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 4c3b2ac..84dedd1 100644 --- hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.util.Threads; +import com.google.common.annotations.VisibleForTesting; + /** * WAL implementation of the ProcedureStore. */ @@ -222,19 +224,10 @@ public class WALProcedureStore extends ProcedureStoreBase { flushLogId = initOldLogs(oldLogs); // Create new state-log - if (!rollWriter(flushLogId + 1)) { + if (!rollWriter()) { // someone else has already created this log LOG.debug("someone else has already created log " + flushLogId); - continue; - } - - // We have the lease on the log - oldLogs = getLogFiles(); - if (getMaxLogId(oldLogs) > flushLogId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); - } - logs.getLast().removeFile(); + Threads.sleepWithoutInterrupt(waitBeforeRoll); continue; } @@ -576,7 +569,8 @@ public class WALProcedureStore extends ProcedureStoreBase { return totalSynced; } - private boolean rollWriterOrDie() { + @VisibleForTesting + public boolean rollWriterOrDie() { for (int i = 1; i <= rollRetries; ++i) { try { return rollWriter(); @@ -591,7 +585,20 @@ public class WALProcedureStore extends ProcedureStoreBase { } protected boolean rollWriter() throws IOException { - return rollWriter(flushLogId + 1); + if (!rollWriter(flushLogId + 1)) { + return false; + } + + // We have the lease on the log + FileStatus[] oldLogs = getLogFiles(); + if (getMaxLogId(oldLogs) > flushLogId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); + } + logs.getLast().removeFile(); + return false; + } + return true; } private boolean rollWriter(final long logId) throws IOException { diff --git hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index a90e056..b5e9541 100644 --- hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,7 +35,6 @@ import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public class ProcedureTestingUtility { private static final Log LOG = LogFactory.getLog(ProcedureTestingUtility.class); @@ -175,4 +176,34 @@ public class ProcedureTestingUtility { assertTrue("expected abort exception, got "+ cause, cause instanceof ProcedureAbortedException); } + + /** + * Helper for testing + */ + public static class SimpleProcedure extends Procedure { + + public SimpleProcedure() { + } + public SimpleProcedure(long procId) { + setProcId(procId); + } + + @Override + protected Procedure[] execute(Void env) throws ProcedureYieldException, InterruptedException { + return null; + } + @Override + protected void rollback(Void env) throws IOException, InterruptedException { + } + @Override + protected boolean abort(Void env) { + return false; + } + @Override + protected void serializeStateData(OutputStream stream) throws IOException { + } + @Override + protected void deserializeStateData(InputStream stream) throws IOException { + } + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java index 7d58ee5..5aeb7c0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.procedure; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; @@ -31,10 +32,18 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; +import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; +import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.SimpleProcedure; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState; @@ -45,7 +54,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.ModifyRegionUtils; - import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -53,7 +61,6 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -159,6 +166,69 @@ public class TestMasterFailoverWithProcedures { backupStore3Abort.await(); } + /** + * Tests proper fencing in case the current WAL store is fenced + */ + @Test + public void testWALfencingWithoutWALRolling() throws IOException { + testWALfencing(false); + } + + /** + * Tests proper fencing in case the current WAL store does not receive writes until after the + * new WAL does a couple of WAL rolls. + */ + @Test + public void testWALfencingWithWALRolling() throws IOException { + testWALfencing(true); + } + + public void testWALfencing(boolean walRolls) throws IOException { + final ProcedureStore procStore = getMasterProcedureExecutor().getStore(); + assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore); + + HMaster firstMaster = UTIL.getHBaseCluster().getMaster(); + + // cause WAL rolling after a delete in WAL: + firstMaster.getConfiguration().setLong("hbase.procedure.store.wal.roll.threshold", 1); + + HMaster backupMaster3 = Mockito.mock(HMaster.class); + Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration(); + Mockito.doReturn(true).when(backupMaster3).isActiveMaster(); + final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(), + firstMaster.getMasterFileSystem().getFileSystem(), + ((WALProcedureStore)procStore).getLogDir(), + new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3)); + + // start a second store which should fence the first one out + LOG.info("Starting new WALProcedureStore"); + procStore2.start(1); + procStore2.recoverLease(); + + // before writing back to the WAL store, optionally do a couple of WAL rolls (which causes + // to delete the old WAL files). + if (walRolls) { + LOG.info("Inserting into second WALProcedureStore, causing WAL rolls"); + for (int i = 0; i < 512; i++) { + // insert something to the second store then delete it, causing a WAL roll(s) + Procedure proc2 = new SimpleProcedure(i); + procStore2.insert(proc2, null); + procStore2.delete(proc2.getProcId()); // delete the procedure so that the WAL is removed later + } + } + + // Now, insert something to the first store, should fail. + // If the store does a WAL roll and continue with another logId without checking higher logIds + // it will incorrectly succeed. + LOG.info("Inserting into first WALProcedureStore"); + try { + procStore.insert(new SimpleProcedure(11), null); + fail("Inserting into Procedure Store should have failed"); + } catch (Exception ex) { + LOG.info("Received expected exception", ex); + } + } + // ========================================================================== // Test Create Table // ==========================================================================