From fd4c8625179410402cdb6486333cfd4375b4763f Mon Sep 17 00:00:00 2001 From: Sean Busbey Date: Sun, 14 Apr 2019 23:31:18 -0500 Subject: [PATCH] HBASE-22263 WIP * add additional logging to GeneralBulkAssigner's timeout handling. * more debug info about SCPs on master init. * don't schedule SCP for ServerName instances that have them. * restore processed status on SCP resume. --- .../hadoop/hbase/master/AssignmentManager.java | 2 ++ .../hadoop/hbase/master/GeneralBulkAssigner.java | 19 ++++++++++-- .../org/apache/hadoop/hbase/master/HMaster.java | 35 +++++++++++++++++++++- .../apache/hadoop/hbase/master/ServerManager.java | 4 +++ .../master/procedure/ServerCrashProcedure.java | 19 ++++++++++++ 5 files changed, 75 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index a039e6a6b85..f90da7366e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -404,6 +404,8 @@ public class AssignmentManager extends ZooKeeperListener { * This SHOULD not be public. It is public now * because of some unit tests. * + * XXX Also public due to ServerCrashProcedure + * * TODO: make it package private and keep RegionStates in the master package */ public RegionStates getRegionStates() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java index 43ea52345cb..81bd61c320c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java @@ -92,13 +92,19 @@ public class GeneralBulkAssigner extends BulkAssigner { int regionCount = regionSet.size(); long startTime = System.currentTimeMillis(); long rpcWaitTime = startTime + timeout; + final long TWICE_EXPECTED = TimeUnit.NANOSECONDS.convert(2*100, TimeUnit.MILLISECONDS); while (!server.isStopped() && !pool.isTerminated() && rpcWaitTime > System.currentTimeMillis()) { + long startLoop = System.nanoTime(); if (failedPlans.isEmpty()) { pool.awaitTermination(100, TimeUnit.MILLISECONDS); } else { reassignFailedPlans(); } + long elapsedLoop = System.nanoTime() - startLoop; + if (elapsedLoop > TWICE_EXPECTED) { + LOG.debug("pause+spin loop iteration took " + TimeUnit.MILLISECONDS.convert(elapsedLoop, TimeUnit.NANOSECONDS) + " ms"); + } } if (!pool.isTerminated()) { LOG.warn("bulk assigner is still running after " @@ -138,6 +144,7 @@ public class GeneralBulkAssigner extends BulkAssigner { Configuration conf = server.getConfiguration(); long perRegionOpenTimeGuesstimate = conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000); + LOG.debug("hbase.bulk.assignment.perregion.open.time = " + perRegionOpenTimeGuesstimate); int maxRegionsPerServer = 1; for (List regionList : bulkPlan.values()) { int size = regionList.size(); @@ -145,10 +152,16 @@ public class GeneralBulkAssigner extends BulkAssigner { maxRegionsPerServer = size; } } + LOG.debug("maxRegionsPerServer = " + maxRegionsPerServer); + long rpcStartupWaittime = conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000); + LOG.debug("hbase.regionserver.rpc.startup.waittime = " +rpcStartupWaittime); + long bulkPerRegionServerRpcWaittime = conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime", + 30000); + LOG.debug("hbase.bulk.assignment.perregionserver.rpc.waittime = " + bulkPerRegionServerRpcWaittime); + LOG.debug("bulkPlan.size() = " + bulkPlan.size()); long timeout = perRegionOpenTimeGuesstimate * maxRegionsPerServer - + conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000) - + conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime", - 30000) * bulkPlan.size(); + + rpcStartupWaittime + + bulkPerRegionServerRpcWaittime * bulkPlan.size(); LOG.debug("Timeout-on-RIT=" + timeout); return timeout; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index e8b9a8193df..750f9c4fee1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; @@ -134,6 +135,7 @@ import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo; @@ -782,6 +784,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // start up all service threads. status.setStatus("Initializing master service threads"); + // after this procedure executor should be up and waiting startServiceThreads(); // Wake up this server to check in @@ -846,10 +849,36 @@ public class HMaster extends HRegionServer implements MasterServices, Server { if (isStopped()) return; status.setStatus("Submitting log splitting work for previously failed region servers"); + + // grab the list of procedures once. SCP fom pre-crash should all be loaded, and can't progress + // until AM joins the cluster any SCPs that got added after we get the log folder list should be + // for a different start code. + final Set alreadyHasSCP = new HashSet<>(); + long scpCount = 0; + for (ProcedureInfo procInfo : this.procedureExecutor.listProcedures() ) { + final Procedure proc = this.procedureExecutor.getProcedure(procInfo.getProcId()); + if (proc != null) { + if (proc instanceof ServerCrashProcedure) { + scpCount++; + alreadyHasSCP.add(((ServerCrashProcedure)proc).getServerName()); + } + } + } + LOG.info("Restored proceduces include " + scpCount + " SCP covering " + alreadyHasSCP.size() + + " ServerName."); + + + LOG.info("Checking " + previouslyFailedServers.size() + " previously failed servers (seen via wals) for existing SCP."); + // AM should be in "not yet init" and these should all be queued // Master has recovered hbase:meta region server and we put // other failed region servers in a queue to be handled later by SSH for (ServerName tmpServer : previouslyFailedServers) { - this.serverManager.processDeadServer(tmpServer, true); + if (alreadyHasSCP.contains(tmpServer)) { + LOG.info("Skipping failed server in FS because it already has a queued SCP: " + tmpServer); + } else { + LOG.info("Process failed server in FS that has no queued SCP: " + tmpServer); + this.serverManager.processDeadServer(tmpServer, true); + } } // Update meta with new PB serialization if required. i.e migrate all HRI to PB serialization @@ -860,6 +889,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // Fix up assignment manager status status.setStatus("Starting assignment manager"); + // die somewhere in here for SCP flood I think. + // previouslyFailedServers will get checked here to see if we can skip SCPs this.assignmentManager.joinCluster(); // set cluster status again after user regions are assigned @@ -911,6 +942,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { Set EMPTY_SET = new HashSet(); int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM); + // the first meta replica will call enableCrashedServerProcessing for (int i = 1; i < numReplicas; i++) { assignMeta(status, EMPTY_SET, i); } @@ -2728,6 +2760,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { return serverCrashProcessingEnabled.isReady(); } + // XXX what @VisibleForTesting public void setServerCrashProcessingEnabled(final boolean b) { procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index c8893956854..e35167a7380 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -681,11 +681,15 @@ public class ServerManager { // the handler threads and meta table could not be re-assigned in case // the corresponding server is down. So we queue them up here instead. if (!services.getAssignmentManager().isFailoverCleanupDone()) { + LOG.debug("AssignmentManager isn't done cleaning up from failover. Requeue server " + serverName); requeuedDeadServers.put(serverName, shouldSplitWal); return; } + // we don't chck if deadservers already included? + // when a server is already in the dead server list (including start code) do we need to schedule an SCP? this.deadservers.add(serverName); + // scheduled an SCP means AM must be going? ProcedureExecutor procExec = this.services.getMasterProcedureExecutor(); procExec.submitProcedure(new ServerCrashProcedure( procExec.getEnvironment(), serverName, shouldSplitWal, false)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index b6e7a7c9744..1469127b5de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -194,6 +194,7 @@ implements ServerProcedureInterface { if (!services.getAssignmentManager().isFailoverCleanupDone()) { throwProcedureYieldException("Waiting on master failover to complete"); } + // XXX We haven't notified but are already scheduled? // HBASE-14802 // If we have not yet notified that we are processing a dead server, we should do now. if (!notifiedDeadServer) { @@ -257,6 +258,18 @@ implements ServerProcedureInterface { break; case SERVER_CRASH_ASSIGN: + // If we're starting here after a procedure restore and we would have split logs + // on the prior master instance, then make sure this instance of the AM + // knows that the server has been processed + if (this.shouldSplitWal) { + if (LOG.isDebugEnabled()) { + LOG.debug("In case we're resuming, mark as processed (either split or DLR r-i-c) " + + "logs for " + serverName + "; region count=" + size(this.regionsOnCrashedServer)); + } + final AssignmentManager am = env.getMasterServices().getAssignmentManager(); + am.getRegionStates().logSplit(this.serverName); + } + List regionsToAssign = calcRegionsToAssign(env); // Assign may not be idempotent. SSH used to requeue the SSH if we got an IOE assigning @@ -426,6 +439,9 @@ implements ServerProcedureInterface { MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); AssignmentManager am = env.getMasterServices().getAssignmentManager(); mfs.prepareLogReplay(this.serverName, regions); + // If the master doesn't fail, we'll set this again in SERVER_CRASH_ASSIGN + // can we skip doing it here? depends on how fast another observer + // needs to see that things were processed since we yield between now and then. am.getRegionStates().logSplit(this.serverName); } @@ -441,6 +457,9 @@ implements ServerProcedureInterface { if (!carryingMeta) { mfs.archiveMetaLog(this.serverName); } + // If the master doesn't fail, we'll set this again in SERVER_CRASH_ASSIGN + // can we skip doing it here? depends on how fast another observer + // needs to see that things were processed since we yield between now and then. am.getRegionStates().logSplit(this.serverName); } -- 2.16.1