diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 54b53dc..4c3b2ac 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; +import org.apache.hadoop.hbase.util.Threads; /** * WAL implementation of the ProcedureStore. @@ -64,7 +65,21 @@ public class WALProcedureStore extends ProcedureStoreBase { void recoverFileLease(FileSystem fs, Path path) throws IOException; } - private static final int MAX_RETRIES_BEFORE_ABORT = 3; + private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY = + "hbase.procedure.store.wal.max.retries.before.roll"; + private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3; + + private static final String WAIT_BEFORE_ROLL_CONF_KEY = + "hbase.procedure.store.wal.wait.before.roll"; + private static final int DEFAULT_WAIT_BEFORE_ROLL = 500; + + private static final String ROLL_RETRIES_CONF_KEY = + "hbase.procedure.store.wal.max.roll.retries"; + private static final int DEFAULT_ROLL_RETRIES = 3; + + private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY = + "hbase.procedure.store.wal.sync.failure.roll.max"; + private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3; private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; private static final int DEFAULT_SYNC_WAIT_MSEC = 100; @@ -98,6 +113,10 @@ public class WALProcedureStore extends ProcedureStoreBase { private Thread syncThread; private ByteSlot[] slots; + private int maxRetriesBeforeRoll; + private int maxSyncFailureRoll; + private int waitBeforeRoll; + private int rollRetries; private long rollThreshold; private boolean useHsync; private int syncWaitMsec; @@ -124,6 +143,10 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Tunings + maxRetriesBeforeRoll = conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL); + maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL); + waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL); + rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES); rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD); syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); @@ -155,6 +178,7 @@ public class WALProcedureStore extends ProcedureStoreBase { if (lock.tryLock()) { try { waitCond.signalAll(); + syncCond.signalAll(); } finally { lock.unlock(); } @@ -399,9 +423,14 @@ public class WALProcedureStore extends ProcedureStoreBase { } private long pushData(final ByteSlot slot) { - assert isRunning() && !logs.isEmpty() : "recoverLease() must be called before inserting data"; - long logId = -1; + if (!isRunning()) { + throw new RuntimeException("the store must be running before inserting data"); + } + if (logs.isEmpty()) { + throw new RuntimeException("recoverLease() must be called before inserting data"); + } + long logId = -1; lock.lock(); try { // Wait for the sync to be completed @@ -434,72 +463,91 @@ public class WALProcedureStore extends ProcedureStoreBase { } catch (InterruptedException e) { Thread.currentThread().interrupt(); sendAbortProcessSignal(); + throw new RuntimeException(e); } finally { lock.unlock(); + if (!isRunning()) { + throw new RuntimeException("sync aborted"); + } } return logId; } private void syncLoop() throws IOException { inSync.set(false); - while (isRunning()) { - lock.lock(); - try { - // Wait until new data is available - if (slotIndex == 0) { - if (LOG.isTraceEnabled()) { - float rollTsSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f; - LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", - StringUtils.humanSize(totalSynced.get()), - StringUtils.humanSize(totalSynced.get() / rollTsSec))); - } - waitCond.await(); + lock.lock(); + try { + while (isRunning()) { + try { + // Wait until new data is available if (slotIndex == 0) { - // no data.. probably a stop() - continue; + if (LOG.isTraceEnabled()) { + float rollTsSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f; + LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", + StringUtils.humanSize(totalSynced.get()), + StringUtils.humanSize(totalSynced.get() / rollTsSec))); + } + waitCond.await(); + if (slotIndex == 0) { + // no data.. probably a stop() + continue; + } } - } - // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing - long syncWaitSt = System.currentTimeMillis(); - if (slotIndex != slots.length) { - slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); - } - long syncWaitMs = System.currentTimeMillis() - syncWaitSt; - if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { - float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f; - LOG.trace("Sync wait " + StringUtils.humanTimeDiff(syncWaitMs) + - ", slotIndex=" + slotIndex + - ", totalSynced=" + StringUtils.humanSize(totalSynced.get()) + - " " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec"); - } + // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing + long syncWaitSt = System.currentTimeMillis(); + if (slotIndex != slots.length) { + slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); + } + long syncWaitMs = System.currentTimeMillis() - syncWaitSt; + if (LOG.isDebugEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { + float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f; + LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s/sec", + StringUtils.humanTimeDiff(syncWaitMs), slotIndex, + StringUtils.humanSize(totalSynced.get()), + StringUtils.humanSize(totalSynced.get() / rollSec))); + } - inSync.set(true); - totalSynced.addAndGet(syncSlots()); - slotIndex = 0; - inSync.set(false); - syncCond.signalAll(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - sendAbortProcessSignal(); - } finally { - lock.unlock(); + inSync.set(true); + totalSynced.addAndGet(syncSlots()); + slotIndex = 0; + inSync.set(false); + syncCond.signalAll(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + sendAbortProcessSignal(); + } } + } finally { + lock.unlock(); } } private long syncSlots() { int retry = 0; + int logRolled = 0; long totalSynced = 0; do { try { totalSynced = syncSlots(stream, slots, 0, slotIndex); break; } catch (Throwable e) { - if (++retry == MAX_RETRIES_BEFORE_ABORT) { - LOG.error("Sync slot failed, abort.", e); - sendAbortProcessSignal(); + if (++retry >= maxRetriesBeforeRoll) { + if (logRolled >= maxSyncFailureRoll) { + LOG.error("Sync slots after log roll failed, abort.", e); + sendAbortProcessSignal(); + while (isRunning()) Thread.yield(); + break; + } + + if (!rollWriterOrDie()) { + while (isRunning()) Thread.yield(); + break; + } + + logRolled++; + retry = 0; } } } while (isRunning()); @@ -529,13 +577,17 @@ public class WALProcedureStore extends ProcedureStoreBase { } private boolean rollWriterOrDie() { - try { - return rollWriter(); - } catch (IOException e) { - LOG.warn("Unable to roll the log", e); - sendAbortProcessSignal(); - return false; + for (int i = 1; i <= rollRetries; ++i) { + try { + return rollWriter(); + } catch (IOException e) { + LOG.warn("Unable to roll the log, attempt=" + i, e); + Threads.sleepWithoutInterrupt(waitBeforeRoll); + } } + LOG.fatal("Unable to roll the log"); + sendAbortProcessSignal(); + throw new RuntimeException("unable to roll the log"); } protected boolean rollWriter() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java index 2576302..7d58ee5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java @@ -64,6 +64,11 @@ public class TestMasterFailoverWithProcedures { protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static void setupConf(Configuration conf) { + // don't waste time retrying with the roll, the test is already slow enough. + conf.setInt("hbase.procedure.store.wal.max.retries.before.roll", 1); + conf.setInt("hbase.procedure.store.wal.wait.before.roll", 0); + conf.setInt("hbase.procedure.store.wal.max.roll.retries", 1); + conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 1); } @Before @@ -127,8 +132,13 @@ public class TestMasterFailoverWithProcedures { HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf("mtb"), "f"); HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null); LOG.debug("submit proc"); - getMasterProcedureExecutor().submitProcedure( - new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions)); + try { + getMasterProcedureExecutor().submitProcedure( + new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions)); + fail("expected RuntimeException 'sync aborted'"); + } catch (RuntimeException e) { + LOG.info("got " + e.getMessage()); + } LOG.debug("wait master store abort"); masterStoreAbort.await(); @@ -140,7 +150,12 @@ public class TestMasterFailoverWithProcedures { // wait the store in here to abort (the test will fail due to timeout if it doesn't) LOG.debug("wait the store to abort"); backupStore3.getStoreTracker().setDeleted(1, false); - backupStore3.delete(1); + try { + backupStore3.delete(1); + fail("expected RuntimeException 'sync aborted'"); + } catch (RuntimeException e) { + LOG.info("got " + e.getMessage()); + } backupStore3Abort.await(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java new file mode 100644 index 0000000..d9aa690 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.DataNode; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, LargeTests.class}) +public class TestWALProcedureStoreOnHDFS { + private static final Log LOG = LogFactory.getLog(TestWALProcedureStoreOnHDFS.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private WALProcedureStore store; + + private static void setupConf(Configuration conf) { + conf.setInt("dfs.replication", 3); + conf.setInt("dfs.namenode.replication.min", 3); + } + + @Before + public void setup() throws Exception { + setupConf(UTIL.getConfiguration()); + MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); + + Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); + store = ProcedureTestingUtility.createWalStore( + UTIL.getConfiguration(), dfs.getFileSystem(), logDir); + store.registerListener(new ProcedureStore.ProcedureStoreListener() { + @Override + public void abortProcess() { + LOG.fatal("Abort the Procedure Store"); + store.stop(true); + } + }); + store.start(8); + store.recoverLease(); + } + + @After + public void tearDown() throws Exception { + store.stop(false); + UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true); + + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Test(timeout=60000, expected=RuntimeException.class) + public void testWalAbortOnLowReplication() throws Exception { + assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); + + LOG.info("Stop DataNode"); + UTIL.getDFSCluster().stopDataNode(0); + assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); + + store.insert(new TestProcedure(1, -1), null); + for (long i = 2; store.isRunning(); ++i) { + assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); + store.insert(new TestProcedure(i, -1), null); + Thread.sleep(100); + } + assertFalse(store.isRunning()); + fail("The store.insert() should throw an exeption"); + } + + @Test(timeout=60000) + public void testWalRollOnLowReplication() throws Exception { + int dnCount = 0; + store.insert(new TestProcedure(1, -1), null); + UTIL.getDFSCluster().restartDataNode(dnCount); + for (long i = 2; i < 100; ++i) { + store.insert(new TestProcedure(i, -1), null); + Thread.sleep(100); + if ((i % 30) == 0) { + LOG.info("Restart Data Node"); + UTIL.getDFSCluster().restartDataNode(++dnCount % 3); + } + } + assertTrue(store.isRunning()); + } + + // ========================================================================== + // Helpers + // ========================================================================== + public static class TestProcedure extends Procedure { + public TestProcedure() {} + + public TestProcedure(long procId, long parentId) { + setProcId(procId); + if (parentId > 0) { + setParentProcId(parentId); + } + } + + public void addStackId(final int index) { + addStackIndex(index); + } + + @Override + protected Procedure[] execute(Void env) { return null; } + + @Override + protected void rollback(Void env) { } + + @Override + protected boolean abort(Void env) { return false; } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { } + } +}