From a2ba873cfe3cfb5c1c6409346c66df3ece0828c1 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 20 Sep 2018 16:53:58 -0700 Subject: [PATCH] HBASE-21269 Forward-port to branch-2 " HBASE-21213 [hbck2] bypass leaves behind state in RegionStates when assign/unassign" Below is commit message for forward-port. Needs amendment. This patch is currenlty unfinished too... HBASE-21213 [hbck2] bypass leaves behind state in RegionStates when assign/unassign Adds override to assigns and unassigns. Changes bypass 'force' to align calling the param 'override' instead. Adds recursive to 'bypass', a means of calling bypass on parent and its subprocedures (usually bypass works on leaf nodes rippling the bypass up to parent -- recursive has us work in the opposite direction): EXPERIMENTAL. bypass on an assign/unassign leaves region in RIT and the RegionStateNode loaded with the bypassed procedure. First implementation had assign/unassign cleanup leftover state. Second implementation, on feedback, keeps the state in place as a fence against other Procedures assuming the region entity, and instead adds an 'override' function that hbck2 can set on assigns/unassigns to override the fencing. Note that the below also converts ProcedureExceptions that come out of the Pv2 system into DoNotRetryIOEs. It is a little awkward because DNRIOE is in client-module, not in procedure module. Previous, we'd just keep retrying the bypass, etc. M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java Have bypass take an environment like all other methods so subclasses. Fix javadoc issues. M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java Javadoc issues. Pass environment when we invoke bypass. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Rename waitUntilNamespace... etc. to align with how these method types are named elsehwere .. i.e. waitFor rather than waitUntil.. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java Cleanup message we emit when we find an exisitng procedure working against this entity. Add support for a force function which allows Assigns/Unassigns force ownership of the Region entity. A hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionBypass.java Test bypass and force. M hbase-shell/src/main/ruby/shell/commands/list_procedures.rb Minor cleanup of the json output... do iso8601 timestamps. --- .../org/apache/hadoop/hbase/client/HBaseHbck.java | 15 +- .../java/org/apache/hadoop/hbase/client/Hbck.java | 35 ++- .../hbase/shaded/protobuf/RequestConverter.java | 10 +- .../apache/hadoop/hbase/procedure2/Procedure.java | 29 +- .../hadoop/hbase/procedure2/ProcedureExecutor.java | 58 +++- .../hadoop/hbase/procedure2/ProcedureUtil.java | 2 +- .../hbase/procedure2/RemoteProcedureException.java | 6 +- .../hbase/procedure2/TestProcedureBypass.java | 23 +- .../src/main/protobuf/Master.proto | 6 +- .../src/main/protobuf/MasterProcedure.proto | 2 + .../hbase/rsgroup/RSGroupInfoManagerImpl.java | 4 +- .../balancer/TestRSGroupBasedLoadBalancer.java | 333 +++++++++++++++++++++ .../org/apache/hadoop/hbase/master/HMaster.java | 8 +- .../hadoop/hbase/master/MasterRpcServices.java | 51 ++-- .../hbase/master/assignment/AssignProcedure.java | 4 + .../assignment/RegionTransitionProcedure.java | 24 +- .../hbase/master/assignment/UnassignProcedure.java | 5 +- .../hbase/master/balancer/BaseLoadBalancer.java | 13 +- .../master/procedure/MasterProcedureUtil.java | 16 + .../hbase/master/procedure/ProcedureDescriber.java | 3 +- .../master/procedure/ProcedurePrepareLatch.java | 2 +- .../hbase/master/procedure/ProcedureSyncWait.java | 5 +- .../apache/hadoop/hbase/TestMetaTableAccessor.java | 4 +- .../org/apache/hadoop/hbase/client/TestHbck.java | 3 +- .../hbase/master/assignment/TestRegionBypass.java | 181 +++++++++++ .../main/ruby/shell/commands/list_procedures.rb | 7 +- .../org/apache/hadoop/hbase/client/TestShell.java | 1 - hbase-shell/src/test/ruby/shell/shell_test.rb | 6 +- 28 files changed, 752 insertions(+), 104 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionBypass.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java index a8daa7b5db..2d088253a4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java @@ -102,11 +102,12 @@ public class HBaseHbck implements Hbck { } @Override - public List assigns(List encodedRegionNames) throws IOException { + public List assigns(List encodedRegionNames, boolean override) + throws IOException { try { MasterProtos.AssignsResponse response = this.hbck.assigns(rpcControllerFactory.newController(), - RequestConverter.toAssignRegionsRequest(encodedRegionNames)); + RequestConverter.toAssignRegionsRequest(encodedRegionNames, override)); return response.getPidList(); } catch (ServiceException se) { LOG.debug(toCommaDelimitedString(encodedRegionNames), se); @@ -115,11 +116,12 @@ public class HBaseHbck implements Hbck { } @Override - public List unassigns(List encodedRegionNames) throws IOException { + public List unassigns(List encodedRegionNames, boolean override) + throws IOException { try { MasterProtos.UnassignsResponse response = this.hbck.unassigns(rpcControllerFactory.newController(), - RequestConverter.toUnassignRegionsRequest(encodedRegionNames)); + RequestConverter.toUnassignRegionsRequest(encodedRegionNames, override)); return response.getPidList(); } catch (ServiceException se) { LOG.debug(toCommaDelimitedString(encodedRegionNames), se); @@ -132,7 +134,8 @@ public class HBaseHbck implements Hbck { } @Override - public List bypassProcedure(List pids, long waitTime, boolean force) + public List bypassProcedure(List pids, long waitTime, boolean override, + boolean recursive) throws IOException { MasterProtos.BypassProcedureResponse response = ProtobufUtil.call( new Callable() { @@ -141,7 +144,7 @@ public class HBaseHbck implements Hbck { try { return hbck.bypassProcedure(rpcControllerFactory.newController(), MasterProtos.BypassProcedureRequest.newBuilder().addAllProcId(pids). - setWaitTime(waitTime).setForce(force).build()); + setWaitTime(waitTime).setOverride(override).setRecursive(recursive).build()); } catch (Throwable t) { LOG.error(pids.stream().map(i -> i.toString()). collect(Collectors.joining(", ")), t); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Hbck.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Hbck.java index 5c9a862feb..5c97d97daa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Hbck.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Hbck.java @@ -28,11 +28,14 @@ import org.apache.yetus.audience.InterfaceAudience; /** * Hbck fixup tool APIs. Obtain an instance from {@link ClusterConnection#getHbck()} and call * {@link #close()} when done. - *

WARNING: the below methods can damage the cluster. For experienced users only. + *

WARNING: the below methods can damage the cluster. It may leave the cluster in an + * indeterminate state, e.g. region not assigned, or some hdfs files left behind. After running + * any of the below, operators may have to do some clean up on hdfs or schedule some assign + * procedures to get regions back online. DO AT YOUR OWN RISK. For experienced users only. * * @see ConnectionFactory * @see ClusterConnection - * @since 2.2.0 + * @since 2.0.2, 2.1.1 */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK) public interface Hbck extends Abortable, Closeable { @@ -49,22 +52,38 @@ public interface Hbck extends Abortable, Closeable { * -- good if many Regions to online -- and it will schedule the assigns even in the case where * Master is initializing (as long as the ProcedureExecutor is up). Does NOT call Coprocessor * hooks. + * @param override You need to add the override for case where a region has previously been + * bypassed. When a Procedure has been bypassed, a Procedure will have completed + * but no other Procedure will be able to make progress on the target entity + * (intentionally). This override flag will override this fencing mechanism. * @param encodedRegionNames Region encoded names; e.g. 1588230740 is the hard-coded encoding * for hbase:meta region and de00010733901a05f5a2a3a382e27dd4 is an * example of what a random user-space encoded Region name looks like. */ - List assigns(List encodedRegionNames) throws IOException; + List assigns(List encodedRegionNames, boolean override) throws IOException; + + default List assigns(List encodedRegionNames) throws IOException { + return assigns(encodedRegionNames, false); + } /** * Like {@link Admin#unassign(byte[], boolean)} but 'raw' in that it can do more than one Region * at a time -- good if many Regions to offline -- and it will schedule the assigns even in the * case where Master is initializing (as long as the ProcedureExecutor is up). Does NOT call * Coprocessor hooks. + * @param override You need to add the override for case where a region has previously been + * bypassed. When a Procedure has been bypassed, a Procedure will have completed + * but no other Procedure will be able to make progress on the target entity + * (intentionally). This override flag will override this fencing mechanism. * @param encodedRegionNames Region encoded names; e.g. 1588230740 is the hard-coded encoding * for hbase:meta region and de00010733901a05f5a2a3a382e27dd4 is an * example of what a random user-space encoded Region name looks like. */ - List unassigns(List encodedRegionNames) throws IOException; + List unassigns(List encodedRegionNames, boolean override) throws IOException; + + default List unassigns(List encodedRegionNames) throws IOException { + return unassigns(encodedRegionNames, false); + } /** * Bypass specified procedure and move it to completion. Procedure is marked completed but @@ -73,9 +92,13 @@ public interface Hbck extends Abortable, Closeable { * * @param pids of procedures to complete. * @param waitTime wait time in ms for acquiring lock for a procedure - * @param force if force set to true, we will bypass the procedure even if it is executing. + * @param override if override set to true, we will bypass the procedure even if it is executing. * This is for procedures which can't break out during execution (bugs?). + * @param recursive If set, if a parent procedure, we will find and bypass children and then + * the parent procedure (Dangerous but useful in case where child procedure has been 'lost'). + * Does not always work. Experimental. * @return true if procedure is marked for bypass successfully, false otherwise */ - List bypassProcedure(List pids, long waitTime, boolean force) throws IOException; + List bypassProcedure(List pids, long waitTime, boolean override, boolean recursive) + throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 6ab81e22f2..7db8b0225d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -1883,16 +1883,18 @@ public final class RequestConverter { // HBCK2 public static MasterProtos.AssignsRequest toAssignRegionsRequest( - List encodedRegionNames) { + List encodedRegionNames, boolean override) { MasterProtos.AssignsRequest.Builder b = MasterProtos.AssignsRequest.newBuilder(); - return b.addAllRegion(toEncodedRegionNameRegionSpecifiers(encodedRegionNames)).build(); + return b.addAllRegion(toEncodedRegionNameRegionSpecifiers(encodedRegionNames)). + setOverride(override).build(); } public static MasterProtos.UnassignsRequest toUnassignRegionsRequest( - List encodedRegionNames) { + List encodedRegionNames, boolean override) { MasterProtos.UnassignsRequest.Builder b = MasterProtos.UnassignsRequest.newBuilder(); - return b.addAllRegion(toEncodedRegionNameRegionSpecifiers(encodedRegionNames)).build(); + return b.addAllRegion(toEncodedRegionNameRegionSpecifiers(encodedRegionNames)). + setOverride(override).build(); } private static List toEncodedRegionNameRegionSpecifiers( diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 08c7ce3bab..2b66961d7d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -145,12 +145,16 @@ public abstract class Procedure implements Comparable

Bypassing a procedure is not like aborting. Aborting a procedure will trigger * a rollback. And since the {@link #abort(Object)} method is overrideable * Some procedures may have chosen to ignore the aborting. */ @@ -175,12 +179,15 @@ public abstract class Procedure implements Comparable implements Comparable { private Configuration conf; /** - * Created in the {@link #start(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing + * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing * resource handling rather than observing in a #join is unexpected). * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery * (Should be ok). @@ -322,7 +322,7 @@ public class ProcedureExecutor { private ThreadGroup threadGroup; /** - * Created in the {@link #start(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing + * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing * resource handling rather than observing in a #join is unexpected). * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery * (Should be ok). @@ -330,7 +330,7 @@ public class ProcedureExecutor { private CopyOnWriteArrayList workerThreads; /** - * Created in the {@link #start(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing + * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing * resource handling rather than observing in a #join is unexpected). * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery * (Should be ok). @@ -976,7 +976,7 @@ public class ProcedureExecutor { * Bypass a procedure. If the procedure is set to bypass, all the logic in * execute/rollback will be ignored and it will return success, whatever. * It is used to recover buggy stuck procedures, releasing the lock resources - * and letting other procedures to run. Bypassing one procedure (and its ancestors will + * and letting other procedures run. Bypassing one procedure (and its ancestors will * be bypassed automatically) may leave the cluster in a middle state, e.g. region * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, * the operators may have to do some clean up on hdfs or schedule some assign procedures @@ -1003,34 +1003,38 @@ public class ProcedureExecutor { * there. We need to restart the master after bypassing, and letting the problematic * procedure to execute wth bypass=true, so in that condition, the procedure can be * successfully bypassed. + * @param recursive We will do an expensive search for children of each pid. EXPENSIVE! * @return true if bypass success * @throws IOException IOException */ - public List bypassProcedure(List pids, long lockWait, boolean force) + public List bypassProcedure(List pids, long lockWait, boolean force, + boolean recursive) throws IOException { List result = new ArrayList(pids.size()); for(long pid: pids) { - result.add(bypassProcedure(pid, lockWait, force)); + result.add(bypassProcedure(pid, lockWait, force, recursive)); } return result; } - boolean bypassProcedure(long pid, long lockWait, boolean force) throws IOException { - Procedure procedure = getProcedure(pid); + boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) + throws IOException { + final Procedure procedure = getProcedure(pid); if (procedure == null) { - LOG.debug("Procedure with id={} does not exist, skipping bypass", pid); + LOG.debug("Procedure pid={} does not exist, skipping bypass", pid); return false; } - LOG.debug("Begin bypass {} with lockWait={}, force={}", procedure, lockWait, force); + LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", + procedure, lockWait, override, recursive); IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait); - if (lockEntry == null && !force) { + if (lockEntry == null && !override) { LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", - lockWait, procedure, force); + lockWait, procedure, override); return false; } else if (lockEntry == null) { LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", - lockWait, procedure, force); + lockWait, procedure, override); } try { // check whether the procedure is already finished @@ -1040,8 +1044,29 @@ public class ProcedureExecutor { } if (procedure.hasChildren()) { - LOG.debug("{} has children, skipping bypass", procedure); - return false; + if (recursive) { + // EXPENSIVE. Checks each live procedure of which there could be many!!! + // Is there another way to get children of a procedure? + LOG.info("Recursive bypass on children of pid={}", procedure.getProcId()); + this.procedures.forEachValue(1 /*Single-threaded*/, + // Transformer + v -> { + return v.getParentProcId() == procedure.getProcId()? v: null; + }, + // Consumer + v -> { + boolean result = false; + IOException ioe = null; + try { + result = bypassProcedure(v.getProcId(), lockWait, override, recursive); + } catch (IOException e) { + LOG.warn("Recursive bypass of pid={}", v.getProcId(), e); + } + }); + } else { + LOG.debug("{} has children, skipping bypass", procedure); + return false; + } } // If the procedure has no parent or no child, we are safe to bypass it in whatever state @@ -1051,6 +1076,7 @@ public class ProcedureExecutor { LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " + "(with no parent), {}", procedure); + // Question: how is the bypass done here? return false; } @@ -1060,7 +1086,7 @@ public class ProcedureExecutor { Procedure current = procedure; while (current != null) { LOG.debug("Bypassing {}", current); - current.bypass(); + current.bypass(getEnvironment()); store.update(procedure); long parentID = current.getParentProcId(); current = getProcedure(parentID); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java index 27802efecd..b7362200d0 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java @@ -272,7 +272,7 @@ public final class ProcedureUtil { } if (proto.getBypass()) { - proc.bypass(); + proc.bypass(null); } ProcedureStateSerializer serializer = null; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java index 6f4795d9dc..612ccf27cc 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; @@ -37,7 +36,6 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil; * their stacks traces and messages overridden to reflect the original 'remote' exception. */ @InterfaceAudience.Private -@InterfaceStability.Evolving @SuppressWarnings("serial") public class RemoteProcedureException extends ProcedureException { @@ -74,6 +72,10 @@ public class RemoteProcedureException extends ProcedureException { return new Exception(cause); } + // NOTE: Does not throw DoNotRetryIOE because it does not + // have access (DNRIOE is in the client module). Use + // MasterProcedureUtil.unwrapRemoteIOException if need to + // throw DNRIOE. public IOException unwrapRemoteIOException() { final Exception cause = unwrapRemoteException(); if (cause instanceof IOException) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java index d58d57e76e..739d1612c3 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java @@ -87,7 +87,7 @@ public class TestProcedureBypass { long id = procExecutor.submitProcedure(proc); Thread.sleep(500); //bypass the procedure - assertTrue(procExecutor.bypassProcedure(id, 30000, false)); + assertTrue(procExecutor.bypassProcedure(id, 30000, false, false)); htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); LOG.info("{} finished", proc); } @@ -98,7 +98,7 @@ public class TestProcedureBypass { long id = procExecutor.submitProcedure(proc); Thread.sleep(500); //bypass the procedure - assertTrue(procExecutor.bypassProcedure(id, 1000, true)); + assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); //Since the procedure is stuck there, we need to restart the executor to recovery. ProcedureTestingUtility.restart(procExecutor); htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); @@ -114,12 +114,24 @@ public class TestProcedureBypass { .size() > 0); SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream() .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); - assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false)); + assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false, false)); htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); LOG.info("{} finished", proc); } - + @Test + public void testBypassingProcedureWithParentRecursive() throws Exception { + final RootProcedure proc = new RootProcedure(); + long rootId = procExecutor.submitProcedure(proc); + htu.waitFor(5000, () -> procExecutor.getProcedures().stream() + .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()) + .size() > 0); + SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream() + .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); + assertTrue(procExecutor.bypassProcedure(rootId, 1000, false, true)); + htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); + LOG.info("{} finished", proc); + } @AfterClass public static void tearDown() throws Exception { @@ -179,7 +191,4 @@ public class TestProcedureBypass { } } } - - - } diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index b2a3bda095..afc6e64cd3 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -99,6 +99,7 @@ message MergeTableRegionsResponse { message AssignRegionRequest { required RegionSpecifier region = 1; + optional bool override = 2 [default = false]; } message AssignRegionResponse { @@ -1005,6 +1006,7 @@ message SetTableStateInMetaRequest { // Region at a time. message AssignsRequest { repeated RegionSpecifier region = 1; + optional bool override = 2 [default = false]; } /** Like Admin's AssignRegionResponse except it can @@ -1019,6 +1021,7 @@ message AssignsResponse { */ message UnassignsRequest { repeated RegionSpecifier region = 1; + optional bool override = 2 [default = false]; } /** Like Admin's UnassignRegionResponse except it can @@ -1031,7 +1034,8 @@ message UnassignsResponse { message BypassProcedureRequest { repeated uint64 proc_id = 1; optional uint64 waitTime = 2; // wait time in ms to acquire lock on a procedure - optional bool force = 3; // if true, procedure is marked for bypass even if its executing + optional bool override = 3 [default = false]; // if true, procedure is marked for bypass even if its executing + optional bool recursive = 4; } message BypassProcedureResponse { diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index d839167278..67b5b69516 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -330,6 +330,7 @@ message AssignRegionStateData { optional ServerName target_server = 4; // Current attempt index used for expotential backoff when stuck optional int32 attempt = 5; + optional bool override = 6 [default = false]; } message UnassignRegionStateData { @@ -341,6 +342,7 @@ message UnassignRegionStateData { // This is the server currently hosting the Region, the // server we will send the unassign rpc too. optional ServerName hosting_server = 5; + // We hijacked an old param named 'force' and use it as 'override'. optional bool force = 4 [default = false]; optional bool remove_after_unassigning = 6 [default = false]; // Current attempt index used for expotential backoff when stuck diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index 3f7af6ba22..ce16061ad9 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.ServerListener; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.assignment.RegionStateNode; +import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; @@ -873,7 +875,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { Procedure result = masterServices.getMasterProcedureExecutor().getResult(procId); if (result != null && result.isFailed()) { throw new IOException("Failed to create group table. " + - result.getException().unwrapRemoteIOException()); + MasterProcedureUtil.unwrapRemoteIOException(result)); } } } diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java index 2e3a2f702d..ca2cd6f0f8 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; @@ -196,4 +198,335 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase { .roundRobinAssignment(regions, onlineServers); assertEquals(bogusRegion, assignments.get(LoadBalancer.BOGUS_SERVER_NAME).size()); } + + /** + * Asserts a valid retained assignment plan. + *

+ * Must meet the following conditions: + *

    + *
  • Every input region has an assignment, and to an online server + *
  • If a region had an existing assignment to a server with the same + * address a a currently online server, it will be assigned to it + *
+ */ + private void assertRetainedAssignment( + Map existing, List servers, + Map> assignment) + throws FileNotFoundException, IOException { + // Verify condition 1, every region assigned, and to online server + Set onlineServerSet = new TreeSet<>(servers); + Set assignedRegions = new TreeSet<>(RegionInfo.COMPARATOR); + for (Map.Entry> a : assignment.entrySet()) { + assertTrue( + "Region assigned to server that was not listed as online", + onlineServerSet.contains(a.getKey())); + for (RegionInfo r : a.getValue()) { + assignedRegions.add(r); + } + } + assertEquals(existing.size(), assignedRegions.size()); + + // Verify condition 2, every region must be assigned to correct server. + Set onlineHostNames = new TreeSet<>(); + for (ServerName s : servers) { + onlineHostNames.add(s.getHostname()); + } + + for (Map.Entry> a : assignment.entrySet()) { + ServerName currentServer = a.getKey(); + for (RegionInfo r : a.getValue()) { + ServerName oldAssignedServer = existing.get(r); + TableName tableName = r.getTable(); + String groupName = + getMockedGroupInfoManager().getRSGroupOfTable(tableName); + assertTrue(StringUtils.isNotEmpty(groupName)); + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup( + groupName); + assertTrue( + "Region is not correctly assigned to group servers.", + gInfo.containsServer(currentServer.getAddress())); + if (oldAssignedServer != null + && onlineHostNames.contains(oldAssignedServer + .getHostname())) { + // this region was previously assigned somewhere, and that + // host is still around, then the host must have been is a + // different group. + if (!oldAssignedServer.getAddress().equals(currentServer.getAddress())) { + assertFalse(gInfo.containsServer(oldAssignedServer.getAddress())); + } + } + } + } + } + + private String printStats( + ArrayListMultimap groupBasedLoad) { + StringBuffer sb = new StringBuffer(); + sb.append("\n"); + for (String groupName : groupBasedLoad.keySet()) { + sb.append("Stats for group: " + groupName); + sb.append("\n"); + sb.append(groupMap.get(groupName).getServers()); + sb.append("\n"); + List groupLoad = groupBasedLoad.get(groupName); + int numServers = groupLoad.size(); + int totalRegions = 0; + sb.append("Per Server Load: \n"); + for (ServerAndLoad sLoad : groupLoad) { + sb.append("Server :" + sLoad.getServerName() + " Load : " + + sLoad.getLoad() + "\n"); + totalRegions += sLoad.getLoad(); + } + sb.append(" Group Statistics : \n"); + float average = (float) totalRegions / numServers; + int max = (int) Math.ceil(average); + int min = (int) Math.floor(average); + sb.append("[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + + average + " max=" + max + " min=" + min + "]"); + sb.append("\n"); + sb.append("==============================="); + sb.append("\n"); + } + return sb.toString(); + } + + private ArrayListMultimap convertToGroupBasedMap( + final Map> serversMap) throws IOException { + ArrayListMultimap loadMap = ArrayListMultimap + .create(); + for (RSGroupInfo gInfo : getMockedGroupInfoManager().listRSGroups()) { + Set
groupServers = gInfo.getServers(); + for (Address hostPort : groupServers) { + ServerName actual = null; + for(ServerName entry: servers) { + if(entry.getAddress().equals(hostPort)) { + actual = entry; + break; + } + } + List regions = serversMap.get(actual); + assertTrue("No load for " + actual, regions != null); + loadMap.put(gInfo.getName(), + new ServerAndLoad(actual, regions.size())); + } + } + return loadMap; + } + + private ArrayListMultimap reconcile( + ArrayListMultimap previousLoad, + List plans) { + ArrayListMultimap result = ArrayListMultimap + .create(); + result.putAll(previousLoad); + if (plans != null) { + for (RegionPlan plan : plans) { + ServerName source = plan.getSource(); + updateLoad(result, source, -1); + ServerName destination = plan.getDestination(); + updateLoad(result, destination, +1); + } + } + return result; + } + + private void updateLoad( + ArrayListMultimap previousLoad, + final ServerName sn, final int diff) { + for (String groupName : previousLoad.keySet()) { + ServerAndLoad newSAL = null; + ServerAndLoad oldSAL = null; + for (ServerAndLoad sal : previousLoad.get(groupName)) { + if (ServerName.isSameAddress(sn, sal.getServerName())) { + oldSAL = sal; + newSAL = new ServerAndLoad(sn, sal.getLoad() + diff); + break; + } + } + if (newSAL != null) { + previousLoad.remove(groupName, oldSAL); + previousLoad.put(groupName, newSAL); + break; + } + } + } + + private Map> mockClusterServers() throws IOException { + assertTrue(servers.size() == regionAssignment.length); + Map> assignment = new TreeMap<>(); + for (int i = 0; i < servers.size(); i++) { + int numRegions = regionAssignment[i]; + List regions = assignedRegions(numRegions, servers.get(i)); + assignment.put(servers.get(i), regions); + } + return assignment; + } + + /** + * Generate a list of regions evenly distributed between the tables. + * + * @param numRegions The number of regions to be generated. + * @return List of RegionInfo. + */ + private List randomRegions(int numRegions) { + List regions = new ArrayList<>(numRegions); + byte[] start = new byte[16]; + byte[] end = new byte[16]; + rand.nextBytes(start); + rand.nextBytes(end); + int regionIdx = rand.nextInt(tables.length); + for (int i = 0; i < numRegions; i++) { + Bytes.putInt(start, 0, numRegions << 1); + Bytes.putInt(end, 0, (numRegions << 1) + 1); + int tableIndex = (i + regionIdx) % tables.length; + regions.add(RegionInfoBuilder.newBuilder(tables[tableIndex]) + .setStartKey(start) + .setEndKey(end) + .setSplit(false) + .setRegionId(regionId++) + .build()); + } + return regions; + } + + /** + * Generate assigned regions to a given server using group information. + * + * @param numRegions the num regions to generate + * @param sn the servername + * @return the list of regions + * @throws java.io.IOException Signals that an I/O exception has occurred. + */ + private List assignedRegions(int numRegions, ServerName sn) throws IOException { + List regions = new ArrayList<>(numRegions); + byte[] start = new byte[16]; + byte[] end = new byte[16]; + Bytes.putInt(start, 0, numRegions << 1); + Bytes.putInt(end, 0, (numRegions << 1) + 1); + for (int i = 0; i < numRegions; i++) { + TableName tableName = getTableName(sn); + regions.add(RegionInfoBuilder.newBuilder(tableName) + .setStartKey(start) + .setEndKey(end) + .setSplit(false) + .setRegionId(regionId++) + .build()); + } + return regions; + } + + private static List generateServers(int numServers) { + List servers = new ArrayList<>(numServers); + for (int i = 0; i < numServers; i++) { + String host = "server" + rand.nextInt(100000); + int port = rand.nextInt(60000); + servers.add(ServerName.valueOf(host, port, -1)); + } + return servers; + } + + /** + * Construct group info, with each group having at least one server. + * + * @param servers the servers + * @param groups the groups + * @return the map + */ + private static Map constructGroupInfo( + List servers, String[] groups) { + assertTrue(servers != null); + assertTrue(servers.size() >= groups.length); + int index = 0; + Map groupMap = new HashMap<>(); + for (String grpName : groups) { + RSGroupInfo RSGroupInfo = new RSGroupInfo(grpName); + RSGroupInfo.addServer(servers.get(index).getAddress()); + groupMap.put(grpName, RSGroupInfo); + index++; + } + while (index < servers.size()) { + int grpIndex = rand.nextInt(groups.length); + groupMap.get(groups[grpIndex]).addServer( + servers.get(index).getAddress()); + index++; + } + return groupMap; + } + + /** + * Construct table descriptors evenly distributed between the groups. + * + * @return the list + */ + private static List constructTableDesc() { + List tds = Lists.newArrayList(); + int index = rand.nextInt(groups.length); + for (int i = 0; i < tables.length; i++) { + HTableDescriptor htd = new HTableDescriptor(tables[i]); + int grpIndex = (i + index) % groups.length ; + String groupName = groups[grpIndex]; + tableMap.put(tables[i], groupName); + tds.add(htd); + } + tableMap.put(table0, ""); + tds.add(new HTableDescriptor(table0)); + return tds; + } + + private static MasterServices getMockedMaster() throws IOException { + TableDescriptors tds = Mockito.mock(TableDescriptors.class); + Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(0)); + Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(1)); + Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(2)); + Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(3)); + MasterServices services = Mockito.mock(HMaster.class); + Mockito.when(services.getTableDescriptors()).thenReturn(tds); + AssignmentManager am = Mockito.mock(AssignmentManager.class); + Mockito.when(services.getAssignmentManager()).thenReturn(am); + RegionStates rss = Mockito.mock(RegionStates.class); + Mockito.when(am.getRegionStates()).thenReturn(rss); + return services; + } + + private static RSGroupInfoManager getMockedGroupInfoManager() throws IOException { + RSGroupInfoManager gm = Mockito.mock(RSGroupInfoManager.class); + Mockito.when(gm.getRSGroup(groups[0])).thenReturn( + groupMap.get(groups[0])); + Mockito.when(gm.getRSGroup(groups[1])).thenReturn( + groupMap.get(groups[1])); + Mockito.when(gm.getRSGroup(groups[2])).thenReturn( + groupMap.get(groups[2])); + Mockito.when(gm.getRSGroup(groups[3])).thenReturn( + groupMap.get(groups[3])); + Mockito.when(gm.listRSGroups()).thenReturn( + Lists.newLinkedList(groupMap.values())); + Mockito.when(gm.isOnline()).thenReturn(true); + Mockito.when(gm.getRSGroupOfTable(Mockito.any())) + .thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return tableMap.get(invocation.getArgument(0)); + } + }); + return gm; + } + + private TableName getTableName(ServerName sn) throws IOException { + TableName tableName = null; + RSGroupInfoManager gm = getMockedGroupInfoManager(); + RSGroupInfo groupOfServer = null; + for(RSGroupInfo gInfo : gm.listRSGroups()){ + if(gInfo.containsServer(sn.getAddress())){ + groupOfServer = gInfo; + break; + } + } + + for(HTableDescriptor desc : tableDescs){ + if(gm.getRSGroupOfTable(desc.getTableName()).endsWith(groupOfServer.getName())){ + tableName = desc.getTableName(); + } + } + return tableName; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0678bfe947..6a013f49fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1046,7 +1046,7 @@ public class HMaster extends HRegionServer implements MasterServices { // as procedures run -- in particular SCPs for crashed servers... One should put up hbase:meta // if it is down. It may take a while to come online. So, wait here until meta if for sure // available. Thats what waitUntilMetaOnline does. - if (!waitUntilMetaOnline()) { + if (!waitForMetaOnline()) { return; } this.assignmentManager.joinCluster(); @@ -1078,7 +1078,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Here we expect hbase:namespace to be online. See inside initClusterSchemaService. // TODO: Fix this. Namespace is a pain being a sort-of system table. Fold it in to hbase:meta. // isNamespace does like isMeta and waits until namespace is onlined before allowing progress. - if (!waitUntilNamespaceOnline()) { + if (!waitForNamespaceOnline()) { return; } status.setStatus("Starting cluster schema service"); @@ -1162,7 +1162,7 @@ public class HMaster extends HRegionServer implements MasterServices { * and we will hold here until operator intervention. */ @VisibleForTesting - public boolean waitUntilMetaOnline() throws InterruptedException { + public boolean waitForMetaOnline() throws InterruptedException { return isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO); } @@ -1203,7 +1203,7 @@ public class HMaster extends HRegionServer implements MasterServices { * @return True if namespace table is up/online. */ @VisibleForTesting - public boolean waitUntilNamespaceOnline() throws InterruptedException { + public boolean waitForNamespaceOnline() throws InterruptedException { List ris = this.assignmentManager.getRegionStates(). getRegionsOfTable(TableName.NAMESPACE_TABLE_NAME); if (ris.isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 4702ad9bb0..6cc4fb049c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -29,7 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetricsBuilder; @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -595,7 +594,7 @@ public class MasterRpcServices extends RSRpcServices BalanceRequest request) throws ServiceException { try { return BalanceResponse.newBuilder().setBalancerRan(master.balance( - request.hasForce() ? request.getForce() : false)).build(); + request.hasForce()? request.getForce(): false)).build(); } catch (IOException ex) { throw new ServiceException(ex); } @@ -1181,7 +1180,8 @@ public class MasterRpcServices extends RSRpcServices if (executor.isFinished(procId)) { builder.setState(GetProcedureResultResponse.State.FINISHED); if (result.isFailed()) { - IOException exception = result.getException().unwrapRemoteIOException(); + IOException exception = + MasterProcedureUtil.unwrapRemoteIOException(result); builder.setException(ForeignExceptionUtil.toProtoForeignException(exception)); } byte[] resultData = result.getResult(); @@ -2331,6 +2331,23 @@ public class MasterRpcServices extends RSRpcServices return ri; } + /** + * Submit the Procedure that gets created by f + * @return pid of the submitted Procedure. + */ + private long submitProcedure(HBaseProtos.RegionSpecifier rs, boolean override, + BiFunction f) + throws UnknownRegionException { + RegionInfo ri = getRegionInfo(rs); + long pid = Procedure.NO_PROC_ID; + if (ri == null) { + LOG.warn("No RegionInfo found to match {}", rs); + } else { + pid = this.master.getMasterProcedureExecutor().submitProcedure(f.apply(ri, override)); + } + return pid; + } + /** * A 'raw' version of assign that does bulk and skirts Master state checks (assigns can be made * during Master startup). For use by Hbck2. @@ -2346,15 +2363,11 @@ public class MasterRpcServices extends RSRpcServices MasterProtos.AssignsResponse.Builder responseBuilder = MasterProtos.AssignsResponse.newBuilder(); try { + boolean override = request.getOverride(); for (HBaseProtos.RegionSpecifier rs: request.getRegionList()) { - // Assign is synchronous as of hbase-2.2. Need an asynchronous one. - RegionInfo ri = getRegionInfo(rs); - if (ri == null) { - LOG.info("Unknown={}", rs); - responseBuilder.addPid(Procedure.NO_PROC_ID); - continue; - } - responseBuilder.addPid(this.master.getAssignmentManager().assign(ri)); + long pid = submitProcedure(rs, override, + (r, b) -> this.master.getAssignmentManager().createAssignProcedure(r, b)); + responseBuilder.addPid(pid); } return responseBuilder.build(); } catch (IOException ioe) { @@ -2377,15 +2390,11 @@ public class MasterRpcServices extends RSRpcServices MasterProtos.UnassignsResponse.Builder responseBuilder = MasterProtos.UnassignsResponse.newBuilder(); try { + boolean override = request.getOverride(); for (HBaseProtos.RegionSpecifier rs: request.getRegionList()) { - // Unassign is synchronous as of hbase-2.2. Need an asynchronous one. - RegionInfo ri = getRegionInfo(rs); - if (ri == null) { - LOG.info("Unknown={}", rs); - responseBuilder.addPid(Procedure.NO_PROC_ID); - continue; - } - responseBuilder.addPid(this.master.getAssignmentManager().unassign(ri)); + long pid = submitProcedure(rs, override, + (r, b) -> this.master.getAssignmentManager().createUnassignProcedure(r, b)); + responseBuilder.addPid(pid); } return responseBuilder.build(); } catch (IOException ioe) { @@ -2410,7 +2419,7 @@ public class MasterRpcServices extends RSRpcServices try { List ret = master.getMasterProcedureExecutor().bypassProcedure(request.getProcIdList(), - request.getWaitTime(), request.getForce()); + request.getWaitTime(), request.getOverride(), request.getRecursive()); return MasterProtos.BypassProcedureResponse.newBuilder().addAllBypassed(ret).build(); } catch (IOException e) { throw new ServiceException(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java index 33a35453e0..ab772c77b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java @@ -79,6 +79,9 @@ public class AssignProcedure extends RegionTransitionProcedure { if (getAttempt() > 0) { state.setAttempt(getAttempt()); } + if (isOverride()) { + state.setOverride(isOverride()); + } serializer.serialize(state.build()); } @@ -88,6 +91,7 @@ public class AssignProcedure extends RegionTransitionProcedure { setTransitionState(state.getTransitionState()); setRegionInfo(ProtobufUtil.toRegionInfo(state.getRegionInfo())); forceNewPlan = state.getForceNewPlan(); + setOverride(state.getOverride()); if (state.hasTargetServer()) { this.targetServer = ProtobufUtil.toServerName(state.getTargetServer()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index 2f947656f7..dc8ab8a217 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -59,8 +59,9 @@ public abstract class RegionTransitionProcedure extends Procedure pids = Arrays.asList(procId); - List results = TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false); + List results = + TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false, false); assertTrue("Failed to by pass procedure!", results.get(0)); TEST_UTIL.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); LOG.info("{} finished", proc); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionBypass.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionBypass.java new file mode 100644 index 0000000000..4bc9f272da --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionBypass.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Tests bypass on a region assign/unassign + */ +@Category({LargeTests.class}) +public class TestRegionBypass { + private final static Logger LOG = LoggerFactory.getLogger(TestRegionBypass.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionBypass.class); + + @Rule + public TestName name = new TestName(); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private TableName tableName; + + @BeforeClass + public static void startCluster() throws Exception { + TEST_UTIL.startMiniCluster(2); + } + + @AfterClass + public static void stopCluster() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void before() throws IOException { + this.tableName = TableName.valueOf(this.name.getMethodName()); + // Create a table. Has one region at least. + TEST_UTIL.createTable(this.tableName, Bytes.toBytes("cf")); + + } + + @Test + public void testBypass() throws IOException { + Admin admin = TEST_UTIL.getAdmin(); + List regions = admin.getRegions(this.tableName); + for (RegionInfo ri: regions) { + admin.unassign(ri.getRegionName(), false); + } + List pids = new ArrayList<>(regions.size()); + for (RegionInfo ri: regions) { + Procedure p = new StallingAssignProcedure(ri); + pids.add(TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(). + submitProcedure(p)); + } + for (Long pid: pids) { + while (!TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().isStarted(pid)) { + Thread.currentThread().yield(); + } + } + // Call bypass on all. We should be stuck in the dispatch at this stage. + List> ps = + TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().getProcedures(); + for (Procedure p: ps) { + if (p instanceof StallingAssignProcedure) { + List bs = TEST_UTIL.getHbck(). + bypassProcedure(Arrays.asList(p.getProcId()), 0, false, false); + for (Boolean b: bs) { + LOG.info("BYPASSED {} {}", p.getProcId(), b); + } + } + } + // Countdown the latch so its not hanging out. + for (Procedure p: ps) { + if (p instanceof StallingAssignProcedure) { + ((StallingAssignProcedure)p).latch.countDown(); + } + } + // Try and assign WITHOUT override flag. Should fail!. + for (RegionInfo ri: regions) { + try { + admin.assign(ri.getRegionName()); + } catch (Throwable dnrioe) { + // Expected + LOG.info("Expected {}", dnrioe); + } + } + while (!TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(). + getActiveProcIds().isEmpty()) { + Thread.currentThread().yield(); + } + // Now assign with the override flag. + for (RegionInfo ri: regions) { + TEST_UTIL.getHbck().assigns(Arrays.asList(ri.getEncodedName()), true); + } + while (!TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(). + getActiveProcIds().isEmpty()) { + Thread.currentThread().yield(); + } + for (RegionInfo ri: regions) { + assertTrue(ri.toString(), TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(). + getRegionStates().isRegionOnline(ri)); + } + } + + /** + * An AssignProcedure that Stalls just before the finish. + */ + public static class StallingAssignProcedure extends AssignProcedure { + public final CountDownLatch latch = new CountDownLatch(2); + + public StallingAssignProcedure() { + super(); + } + + public StallingAssignProcedure(RegionInfo regionInfo) { + super(regionInfo); + } + + @Override + void setTransitionState(MasterProcedureProtos.RegionTransitionState state) { + if (state == MasterProcedureProtos.RegionTransitionState.REGION_TRANSITION_DISPATCH) { + try { + LOG.info("LATCH2 {}", this.latch.getCount()); + this.latch.await(); + LOG.info("LATCH3 {}", this.latch.getCount()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else if (state == MasterProcedureProtos.RegionTransitionState.REGION_TRANSITION_QUEUE) { + // Set latch. + LOG.info("LATCH1 {}", this.latch.getCount()); + this.latch.countDown(); + } + super.setTransitionState(state); + } + } +} diff --git a/hbase-shell/src/main/ruby/shell/commands/list_procedures.rb b/hbase-shell/src/main/ruby/shell/commands/list_procedures.rb index 68842a0410..cd6e7516f9 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_procedures.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_procedures.rb @@ -31,12 +31,11 @@ EOF end def command - formatter.header(%w[Id Name State Submitted_Time Last_Update Parameters]) - + formatter.header(%w[PID Name State Submitted Last_Update Parameters]) list = JSON.parse(admin.list_procedures) list.each do |proc| - submitted_time = Time.at(Integer(proc['submittedTime']) / 1000).to_s - last_update = Time.at(Integer(proc['lastUpdate']) / 1000).to_s + submitted_time = Time.at(Integer(proc['submittedTime'])/1000).to_s + last_update = Time.at(Integer(proc['lastUpdate'])/1000).to_s formatter.row([proc['procId'], proc['className'], proc['state'], submitted_time, last_update, proc['stateMessage']]) end diff --git a/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestShell.java b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestShell.java index ab36edc278..9bb8eacce8 100644 --- a/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestShell.java +++ b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestShell.java @@ -39,5 +39,4 @@ public class TestShell extends AbstractTestShell { // Start all ruby tests jruby.runScriptlet(PathType.ABSOLUTE, "src/test/ruby/tests_runner.rb"); } - } diff --git a/hbase-shell/src/test/ruby/shell/shell_test.rb b/hbase-shell/src/test/ruby/shell/shell_test.rb index c1e9017edb..abf92357c4 100644 --- a/hbase-shell/src/test/ruby/shell/shell_test.rb +++ b/hbase-shell/src/test/ruby/shell/shell_test.rb @@ -85,9 +85,9 @@ class ShellTest < Test::Unit::TestCase define_test "Shell::Shell interactive mode should not throw" do # incorrect number of arguments - @shell.command('create', 'foo') - @shell.command('create', 'foo', 'family_1') + @shell.command('create', 'nothrow_table') + @shell.command('create', 'nothrow_table', 'family_1') # create a table that exists - @shell.command('create', 'foo', 'family_1') + @shell.command('create', 'nothrow_table', 'family_1') end end -- 2.16.3