From 237e7d5f5f638c819d03ae3abf302dcc27234067 Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Mon, 11 Jul 2016 13:17:56 -0700 Subject: [PATCH] HBASE-16081 Wait for Replication Tasks to complete before killing the ThreadPoolExecutor inside of HBaseInterClusterReplicationEndpoint --- .../hbase/replication/ReplicationEndpoint.java | 7 +++++- .../HBaseInterClusterReplicationEndpoint.java | 29 ++++++++++++++++++++-- .../regionserver/ReplicationSourceManager.java | 2 +- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index c92b53d..69db31c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import java.util.UUID; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -57,6 +58,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe private final String peerId; private final UUID clusterId; private final MetricsSource metrics; + private final Abortable abortable; @InterfaceAudience.Private public Context( @@ -66,7 +68,8 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe final UUID clusterId, final ReplicationPeer replicationPeer, final MetricsSource metrics, - final TableDescriptors tableDescriptors) { + final TableDescriptors tableDescriptors, + final Abortable abortable) { this.conf = conf; this.fs = fs; this.clusterId = clusterId; @@ -74,6 +77,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe this.replicationPeer = replicationPeer; this.metrics = metrics; this.tableDescriptors = tableDescriptors; + this.abortable = abortable; } public Configuration getConfiguration() { return conf; @@ -99,6 +103,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe public TableDescriptors getTableDescriptors() { return tableDescriptors; } + public Abortable getAbortable() { return abortable; } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 28340b5..4b6dbd1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -34,12 +34,14 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableNotFoundException; @@ -82,6 +84,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS private int socketTimeoutMultiplier; + // Amount of time for shutdown to wait for all tasks to complete + private long maxTerminationWait; //Metrics for this source private MetricsSource metrics; // Handles connecting to peer region servers @@ -93,6 +97,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private Path baseNamespaceDir; private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; + private Abortable abortable; + private volatile AtomicBoolean shutDownNow; @Override public void init(Context context) throws IOException { @@ -102,6 +108,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier); + // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator + // tasks to terminate when doStop() is called. + this.maxTerminationWait = this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. @@ -117,6 +127,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue()); this.exec.allowCoreThreadTimeOut(true); + this.shutDownNow = new AtomicBoolean(false); + this.abortable = ctx.getAbortable(); this.replicationBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, @@ -211,7 +223,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e); } } - while (this.isRunning()) { + while (this.isRunning() && !shutDownNow.get()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; @@ -321,7 +333,20 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi LOG.warn("Failed to close the connection"); } } - exec.shutdownNow(); + shutDownNow.set(true); + // Allow currently running replication tasks to finish + exec.shutdown(); + try { + exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + // Abort if the tasks did not terminate in time + if (!exec.isTerminated()) { + String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " + + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " + + "Aborting to prevent Replication from deadlocking. See HBASE-16081."; + abortable.abort(errMsg, new IOException(errMsg)); + } notifyStopped(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 07ee46a..ce29f24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -483,7 +483,7 @@ public class ReplicationSourceManager implements ReplicationListener { // init replication endpoint replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), - fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors)); + fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); return src; } -- 2.8.0-rc2