From aa7c32c32be448cafac66027b37d6965136c84db Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 26 Feb 2018 11:44:39 -0800 Subject: [PATCH] HBASE-20087 Periodically attempt redeploy of regions in FAILED_OPEN state --- .../hbase/rsgroup/RSGroupInfoManagerImpl.java | 73 ---------------------- .../hadoop/hbase/master/AssignmentManager.java | 28 +++++++++ .../apache/hadoop/hbase/master/RegionStates.java | 7 +++ 3 files changed, 35 insertions(+), 73 deletions(-) diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index caa7fc6738..be15fa5641 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.ServerListener; import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -121,7 +120,6 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene private volatile Set prevRSGroups; private RSGroupSerDe rsGroupSerDe; private DefaultServerUpdater defaultServerUpdater; - private FailedOpenUpdater failedOpenUpdater; private boolean isInit = false; public RSGroupInfoManagerImpl(MasterServices master) throws IOException { @@ -139,8 +137,6 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene refresh(); defaultServerUpdater = new DefaultServerUpdater(this); Threads.setDaemonThreadRunning(defaultServerUpdater); - failedOpenUpdater = new FailedOpenUpdater(this); - Threads.setDaemonThreadRunning(failedOpenUpdater); master.getServerManager().registerListener(this); isInit = true; } @@ -493,7 +489,6 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene @Override public void serverAdded(ServerName serverName) { defaultServerUpdater.serverChanged(); - failedOpenUpdater.serverChanged(); } @Override @@ -561,74 +556,6 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene } } - private static class FailedOpenUpdater extends Thread { - private static final Log LOG = LogFactory.getLog(FailedOpenUpdater.class); - - private final RSGroupInfoManagerImpl mgr; - private final long waitInterval; - private volatile boolean hasChanged = false; - - public FailedOpenUpdater(RSGroupInfoManagerImpl mgr) { - this.mgr = mgr; - this.waitInterval = mgr.master.getConfiguration().getLong(REASSIGN_WAIT_INTERVAL_KEY, - DEFAULT_REASSIGN_WAIT_INTERVAL); - setName(FailedOpenUpdater.class.getName()+"-" + mgr.master.getServerName()); - setDaemon(true); - } - - @Override - public void run() { - while (!mgr.master.isAborted() && !mgr.master.isStopped()) { - boolean interrupted = false; - try { - synchronized (this) { - while (!hasChanged) { - wait(); - } - hasChanged = false; - } - } catch (InterruptedException e) { - LOG.warn("Interrupted", e); - interrupted = true; - } - if (mgr.master.isAborted() || mgr.master.isStopped() || interrupted) { - continue; - } - - // First, wait a while in case more servers are about to rejoin the cluster - try { - Thread.sleep(waitInterval); - } catch (InterruptedException e) { - LOG.warn("Interrupted", e); - } - if (mgr.master.isAborted() || mgr.master.isStopped()) { - continue; - } - - // Kick all regions in FAILED_OPEN state - List failedAssignments = Lists.newArrayList(); - for (RegionState state: - mgr.master.getAssignmentManager().getRegionStates().getRegionsInTransition()) { - if (state.isFailedOpen()) { - failedAssignments.add(state.getRegion()); - } - } - for (HRegionInfo region: failedAssignments) { - LOG.info("Retrying assignment of " + region); - mgr.master.getAssignmentManager().unassign(region); - } - } - } - - // Only called for server additions - public void serverChanged() { - synchronized (this) { - hasChanged = true; - this.notify(); - } - } - } - @Override public void waiting() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index bc1e02fd0b..60b84308d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -140,6 +140,9 @@ public class AssignmentManager extends ZooKeeperListener { = "hbase.assignment.already.intransition.waittime"; static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute + static final String FAILED_OPEN_RETRY_KEY = "hbase.assignment.failed.open.retry.period"; + static final int FAILED_OPEN_RETRY_DEFAULT = 300000; // 5 minutes + protected final MasterServices server; private ServerManager serverManager; @@ -352,6 +355,12 @@ public class AssignmentManager extends ZooKeeperListener { this.retryConfig.setMaxSleepTime(conf.getLong("hbase.assignment.retry.sleep.max", retryConfig.getSleepInterval())); this.backoffPolicy = getBackoffPolicy(); + + int failedOpenRetryPeriod = conf.getInt(FAILED_OPEN_RETRY_KEY, FAILED_OPEN_RETRY_DEFAULT); + if (failedOpenRetryPeriod > 0) { + scheduledThreadPoolExecutor.scheduleWithFixedDelay(new FailedOpenRetryRunnable(), + failedOpenRetryPeriod, failedOpenRetryPeriod, TimeUnit.MILLISECONDS); + } } /** @@ -4753,4 +4762,23 @@ public class AssignmentManager extends ZooKeeperListener { public static void setTestSkipMergeHandling(boolean skipMergeHandling) { TEST_SKIP_MERGE_HANDLING = skipMergeHandling; } + + /** + * Scheduled task that will attempt to redeploy regions that have transitioned permanently into + * FAILED_OPEN state. + */ + class FailedOpenRetryRunnable implements Runnable { + @Override + public void run() { + // Kick regions that have been transitioned into permanent FAILED_OPEN state + for (RegionState s: getRegionStates().getAllRegions()) { + if (s.isFailedOpen()) { + LOG.info("Retrying assignment of " + s.toDescriptiveString()); + // Run the entire unassign protocol for safety's sake + unassign(s.getRegion()); + } + } + } + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index f47d555db3..4ce1db3601 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -231,6 +231,13 @@ public class RegionStates { return new HashSet(regionsInTransition.values()); } + /** + * Get all regions and their states + */ + public synchronized Set getAllRegions() { + return new HashSet(regionStates.values()); + } + /** * @return a set of the regions in transition that are sorted by timestamp */ -- 2.15.1