From 0829364cecbd1d8ed7f9de161998faeadb8c82c0 Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Mon, 11 Jul 2016 13:17:56 -0700 Subject: [PATCH] HBASE-16081 Wait for Replication Tasks to complete before killing the ThreadPoolExecutor inside of HBaseInterClusterReplicationEndpoint --- .../HBaseInterClusterReplicationEndpoint.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 28340b5..6e9f58c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -82,6 +82,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS private int socketTimeoutMultiplier; + // Amount of time for shutdown to wait for all tasks to complete + private long maxTerminationWait; //Metrics for this source private MetricsSource metrics; // Handles connecting to peer region servers @@ -102,6 +104,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier); + this.maxTerminationWait = this.conf.getLong("replication.source.maxterminationwait", + Long.MAX_VALUE); // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. @@ -321,7 +325,18 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi LOG.warn("Failed to close the connection"); } } - exec.shutdownNow(); + // Allow currently submitted replication tasks to finish termination + exec.shutdown(); + try { + exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + // Log an error if the tasks did not terminate in time + if (!exec.isTerminated()) { + LOG.error("HBaseInterClusterReplicationEndpoint failed to finish all ThreadPool tasks in " + + maxTerminationWait + " milliseconds. HBaseInterClusterReplicationEndpoint.replicate()" + + "may be blocked on pool.take(), consider restarting the server."); + } notifyStopped(); } -- 2.8.0-rc2