diff --git hbase-protocol/src/main/protobuf/ZooKeeper.proto hbase-protocol/src/main/protobuf/ZooKeeper.proto index 082e1f7..0e82238 100644 --- hbase-protocol/src/main/protobuf/ZooKeeper.proto +++ hbase-protocol/src/main/protobuf/ZooKeeper.proto @@ -113,6 +113,9 @@ message ReplicationPeer { // clusterkey is the concatenation of the slave cluster's // hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent required string clusterkey = 1; + optional string replicationConsumerImpl = 2; + repeated BytesBytesPair data = 3; + repeated NameStringPair configuration = 4; } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationConsumer.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationConsumer.java new file mode 100644 index 0000000..efc4c0d --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationConsumer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.wal.HLog; + +public interface ReplicationConsumer extends Stoppable { + + class Context { + public final Configuration conf; + public final FileSystem fs; + public final Stoppable stopper; + public final ReplicationPeers replicationPeers; + public final String peerId; + public final UUID clusterId; + + public Context(final Configuration conf, final FileSystem fs, + final Stoppable stopper, final ReplicationPeers replicationPeers, + final String peerId, final UUID clusterId) { + this.conf = conf; + this.fs = fs; + this.stopper = stopper; + this.replicationPeers = replicationPeers; + this.clusterId = clusterId; + this.peerId = peerId; + } + } + + void init(Context context) throws IOException; + + void start() throws IOException; + + void shipEdits(boolean currentWALisBeingWrittenTo, List entries); +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseClusterReplicator.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseClusterReplicator.java new file mode 100644 index 0000000..675651d --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseClusterReplicator.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationConsumer; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.apache.hadoop.ipc.RemoteException; + +/** + * A Replicator implementation for replicating to another HBase cluster. + */ +public class HBaseClusterReplicator implements ReplicationConsumer { + + public static final Log LOG = LogFactory.getLog(HBaseClusterReplicator.class); + private HConnection conn; + + private Configuration conf; + private Context ctx; + + // How long should we sleep for each retry + private long sleepForRetries; + + // total number of edits we replicated + private long totalReplicatedEdits = 0; + // total number of edits we replicated + private long totalReplicatedOperations = 0; + // The znode we currently play with + private String peerClusterZnode; + // Maximum number of retries before taking bold actions + private int maxRetriesMultiplier; + // Socket timeouts require even bolder actions since we don't want to DDOS + private int socketTimeoutMultiplier; + // Current number of operations (Put/Delete) that we need to replicate + private int currentNbOperations = 0; + // Current size of data we need to replicate + private int currentSize = 0; + //Metrics for this source + private MetricsSource metrics; + // Handles connecting to peer region servers + private ReplicationSinkManager replicationSinkMgr; + + @Override + public void init(Context context) throws IOException { + this.ctx = context; + this.conf = HBaseConfiguration.create(ctx.conf); + decorateConf(); + this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); + this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", + maxRetriesMultiplier * maxRetriesMultiplier); + // TODO: This connection is replication specific or we should make it particular to + // replication and make replication specific settings such as compression or codec to use + // passing Cells. + this.conn = HConnectionManager.getConnection(this.conf); + this.sleepForRetries = + this.conf.getLong("replication.source.sleepforretries", 1000); + this.metrics = new MetricsSource(peerClusterZnode); + // ReplicationQueueInfo parses the peerId out of the znode for us + this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.peerId, ctx.replicationPeers, this.conf); + } + + private void decorateConf() { + String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); + if (StringUtils.isNotEmpty(replicationCodec)) { + this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); + } + } + + @Override + public void start() { + connectToPeers(); + } + + private void connectToPeers() { + int sleepMultiplier = 1; + + // Connect to peer cluster first, unless we have to stop + while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) { + replicationSinkMgr.chooseSinks(); + if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) { + if (sleepForRetries("Waiting for peers", sleepMultiplier)) { + sleepMultiplier++; + } + } + } + } + + /** + * Do the sleeping logic + * @param msg Why we sleep + * @param sleepMultiplier by how many times the default sleeping time is augmented + * @return True if sleepMultiplier is < maxRetriesMultiplier + */ + protected boolean sleepForRetries(String msg, int sleepMultiplier) { + try { + if (LOG.isTraceEnabled()) { + LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); + } + Thread.sleep(this.sleepForRetries * sleepMultiplier); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping between retries"); + } + return sleepMultiplier < maxRetriesMultiplier; + } + + /** + * We only want KVs that are scoped other than local + * @param entry The entry to check for replication + */ + protected void removeNonReplicableEdits(HLog.Entry entry) { + NavigableMap scopes = entry.getKey().getScopes(); + ArrayList kvs = entry.getEdit().getKeyValues(); + int size = kvs.size(); + for (int i = size-1; i >= 0; i--) { + KeyValue kv = kvs.get(i); + // The scope will be null or empty if + // there's nothing to replicate in that WALEdit + if (scopes == null || !scopes.containsKey(kv.getFamily())) { + kvs.remove(i); + } + } + if (kvs.size() < size/2) { + kvs.trimToSize(); + } + } + + /** + * Count the number of different row keys in the given edit because of + * mini-batching. We assume that there's at least one KV in the WALEdit. + * @param edit edit to count row keys from + * @return number of different row keys + */ + private int countDistinctRowKeys(WALEdit edit) { + List kvs = edit.getKeyValues(); + int distinctRowKeys = 1; + KeyValue lastKV = kvs.get(0); + for (int i = 0; i < edit.size(); i++) { + if (!kvs.get(i).matchingRow(lastKV)) { + distinctRowKeys++; + } + } + return distinctRowKeys; + } + + /** + * Do the shipping logic + * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) + * written to when this method was called + */ + @Override + public void shipEdits(boolean currentWALisBeingWrittenTo, List entries) { + int sleepMultiplier = 1; + while (this.isActive()) { + if (!isPeerEnabled()) { + if (sleepForRetries("Replication is disabled", sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } + SinkPeer sinkPeer = null; + try { + sinkPeer = replicationSinkMgr.getReplicationSink(); + BlockingInterface rrs = sinkPeer.getRegionServer(); + if (LOG.isTraceEnabled()) { + LOG.trace("Replicating " + entries.size() + + " entries of total size " + currentSize); + } + ReplicationProtbufUtil.replicateWALEntry(rrs, + entries.toArray(new HLog.Entry[entries.size()])); + + // update metrics + this.totalReplicatedEdits += entries.size(); + this.totalReplicatedOperations += currentNbOperations; + this.metrics.shipBatch(this.currentNbOperations); + this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); + if (LOG.isTraceEnabled()) { + LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or " + + this.totalReplicatedOperations + " operations"); + } + break; + + } catch (IOException ioe) { + // Didn't ship anything, but must still age the last time we did + this.metrics.refreshAgeOfLastShippedOp(); + if (ioe instanceof RemoteException) { + ioe = ((RemoteException) ioe).unwrapRemoteException(); + LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); + if (ioe instanceof TableNotFoundException) { + if (sleepForRetries("A table is missing in the peer cluster. " + + "Replication cannot proceed without losing data.", sleepMultiplier)) { + sleepMultiplier++; + } + } + } else { + if (ioe instanceof SocketTimeoutException) { + // This exception means we waited for more than 60s and nothing + // happened, the cluster is alive and calling it right away + // even for a test just makes things worse. + sleepForRetries("Encountered a SocketTimeoutException. Since the " + + "call to the remote cluster timed out, which is usually " + + "caused by a machine failure or a massive slowdown", + this.socketTimeoutMultiplier); + } else if (ioe instanceof ConnectException) { + LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); + replicationSinkMgr.chooseSinks(); + } else { + LOG.warn("Can't replicate because of a local or network error: ", ioe); + } + } + + if (sinkPeer != null) { + replicationSinkMgr.reportBadSink(sinkPeer); + } + if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { + sleepMultiplier++; + } + } + } + } + + protected boolean isPeerEnabled() { + return ctx.replicationPeers.getStatusOfConnectedPeer(ctx.peerId); + } + + private boolean isActive() { + return !ctx.stopper.isStopped(); + } + + @Override + public void stop(String why) { + if (this.conn != null) { + try { + this.conn.close(); + this.conn = null; + } catch (IOException e) { + LOG.debug("Attempt to close connection failed", e); + } + } + } + + @Override + public boolean isStopped() { + return ctx.stopper.isStopped(); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 020b30f..3438cf4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; -import java.net.ConnectException; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -39,16 +37,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -56,9 +48,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.apache.hadoop.hbase.replication.ReplicationConsumer; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.ipc.RemoteException; /** * Class that handles the source of a replication stream. @@ -74,14 +65,14 @@ import org.apache.hadoop.ipc.RemoteException; */ @InterfaceAudience.Private public class ReplicationSource extends Thread - implements ReplicationSourceInterface { + implements ReplicationSourceInterface, Stoppable { public static final Log LOG = LogFactory.getLog(ReplicationSource.class); // Queue of logs to process private PriorityBlockingQueue queue; - private HConnection conn; private ReplicationQueues replicationQueues; private ReplicationPeers replicationPeers; + private Configuration conf; private ReplicationQueueInfo replicationQueueInfo; // id of the peer cluster this source replicates to @@ -115,8 +106,6 @@ public class ReplicationSource extends Thread private String peerClusterZnode; // Maximum number of retries before taking bold actions private int maxRetriesMultiplier; - // Socket timeouts require even bolder actions since we don't want to DDOS - private int socketTimeoutMultiplier; // Current number of operations (Put/Delete) that we need to replicate private int currentNbOperations = 0; // Current size of data we need to replicate @@ -127,10 +116,10 @@ public class ReplicationSource extends Thread private MetricsSource metrics; // Handle on the log reader helper private ReplicationHLogReaderManager repLogReader; - // Handles connecting to peer region servers - private ReplicationSinkManager replicationSinkMgr; //WARN threshold for the number of queued logs, defaults to 2 private int logQueueWarnThreshold; + // ReplicationConsumer which will handle the actual replication + private ReplicationConsumer replicationConsumer; /** * Instantiation method used by region servers @@ -142,28 +131,25 @@ public class ReplicationSource extends Thread * @param peerClusterZnode the name of our znode * @throws IOException */ + @Override public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, - final String peerClusterZnode, final UUID clusterId) throws IOException { + final String peerClusterZnode, final UUID clusterId, ReplicationConsumer replicationConsumer) throws IOException { this.stopper = stopper; - this.conf = HBaseConfiguration.create(conf); + this.conf = conf; decorateConf(); this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); - this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", - maxRetriesMultiplier * maxRetriesMultiplier); + this.queue = new PriorityBlockingQueue( this.conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); - // TODO: This connection is replication specific or we should make it particular to - // replication and make replication specific settings such as compression or codec to use - // passing Cells. - this.conn = HConnectionManager.getConnection(this.conf); + this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; @@ -178,8 +164,9 @@ public class ReplicationSource extends Thread this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); - this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); + this.replicationConsumer = replicationConsumer; + this.replicationConsumer.init(new ReplicationConsumer.Context(conf, fs, this, replicationPeers, peerId, clusterId)); } private void decorateConf() { @@ -203,7 +190,13 @@ public class ReplicationSource extends Thread @Override public void run() { - connectToPeers(); + try { + replicationConsumer.start(); + } catch (IOException ex) { + LOG.warn("Error starting ReplicationConsumer, exiting", ex); + throw new RuntimeException(ex); + } + // We were stopped while looping to connect to sinks, just abort if (!this.isActive()) { metrics.clear(); @@ -359,13 +352,8 @@ public class ReplicationSource extends Thread sleepMultiplier = 1; shipEdits(currentWALisBeingWrittenTo, entries); } - if (this.conn != null) { - try { - this.conn.close(); - } catch (IOException e) { - LOG.debug("Attempt to close connection failed", e); - } - } + replicationConsumer.stop("exiting"); + LOG.debug("Source exiting " + this.peerId); metrics.clear(); } @@ -433,20 +421,6 @@ public class ReplicationSource extends Thread return seenEntries == 0 && processEndOfFile(); } - private void connectToPeers() { - int sleepMultiplier = 1; - - // Connect to peer cluster first, unless we have to stop - while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) { - replicationSinkMgr.chooseSinks(); - if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) { - if (sleepForRetries("Waiting for peers", sleepMultiplier)) { - sleepMultiplier++; - } - } - } - } - /** * Poll for the next path * @return true if a path was obtained, false if not @@ -647,88 +621,29 @@ public class ReplicationSource extends Thread * written to when this method was called */ protected void shipEdits(boolean currentWALisBeingWrittenTo, List entries) { - int sleepMultiplier = 1; if (entries.isEmpty()) { LOG.warn("Was given 0 edits to ship"); return; } while (this.isActive()) { - if (!isPeerEnabled()) { - if (sleepForRetries("Replication is disabled", sleepMultiplier)) { - sleepMultiplier++; - } - continue; - } - SinkPeer sinkPeer = null; try { - sinkPeer = replicationSinkMgr.getReplicationSink(); - BlockingInterface rrs = sinkPeer.getRegionServer(); - if (LOG.isTraceEnabled()) { - LOG.trace("Replicating " + entries.size() + - " entries of total size " + currentSize); - } - ReplicationProtbufUtil.replicateWALEntry(rrs, - entries.toArray(new HLog.Entry[entries.size()])); + // send the edits to the consumer. Will block until the edits are actually sent + replicationConsumer.shipEdits(currentWALisBeingWrittenTo, entries); + if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.repLogReader.getPosition(), - this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); + this.peerClusterZnode, this.repLogReader.getPosition(), + this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); this.lastLoggedPosition = this.repLogReader.getPosition(); } - this.totalReplicatedEdits += entries.size(); - this.totalReplicatedOperations += currentNbOperations; - this.metrics.shipBatch(this.currentNbOperations); - this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); - if (LOG.isTraceEnabled()) { - LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or " - + this.totalReplicatedOperations + " operations"); - } - break; - } catch (IOException ioe) { - // Didn't ship anything, but must still age the last time we did - this.metrics.refreshAgeOfLastShippedOp(); - if (ioe instanceof RemoteException) { - ioe = ((RemoteException) ioe).unwrapRemoteException(); - LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); - if (ioe instanceof TableNotFoundException) { - if (sleepForRetries("A table is missing in the peer cluster. " - + "Replication cannot proceed without losing data.", sleepMultiplier)) { - sleepMultiplier++; - } - } - } else { - if (ioe instanceof SocketTimeoutException) { - // This exception means we waited for more than 60s and nothing - // happened, the cluster is alive and calling it right away - // even for a test just makes things worse. - sleepForRetries("Encountered a SocketTimeoutException. Since the " + - "call to the remote cluster timed out, which is usually " + - "caused by a machine failure or a massive slowdown", - this.socketTimeoutMultiplier); - } else if (ioe instanceof ConnectException) { - LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); - replicationSinkMgr.chooseSinks(); - } else { - LOG.warn("Can't replicate because of a local or network error: ", ioe); - } - } + break; + } finally { - if (sinkPeer != null) { - replicationSinkMgr.reportBadSink(sinkPeer); - } - if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { - sleepMultiplier++; - } } } } - /** - * check whether the peer is enabled or not - * - * @return true if the peer is enabled, otherwise false - */ protected boolean isPeerEnabled() { return this.replicationPeers.getStatusOfConnectedPeer(this.peerId); } @@ -764,10 +679,12 @@ public class ReplicationSource extends Thread return false; } + @Override public void startup() { String n = Thread.currentThread().getName(); Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { + @Override public void uncaughtException(final Thread t, final Throwable e) { LOG.error("Unexpected exception in ReplicationSource," + " currentPath=" + currentPath, e); @@ -778,10 +695,12 @@ public class ReplicationSource extends Thread this.peerClusterZnode, handler); } + @Override public void terminate(String reason) { terminate(reason, null); } + @Override public void terminate(String reason, Exception cause) { if (cause == null) { LOG.info("Closing source " @@ -791,18 +710,34 @@ public class ReplicationSource extends Thread LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason, cause); } + if (this.replicationConsumer != null) { + this.replicationConsumer.stop(reason); + } this.running = false; Threads.shutdown(this, this.sleepForRetries); } + @Override + public void stop(String why) { + terminate(why); + } + + @Override + public boolean isStopped() { + return !isActive(); + }; + + @Override public String getPeerClusterZnode() { return this.peerClusterZnode; } + @Override public String getPeerClusterId() { return this.peerId; } + @Override public Path getCurrentPath() { return this.currentPath; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index df599f0..6d5f3f9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.replication.ReplicationConsumer; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -50,7 +51,7 @@ public interface ReplicationSourceInterface { public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, - final String peerClusterZnode, final UUID clusterId) throws IOException; + final String peerClusterZnode, final UUID clusterId, ReplicationConsumer replicationConsumer) throws IOException; /** * Add a log to the list of logs to replicate diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index b9ac55b..823d10c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -42,13 +42,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.replication.ReplicationConsumer; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.zookeeper.KeeperException; - import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -351,7 +350,11 @@ public class ReplicationSourceManager implements ReplicationListener { src = new ReplicationSource(); } - src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId); + + // TODO: make pluggable + ReplicationConsumer replicationConsumer = new HBaseClusterReplicator(); + + src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId, replicationConsumer); return src; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index d0868d6..97fb8b3 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -41,7 +39,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, - UUID clusterId) throws IOException { + UUID clusterId, ReplicationConsumer replicationConsumer) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId;