diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 8eb1108..d1d2c98 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -174,6 +174,9 @@ public class ProcedureExecutor { private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl"; private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min + private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size"; + private static final int DEFAULT_BATCH_SIZE = 32; + private final Map completed; private final Map nonceKeysToProcIdsMap; private final ProcedureStore store; @@ -201,6 +204,10 @@ public class ProcedureExecutor { final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL); final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL); + final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); + + final long[] batchIds = new long[batchSize]; + int batchCount = 0; final long now = EnvironmentEdgeManager.currentTime(); final Iterator> it = completed.entrySet().iterator(); @@ -215,15 +222,22 @@ public class ProcedureExecutor { if (isDebugEnabled) { LOG.debug("Evict completed procedure: " + procInfo); } - store.delete(entry.getKey()); + batchIds[batchCount++] = entry.getKey(); + if (batchCount == batchIds.length) { + store.delete(batchIds, 0, batchCount); + batchCount = 0; + } it.remove(); - NonceKey nonceKey = procInfo.getNonceKey(); + final NonceKey nonceKey = procInfo.getNonceKey(); if (nonceKey != null) { nonceKeysToProcIdsMap.remove(nonceKey); } } } + if (batchCount > 0) { + store.delete(batchIds, 0, batchCount); + } } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index c9808a1..82ef8f0 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -75,4 +75,9 @@ public class NoopProcedureStore extends ProcedureStoreBase { public void delete(Procedure proc, long[] subprocs) { // no-op } + + @Override + public void delete(long[] procIds, int offset, int count) { + // no-op + } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 11216d8..7df5226 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -196,4 +196,14 @@ public interface ProcedureStore { * @param subProcIds the IDs of the sub-procedure to remove. */ void delete(Procedure parentProc, long[] subProcIds); + + /** + * The specified procIds were removed from the executor, + * due to completion, abort or failure. + * The store implementor should remove all the information about the specified procIds. + * @param procIds the IDs of the procedures to remove. + * @param offset the array offset from where to start to delete + * @param count the number of IDs to delete + */ + void delete(long[] procIds, int offset, int count); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 1e60402..3a46f8f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -465,6 +465,8 @@ public class WALProcedureStore extends ProcedureStoreBase { @Override public void delete(final Procedure proc, final long[] subProcIds) { + assert proc != null : "expected a non-null procedure"; + assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; if (LOG.isTraceEnabled()) { LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds)); } @@ -486,6 +488,42 @@ public class WALProcedureStore extends ProcedureStoreBase { } } + @Override + public void delete(final long[] procIds, final int offset, final int count) { + if (count == 0) return; + if (offset == 0 && count == procIds.length) { + delete(procIds); + } else if (count == 1) { + delete(procIds[offset]); + } else { + delete(Arrays.copyOfRange(procIds, offset, offset + count)); + } + } + + private void delete(final long[] procIds) { + if (LOG.isTraceEnabled()) { + LOG.trace("Delete " + Arrays.toString(procIds)); + } + + final ByteSlot slot = acquireSlot(); + try { + // Serialize the delete + for (int i = 0; i < procIds.length; ++i) { + ProcedureWALFormat.writeDelete(slot, procIds[i]); + } + + // Push the transaction data and wait until it is persisted + pushData(PushType.DELETE, slot, -1, procIds); + } catch (IOException e) { + // We are not able to serialize the procedure. + // this is a code error, and we are not able to go on. + LOG.fatal("Unable to serialize the procedures: " + Arrays.toString(procIds), e); + throw new RuntimeException(e); + } finally { + releaseSlot(slot); + } + } + private ByteSlot acquireSlot() { ByteSlot slot = slotsCache.poll(); return slot != null ? slot : new ByteSlot(); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index d767a0f..0b85ff8 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -97,7 +97,7 @@ public class ProcedureTestingUtility { procStore.load(loader); } - public static void storeRestartAndAssert(ProcedureStore procStore, long maxProcId, + public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId, long runnableCount, int completedCount, int corruptedCount) throws Exception { final LoadCounter loader = new LoadCounter(); storeRestart(procStore, loader); @@ -105,6 +105,7 @@ public class ProcedureTestingUtility { assertEquals(runnableCount, loader.getRunnableCount()); assertEquals(completedCount, loader.getCompletedCount()); assertEquals(corruptedCount, loader.getCorruptedCount()); + return loader; } public static void setKillBeforeStoreUpdate(ProcedureExecutor procExecutor, @@ -366,6 +367,15 @@ public class ProcedureTestingUtility { return corrupted.size(); } + public boolean isRunnable(final long procId) { + for (Procedure proc: runnable) { + if (proc.getProcId() == procId) { + return true; + } + } + return false; + } + @Override public void setMaxProcId(long maxProcId) { this.maxProcId = maxProcId; diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 5353d62..009b6bc 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -404,13 +404,13 @@ public class TestWALProcedureStore { procStore.insert(procs[1], null); procStore.insert(procs[2], null); procStore.insert(procs[3], null); - procStore.delete(procs[0], null); + procStore.delete(procs[0].getProcId()); procStore.rollWriterForTesting(); - procStore.delete(procs[2], null); + procStore.delete(procs[2].getProcId()); procStore.update(procs[3]); procStore.insert(procs[4], null); procStore.rollWriterForTesting(); - procStore.delete(procs[4], null); + procStore.delete(procs[4].getProcId()); procStore.insert(procs[5], null); // Stop the store @@ -737,9 +737,22 @@ public class TestWALProcedureStore { restartAndAssert(3, 0, 1, 0); } - private void restartAndAssert(long maxProcId, long runnableCount, + @Test + public void testBatchDelete() throws Exception { + for (int i = 1; i < 10; ++i) { + procStore.insert(new TestProcedure(i), null); + } + final long[] toDelete = new long[] { 2, 4, 6, 8 }; + procStore.delete(toDelete, 0, toDelete.length); + LoadCounter loader = restartAndAssert(9, 5, 0, 0); + for (int i = 1; i < 10; ++i) { + assertEquals(i % 2 != 0, loader.isRunnable(i)); + } + } + + private LoadCounter restartAndAssert(long maxProcId, long runnableCount, int completedCount, int corruptedCount) throws Exception { - ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, + return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, runnableCount, completedCount, corruptedCount); }