From 3c195886b896c60efb99fe7e223bb0a7a2ff992d Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 21 Jul 2017 17:24:22 -0700 Subject: [PATCH] HBASE-18370 Port HBASE-16209 to branch-1.3 (cherry picked from commit 7c97acf6e345023f043964d023816d5b3329dde9) (cherry picked from commit 39153b88249ed63ed35b2b88fcc75eb1452951a8) (cherry picked from commit 1b303ad75d4fed773eeab1f5c3d63691843ae9ac) (cherry picked from commit cbdc9fcb8a705f4e5ee28a917a335c6f1ef5df42) --- .../hadoop/hbase/master/AssignmentManager.java | 77 ++++++++++++++++++++-- .../apache/hadoop/hbase/master/RegionStates.java | 17 ++++- .../hbase/master/handler/ClosedRegionHandler.java | 2 +- 3 files changed, 90 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index d247b11046..e9c194dad5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -37,6 +37,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -100,6 +101,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.wal.DefaultWALProvider; @@ -200,6 +202,7 @@ public class AssignmentManager extends ZooKeeperListener { //Thread pool executor service for timeout monitor private java.util.concurrent.ExecutorService threadPoolExecutorService; + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; // A bunch of ZK events workers. Each is a single thread executor service private final java.util.concurrent.ExecutorService zkEventWorkers; @@ -264,6 +267,8 @@ public class AssignmentManager extends ZooKeeperListener { NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN, } + private RetryCounter.BackoffPolicy backoffPolicy; + private RetryCounter.RetryConfig retryConfig; /** * Constructs a new assignment manager. * @@ -309,8 +314,13 @@ public class AssignmentManager extends ZooKeeperListener { "hbase.meta.assignment.retry.sleeptime", 1000l); this.balancer = balancer; int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); + this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( - maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); + maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); + + this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, + Threads.newDaemonThreadFactory("AM.Scheduler")); + this.regionStates = new RegionStates( server, tableStateManager, serverManager, regionStateStore); @@ -329,6 +339,22 @@ public class AssignmentManager extends ZooKeeperListener { this.metricsAssignmentManager = new MetricsAssignmentManager(); useZKForAssignment = ConfigUtil.useZKForAssignment(conf); + // Configurations for retrying opening a region on receiving a FAILED_OPEN + this.retryConfig = new RetryCounter.RetryConfig(); + this.retryConfig.setSleepInterval(conf.getLong("hbase.assignment.retry.sleep.initial", 0l)); + // Set the max time limit to the initial sleep interval so we use a constant time sleep strategy + // if the user does not set a max sleep time + this.retryConfig.setMaxSleepTime(conf.getLong("hbase.assignment.retry.sleep.max", + retryConfig.getSleepInterval())); + this.backoffPolicy = getBackoffPolicy(); + } + + /** + * Returns the backoff policy used for Failed Region Open retries + * @return the backoff policy used for Failed Region Open retries + */ + RetryCounter.BackoffPolicy getBackoffPolicy() { + return new RetryCounter.ExponentialBackoffPolicyWithLimit(); } MetricsAssignmentManager getAssignmentManagerMetrics() { @@ -3393,12 +3419,39 @@ public class AssignmentManager extends ZooKeeperListener { return true; } - void invokeAssign(HRegionInfo regionInfo) { + void invokeAssignNow(HRegionInfo regionInfo, boolean forceNewPlan) { + threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, forceNewPlan)); + } + + void invokeAssignLater(HRegionInfo regionInfo, boolean forceNewPlan, long sleepMillis) { + scheduledThreadPoolExecutor.schedule(new DelayedAssignCallable(new AssignCallable(this, + regionInfo, forceNewPlan)), sleepMillis, TimeUnit.MILLISECONDS); + } + + public void invokeAssign(HRegionInfo regionInfo) { invokeAssign(regionInfo, true); } - void invokeAssign(HRegionInfo regionInfo, boolean newPlan) { - threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan)); + public void invokeAssign(HRegionInfo regionInfo, boolean forceNewPlan) { + if (failedOpenTracker.containsKey(regionInfo.getEncodedName())) { + // Sleep before reassigning if this region has failed to open before + long sleepTime = backoffPolicy.getBackoffTime(retryConfig, + getFailedAttempts(regionInfo.getEncodedName())); + invokeAssignLater(regionInfo, forceNewPlan, sleepTime); + } else { + // Immediately reassign if this region has never failed an open before + invokeAssignNow(regionInfo, forceNewPlan); + } + } + + private int getFailedAttempts(String regionName) { + AtomicInteger failedCount = failedOpenTracker.get(regionName); + if (failedCount != null) { + return failedCount.get(); + } else { + // If we do not have a failed open tracker for a region assume it has never failed before + return 0; + } } void invokeUnAssign(HRegionInfo regionInfo) { @@ -4258,6 +4311,8 @@ public class AssignmentManager extends ZooKeeperListener { return replicasToClose; } + public Map getFailedOpenTracker() {return failedOpenTracker;} + /** * A region is offline. The new state should be the specified one, * if not null. If the specified state is null, the new state is Offline. @@ -4483,4 +4538,18 @@ public class AssignmentManager extends ZooKeeperListener { void setRegionStateListener(RegionStateListener listener) { this.regionStateListener = listener; } + + private class DelayedAssignCallable implements Runnable { + + Callable callable; + + public DelayedAssignCallable(Callable callable) { + this.callable = callable; + } + + @Override + public void run() { + threadPoolExecutorService.submit(callable); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index 9fda8da515..8c90b08e51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -66,6 +66,21 @@ import com.google.common.base.Preconditions; public class RegionStates { private static final Log LOG = LogFactory.getLog(RegionStates.class); + public final static RegionStateStampComparator REGION_STATE_COMPARATOR = + new RegionStateStampComparator(); + + // This comparator sorts the RegionStates by time stamp then Region name. + // Comparing by timestamp alone can lead us to discard different RegionStates that happen + // to share a timestamp. + private static class RegionStateStampComparator implements Comparator { + @Override + public int compare(RegionState l, RegionState r) { + return Long.compare(l.getStamp(), r.getStamp()) == 0 ? + Bytes.compareTo(l.getRegion().getRegionName(), r.getRegion().getRegionName()) : + Long.compare(l.getStamp(), r.getStamp()); + } + } + /** * Regions currently in transition. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java index 74b45bc8a1..389a738c7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java @@ -102,6 +102,6 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf assignmentManager.getRegionStates().setRegionStateTOCLOSED(regionInfo, null); // This below has to do w/ online enable/disable of a table assignmentManager.removeClosedRegion(regionInfo); - assignmentManager.assign(regionInfo, true); + assignmentManager.invokeAssign(regionInfo, false); } }