diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 54b53dc..b8c96ec 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -51,6 +51,9 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; +import org.apache.hadoop.hbase.util.Threads; + +import com.google.common.annotations.VisibleForTesting; /** * WAL implementation of the ProcedureStore. @@ -64,7 +67,25 @@ public class WALProcedureStore extends ProcedureStoreBase { void recoverFileLease(FileSystem fs, Path path) throws IOException; } - private static final int MAX_RETRIES_BEFORE_ABORT = 3; + private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY = + "hbase.procedure.store.wal.max.retries.before.roll"; + private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3; + + private static final String WAIT_BEFORE_ROLL_CONF_KEY = + "hbase.procedure.store.wal.wait.before.roll"; + private static final int DEFAULT_WAIT_BEFORE_ROLL = 500; + + private static final String ROLL_RETRIES_CONF_KEY = + "hbase.procedure.store.wal.max.roll.retries"; + private static final int DEFAULT_ROLL_RETRIES = 3; + + private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY = + "hbase.procedure.store.wal.sync.failure.roll.max"; + private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3; + + private static final String PERIODIC_ROLL_CONF_KEY = + "hbase.procedure.store.wal.periodic.roll.msec"; + private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; private static final int DEFAULT_SYNC_WAIT_MSEC = 100; @@ -91,13 +112,18 @@ public class WALProcedureStore extends ProcedureStoreBase { private LinkedTransferQueue slotsCache = null; private Set corruptedLogs = null; private AtomicLong totalSynced = new AtomicLong(0); + private AtomicLong lastRollTs = new AtomicLong(0); private FSDataOutputStream stream = null; - private long lastRollTs = 0; private long flushLogId = 0; private int slotIndex = 0; private Thread syncThread; private ByteSlot[] slots; + private int maxRetriesBeforeRoll; + private int maxSyncFailureRoll; + private int waitBeforeRoll; + private int rollRetries; + private int periodicRollMsec; private long rollThreshold; private boolean useHsync; private int syncWaitMsec; @@ -124,7 +150,12 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Tunings + maxRetriesBeforeRoll = conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL); + maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL); + waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL); + rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES); rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD); + periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL); syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); @@ -155,6 +186,7 @@ public class WALProcedureStore extends ProcedureStoreBase { if (lock.tryLock()) { try { waitCond.signalAll(); + syncCond.signalAll(); } finally { lock.unlock(); } @@ -310,6 +342,9 @@ public class WALProcedureStore extends ProcedureStoreBase { // Update the store tracker synchronized (storeTracker) { storeTracker.insert(proc, subprocs); + if (logId == flushLogId) { + checkAndTryRoll(); + } } } @@ -342,6 +377,7 @@ public class WALProcedureStore extends ProcedureStoreBase { storeTracker.update(proc); if (logId == flushLogId) { removeOldLogs = storeTracker.isUpdated(); + checkAndTryRoll(); } } @@ -377,8 +413,10 @@ public class WALProcedureStore extends ProcedureStoreBase { synchronized (storeTracker) { storeTracker.delete(procId); if (logId == flushLogId) { - if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) { - removeOldLogs = rollWriterOrDie(); + if (storeTracker.isEmpty() || storeTracker.isUpdated()) { + removeOldLogs = checkAndTryRoll(); + } else { + checkAndTryRoll(); } } } @@ -399,9 +437,14 @@ public class WALProcedureStore extends ProcedureStoreBase { } private long pushData(final ByteSlot slot) { - assert isRunning() && !logs.isEmpty() : "recoverLease() must be called before inserting data"; - long logId = -1; + if (!isRunning()) { + throw new RuntimeException("the store must be running before inserting data"); + } + if (logs.isEmpty()) { + throw new RuntimeException("recoverLease() must be called before inserting data"); + } + long logId = -1; lock.lock(); try { // Wait for the sync to be completed @@ -434,72 +477,93 @@ public class WALProcedureStore extends ProcedureStoreBase { } catch (InterruptedException e) { Thread.currentThread().interrupt(); sendAbortProcessSignal(); + throw new RuntimeException(e); } finally { lock.unlock(); + if (!isRunning()) { + throw new RuntimeException("sync aborted"); + } } return logId; } private void syncLoop() throws IOException { inSync.set(false); - while (isRunning()) { - lock.lock(); - try { - // Wait until new data is available - if (slotIndex == 0) { - if (LOG.isTraceEnabled()) { - float rollTsSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f; - LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", - StringUtils.humanSize(totalSynced.get()), - StringUtils.humanSize(totalSynced.get() / rollTsSec))); - } - waitCond.await(); + lock.lock(); + try { + while (isRunning()) { + try { + // Wait until new data is available if (slotIndex == 0) { - // no data.. probably a stop() - continue; + if (LOG.isTraceEnabled()) { + float rollTsSec = getMillisFromLastRoll() / 1000.0f; + LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", + StringUtils.humanSize(totalSynced.get()), + StringUtils.humanSize(totalSynced.get() / rollTsSec))); + } + + waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); + if (slotIndex == 0) { + // no data.. probably a stop() + checkAndTryRoll(); + continue; + } } - } - // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing - long syncWaitSt = System.currentTimeMillis(); - if (slotIndex != slots.length) { - slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); - } - long syncWaitMs = System.currentTimeMillis() - syncWaitSt; - if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { - float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f; - LOG.trace("Sync wait " + StringUtils.humanTimeDiff(syncWaitMs) + - ", slotIndex=" + slotIndex + - ", totalSynced=" + StringUtils.humanSize(totalSynced.get()) + - " " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec"); - } + // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing + long syncWaitSt = System.currentTimeMillis(); + if (slotIndex != slots.length) { + slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); + } + long syncWaitMs = System.currentTimeMillis() - syncWaitSt; + if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { + float rollSec = getMillisFromLastRoll() / 1000.0f; + LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s/sec", + StringUtils.humanTimeDiff(syncWaitMs), slotIndex, + StringUtils.humanSize(totalSynced.get()), + StringUtils.humanSize(totalSynced.get() / rollSec))); + } - inSync.set(true); - totalSynced.addAndGet(syncSlots()); - slotIndex = 0; - inSync.set(false); - syncCond.signalAll(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - sendAbortProcessSignal(); - } finally { - lock.unlock(); + inSync.set(true); + totalSynced.addAndGet(syncSlots()); + slotIndex = 0; + inSync.set(false); + syncCond.signalAll(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + sendAbortProcessSignal(); + } } + } finally { + lock.unlock(); } } private long syncSlots() { int retry = 0; + int logRolled = 0; long totalSynced = 0; do { try { totalSynced = syncSlots(stream, slots, 0, slotIndex); break; } catch (Throwable e) { - if (++retry == MAX_RETRIES_BEFORE_ABORT) { - LOG.error("Sync slot failed, abort.", e); - sendAbortProcessSignal(); + if (++retry >= maxRetriesBeforeRoll) { + if (logRolled >= maxSyncFailureRoll) { + LOG.error("Sync slots after log roll failed, abort.", e); + sendAbortProcessSignal(); + while (isRunning()) Thread.yield(); + break; + } + + if (!rollWriterOrDie()) { + while (isRunning()) Thread.yield(); + break; + } + + logRolled++; + retry = 0; } } } while (isRunning()); @@ -528,14 +592,45 @@ public class WALProcedureStore extends ProcedureStoreBase { return totalSynced; } - private boolean rollWriterOrDie() { - try { - return rollWriter(); - } catch (IOException e) { - LOG.warn("Unable to roll the log", e); - sendAbortProcessSignal(); - return false; + @VisibleForTesting + public boolean rollWriterOrDie() { + for (int i = 1; i <= rollRetries; ++i) { + try { + if (rollWriter()) { + return true; + } + } catch (IOException e) { + LOG.warn("Unable to roll the log, attempt=" + i, e); + Threads.sleepWithoutInterrupt(waitBeforeRoll); + } + } + LOG.fatal("Unable to roll the log"); + sendAbortProcessSignal(); + throw new RuntimeException("unable to roll the log"); + } + + protected boolean checkAndTryRoll() { + if (!isRunning()) return false; + + if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { + try { + return rollWriter(); + } catch (IOException e) { + LOG.warn("Unable to roll the log", e); + } + } + return false; + } + + private long getMillisToNextPeriodicRoll() { + if (lastRollTs.get() > 0 && periodicRollMsec > 0) { + return periodicRollMsec - getMillisFromLastRoll(); } + return Long.MAX_VALUE; + } + + private long getMillisFromLastRoll() { + return (System.currentTimeMillis() - lastRollTs.get()); } protected boolean rollWriter() throws IOException { @@ -573,7 +668,7 @@ public class WALProcedureStore extends ProcedureStoreBase { stream = newStream; flushLogId = logId; totalSynced.set(0); - lastRollTs = System.currentTimeMillis(); + lastRollTs.set(System.currentTimeMillis()); logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos)); } finally { lock.unlock(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java index 2576302..1103ff0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.procedure; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; @@ -31,10 +32,12 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.wal.TestWALProcedureStore.TestSequentialProcedure; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState; @@ -45,7 +48,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.ModifyRegionUtils; - import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -53,7 +55,6 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -64,6 +65,11 @@ public class TestMasterFailoverWithProcedures { protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static void setupConf(Configuration conf) { + // don't waste time retrying with the roll, the test is already slow enough. + conf.setInt("hbase.procedure.store.wal.max.retries.before.roll", 1); + conf.setInt("hbase.procedure.store.wal.wait.before.roll", 0); + conf.setInt("hbase.procedure.store.wal.max.roll.retries", 1); + conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 1); } @Before @@ -127,8 +133,13 @@ public class TestMasterFailoverWithProcedures { HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf("mtb"), "f"); HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null); LOG.debug("submit proc"); - getMasterProcedureExecutor().submitProcedure( - new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions)); + try { + getMasterProcedureExecutor().submitProcedure( + new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions)); + fail("expected RuntimeException 'sync aborted'"); + } catch (RuntimeException e) { + LOG.info("got " + e.getMessage()); + } LOG.debug("wait master store abort"); masterStoreAbort.await(); @@ -140,10 +151,52 @@ public class TestMasterFailoverWithProcedures { // wait the store in here to abort (the test will fail due to timeout if it doesn't) LOG.debug("wait the store to abort"); backupStore3.getStoreTracker().setDeleted(1, false); - backupStore3.delete(1); + try { + backupStore3.delete(1); + fail("expected RuntimeException 'sync aborted'"); + } catch (RuntimeException e) { + LOG.info("got " + e.getMessage()); + } backupStore3Abort.await(); } + @Test(timeout=60000) + public void testWALfencingWithWALRolling() throws IOException { + final ProcedureStore procStore = getMasterProcedureExecutor().getStore(); + assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore); + + HMaster firstMaster = UTIL.getHBaseCluster().getMaster(); + + HMaster backupMaster3 = Mockito.mock(HMaster.class); + Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration(); + Mockito.doReturn(true).when(backupMaster3).isActiveMaster(); + final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(), + firstMaster.getMasterFileSystem().getFileSystem(), + ((WALProcedureStore)procStore).getLogDir(), + new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3)); + + // start a second store which should fence the first one out + LOG.info("Starting new WALProcedureStore"); + procStore2.start(1); + procStore2.recoverLease(); + + LOG.info("Inserting into second WALProcedureStore"); + // insert something to the second store then delete it, causing a WAL roll + Procedure proc2 = new TestSequentialProcedure(); + procStore2.insert(proc2, null); + procStore2.rollWriterOrDie(); + + LOG.info("Inserting into first WALProcedureStore"); + // insert something to the first store + proc2 = new TestSequentialProcedure(); + try { + procStore.insert(proc2, null); + fail("expected RuntimeException 'sync aborted'"); + } catch (RuntimeException e) { + LOG.info("got " + e.getMessage()); + } + } + // ========================================================================== // Test Create Table // ========================================================================== diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java new file mode 100644 index 0000000..d9aa690 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.DataNode; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, LargeTests.class}) +public class TestWALProcedureStoreOnHDFS { + private static final Log LOG = LogFactory.getLog(TestWALProcedureStoreOnHDFS.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private WALProcedureStore store; + + private static void setupConf(Configuration conf) { + conf.setInt("dfs.replication", 3); + conf.setInt("dfs.namenode.replication.min", 3); + } + + @Before + public void setup() throws Exception { + setupConf(UTIL.getConfiguration()); + MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); + + Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); + store = ProcedureTestingUtility.createWalStore( + UTIL.getConfiguration(), dfs.getFileSystem(), logDir); + store.registerListener(new ProcedureStore.ProcedureStoreListener() { + @Override + public void abortProcess() { + LOG.fatal("Abort the Procedure Store"); + store.stop(true); + } + }); + store.start(8); + store.recoverLease(); + } + + @After + public void tearDown() throws Exception { + store.stop(false); + UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true); + + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Test(timeout=60000, expected=RuntimeException.class) + public void testWalAbortOnLowReplication() throws Exception { + assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); + + LOG.info("Stop DataNode"); + UTIL.getDFSCluster().stopDataNode(0); + assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); + + store.insert(new TestProcedure(1, -1), null); + for (long i = 2; store.isRunning(); ++i) { + assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); + store.insert(new TestProcedure(i, -1), null); + Thread.sleep(100); + } + assertFalse(store.isRunning()); + fail("The store.insert() should throw an exeption"); + } + + @Test(timeout=60000) + public void testWalRollOnLowReplication() throws Exception { + int dnCount = 0; + store.insert(new TestProcedure(1, -1), null); + UTIL.getDFSCluster().restartDataNode(dnCount); + for (long i = 2; i < 100; ++i) { + store.insert(new TestProcedure(i, -1), null); + Thread.sleep(100); + if ((i % 30) == 0) { + LOG.info("Restart Data Node"); + UTIL.getDFSCluster().restartDataNode(++dnCount % 3); + } + } + assertTrue(store.isRunning()); + } + + // ========================================================================== + // Helpers + // ========================================================================== + public static class TestProcedure extends Procedure { + public TestProcedure() {} + + public TestProcedure(long procId, long parentId) { + setProcId(procId); + if (parentId > 0) { + setParentProcId(parentId); + } + } + + public void addStackId(final int index) { + addStackIndex(index); + } + + @Override + protected Procedure[] execute(Void env) { return null; } + + @Override + protected void rollback(Void env) { } + + @Override + protected boolean abort(Void env) { return false; } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { } + } +}