From ec22af467fe181c6a600c75e89c6ac0833c8de99 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 28 Dec 2017 21:38:12 +0800 Subject: [PATCH] HBASE-19623 Create replication endpoint asynchronously when adding a replication source --- .../regionserver/RecoveredReplicationSource.java | 7 +- .../regionserver/ReplicationSource.java | 84 ++++++++++++++-------- .../regionserver/ReplicationSourceInterface.java | 4 +- .../regionserver/ReplicationSourceManager.java | 41 +---------- .../hbase/client/TestAsyncReplicationAdminApi.java | 2 - .../client/replication/TestReplicationAdmin.java | 2 - .../hbase/replication/ReplicationSourceDummy.java | 4 +- .../hbase/replication/TestReplicationSource.java | 24 +++---- .../regionserver/TestReplicationSourceManager.java | 5 +- 9 files changed, 78 insertions(+), 95 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index e0c45d5..37041af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.FSUtils; @@ -52,10 +51,10 @@ public class RecoveredReplicationSource extends ReplicationSource { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, - String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + MetricsSource metrics) throws IOException { super.init(conf, fs, manager, queueStorage, replicationPeers, server, peerClusterZnode, - clusterId, replicationEndpoint, walFileLengthProvider, metrics); + clusterId, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index fef9054..5669ec0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -39,8 +39,11 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -112,8 +115,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private volatile boolean sourceRunning = false; // Metrics for this source private MetricsSource metrics; - //WARN threshold for the number of queued logs, defaults to 2 + // WARN threshold for the number of queued logs, defaults to 2 private int logQueueWarnThreshold; + // whether the replication endpoint has been initialized + private volatile boolean endpointInitialized = false; // ReplicationEndpoint which will handle the actual replication private ReplicationEndpoint replicationEndpoint; // A filter (or a chain of filters) for the WAL entries. @@ -149,8 +154,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, - String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + MetricsSource metrics) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); this.waitOnEndpointSeconds = @@ -173,7 +178,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); - this.replicationEndpoint = replicationEndpoint; defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); currentBandwidth = getCurrentBandwidth(); @@ -198,7 +202,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf if (queue == null) { queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); queues.put(logPrefix, queue); - if (this.sourceRunning) { + if (this.isSourceActive() && this.endpointInitialized) { // new wal group observed after source startup, start a new worker thread to track it // notice: it's possible that log enqueued when this.running is set but worker thread // still not launched, so it's necessary to check workerThreads before start the worker @@ -243,21 +247,59 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } } + private void initAndStartReplicationEndpoint() throws Exception { + RegionServerCoprocessorHost rsServerHost = null; + TableDescriptors tableDescriptors = null; + if (server instanceof HRegionServer) { + rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); + tableDescriptors = ((HRegionServer) server).getTableDescriptors(); + } + ReplicationPeer peer = replicationPeers.getConnectedPeer(peerId); + String replicationEndpointImpl = peer.getPeerConfig().getReplicationEndpointImpl(); + if (replicationEndpointImpl == null) { + // Default to HBase inter-cluster replication endpoint + replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); + } + replicationEndpoint = + Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance(); + if (rsServerHost != null) { + ReplicationEndpoint newReplicationEndPoint = + rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); + if (newReplicationEndPoint != null) { + // Override the newly created endpoint from the hook with configured end point + replicationEndpoint = newReplicationEndPoint; + } + } + replicationEndpoint.init(new ReplicationEndpoint.Context(conf, peer.getConfiguration(), fs, + peerId, clusterId, peer, metrics, tableDescriptors, server)); + replicationEndpoint.start(); + replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); + } + @Override public void run() { // mark we are running now this.sourceRunning = true; - try { - // start the endpoint, connect to the cluster - this.replicationEndpoint.start(); - this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS); - } catch (Exception ex) { - LOG.warn("Error starting ReplicationEndpoint, exiting", ex); - uninitialize(); - throw new RuntimeException(ex); - } int sleepMultiplier = 1; + while (this.isSourceActive()) { + try { + initAndStartReplicationEndpoint(); + break; + } catch (Exception e) { + LOG.warn("Error starting ReplicationEndpoint, retrying", e); + if (replicationEndpoint != null) { + replicationEndpoint.stop(); + replicationEndpoint = null; + } + if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { + sleepMultiplier++; + } + } + } + this.endpointInitialized = true; + + sleepMultiplier = 1; // delay this until we are in an asynchronous thread while (this.isSourceActive() && this.peerClusterId == null) { this.peerClusterId = replicationEndpoint.getPeerUUID(); @@ -311,7 +353,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf worker.startup(getUncaughtExceptionHandler()); worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); - workerThreads.put(walGroupId, worker); } } @@ -378,19 +419,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; } - private void uninitialize() { - LOG.debug("Source exiting " + this.peerId); - metrics.clear(); - if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) { - this.replicationEndpoint.stop(); - try { - this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS); - } catch (TimeoutException e) { - LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " seconds."); - } - } - } - /** * Do the sleeping logic * @param msg Why we sleep diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 4b9ed74..32a8846 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -51,8 +51,8 @@ public interface ReplicationSourceInterface { */ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, - String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; + String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + MetricsSource metrics) throws IOException; /** * Add a log to the list of logs to replicate diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index f5e0e1b..887c284 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -50,13 +50,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; -import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationPeer; @@ -503,47 +499,12 @@ public class ReplicationSourceManager implements ReplicationListener { */ private ReplicationSourceInterface getReplicationSource(String peerId, ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException { - RegionServerCoprocessorHost rsServerHost = null; - TableDescriptors tableDescriptors = null; - if (server instanceof HRegionServer) { - rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); - tableDescriptors = ((HRegionServer) server).getTableDescriptors(); - } - ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId); - ReplicationEndpoint replicationEndpoint = null; - try { - String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); - if (replicationEndpointImpl == null) { - // Default to HBase inter-cluster replication endpoint - replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); - } - replicationEndpoint = Class.forName(replicationEndpointImpl) - .asSubclass(ReplicationEndpoint.class).newInstance(); - if(rsServerHost != null) { - ReplicationEndpoint newReplicationEndPoint = rsServerHost - .postCreateReplicationEndPoint(replicationEndpoint); - if(newReplicationEndPoint != null) { - // Override the newly created endpoint from the hook with configured end point - replicationEndpoint = newReplicationEndPoint; - } - } - } catch (Exception e) { - LOG.warn("Passed replication endpoint implementation throws errors" - + " while initializing ReplicationSource for peer: " + peerId, e); - throw new IOException(e); - } - MetricsSource metrics = new MetricsSource(peerId); // init replication source src.init(conf, fs, this, queueStorage, replicationPeers, server, peerId, clusterId, - replicationEndpoint, walFileLengthProvider, metrics); - - // init replication endpoint - replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), - fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); - + walFileLengthProvider, metrics); return src; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index b28eaaf..5225add 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; @@ -70,7 +69,6 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); - TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1); TEST_UTIL.startMiniCluster(); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index a198d20..9b71595 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -83,7 +82,6 @@ public class TestReplicationAdmin { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); - TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1); TEST_UTIL.startMiniCluster(); admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); hbaseAdmin = TEST_UTIL.getAdmin(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 14c5e56..540cd96 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -49,8 +49,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId, - UUID clusterId, ReplicationEndpoint replicationEndpoint, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; this.metrics = metrics; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 48d8924..9d12c07 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -32,28 +32,28 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.replication.regionserver.Replication; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALKeyImpl; -import org.apache.hadoop.hbase.wal.WALProvider; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -167,7 +167,7 @@ public class TestReplicationSource { ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null, - replicationEndpoint, p -> OptionalLong.empty(), null); + p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(new Runnable() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index ed0814d..83af5e3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; @@ -714,8 +713,8 @@ public abstract class TestReplicationSourceManager { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId, - UUID clusterId, ReplicationEndpoint replicationEndpoint, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { throw new IOException("Failing deliberately"); } } -- 2.7.4