From 5a328befb0a0cbf9444039d45d7d2f84e02794d9 Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Thu, 19 May 2016 17:14:33 -0700 Subject: [PATCH] Implemented ReplicationQueuesHBaseImpl that tracks WAL offsets and replication queues in an HBase table. Only wrote the basic tracking methods, have not implemented claimQueue() or HFileRef methods yet. Wrote a basic unit test for ReplicationQueueHBaseImpl that tests the implemented functions on a single Region Server --- .../hbase/replication/ReplicationFactory.java | 11 +- .../hbase/replication/ReplicationQueues.java | 8 +- .../replication/ReplicationQueuesArguments.java | 66 +++ .../replication/ReplicationQueuesHBaseImpl.java | 491 +++++++++++++++++++++ .../hbase/replication/ReplicationQueuesZKImpl.java | 13 +- .../replication/regionserver/Replication.java | 12 +- .../regionserver/ReplicationSourceManager.java | 5 +- .../client/replication/TestReplicationAdmin.java | 3 +- .../hbase/master/cleaner/TestLogsCleaner.java | 3 +- .../cleaner/TestReplicationHFileCleaner.java | 4 +- .../replication/TestReplicationStateBasic.java | 2 +- .../replication/TestReplicationStateHBaseImpl.java | 243 ++++++++++ .../replication/TestReplicationStateZKImpl.java | 13 +- .../regionserver/TestReplicationSourceManager.java | 36 +- .../hadoop/hbase/util/TestHBaseFsckOneRS.java | 4 +- 15 files changed, 871 insertions(+), 43 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 91e77ca..e264a4d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication; +import org.apache.commons.lang.reflect.ConstructorUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; @@ -30,9 +31,11 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @InterfaceAudience.Private public class ReplicationFactory { - public static ReplicationQueues getReplicationQueues(final ZooKeeperWatcher zk, - Configuration conf, Abortable abortable) { - return new ReplicationQueuesZKImpl(zk, conf, abortable); + public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args) + throws Exception { + Class classToBuild = args.getConf().getClass("hbase.region.replica." + + "replication.ReplicationQueuesType", ReplicationQueuesZKImpl.class); + return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args); } public static ReplicationQueuesClient getReplicationQueuesClient(final ZooKeeperWatcher zk, @@ -44,7 +47,7 @@ public class ReplicationFactory { Abortable abortable) { return getReplicationPeers(zk, conf, null, abortable); } - + public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, final ReplicationQueuesClient queuesClient, Abortable abortable) { return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 0d47a88..db6da91 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -83,13 +83,13 @@ public interface ReplicationQueues { /** * Get a list of all WALs in the given queue. * @param queueId a String that identifies the queue - * @return a list of WALs, null if this region server is dead and has no outstanding queues + * @return a list of WALs, null if no such queue exists for this server */ List getLogsInQueue(String queueId); /** * Get a list of all queues for this region server. - * @return a list of queueIds, null if this region server is dead and has no outstanding queues + * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues */ List getAllQueues(); @@ -110,10 +110,10 @@ public interface ReplicationQueues { /** * Checks if the provided znode is the same as this region server's - * @param znode to check + * @param regionserver the id of the region server * @return if this is this rs's znode */ - boolean isThisOurZnode(String znode); + boolean isThisOurRegionServer(String regionserver); /** * Add a peer to hfile reference queue if peer does not exist. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java new file mode 100644 index 0000000..4907b73 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +@InterfaceAudience.Private +public class ReplicationQueuesArguments { + + private ZooKeeperWatcher zk; + private Configuration conf; + private Abortable abort; + + public ReplicationQueuesArguments(Configuration conf, Abortable abort) { + this.conf = conf; + this.abort = abort; + } + + public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZooKeeperWatcher zk) { + this(conf, abort); + setZk(zk); + } + + public ZooKeeperWatcher getZk() { + return zk; + } + + public void setZk(ZooKeeperWatcher zk) { + this.zk = zk; + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Abortable getAbort() { + return abort; + } + + public void setAbort(Abortable abort) { + this.abort = abort; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java new file mode 100644 index 0000000..bbc9e32 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java @@ -0,0 +1,491 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.SortedSet; + +@InterfaceAudience.Private +public class ReplicationQueuesHBaseImpl implements ReplicationQueues { + + /** Name of the HBase Table used for tracking replication*/ + public static final TableName REPLICATION_TABLE_NAME = + TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); + + // Column family and column names for the Replication Table + private static final byte[] CF = Bytes.toBytes("r"); + private static final byte[] COL_OWNER = Bytes.toBytes("o"); + private static final byte[] COL_QUEUE_ID = Bytes.toBytes("q"); + + // Column Descriptor for the Replication Table + private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR = + new HColumnDescriptor(CF).setMaxVersions(1) + .setInMemory(true) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + // TODO: Figure out which bloom filter to use + .setBloomFilterType(BloomType.NONE) + .setCacheDataInL1(true); + + // Common byte values used in replication offset tracking + private static final byte[] INITIAL_OFFSET = Bytes.toBytes(0L); + + /* + * Make sure that HBase table operations for replication have a high number of retries. This is + * because the server is aborted if any HBase table operation fails. Each RPC will be attempted + * 3600 times before exiting. This provides each operation with 2 hours of retries + * before the server is aborted. + */ + private static final int CLIENT_RETRIES = 3600; + private static final int RPC_TIMEOUT = 2000; + private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; + + private final Configuration conf; + private final Admin admin; + private final Connection connection; + private final Table replicationTable; + private final Abortable abortable; + private String serverName = null; + private byte[] serverNameBytes = null; + + public ReplicationQueuesHBaseImpl(ReplicationQueuesArguments args) throws IOException { + this(args.getConf(), args.getAbort()); + } + + public ReplicationQueuesHBaseImpl(Configuration conf, Abortable abort) throws IOException { + this.conf = new Configuration(conf); + // Modify the connection's config so that the Replication Table it returns has a much higher + // number of client retries + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); + this.connection = ConnectionFactory.createConnection(conf); + this.admin = connection.getAdmin(); + this.abortable = abort; + replicationTable = createAndGetReplicationTable(); + replicationTable.setRpcTimeout(RPC_TIMEOUT); + replicationTable.setOperationTimeout(OPERATION_TIMEOUT); + } + + @Override + public void init(String serverName) throws ReplicationException { + this.serverName = serverName; + this.serverNameBytes = Bytes.toBytes(serverName); + } + + @Override + public void removeQueue(String queueId) { + try { + byte[] rowKey = this.queueIdToRowKey(queueId); + // The rowkey will be null if the queue cannot be found in the Replication Table + if (rowKey == null) { + String errMsg = "Could not remove non-existent queue with queueId=" + queueId; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return; + } + Delete deleteQueue = new Delete(rowKey); + safeQueueUpdate(deleteQueue); + } catch (IOException e) { + abortable.abort("Could not remove queue with queueId=" + queueId, e); + } + } + + @Override + public void addLog(String queueId, String filename) throws ReplicationException { + try { + // Check if the queue info (Owner, QueueId) is currently stored in the Replication Table + if (this.queueIdToRowKey(queueId) == null) { + // Each queue will have an Owner, QueueId, and a collection of [WAL:offset] key values. + Put putNewQueue = new Put(Bytes.toBytes(buildServerQueueName(queueId))); + putNewQueue.addColumn(CF, COL_OWNER, Bytes.toBytes(serverName)); + putNewQueue.addColumn(CF, COL_QUEUE_ID, Bytes.toBytes(queueId)); + putNewQueue.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET); + replicationTable.put(putNewQueue); + } else { + // Otherwise simply add the new log and offset as a new column + Put putNewLog = new Put(this.queueIdToRowKey(queueId)); + putNewLog.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET); + safeQueueUpdate(putNewLog); + } + } catch (IOException e) { + abortable.abort("Could not add queue queueId=" + queueId + " filename=" + filename, e); + } + } + + @Override + public void removeLog(String queueId, String filename) { + try { + byte[] rowKey = this.queueIdToRowKey(queueId); + if (rowKey == null) { + String errMsg = "Could not remove log from non-existent queueId=" + queueId + ", filename=" + + filename; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return; + } + Delete delete = new Delete(rowKey); + delete.addColumns(CF, Bytes.toBytes(filename)); + safeQueueUpdate(delete); + } catch (IOException e) { + abortable.abort("Could not remove log from queueId=" + queueId + ", filename=" + filename, e); + } + } + + @Override + public void setLogPosition(String queueId, String filename, long position) { + try { + byte[] rowKey = this.queueIdToRowKey(queueId); + if (rowKey == null) { + String errMsg = "Could not set position of log from non-existent queueId=" + queueId + + ", filename=" + filename; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return; + } + // Check that the log exists. addLog() must have been called before setLogPosition(). + Get checkLogExists = new Get(rowKey); + checkLogExists.addColumn(CF, Bytes.toBytes(filename)); + if (!replicationTable.exists(checkLogExists)) { + String errMsg = "Could not set position of non-existent log from queueId=" + queueId + + ", filename=" + filename; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return; + } + // Update the log offset if it exists + Put walAndOffset = new Put(rowKey); + walAndOffset.addColumn(CF, Bytes.toBytes(filename), Bytes.toBytes(position)); + safeQueueUpdate(walAndOffset); + } catch (IOException e) { + abortable.abort("Failed to write replication wal position (filename=" + filename + + ", position=" + position + ")", e); + } + } + + @Override + public long getLogPosition(String queueId, String filename) throws ReplicationException { + try { + byte[] rowKey = this.queueIdToRowKey(queueId); + if (rowKey == null) { + throw new ReplicationException("Could not get position in log for non-existent queue " + + "queueId=" + queueId + ", filename=" + filename); + } + Get getOffset = new Get(rowKey); + getOffset.addColumn(CF, Bytes.toBytes(filename)); + Result result = replicationTable.get(getOffset); + if (result.isEmpty()) { + throw new ReplicationException("Could not read empty result while getting log position " + + "queueId=" + queueId + ", filename=" + filename); + } + return Bytes.toLong(result.getValue(CF, Bytes.toBytes(filename))); + } catch (IOException e) { + throw new ReplicationException("Could not get position in log for queueId=" + queueId + + ", filename=" + filename); + } + } + + @Override + public void removeAllQueues() { + List myQueueIds = getAllQueues(); + for (String queueId : myQueueIds) { + removeQueue(queueId); + } + } + + @Override + public List getLogsInQueue(String queueId) { + List logs = new ArrayList(); + try { + byte[] rowKey = this.queueIdToRowKey(queueId); + if (rowKey == null) { + String errMsg = "Could not get logs from non-existent queueId=" + queueId; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return null; + } + Get getQueue = new Get(rowKey); + Result queue = replicationTable.get(getQueue); + if (queue.isEmpty()) { + return null; + } + Map familyMap = queue.getFamilyMap(CF); + for (byte[] cQualifier : familyMap.keySet()) { + if (Arrays.equals(cQualifier, COL_OWNER) || Arrays.equals(cQualifier, COL_QUEUE_ID)) { + continue; + } + logs.add(Bytes.toString(cQualifier)); + } + } catch (IOException e) { + abortable.abort("Could not get logs from queue queueId=" + queueId, e); + return null; + } + return logs; + } + + @Override + public List getAllQueues() { + try { + return this.getQueuesBelongingToServer(serverName); + } catch (IOException e) { + abortable.abort("Could not get all replication queues", e); + return null; + } + } + + @Override + public SortedMap> claimQueues(String regionserver) { + // TODO + throw new NotImplementedException(); + } + + @Override + public List getListOfReplicators() { + // TODO + throw new NotImplementedException(); + } + + @Override + public boolean isThisOurRegionServer(String regionserver) { + return this.serverName.equals(regionserver); + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + // TODO + throw new NotImplementedException(); + } + + @Override + public void addHFileRefs(String peerId, List files) throws ReplicationException { + // TODO + throw new NotImplementedException(); + } + + @Override + public void removeHFileRefs(String peerId, List files) { + // TODO + throw new NotImplementedException(); + } + + /** + * Gets the Replication Table. Builds and blocks until the table is available if the Replication + * Table does not exist. + * + * @return the Replication Table + * @throws IOException if the Replication Table takes too long to build + */ + private Table createAndGetReplicationTable() throws IOException { + if (!replicationTableExists()) { + createReplicationTable(); + } + int maxRetries = conf.getInt("replication.queues.createtable.retries.number", 100); + RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 100); + RetryCounter retryCounter = counterFactory.create(); + while (!replicationTableExists()) { + try { + retryCounter.sleepUntilNextRetry(); + if (!retryCounter.shouldRetry()) { + throw new IOException("Unable to acquire the Replication Table"); + } + } catch (InterruptedException e) { + return null; + } + } + return connection.getTable(REPLICATION_TABLE_NAME); + } + + /** + * Checks whether the Replication Table exists yet + * + * @return whether the Replication Table exists + * @throws IOException + */ + private boolean replicationTableExists() { + try { + return admin.tableExists(REPLICATION_TABLE_NAME); + } catch (IOException e) { + return false; + } + } + + /** + * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR + * in ReplicationQueuesHBaseImpl + * @throws IOException + */ + private void createReplicationTable() throws IOException { + HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); + replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); + admin.createTable(replicationTableDescriptor); + } + + /** + * Builds the unique identifier for a queue in the Replication table by appending the queueId to + * the servername + * + * @param queueId a String that identifies the queue + * @return unique identifier for a queue in the Replication table + */ + private String buildServerQueueName(String queueId) { + return serverName + "-" + queueId; + } + + /** + * See safeQueueUpdate(RowMutations mutate) + * @param put Row mutation to perform on the queue + */ + private void safeQueueUpdate(Put put) { + RowMutations mutations = new RowMutations(put.getRow()); + try { + mutations.add(put); + } catch (IOException e){ + abortable.abort("Failed to update Replication Table because of IOException", e); + } + safeQueueUpdate(mutations); + } + + /** + * See safeQueueUpdate(RowMutations mutate) + * @param delete Row mutation to perform on the queue + */ + private void safeQueueUpdate(Delete delete) { + RowMutations mutations = new RowMutations(delete.getRow()); + try { + mutations.add(delete); + } catch (IOException e) { + abortable.abort("Failed to update Replication Table because of IOException", e); + } + safeQueueUpdate(mutations); + } + + /** + * Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column + * of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost + * ownership of the column or an IO Exception has occurred during the transaction. + * + * @param mutate Mutation to perform on a given queue + */ + private void safeQueueUpdate(RowMutations mutate) { + try { + boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF, COL_OWNER, + CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate); + if (!updateSuccess) { + String errMsg = "Failed to update Replication Table because we lost queue ownership"; + abortable.abort(errMsg, new ReplicationException(errMsg)); + } + } catch (IOException e) { + abortable.abort("Failed to update Replication Table because of IOException", e); + } + } + + /** + * Get the QueueIds belonging to the named server from the ReplicationTable + * + * @param server name of the server + * @return a list of the QueueIds belonging to the server + * @throws IOException + */ + private List getQueuesBelongingToServer(String server) throws IOException { + List queues = new ArrayList(); + Scan scan = new Scan(); + SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF, COL_OWNER, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server)); + scan.setFilter(filterMyQueues); + scan.addColumn(CF, COL_QUEUE_ID); + scan.addColumn(CF, COL_OWNER); + ResultScanner results = replicationTable.getScanner(scan); + for (Result result : results) { + queues.add(Bytes.toString(result.getValue(CF, COL_QUEUE_ID))); + } + results.close(); + return queues; + } + + /** + * Finds the row key of the HBase row corresponding to the provided queue. This has to be done, + * because the row key is [original server name + "-" + queueId0]. And the original server will + * make calls to getLog(), getQueue(), etc. with the argument queueId = queueId0. + * On the original server we can build the row key by concatenating servername + queueId0. + * Yet if the queue is claimed by another server, future calls to getLog(), getQueue(), etc. + * will be made with the argument queueId = queueId0 + "-" + pastOwner0 + "-" + pastOwner1 ... + * so we need a way to look up rows by their modified queueId's. + * + * TODO: Consider updating the queueId passed to getLog, getQueue()... inside of ReplicationSource + * TODO: and ReplicationSourceManager or the parsing of the passed in queueId's so that we don't + * TODO have to scan the table for row keys for each update. See HBASE-15956. + * + * TODO: We can also cache queueId's if ReplicationQueuesHBaseImpl becomes a bottleneck. We + * TODO: currently perform scan's over all the rows looking for one with a matching QueueId. + * + * @param queueId string representation of the queue id + * @return the rowkey of the corresponding queue. This returns null if the corresponding queue + * cannot be found. + * @throws IOException + */ + private byte[] queueIdToRowKey(String queueId) throws IOException { + Scan scan = new Scan(); + scan.addColumn(CF, COL_QUEUE_ID); + scan.addColumn(CF, COL_OWNER); + scan.setMaxResultSize(1); + // Search for the queue that matches this queueId + SingleColumnValueFilter filterByQueueId = new SingleColumnValueFilter(CF, COL_QUEUE_ID, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(queueId)); + // Make sure that we are the owners of the queue. QueueId's may overlap. + SingleColumnValueFilter filterByOwner = new SingleColumnValueFilter(CF, COL_OWNER, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(serverName)); + // We only want the row key + FirstKeyOnlyFilter filterOutColumns = new FirstKeyOnlyFilter(); + FilterList filterList = new FilterList(filterByQueueId, filterByOwner, filterOutColumns); + scan.setFilter(filterList); + ResultScanner results = replicationTable.getScanner(scan); + Result result = results.next(); + results.close(); + return (result == null) ? null : result.getRow(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 2bb8ea8..32d0883 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -41,7 +41,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; /** - * This class provides an implementation of the ReplicationQueues interface using ZooKeeper. The + * This class provides an implementation of the + * interface using ZooKeeper. The * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is * the regionserver name (a concatenation of the region server’s hostname, client port and start @@ -71,6 +72,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); + public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) { + this(args.getZk(), args.getConf(), args.getAbort()); + } + public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { super(zk, conf, abortable); @@ -166,8 +171,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override - public boolean isThisOurZnode(String znode) { - return ZKUtil.joinZNode(this.queuesZNode, znode).equals(this.myQueuesZnode); + public boolean isThisOurRegionServer(String regionserver) { + return ZKUtil.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode); } @Override @@ -223,7 +228,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R this.abortable.abort("Failed to get a list of queues for region server: " + this.myQueuesZnode, e); } - return listOfQueues; + return listOfQueues == null ? new ArrayList() : listOfQueues; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index fa5e222..d55472d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -48,16 +48,17 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.zookeeper.KeeperException; @@ -127,7 +128,8 @@ public class Replication extends WALActionsListener.Base implements if (replication) { try { this.replicationQueues = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server, + server.getZooKeeper())); this.replicationQueues.init(this.server.getServerName().toString()); this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server); @@ -135,7 +137,7 @@ public class Replication extends WALActionsListener.Base implements this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, this.conf, this.server, this.server); - } catch (ReplicationException e) { + } catch (Exception e) { throw new IOException("Failed replication handler create", e); } UUID clusterId = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index b585513..ed2eecc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -315,9 +315,6 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void join() { this.executor.shutdown(); - if (this.sources.size() == 0) { - this.replicationQueues.removeAllQueues(); - } for (ReplicationSourceInterface source : this.sources) { source.terminate("Region server is closing"); } @@ -624,7 +621,7 @@ public class ReplicationSourceManager implements ReplicationListener { @Override public void run() { - if (this.rq.isThisOurZnode(rsZnode)) { + if (this.rq.isThisOurRegionServer(rsZnode)) { return; } // Wait a bit before transferring the queues, we may be shutting down. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index c3241c9..06a3c7e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -160,7 +161,7 @@ public class TestReplicationAdmin { Configuration conf = TEST_UTIL.getConfiguration(); ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(zkw, conf, null); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw)); repQueues.init("server1"); // add queue for ID_ONE diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 47db32b..18950a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.regionserver.Replication; @@ -94,7 +95,7 @@ public class TestLogsCleaner { Replication.decorateMasterConfiguration(conf); Server server = new DummyServer(); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); repQueues.init(server.getServerName().toString()); final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index d4f23c8..1778e73 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; @@ -87,8 +88,7 @@ public class TestReplicationHFileCleaner { Replication.decorateMasterConfiguration(conf); rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server); rp.init(); - - rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); + rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); rq.init(server.getServerName().toString()); try { fs = FileSystem.get(conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 144046f4..5ab26ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -121,7 +121,7 @@ public abstract class TestReplicationStateBasic { rq1.removeQueue("bogus"); rq1.removeLog("bogus", "bogus"); rq1.removeAllQueues(); - assertNull(rq1.getAllQueues()); + assertEquals(0, rq1.getAllQueues().size()); assertEquals(0, rq1.getLogPosition("bogus", "bogus")); assertNull(rq1.getLogsInQueue("bogus")); assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java new file mode 100644 index 0000000..8186213 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static junit.framework.TestCase.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({ReplicationTests.class, MediumTests.class}) +public class TestReplicationStateHBaseImpl { + + private static Configuration conf; + private static HBaseTestingUtility utility; + private static Connection connection; + private static ReplicationQueues rqH; + + private final String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + utility = new HBaseTestingUtility(); + utility.startMiniCluster(); + conf = utility.getConfiguration(); + conf.setClass("hbase.region.replica.replication.ReplicationQueuesType", + ReplicationQueuesHBaseImpl.class, ReplicationQueues.class); + connection = ConnectionFactory.createConnection(conf); + } + + @Test + public void checkNamingSchema() throws Exception { + rqH.init(server1); + assertTrue(rqH.isThisOurRegionServer(server1)); + assertTrue(!rqH.isThisOurRegionServer(server1 + "a")); + assertTrue(!rqH.isThisOurRegionServer(null)); + } + + @Test + public void testReplicationStateHBase() { + DummyServer ds = new DummyServer(server1); + try { + rqH = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds, null)); + rqH.init(server1); + // Check that the proper System Tables have been generated + Table replicationTable = connection.getTable( + ReplicationQueuesHBaseImpl.REPLICATION_TABLE_NAME); + assertTrue(replicationTable.getName().isSystemTable()); + + } catch (Exception e) { + e.printStackTrace(); + fail("testReplicationStateHBaseConstruction received an Exception"); + } + try { + // Test adding in WAL files + assertEquals(0, rqH.getAllQueues().size()); + rqH.addLog("Queue1", "WALLogFile1.1"); + assertEquals(1, rqH.getAllQueues().size()); + rqH.addLog("Queue1", "WALLogFile1.2"); + rqH.addLog("Queue1", "WALLogFile1.3"); + rqH.addLog("Queue1", "WALLogFile1.4"); + rqH.addLog("Queue2", "WALLogFile2.1"); + rqH.addLog("Queue3", "WALLogFile3.1"); + assertEquals(3, rqH.getAllQueues().size()); + assertEquals(4, rqH.getLogsInQueue("Queue1").size()); + assertEquals(1, rqH.getLogsInQueue("Queue2").size()); + assertEquals(1, rqH.getLogsInQueue("Queue3").size()); + // Make sure that abortCount is still 0 + assertEquals(0, ds.getAbortCount()); + // Make sure that getting a log from a non-existent queue triggers an abort + assertNull(rqH.getLogsInQueue("Queue4")); + assertEquals(1, ds.getAbortCount()); + } catch (ReplicationException e) { + e.printStackTrace(); + fail("testAddLog received a ReplicationException"); + } + try { + + // Test updating the log positions + assertEquals(0L, rqH.getLogPosition("Queue1", "WALLogFile1.1")); + rqH.setLogPosition("Queue1", "WALLogFile1.1", 123L); + assertEquals(123L, rqH.getLogPosition("Queue1", "WALLogFile1.1")); + rqH.setLogPosition("Queue1", "WALLogFile1.1", 123456789L); + assertEquals(123456789L, rqH.getLogPosition("Queue1", "WALLogFile1.1")); + rqH.setLogPosition("Queue2", "WALLogFile2.1", 242L); + assertEquals(242L, rqH.getLogPosition("Queue2", "WALLogFile2.1")); + rqH.setLogPosition("Queue3", "WALLogFile3.1", 243L); + assertEquals(243L, rqH.getLogPosition("Queue3", "WALLogFile3.1")); + + // Test that setting log positions in non-existing logs will cause an abort + assertEquals(1, ds.getAbortCount()); + rqH.setLogPosition("NotHereQueue", "WALLogFile3.1", 243L); + assertEquals(2, ds.getAbortCount()); + rqH.setLogPosition("NotHereQueue", "NotHereFile", 243L); + assertEquals(3, ds.getAbortCount()); + rqH.setLogPosition("Queue1", "NotHereFile", 243l); + assertEquals(4, ds.getAbortCount()); + + // Test reading log positions for non-existent queues and WAL's + try { + rqH.getLogPosition("Queue1", "NotHereWAL"); + fail("Replication queue should have thrown a ReplicationException for reading from a " + + "non-existent WAL"); + } catch (ReplicationException e) { + } + try { + rqH.getLogPosition("NotHereQueue", "NotHereWAL"); + fail("Replication queue should have thrown a ReplicationException for reading from a " + + "non-existent queue"); + } catch (ReplicationException e) { + } + // Test removing logs + rqH.removeLog("Queue1", "WALLogFile1.1"); + assertEquals(3, rqH.getLogsInQueue("Queue1").size()); + // Test removing queues + rqH.removeQueue("Queue2"); + assertEquals(2, rqH.getAllQueues().size()); + assertNull(rqH.getLogsInQueue("Queue2")); + // Test that getting logs from a non-existent queue aborts + assertEquals(5, ds.getAbortCount()); + // Test removing all queues for a Region Server + rqH.removeAllQueues(); + assertEquals(0, rqH.getAllQueues().size()); + assertNull(rqH.getLogsInQueue("Queue1")); + // Test that getting logs from a non-existent queue aborts + assertEquals(6, ds.getAbortCount()); + } catch (ReplicationException e) { + e.printStackTrace(); + fail("testAddLog received a ReplicationException"); + } + } + + static class DummyServer implements Server { + private String serverName; + private boolean isAborted = false; + private boolean isStopped = false; + private int abortCount = 0; + + public DummyServer(String serverName) { + this.serverName = serverName; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return null; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return null; + } + + @Override + public ClusterConnection getConnection() { + return null; + } + + @Override + public MetaTableLocator getMetaTableLocator() { + return null; + } + + @Override + public ServerName getServerName() { + return ServerName.valueOf(this.serverName); + } + + @Override + public void abort(String why, Throwable e) { + abortCount++; + this.isAborted = true; + } + + @Override + public boolean isAborted() { + return this.isAborted; + } + + @Override + public void stop(String why) { + this.isStopped = true; + } + + @Override + public boolean isStopped() { + return this.isStopped; + } + + @Override + public ChoreService getChoreService() { + return null; + } + + @Override + public ClusterConnection getClusterConnection() { + return null; + } + + public int getAbortCount() { + return abortCount; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 94dbb25..e731135 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -91,9 +93,14 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { DummyServer ds1 = new DummyServer(server1); DummyServer ds2 = new DummyServer(server2); DummyServer ds3 = new DummyServer(server3); - rq1 = ReplicationFactory.getReplicationQueues(zkw, conf, ds1); - rq2 = ReplicationFactory.getReplicationQueues(zkw, conf, ds2); - rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3); + try { + rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw)); + rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw)); + rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw)); + } catch (Exception e) { + // This should not occur, because getReplicationQueues() only throws for ReplicationQueuesHBaseImpl + fail("ReplicationFactory.getReplicationQueues() threw an IO Exception"); + } rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1); rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 9e950d2..d1db068 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; @@ -284,9 +285,11 @@ public class TestReplicationSourceManager { LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("hostname0.example.org"); + + ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), - server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, + server.getZooKeeper())); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); @@ -326,8 +329,8 @@ public class TestReplicationSourceManager { public void testCleanupFailoverQueues() throws Exception { final Server server = new DummyServer("hostname1.example.org"); ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), - server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, + server.getZooKeeper())); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode SortedSet files = new TreeSet(); @@ -341,7 +344,8 @@ public class TestReplicationSourceManager { } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, + s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); @@ -365,7 +369,8 @@ public class TestReplicationSourceManager { conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, + server.getZooKeeper())); repQueues.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); @@ -381,16 +386,19 @@ public class TestReplicationSourceManager { // simulate three servers fail sequentially ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, + s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); SortedMap> testMap = rq1.claimQueues(server.getServerName().getServerName()); ReplicationQueues rq2 = - ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2, + s2.getZooKeeper())); rq2.init(s2.getServerName().toString()); testMap = rq2.claimQueues(s1.getServerName().getServerName()); ReplicationQueues rq3 = - ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3, + s3.getZooKeeper())); rq3.init(s3.getServerName().toString()); testMap = rq3.claimQueues(s2.getServerName().getServerName()); @@ -412,7 +420,8 @@ public class TestReplicationSourceManager { conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server s0 = new DummyServer("cversion-change0.example.org"); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0, + s0.getZooKeeper())); repQueues.init(s0.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); @@ -423,7 +432,8 @@ public class TestReplicationSourceManager { // simulate queue transfer Server s1 = new DummyServer("cversion-change1.example.org"); ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, + s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); ReplicationQueuesClient client = @@ -522,8 +532,8 @@ public class TestReplicationSourceManager { this.deadRsZnode = znode; this.server = s; this.rq = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), - server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, + server.getZooKeeper())); this.rq.init(this.server.getServerName().toString()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java index 866a12d..9d9e3f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker; @@ -1547,7 +1548,8 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck { // create replicator ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test Hbase Fsck", connection); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(zkw, conf, connection); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, connection, + zkw)); repQueues.init("server1"); // queues for current peer, no errors repQueues.addLog("1", "file1"); -- 2.8.0-rc2