From c7bb66fb1e2640931b28f84ce881ecda08ee7a0f Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 17 Oct 2018 20:51:19 +0800 Subject: [PATCH] HBASE-21323 Should not skip force updating for a sub procedure even if it has been finished Reapplication after fixing failing test. --- .../hadoop/hbase/procedure2/ProcedureExecutor.java | 22 ++++++++++++- .../store/wal/TestForceUpdateProcedure.java | 36 +++++++++++++++++++--- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 9ad73c4e73..f4da3458d8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Deque; import java.util.HashSet; import java.util.Iterator; @@ -376,6 +377,11 @@ public class ProcedureExecutor { this(conf, environment, store, new SimpleProcedureScheduler()); } + private boolean isRootFinished(Procedure proc) { + Procedure rootProc = procedures.get(proc.getRootProcId()); + return rootProc == null || rootProc.isFinished(); + } + private void forceUpdateProcedure(long procId) throws IOException { IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); try { @@ -384,7 +390,9 @@ public class ProcedureExecutor { LOG.debug("No pending procedure with id = {}, skip force updating.", procId); return; } - if (proc.isFinished()) { + // For a sub procedure which root parent has not been finished, we still need to retain the + // wal even if the procedure itself is finished. + if (proc.isFinished() && (!proc.hasParent() || isRootFinished(proc))) { LOG.debug("Procedure {} has already been finished, skip force updating.", proc); return; } @@ -1396,6 +1404,18 @@ public class ProcedureExecutor { return false; } + + /** + * Should only be used when starting up, where the procedure workers have not been started. + *

+ * If the procedure works has been started, the return values maybe changed when you are + * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as + * it will do a copy, and also include the finished procedures. + */ + public Collection> getActiveProceduresNoCopy() { + return procedures.values(); + } + /** * Get procedures. * @return the procedures in a list diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java index 1e27158ae3..df6ee51efe 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java @@ -123,7 +123,34 @@ public class TestForceUpdateProcedure { @Override protected Procedure[] execute(Void env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { - return new Procedure[] { new WaitingProcedure() }; + return new Procedure[] { new DummyProcedure(), new WaitingProcedure() }; + } + + @Override + protected void rollback(Void env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(Void env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + } + + public static final class DummyProcedure extends Procedure { + + @Override + protected Procedure[] execute(Void env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + return null; } @Override @@ -207,12 +234,13 @@ public class TestForceUpdateProcedure { stopStoreAndExecutor(); createStoreAndExecutor(); Map, Procedure> procMap = new HashMap<>(); - EXEC.getProcedures().stream().filter(p -> !p.isFinished()) - .forEach(p -> procMap.put(p.getClass(), p)); - assertEquals(2, procMap.size()); + EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p)); + assertEquals(3, procMap.size()); ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class); assertEquals(ProcedureState.WAITING, parentProc.getState()); WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class); assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState()); + DummyProcedure dummyProc = (DummyProcedure) procMap.get(DummyProcedure.class); + assertEquals(ProcedureState.SUCCESS, dummyProc.getState()); } } -- 2.16.3