From 044f06d98a403546023e34e17261e38c70da696c Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Mon, 11 Jul 2016 13:17:56 -0700 Subject: [PATCH] HBASE-16081 Wait for Replication Tasks to complete before killing the ThreadPoolExecutor inside of HBaseInterClusterReplicationEndpoint --- .../HBaseInterClusterReplicationEndpoint.java | 25 ++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 28340b5..37183c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -34,6 +34,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -82,6 +83,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS private int socketTimeoutMultiplier; + // Amount of time for shutdown to wait for all tasks to complete + private long maxTerminationWait; //Metrics for this source private MetricsSource metrics; // Handles connecting to peer region servers @@ -93,6 +96,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private Path baseNamespaceDir; private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; + private volatile AtomicBoolean shutDownNow; @Override public void init(Context context) throws IOException { @@ -102,6 +106,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier); + // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator + // tasks to terminate when doStop() is called. + this.maxTerminationWait = this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. @@ -117,6 +125,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue()); this.exec.allowCoreThreadTimeOut(true); + this.shutDownNow = new AtomicBoolean(false); this.replicationBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, @@ -211,7 +220,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e); } } - while (this.isRunning()) { + while (this.isRunning() && !shutDownNow.get()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; @@ -321,7 +330,19 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi LOG.warn("Failed to close the connection"); } } - exec.shutdownNow(); + shutDownNow.set(true); + // Allow currently submitted replication tasks to finish termination + exec.shutdown(); + try { + exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + // Log an error if the tasks did not terminate in time + if (!exec.isTerminated()) { + LOG.error("HBaseInterClusterReplicationEndpoint failed to finish all ThreadPool tasks in " + + maxTerminationWait + " milliseconds. HBaseInterClusterReplicationEndpoint.replicate()" + + "may be blocked on pool.take(), consider restarting the server."); + } notifyStopped(); } -- 2.8.0-rc2