From 53c56e8120f6709a9fb86d7079ec0515d2a03f2f Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Tue, 5 Jul 2016 13:04:39 -0700 Subject: [PATCH] HBASE-16081 Fixed race condition in HBaseInterClusterReplicationEndpoint where if the exec was shutdown while we were waiting on a Future from pool with pool.take() we would deadlock. Change pool.take() to a timed polling. On timeout, HBaseInterClusterReplicationEndpoint retries. --- .../HBaseInterClusterReplicationEndpoint.java | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 28340b5..f1638fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -82,6 +82,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS private int socketTimeoutMultiplier; + // Amount of time to poll for a completed Replication, before retrying the entire batch of + // Replications + private long replicationTimeout; + //Metrics for this source private MetricsSource metrics; // Handles connecting to peer region servers @@ -102,6 +106,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier); + this.replicationTimeout = this.conf.getLong("replication.source.replicationTimeout", 100000); // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. @@ -242,7 +247,20 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi try { // wait for all futures, remove successful parts // (only the remaining parts will be retried) - Future f = pool.take(); + // We use pool.poll() with a timeout instead of pool.take(), because of the race + // condition mentioned in HBASE-16081 when HBaseInterClusterReplicationEndpoint.doStop() + // is called concurrently with HBaseInterClusterReplicationEndpoint.replicate(). If + // exec.shutdownNow() is called in doStop() while we are blocking on pool.take() + // in replicate() the submitted Replication tasks will never be completed and + // pool.take() will just block indefinitely. + Future f = pool.poll(replicationTimeout, TimeUnit.MILLISECONDS); + // The future f will be null if none of the Replicator tasks have completed by the + // timeout + if (null == f) { + iox = new IOException("HBaseClusterReplicationEndpoint failed to poll a new update" + + "within " + replicationTimeout + " ms. Retrying."); + break; + } int index = f.get().intValue(); int batchSize = entryLists.get(index).size(); entryLists.set(index, Collections.emptyList()); -- 2.8.0-rc2