From 2cabd4821f5d2ba41423ea9597d652ee9e1ea3d2 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Wed, 1 Nov 2017 17:12:11 -0700 Subject: [PATCH] HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster --- .../hadoop/hbase/rsgroup/RSGroupInfoManager.java | 4 ++ .../hbase/rsgroup/RSGroupInfoManagerImpl.java | 79 +++++++++++++++++++++- 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java index ab423e9565..87a87f6194 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java @@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.net.Address; */ @InterfaceAudience.Private public interface RSGroupInfoManager { + + String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait"; + long DEFAULT_REASSIGN_WAIT_INTERVAL = 10 * 1000L; + //Assigned before user tables TableName RSGROUP_TABLE_NAME = TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup"); diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index 80eaefb036..49871cff55 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.ServerListener; import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -81,6 +82,7 @@ import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; import org.apache.hadoop.hbase.security.access.AccessControlLists; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -119,6 +121,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene private volatile Set prevRSGroups; private RSGroupSerDe rsGroupSerDe; private DefaultServerUpdater defaultServerUpdater; + private FailedOpenUpdater failedOpenUpdater; private boolean isInit = false; public RSGroupInfoManagerImpl(MasterServices master) throws IOException { @@ -136,8 +139,10 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene refresh(); rsGroupStartupWorker.start(); defaultServerUpdater = new DefaultServerUpdater(this); + Threads.setDaemonThreadRunning(defaultServerUpdater); + failedOpenUpdater = new FailedOpenUpdater(this); + Threads.setDaemonThreadRunning(failedOpenUpdater); master.getServerManager().registerListener(this); - defaultServerUpdater.start(); isInit = true; } @@ -493,6 +498,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene @Override public void serverAdded(ServerName serverName) { defaultServerUpdater.serverChanged(); + failedOpenUpdater.serverChanged(); } @Override @@ -507,14 +513,18 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) { this.mgr = mgr; + setName(DefaultServerUpdater.class.getName()+"-" + mgr.master.getServerName()); + setDaemon(true); } @Override public void run() { List
prevDefaultServers = new LinkedList
(); - while(!mgr.master.isAborted() || !mgr.master.isStopped()) { + while (!mgr.master.isAborted() && !mgr.master.isStopped()) { try { - LOG.info("Updating default servers."); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating default servers"); + } List
servers = mgr.getDefaultServers(); Collections.sort(servers, new Comparator
() { @Override @@ -554,6 +564,69 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene } } + private static class FailedOpenUpdater extends Thread { + private static final Log LOG = LogFactory.getLog(FailedOpenUpdater.class); + + private final RSGroupInfoManagerImpl mgr; + private final long waitInterval; + private boolean hasChanged = false; + + public FailedOpenUpdater(RSGroupInfoManagerImpl mgr) { + this.mgr = mgr; + this.waitInterval = mgr.master.getConfiguration().getLong(REASSIGN_WAIT_INTERVAL_KEY, + DEFAULT_REASSIGN_WAIT_INTERVAL); + setName(FailedOpenUpdater.class.getName()+"-" + mgr.master.getServerName()); + setDaemon(true); + } + + @Override + public void run() { + while (!mgr.master.isAborted() && !mgr.master.isStopped()) { + try { + synchronized (this) { + if (!hasChanged) { + wait(); + } + hasChanged = false; + } + } catch (InterruptedException e) { + } + if (mgr.master.isAborted() || mgr.master.isStopped()) { + continue; + } + + // First, wait a while in case more servers are about to rejoin the cluster + try { + Thread.sleep(waitInterval); + } catch (InterruptedException e) { + } + if (mgr.master.isAborted() || mgr.master.isStopped()) { + continue; + } + + // Kick all regions in FAILED_OPEN state + List failedAssignments = Lists.newArrayList(); + for (RegionState state: + mgr.master.getAssignmentManager().getRegionStates().getRegionsInTransition()) { + if (state.isFailedOpen()) { + failedAssignments.add(state.getRegion()); + } + } + for (HRegionInfo region: failedAssignments) { + LOG.info("Retrying assignment of " + region); + mgr.master.getAssignmentManager().unassign(region); + } + } + } + + public void serverChanged() { + synchronized (this) { + hasChanged = true; + this.notify(); + } + } + } + @Override public void waiting() { -- 2.13.4