From 54c620b17a0f3706944dfa89b2544eba669bec4c Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Thu, 9 Jun 2016 16:16:38 -0700 Subject: [PATCH] Building on HBase-15958. Provided a ReplicationQueuesClientHBaseImpl that relies on the HBase Replication Table to track WAL queues. Refactored out a large section of ReplicationQueuesHBaseImpl into a ReplicationTableClient class that handles Replication Table operations. --- .../hbase/client/replication/ReplicationAdmin.java | 4 +- .../hbase/replication/ReplicationFactory.java | 9 +- .../hbase/replication/ReplicationQueuesClient.java | 10 +- .../ReplicationQueuesClientArguments.java | 35 ++ .../replication/ReplicationQueuesClientZKImpl.java | 51 +- .../replication/ReplicationQueuesHBaseImpl.java | 644 --------------------- .../hbase/replication/ReplicationTableBase.java | 351 +++++++++++ .../TableBasedReplicationQueuesClientImpl.java | 111 ++++ .../TableBasedReplicationQueuesImpl.java | 437 ++++++++++++++ .../master/ReplicationHFileCleaner.java | 8 +- .../replication/master/ReplicationLogCleaner.java | 46 +- .../hadoop/hbase/util/hbck/ReplicationChecker.java | 6 +- .../hbase/master/cleaner/TestLogsCleaner.java | 3 +- .../replication/TestReplicationStateHBaseImpl.java | 90 ++- .../replication/TestReplicationStateZKImpl.java | 6 +- .../regionserver/TestReplicationSourceManager.java | 7 +- 16 files changed, 1101 insertions(+), 717 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index d062448..e0985bd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -122,7 +123,8 @@ public class ReplicationAdmin implements Closeable { zkw = createZooKeeperWatcher(); try { this.replicationQueuesClient = - ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection); + ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, + this.connection, zkw)); this.replicationQueuesClient.init(); this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.replicationQueuesClient, this.connection); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index e264a4d..38f9f30 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -38,9 +38,12 @@ public class ReplicationFactory { return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args); } - public static ReplicationQueuesClient getReplicationQueuesClient(final ZooKeeperWatcher zk, - Configuration conf, Abortable abortable) { - return new ReplicationQueuesClientZKImpl(zk, conf, abortable); + public static ReplicationQueuesClient getReplicationQueuesClient( + ReplicationQueuesClientArguments args) + throws Exception { + Class classToBuild = args.getConf().getClass("hbase.region.replica." + + "replication.ReplicationQueuesClientType", ReplicationQueuesClientZKImpl.class); + return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args); } public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java index 7fa3bbb..6d8900e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import java.util.List; +import java.util.Set; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -61,11 +62,12 @@ public interface ReplicationQueuesClient { List getAllQueues(String serverName) throws KeeperException; /** - * Get the cversion of replication rs node. This can be used as optimistic locking to get a - * consistent snapshot of the replication queues. - * @return cversion of replication rs node + * Load all wals in all replication queues from ZK. This method guarantees to return a + * snapshot which contains all WALs in the zookeeper at the start of this call even there + * is concurrent queue failover. However, some newly created WALs during the call may + * not be included. */ - int getQueuesZNodeCversion() throws KeeperException; + Set getAllWALs() throws KeeperException; /** * Get the change version number of replication hfile references node. This can be used as diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java new file mode 100644 index 0000000..8a61993 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +@InterfaceAudience.Private +public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments { + public ReplicationQueuesClientArguments(Configuration conf, Abortable abort, + ZooKeeperWatcher zk) { + super(conf, abort, zk); + } + public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) { + super(conf, abort); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index cc407e3..b0ded7d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -19,7 +19,12 @@ package org.apache.hadoop.hbase.replication; import java.util.List; +import java.util.Set; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; @@ -32,6 +37,12 @@ import org.apache.zookeeper.data.Stat; public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements ReplicationQueuesClient { + Log LOG = LogFactory.getLog(ReplicationQueuesClientZKImpl.class); + + public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) { + this(args.getZk(), args.getConf(), args.getAbortable()); + } + public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { super(zk, conf, abortable); @@ -74,7 +85,45 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem return result; } - @Override public int getQueuesZNodeCversion() throws KeeperException { + @Override + public Set getAllWALs() throws KeeperException { + /** + * Load all wals in all replication queues from ZK. This method guarantees to return a + * snapshot which contains all WALs in the zookeeper at the start of this call even there + * is concurrent queue failover. However, some newly created WALs during the call may + * not be included. + */ + for (int retry = 0; ; retry++) { + int v0 = getQueuesZNodeCversion(); + List rss = getListOfReplicators(); + if (rss == null) { + LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); + return ImmutableSet.of(); + } + Set wals = Sets.newHashSet(); + for (String rs : rss) { + List listOfPeers = getAllQueues(rs); + // if rs just died, this will be null + if (listOfPeers == null) { + continue; + } + for (String id : listOfPeers) { + List peersWals = getLogsInQueue(rs, id); + if (peersWals != null) { + wals.addAll(peersWals); + } + } + } + int v1 = getQueuesZNodeCversion(); + if (v0 == v1) { + return wals; + } + LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", + v0, v1, retry)); + } + } + + public int getQueuesZNodeCversion() throws KeeperException { try { Stat stat = new Stat(); ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java deleted file mode 100644 index 34a5289..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java +++ /dev/null @@ -1,644 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.hbase.replication; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; - -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.RetryCounter; -import org.apache.hadoop.hbase.util.RetryCounterFactory; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * This class provides an implementation of the ReplicationQueues interface using an HBase table - * "Replication Table". The basic schema of this table will store each individual queue as a - * seperate row. The row key will be a unique identifier of the creating server's name and the - * queueId. Each queue must have the following two columns: - * COL_OWNER: tracks which server is currently responsible for tracking the queue - * COL_QUEUE_ID: tracks the queue's id as stored in ReplicationSource - * They will also have columns mapping [WAL filename : offset] - * One key difference from the ReplicationQueuesZkImpl is that when queues are reclaimed we - * simply return its HBase row key as its new "queueId" - */ - -@InterfaceAudience.Private -public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase - implements ReplicationQueues { - - private static final Log LOG = LogFactory.getLog(ReplicationQueuesHBaseImpl.class); - - /** Name of the HBase Table used for tracking replication*/ - public static final TableName REPLICATION_TABLE_NAME = - TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); - - // Column family and column names for the Replication Table - private static final byte[] CF = Bytes.toBytes("r"); - private static final byte[] COL_OWNER = Bytes.toBytes("o"); - private static final byte[] COL_OWNER_HISTORY = Bytes.toBytes("h"); - - // The value used to delimit the queueId and server name inside of a queue's row key. Currently a - // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens. - // See HBASE-11394. - private static String ROW_KEY_DELIMITER = "-"; - - // Column Descriptor for the Replication Table - private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR = - new HColumnDescriptor(CF).setMaxVersions(1) - .setInMemory(true) - .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // TODO: Figure out which bloom filter to use - .setBloomFilterType(BloomType.NONE) - .setCacheDataInL1(true); - - // Common byte values used in replication offset tracking - private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L); - private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes(""); - - /* - * Make sure that HBase table operations for replication have a high number of retries. This is - * because the server is aborted if any HBase table operation fails. Each RPC will be attempted - * 3600 times before exiting. This provides each operation with 2 hours of retries - * before the server is aborted. - */ - private static final int CLIENT_RETRIES = 3600; - private static final int RPC_TIMEOUT = 2000; - private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; - - private Configuration modifiedConf; - private Admin admin; - private Connection connection; - private Table replicationTable; - private String serverName = null; - private byte[] serverNameBytes = null; - - public ReplicationQueuesHBaseImpl(ReplicationQueuesArguments args) { - this(args.getConf(), args.getAbortable(), args.getZk()); - } - - public ReplicationQueuesHBaseImpl(Configuration conf, Abortable abort, ZooKeeperWatcher zkw) { - super(zkw, conf, abort); - modifiedConf = new Configuration(conf); - modifiedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); - } - - @Override - public void init(String serverName) throws ReplicationException { - try { - this.serverName = serverName; - this.serverNameBytes = Bytes.toBytes(serverName); - // Modify the connection's config so that the Replication Table it returns has a much higher - // number of client retries - this.connection = ConnectionFactory.createConnection(modifiedConf); - this.admin = connection.getAdmin(); - replicationTable = createAndGetReplicationTable(); - replicationTable.setRpcTimeout(RPC_TIMEOUT); - replicationTable.setOperationTimeout(OPERATION_TIMEOUT); - } catch (IOException e) { - throw new ReplicationException(e); - } - } - - @Override - public void removeQueue(String queueId) { - - try { - byte[] rowKey = queueIdToRowKey(queueId); - Delete deleteQueue = new Delete(rowKey); - safeQueueUpdate(deleteQueue); - } catch (IOException | ReplicationException e) { - String errMsg = "Failed removing queue queueId=" + queueId; - abortable.abort(errMsg, e); - } - } - - @Override - public void addLog(String queueId, String filename) throws ReplicationException { - try { - if (!checkQueueExists(queueId)) { - // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values - Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId))); - putNewQueue.addColumn(CF, COL_OWNER, serverNameBytes); - putNewQueue.addColumn(CF, COL_OWNER_HISTORY, EMPTY_STRING_BYTES); - putNewQueue.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - replicationTable.put(putNewQueue); - } else { - // Otherwise simply add the new log and offset as a new column - Put putNewLog = new Put(queueIdToRowKey(queueId)); - putNewLog.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - safeQueueUpdate(putNewLog); - } - } catch (IOException | ReplicationException e) { - String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; - abortable.abort(errMsg, e); - } - } - - @Override - public void removeLog(String queueId, String filename) { - try { - byte[] rowKey = queueIdToRowKey(queueId); - Delete delete = new Delete(rowKey); - delete.addColumns(CF, Bytes.toBytes(filename)); - safeQueueUpdate(delete); - } catch (IOException | ReplicationException e) { - String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename; - abortable.abort(errMsg, e); - } - } - - @Override - public void setLogPosition(String queueId, String filename, long position) { - try { - byte[] rowKey = queueIdToRowKey(queueId); - // Check that the log exists. addLog() must have been called before setLogPosition(). - Get checkLogExists = new Get(rowKey); - checkLogExists.addColumn(CF, Bytes.toBytes(filename)); - if (!replicationTable.exists(checkLogExists)) { - String errMsg = "Could not set position of non-existent log from queueId=" + queueId + - ", filename=" + filename; - abortable.abort(errMsg, new ReplicationException(errMsg)); - return; - } - // Update the log offset if it exists - Put walAndOffset = new Put(rowKey); - walAndOffset.addColumn(CF, Bytes.toBytes(filename), Bytes.toBytes(position)); - safeQueueUpdate(walAndOffset); - } catch (IOException | ReplicationException e) { - String errMsg = "Failed writing log position queueId=" + queueId + "filename=" + - filename + " position=" + position; - abortable.abort(errMsg, e); - } - } - - @Override - public long getLogPosition(String queueId, String filename) throws ReplicationException { - try { - byte[] rowKey = queueIdToRowKey(queueId); - Get getOffset = new Get(rowKey); - getOffset.addColumn(CF, Bytes.toBytes(filename)); - Result result = getResultIfOwner(getOffset); - if (result == null || !result.containsColumn(CF, Bytes.toBytes(filename))) { - throw new ReplicationException("Could not read empty result while getting log position " + - "queueId=" + queueId + ", filename=" + filename); - } - return Bytes.toLong(result.getValue(CF, Bytes.toBytes(filename))); - } catch (IOException e) { - throw new ReplicationException("Could not get position in log for queueId=" + queueId + - ", filename=" + filename); - } - } - - @Override - public void removeAllQueues() { - List myQueueIds = getAllQueues(); - for (String queueId : myQueueIds) { - removeQueue(queueId); - } - } - - @Override - public List getLogsInQueue(String queueId) { - byte[] rowKey = queueIdToRowKey(queueId); - return getLogsInQueue(rowKey); - } - - private List getLogsInQueue(byte[] rowKey) { - String errMsg = "Could not get logs in queue queueId=" + Bytes.toString(rowKey); - try { - Get getQueue = new Get(rowKey); - Result queue = getResultIfOwner(getQueue); - // The returned queue could be null if we have lost ownership of it - if (queue == null) { - abortable.abort(errMsg, new ReplicationException(errMsg)); - return null; - } - return readWALsFromResult(queue); - } catch (IOException e) { - abortable.abort(errMsg, e); - return null; - } - } - - @Override - public List getAllQueues() { - List allQueues = new ArrayList(); - ResultScanner queueScanner = null; - try { - queueScanner = this.getQueuesBelongingToServer(serverName); - for (Result queue : queueScanner) { - String rowKey = Bytes.toString(queue.getRow()); - // If the queue does not have a Owner History, then we must be its original owner. So we - // want to return its queueId in raw form - if (Bytes.toString(queue.getValue(CF, COL_OWNER_HISTORY)).length() == 0) { - allQueues.add(getRawQueueIdFromRowKey(rowKey)); - } else { - allQueues.add(rowKey); - } - } - return allQueues; - } catch (IOException e) { - String errMsg = "Failed getting list of all replication queues"; - abortable.abort(errMsg, e); - return null; - } finally { - if (queueScanner != null) { - queueScanner.close(); - } - } - } - - @Override - public Map> claimQueues(String regionserver) { - Map> queues = new HashMap<>(); - if (isThisOurRegionServer(regionserver)) { - return queues; - } - ResultScanner queuesToClaim = null; - try { - queuesToClaim = this.getQueuesBelongingToServer(regionserver); - for (Result queue : queuesToClaim) { - if (attemptToClaimQueue(queue, regionserver)) { - String rowKey = Bytes.toString(queue.getRow()); - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey); - if (peerExists(replicationQueueInfo.getPeerId())) { - Set sortedLogs = new HashSet(); - List logs = getLogsInQueue(queue.getRow()); - for (String log : logs) { - sortedLogs.add(log); - } - queues.put(rowKey, sortedLogs); - LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver); - } else { - // Delete orphaned queues - removeQueue(Bytes.toString(queue.getRow())); - LOG.info(serverName + " has deleted abandoned queue " + rowKey + " from " + - regionserver); - } - } - } - } catch (IOException | KeeperException e) { - String errMsg = "Failed claiming queues for regionserver=" + regionserver; - abortable.abort(errMsg, e); - queues.clear(); - } finally { - if (queuesToClaim != null) { - queuesToClaim.close(); - } - } - return queues; - } - - @Override - public List getListOfReplicators() { - // scan all of the queues and return a list of all unique OWNER values - Set peerServers = new HashSet(); - ResultScanner allQueuesInCluster = null; - try { - Scan scan = new Scan(); - scan.addColumn(CF, COL_OWNER); - allQueuesInCluster = replicationTable.getScanner(scan); - for (Result queue : allQueuesInCluster) { - peerServers.add(Bytes.toString(queue.getValue(CF, COL_OWNER))); - } - } catch (IOException e) { - String errMsg = "Failed getting list of replicators"; - abortable.abort(errMsg, e); - } finally { - if (allQueuesInCluster != null) { - allQueuesInCluster.close(); - } - } - return new ArrayList(peerServers); - } - - @Override - public boolean isThisOurRegionServer(String regionserver) { - return this.serverName.equals(regionserver); - } - - @Override - public void addPeerToHFileRefs(String peerId) throws ReplicationException { - // TODO - throw new NotImplementedException(); - } - - @Override - public void removePeerFromHFileRefs(String peerId) { - // TODO - throw new NotImplementedException(); - } - - @Override - public void addHFileRefs(String peerId, List files) throws ReplicationException { - // TODO - throw new NotImplementedException(); - } - - @Override - public void removeHFileRefs(String peerId, List files) { - // TODO - throw new NotImplementedException(); - } - - /** - * Gets the Replication Table. Builds and blocks until the table is available if the Replication - * Table does not exist. - * - * @return the Replication Table - * @throws IOException if the Replication Table takes too long to build - */ - private Table createAndGetReplicationTable() throws IOException { - if (!replicationTableExists()) { - createReplicationTable(); - } - int maxRetries = conf.getInt("replication.queues.createtable.retries.number", 100); - RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 100); - RetryCounter retryCounter = counterFactory.create(); - while (!replicationTableExists()) { - try { - retryCounter.sleepUntilNextRetry(); - if (!retryCounter.shouldRetry()) { - throw new IOException("Unable to acquire the Replication Table"); - } - } catch (InterruptedException e) { - return null; - } - } - return connection.getTable(REPLICATION_TABLE_NAME); - } - - /** - * Checks whether the Replication Table exists yet - * - * @return whether the Replication Table exists - * @throws IOException - */ - private boolean replicationTableExists() { - try { - return admin.tableExists(REPLICATION_TABLE_NAME); - } catch (IOException e) { - return false; - } - } - - /** - * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR - * in ReplicationQueuesHBaseImpl - * - * @throws IOException - */ - private void createReplicationTable() throws IOException { - HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); - replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); - admin.createTable(replicationTableDescriptor); - } - - /** - * Build the row key for the given queueId. This will uniquely identify it from all other queues - * in the cluster. - * @param serverName The owner of the queue - * @param queueId String identifier of the queue - * @return String representation of the queue's row key - */ - private String buildQueueRowKey(String serverName, String queueId) { - return queueId + ROW_KEY_DELIMITER + serverName; - } - - private String buildQueueRowKey(String queueId) { - return buildQueueRowKey(serverName, queueId); - } - - /** - * Parse the original queueId from a row key - * @param rowKey String representation of a queue's row key - * @return the original queueId - */ - private String getRawQueueIdFromRowKey(String rowKey) { - return rowKey.split(ROW_KEY_DELIMITER)[0]; - } - - /** - * See safeQueueUpdate(RowMutations mutate) - * - * @param put Row mutation to perform on the queue - */ - private void safeQueueUpdate(Put put) throws ReplicationException, IOException { - RowMutations mutations = new RowMutations(put.getRow()); - mutations.add(put); - safeQueueUpdate(mutations); - } - - /** - * See safeQueueUpdate(RowMutations mutate) - * - * @param delete Row mutation to perform on the queue - */ - private void safeQueueUpdate(Delete delete) throws ReplicationException, - IOException{ - RowMutations mutations = new RowMutations(delete.getRow()); - mutations.add(delete); - safeQueueUpdate(mutations); - } - - /** - * Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column - * of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost - * ownership of the column or an IO Exception has occurred during the transaction. - * - * @param mutate Mutation to perform on a given queue - */ - private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{ - boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF, COL_OWNER, - CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate); - if (!updateSuccess) { - throw new ReplicationException("Failed to update Replication Table because we lost queue " + - " ownership"); - } - } - - /** - * Returns a queue's row key given either its raw or reclaimed queueId - * - * @param queueId queueId of the queue - * @return byte representation of the queue's row key - */ - private byte[] queueIdToRowKey(String queueId) { - // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen - // then this is not a reclaimed queue. - if (!queueId.contains(ROW_KEY_DELIMITER)) { - return Bytes.toBytes(buildQueueRowKey(queueId)); - // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the - // queue's row key - } else { - return Bytes.toBytes(queueId); - } - } - - /** - * Get the QueueIds belonging to the named server from the ReplicationTable - * - * @param server name of the server - * @return a ResultScanner over the QueueIds belonging to the server - * @throws IOException - */ - private ResultScanner getQueuesBelongingToServer(String server) throws IOException { - Scan scan = new Scan(); - SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF, COL_OWNER, - CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server)); - scan.setFilter(filterMyQueues); - scan.addColumn(CF, COL_OWNER); - scan.addColumn(CF, COL_OWNER_HISTORY); - ResultScanner results = replicationTable.getScanner(scan); - return results; - } - - /** - * Check if the queue specified by queueId is stored in HBase - * - * @param queueId Either raw or reclaimed format of the queueId - * @return Whether the queue is stored in HBase - * @throws IOException - */ - private boolean checkQueueExists(String queueId) throws IOException { - byte[] rowKey = queueIdToRowKey(queueId); - return replicationTable.exists(new Get(rowKey)); - } - - /** - * Read all of the WAL's from a queue into a list - * - * @param queue HBase query result containing the queue - * @return a list of all the WAL filenames - */ - private List readWALsFromResult(Result queue) { - List wals = new ArrayList<>(); - Map familyMap = queue.getFamilyMap(CF); - for(byte[] cQualifier : familyMap.keySet()) { - // Ignore the meta data fields of the queue - if (Arrays.equals(cQualifier, COL_OWNER) || Arrays.equals(cQualifier, COL_OWNER_HISTORY)) { - continue; - } - wals.add(Bytes.toString(cQualifier)); - } - return wals; - } - - /** - * Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the - * recently killed server is still the OWNER before we claim it. - * - * @param queue The queue that we are trying to claim - * @param originalServer The server that originally owned the queue - * @return Whether we successfully claimed the queue - * @throws IOException - */ - private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{ - Put putQueueNameAndHistory = new Put(queue.getRow()); - putQueueNameAndHistory.addColumn(CF, COL_OWNER, Bytes.toBytes(serverName)); - String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF, - COL_OWNER_HISTORY)), originalServer); - putQueueNameAndHistory.addColumn(CF, COL_OWNER_HISTORY, Bytes.toBytes(newOwnerHistory)); - RowMutations claimAndRenameQueue = new RowMutations(queue.getRow()); - claimAndRenameQueue.add(putQueueNameAndHistory); - // Attempt to claim ownership for this queue by checking if the current OWNER is the original - // server. If it is not then another RS has already claimed it. If it is we set ourselves as the - // new owner and update the queue's history - boolean success = replicationTable.checkAndMutate(queue.getRow(), CF, COL_OWNER, - CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), claimAndRenameQueue); - return success; - } - - /** - * Creates a "|" delimited record of the queue's past region server owners. - * - * @param originalHistory the queue's original owner history - * @param oldServer the name of the server that used to own the queue - * @return the queue's new owner history - */ - private String buildClaimedQueueHistory(String originalHistory, String oldServer) { - return originalHistory + "|" + oldServer; - } - - /** - * Attempts to run a Get on some queue. Will only return a non-null result if we currently own - * the queue. - * - * @param get The get that we want to query - * @return The result of the get if this server is the owner of the queue. Else it returns null - * @throws IOException - */ - private Result getResultIfOwner(Get get) throws IOException { - Scan scan = new Scan(get); - // Check if the Get currently contains all columns or only specific columns - if (scan.getFamilyMap().size() > 0) { - // Add the OWNER column if the scan is already only over specific columns - scan.addColumn(CF, COL_OWNER); - } - scan.setMaxResultSize(1); - SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF, COL_OWNER, - CompareFilter.CompareOp.EQUAL, serverNameBytes); - scan.setFilter(checkOwner); - ResultScanner scanner = null; - try { - scanner = replicationTable.getScanner(scan); - Result result = scanner.next(); - return (result == null || result.isEmpty()) ? null : result; - } finally { - if (scanner != null) { - scanner.close(); - } - } - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java new file mode 100644 index 0000000..c1506cd --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java @@ -0,0 +1,351 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/* + * Abstract class that provides an interface to the Replication Table. Which is currently + * being used for WAL offset tracking. + * The basic schema of this table will store each individual queue as a + * seperate row. The row key will be a unique identifier of the creating server's name and the + * queueId. Each queue must have the following two columns: + * COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue + * COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this + * queue. The most recent previous owner is the leftmost entry. + * They will also have columns mapping [WAL filename : offset] + */ + +@InterfaceAudience.Private +abstract class ReplicationTableBase { + + /** Name of the HBase Table used for tracking replication*/ + public static final TableName REPLICATION_TABLE_NAME = + TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); + + // Column family and column names for Queues in the Replication Table + public static final byte[] CF_QUEUE = Bytes.toBytes("q"); + public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o"); + public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h"); + + // Column Descriptor for the Replication Table + private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR = + new HColumnDescriptor(CF_QUEUE).setMaxVersions(1) + .setInMemory(true) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + // TODO: Figure out which bloom filter to use + .setBloomFilterType(BloomType.NONE); + + // The value used to delimit the queueId and server name inside of a queue's row key. Currently a + // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens. + // See HBASE-11394. + public static final String ROW_KEY_DELIMITER = "-"; + + // The value used to delimit server names in the queue history list + public static final String QUEUE_HISTORY_DELIMITER = "|"; + + /* + * Make sure that HBase table operations for replication have a high number of retries. This is + * because the server is aborted if any HBase table operation fails. Each RPC will be attempted + * 3600 times before exiting. This provides each operation with 2 hours of retries + * before the server is aborted. + */ + private static final int CLIENT_RETRIES = 3600; + private static final int RPC_TIMEOUT = 2000; + private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; + + protected final Table replicationTable; + protected final Configuration conf; + protected final Abortable abortable; + private final Admin admin; + private final Connection connection; + + public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException { + this.conf = new Configuration(conf); + this.abortable = abort; + decorateConf(); + this.connection = ConnectionFactory.createConnection(this.conf); + this.admin = connection.getAdmin(); + this.replicationTable = createAndGetReplicationTable(); + setTableTimeOuts(); + } + + /** + * Modify the connection's config so that operations run on the Replication Table have longer and + * a larger number of retries + */ + private void decorateConf() { + this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); + } + + /** + * Increases the RPC and operations timeouts for the Replication Table + */ + private void setTableTimeOuts() { + replicationTable.setRpcTimeout(RPC_TIMEOUT); + replicationTable.setOperationTimeout(OPERATION_TIMEOUT); + } + + /** + * Build the row key for the given queueId. This will uniquely identify it from all other queues + * in the cluster. + * @param serverName The owner of the queue + * @param queueId String identifier of the queue + * @return String representation of the queue's row key + */ + protected String buildQueueRowKey(String serverName, String queueId) { + return queueId + ROW_KEY_DELIMITER + serverName; + } + + /** + * Parse the original queueId from a row key + * @param rowKey String representation of a queue's row key + * @return the original queueId + */ + protected String getRawQueueIdFromRowKey(String rowKey) { + return rowKey.split(ROW_KEY_DELIMITER)[0]; + } + + /** + * Returns a queue's row key given either its raw or reclaimed queueId + * + * @param queueId queueId of the queue + * @return byte representation of the queue's row key + */ + protected byte[] queueIdToRowKey(String serverName, String queueId) { + // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen + // then this is not a reclaimed queue. + if (!queueId.contains(ROW_KEY_DELIMITER)) { + return Bytes.toBytes(buildQueueRowKey(serverName, queueId)); + // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the + // queue's row key + } else { + return Bytes.toBytes(queueId); + } + } + + /** + * Creates a "|" delimited record of the queue's past region server owners. + * + * @param originalHistory the queue's original owner history + * @param oldServer the name of the server that used to own the queue + * @return the queue's new owner history + */ + protected String buildClaimedQueueHistory(String originalHistory, String oldServer) { + return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory; + } + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + */ + protected List getListOfReplicators() { + // scan all of the queues and return a list of all unique OWNER values + Set peerServers = new HashSet(); + ResultScanner allQueuesInCluster = null; + try { + Scan scan = new Scan(); + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); + allQueuesInCluster = replicationTable.getScanner(scan); + for (Result queue : allQueuesInCluster) { + peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER))); + } + } catch (IOException e) { + String errMsg = "Failed getting list of replicators"; + abortable.abort(errMsg, e); + } finally { + if (allQueuesInCluster != null) { + allQueuesInCluster.close(); + } + } + return new ArrayList(peerServers); + } + + protected List getAllQueues(String serverName) { + List allQueues = new ArrayList(); + ResultScanner queueScanner = null; + try { + queueScanner = getQueuesBelongingToServer(serverName); + for (Result queue : queueScanner) { + String rowKey = Bytes.toString(queue.getRow()); + // If the queue does not have a Owner History, then we must be its original owner. So we + // want to return its queueId in raw form + if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) { + allQueues.add(getRawQueueIdFromRowKey(rowKey)); + } else { + allQueues.add(rowKey); + } + } + return allQueues; + } catch (IOException e) { + String errMsg = "Failed getting list of all replication queues for serverName=" + serverName; + abortable.abort(errMsg, e); + return null; + } finally { + if (queueScanner != null) { + queueScanner.close(); + } + } + } + + protected List getLogsInQueue(String serverName, String queueId) { + String rowKey = queueId; + if (!queueId.contains(ROW_KEY_DELIMITER)) { + rowKey = buildQueueRowKey(serverName, queueId); + } + return getLogsInQueue(Bytes.toBytes(rowKey)); + } + + protected List getLogsInQueue(byte[] rowKey) { + String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey); + try { + Get getQueue = new Get(rowKey); + Result queue = replicationTable.get(getQueue); + if (queue == null || queue.isEmpty()) { + abortable.abort(errMsg, new ReplicationException(errMsg)); + return null; + } + return readWALsFromResult(queue); + } catch (IOException e) { + abortable.abort(errMsg, e); + return null; + } + } + + /** + * Read all of the WAL's from a queue into a list + * + * @param queue HBase query result containing the queue + * @return a list of all the WAL filenames + */ + protected List readWALsFromResult(Result queue) { + List wals = new ArrayList<>(); + Map familyMap = queue.getFamilyMap(CF_QUEUE); + for (byte[] cQualifier : familyMap.keySet()) { + // Ignore the meta data fields of the queue + if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, + COL_QUEUE_OWNER_HISTORY)) { + continue; + } + wals.add(Bytes.toString(cQualifier)); + } + return wals; + } + + /** + * Get the queue id's and meta data (Owner and History) for the queues belonging to the named + * server + * + * @param server name of the server + * @return a ResultScanner over the QueueIds belonging to the server + * @throws IOException + */ + private ResultScanner getQueuesBelongingToServer(String server) throws IOException { + Scan scan = new Scan(); + SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server)); + scan.setFilter(filterMyQueues); + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY); + ResultScanner results = replicationTable.getScanner(scan); + return results; + } + + /** + * Gets the Replication Table. Builds and blocks until the table is available if the Replication + * Table does not exist. + * + * @return the Replication Table + * @throws IOException if the Replication Table takes too long to build + */ + private Table createAndGetReplicationTable() throws IOException { + if (!replicationTableExists()) { + createReplicationTable(); + } + int maxRetries = conf.getInt("replication.queues.createtable.retries.number", 100); + RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 100); + RetryCounter retryCounter = counterFactory.create(); + while (!replicationTableExists()) { + try { + retryCounter.sleepUntilNextRetry(); + if (!retryCounter.shouldRetry()) { + throw new IOException("Unable to acquire the Replication Table"); + } + } catch (InterruptedException e) { + return null; + } + } + return connection.getTable(REPLICATION_TABLE_NAME); + } + + /** + * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR + * in TableBasedReplicationQueuesImpl + * @throws IOException + */ + private void createReplicationTable() throws IOException { + HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); + replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); + admin.createTable(replicationTableDescriptor); + } + + /** + * Checks whether the Replication Table exists yet + * + * @return whether the Replication Table exists + * @throws IOException + */ + private boolean replicationTableExists() { + try { + return admin.tableExists(REPLICATION_TABLE_NAME); + } catch (IOException e) { + return false; + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java new file mode 100644 index 0000000..55dfdd8 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Implements the ReplicationQueuesClient interface on top of the Replication Table. It utilizes + * the ReplicationTableBase to access the Replication Table. + */ +@InterfaceAudience.Private +public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase + implements ReplicationQueuesClient { + + public TableBasedReplicationQueuesClientImpl(ReplicationQueuesClientArguments args) + throws IOException { + super(args.getConf(), args.getAbortable()); + } + public TableBasedReplicationQueuesClientImpl(Configuration conf, + Abortable abortable) throws IOException { + super(conf, abortable); + } + + @Override + public void init() throws ReplicationException{ + // no-op + } + + @Override + public List getListOfReplicators() { + return super.getListOfReplicators(); + } + + @Override + public List getLogsInQueue(String serverName, String queueId) { + return super.getLogsInQueue(serverName, queueId); + } + + @Override + public List getAllQueues(String serverName) { + return super.getAllQueues(serverName); + } + + @Override + public Set getAllWALs() { + Set allWals = new HashSet(); + ResultScanner allQueues = null; + try { + allQueues = replicationTable.getScanner(new Scan()); + for (Result queue : allQueues) { + for (String wal : readWALsFromResult(queue)) { + allWals.add(wal); + } + } + } catch (IOException e) { + String errMsg = "Failed getting all WAL's in Replication Table"; + abortable.abort(errMsg, e); + } finally { + if (allQueues != null) { + allQueues.close(); + } + } + return allWals; + } + + @Override + public int getHFileRefsNodeChangeVersion() throws KeeperException { + // TODO + throw new NotImplementedException(); + } + + @Override + public List getAllPeersFromHFileRefsQueue() throws KeeperException { + // TODO + throw new NotImplementedException(); + } + + @Override + public List getReplicableHFiles(String peerId) throws KeeperException { + // TODO + throw new NotImplementedException(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java new file mode 100644 index 0000000..6ea7801 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java @@ -0,0 +1,437 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * This class provides an implementation of the ReplicationQueues interface using an HBase table + * "Replication Table". It utilizes the ReplicationTableBase to access the Replication Table. + */ +@InterfaceAudience.Private +public class TableBasedReplicationQueuesImpl extends ReplicationTableBase + implements ReplicationQueues { + + private static final Log LOG = LogFactory.getLog(TableBasedReplicationQueuesImpl.class); + + // Common byte values used in replication offset tracking + private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L); + private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes(""); + + private String serverName = null; + private byte[] serverNameBytes = null; + + // TODO: Only use this variable temporarily. Eventually we want to use HBase to store all + // TODO: replication information + private ReplicationStateZKBase replicationState; + + public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) throws IOException { + this(args.getConf(), args.getAbortable(), args.getZk()); + } + + public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, ZooKeeperWatcher zkw) + throws IOException { + super(conf, abort); + replicationState = new ReplicationStateZKBase(zkw, conf, abort) {}; + } + + @Override + public void init(String serverName) throws ReplicationException { + this.serverName = serverName; + this.serverNameBytes = Bytes.toBytes(serverName); + } + + @Override + public List getListOfReplicators() { + return super.getListOfReplicators(); + } + + @Override + public void removeQueue(String queueId) { + + try { + byte[] rowKey = queueIdToRowKey(queueId); + Delete deleteQueue = new Delete(rowKey); + safeQueueUpdate(deleteQueue); + } catch (IOException | ReplicationException e) { + String errMsg = "Failed removing queue queueId=" + queueId; + abortable.abort(errMsg, e); + } + } + + @Override + public void addLog(String queueId, String filename) throws ReplicationException { + try { + if (!checkQueueExists(queueId)) { + // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values + Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId))); + putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes); + putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES); + putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); + replicationTable.put(putNewQueue); + } else { + // Otherwise simply add the new log and offset as a new column + Put putNewLog = new Put(queueIdToRowKey(queueId)); + putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); + safeQueueUpdate(putNewLog); + } + } catch (IOException | ReplicationException e) { + String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; + abortable.abort(errMsg, e); + } + } + + @Override + public void removeLog(String queueId, String filename) { + try { + byte[] rowKey = queueIdToRowKey(queueId); + Delete delete = new Delete(rowKey); + delete.addColumns(CF_QUEUE, Bytes.toBytes(filename)); + safeQueueUpdate(delete); + } catch (IOException | ReplicationException e) { + String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename; + abortable.abort(errMsg, e); + } + } + + @Override + public void setLogPosition(String queueId, String filename, long position) { + try { + byte[] rowKey = queueIdToRowKey(queueId); + // Check that the log exists. addLog() must have been called before setLogPosition(). + Get checkLogExists = new Get(rowKey); + checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename)); + if (!replicationTable.exists(checkLogExists)) { + String errMsg = "Could not set position of non-existent log from queueId=" + queueId + + ", filename=" + filename; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return; + } + // Update the log offset if it exists + Put walAndOffset = new Put(rowKey); + walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), Bytes.toBytes(position)); + safeQueueUpdate(walAndOffset); + } catch (IOException | ReplicationException e) { + String errMsg = "Failed writing log position queueId=" + queueId + "filename=" + + filename + " position=" + position; + abortable.abort(errMsg, e); + } + } + + @Override + public long getLogPosition(String queueId, String filename) throws ReplicationException { + try { + byte[] rowKey = queueIdToRowKey(queueId); + Get getOffset = new Get(rowKey); + getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename)); + Result result = getResultIfOwner(getOffset); + if (result == null || !result.containsColumn(CF_QUEUE, Bytes.toBytes(filename))) { + throw new ReplicationException("Could not read empty result while getting log position " + + "queueId=" + queueId + ", filename=" + filename); + } + return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename))); + } catch (IOException e) { + throw new ReplicationException("Could not get position in log for queueId=" + queueId + + ", filename=" + filename); + } + } + + @Override + public void removeAllQueues() { + List myQueueIds = getAllQueues(); + for (String queueId : myQueueIds) { + removeQueue(queueId); + } + } + + @Override + public List getLogsInQueue(String queueId) { + byte[] rowKey = queueIdToRowKey(queueId); + return getLogsInQueueAndCheckOwnership(rowKey); + } + + @Override + public List getAllQueues() { + return getAllQueues(serverName); + } + + @Override + public Map> claimQueues(String regionserver) { + Map> queues = new HashMap<>(); + if (isThisOurRegionServer(regionserver)) { + return queues; + } + ResultScanner queuesToClaim = null; + try { + queuesToClaim = getAllQueuesScanner(regionserver); + for (Result queue : queuesToClaim) { + if (attemptToClaimQueue(queue, regionserver)) { + String rowKey = Bytes.toString(queue.getRow()); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey); + if (replicationState.peerExists(replicationQueueInfo.getPeerId())) { + Set sortedLogs = new HashSet(); + List logs = getLogsInQueue(queue.getRow()); + for (String log : logs) { + sortedLogs.add(log); + } + queues.put(rowKey, sortedLogs); + LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver); + } else { + // Delete orphaned queues + removeQueue(Bytes.toString(queue.getRow())); + LOG.info(serverName + " has deleted abandoned queue " + rowKey + " from " + + regionserver); + } + } + } + } catch (IOException | KeeperException e) { + String errMsg = "Failed claiming queues for regionserver=" + regionserver; + abortable.abort(errMsg, e); + queues.clear(); + } finally { + if (queuesToClaim != null) { + queuesToClaim.close(); + } + } + return queues; + } + + /** + * Get the QueueIds belonging to the named server from the ReplicationTableBase + * + * @param server name of the server + * @return a ResultScanner over the QueueIds belonging to the server + * @throws IOException + */ + private ResultScanner getAllQueuesScanner(String server) throws IOException { + Scan scan = new Scan(); + SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server)); + scan.setFilter(filterMyQueues); + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY); + ResultScanner results = replicationTable.getScanner(scan); + return results; + } + + @Override + public boolean isThisOurRegionServer(String regionserver) { + return this.serverName.equals(regionserver); + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + // TODO + throw new NotImplementedException(); + } + + @Override + public void removePeerFromHFileRefs(String peerId) { + // TODO + throw new NotImplementedException(); + } + + @Override + public void addHFileRefs(String peerId, List files) throws ReplicationException { + // TODO + throw new NotImplementedException(); + } + + @Override + public void removeHFileRefs(String peerId, List files) { + // TODO + throw new NotImplementedException(); + } + + private List getLogsInQueueAndCheckOwnership(byte[] rowKey) { + String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey); + List logs = new ArrayList(); + try { + Get getQueue = new Get(rowKey); + Result queue = getResultIfOwner(getQueue); + if (queue == null || queue.isEmpty()) { + String errMsgLostOwnership = "Failed getting logs for queue queueId=" + + Bytes.toString(rowKey) + " because the queue was missing or we lost ownership"; + abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership)); + return null; + } + Map familyMap = queue.getFamilyMap(CF_QUEUE); + for(byte[] cQualifier : familyMap.keySet()) { + if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, + COL_QUEUE_OWNER_HISTORY)) { + continue; + } + logs.add(Bytes.toString(cQualifier)); + } + } catch (IOException e) { + abortable.abort(errMsg, e); + return null; + } + return logs; + } + + private String buildQueueRowKey(String queueId) { + return buildQueueRowKey(serverName, queueId); + } + + /** + * Convenience method that gets the row key of the queue specified by queueId + * @param queueId queueId of a queue in this server + * @return the row key of the queue in the Replication Table + */ + private byte[] queueIdToRowKey(String queueId) { + return queueIdToRowKey(serverName, queueId); + } + + /** + * See safeQueueUpdate(RowMutations mutate) + * + * @param put Row mutation to perform on the queue + */ + private void safeQueueUpdate(Put put) throws ReplicationException, IOException { + RowMutations mutations = new RowMutations(put.getRow()); + mutations.add(put); + safeQueueUpdate(mutations); + } + + /** + * See safeQueueUpdate(RowMutations mutate) + * + * @param delete Row mutation to perform on the queue + */ + private void safeQueueUpdate(Delete delete) throws ReplicationException, + IOException{ + RowMutations mutations = new RowMutations(delete.getRow()); + mutations.add(delete); + safeQueueUpdate(mutations); + } + + /** + * Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column + * of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost + * ownership of the column or an IO Exception has occurred during the transaction. + * + * @param mutate Mutation to perform on a given queue + */ + private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{ + boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF_QUEUE, + COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate); + if (!updateSuccess) { + throw new ReplicationException("Failed to update Replication Table because we lost queue " + + " ownership"); + } + } + + /** + * Check if the queue specified by queueId is stored in HBase + * + * @param queueId Either raw or reclaimed format of the queueId + * @return Whether the queue is stored in HBase + * @throws IOException + */ + private boolean checkQueueExists(String queueId) throws IOException { + byte[] rowKey = queueIdToRowKey(queueId); + return replicationTable.exists(new Get(rowKey)); + } + + /** + * Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the + * recently killed server is still the OWNER before we claim it. + * + * @param queue The queue that we are trying to claim + * @param originalServer The server that originally owned the queue + * @return Whether we successfully claimed the queue + * @throws IOException + */ + private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{ + Put putQueueNameAndHistory = new Put(queue.getRow()); + putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, Bytes.toBytes(serverName)); + String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE, + COL_QUEUE_OWNER_HISTORY)), originalServer); + putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, + Bytes.toBytes(newOwnerHistory)); + RowMutations claimAndRenameQueue = new RowMutations(queue.getRow()); + claimAndRenameQueue.add(putQueueNameAndHistory); + // Attempt to claim ownership for this queue by checking if the current OWNER is the original + // server. If it is not then another RS has already claimed it. If it is we set ourselves as the + // new owner and update the queue's history + boolean success = replicationTable.checkAndMutate(queue.getRow(), CF_QUEUE, COL_QUEUE_OWNER, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), claimAndRenameQueue); + return success; + } + + /** + * Attempts to run a Get on some queue. Will only return a non-null result if we currently own + * the queue. + * + * @param get The Get that we want to query + * @return The result of the Get if this server is the owner of the queue. Else it returns null. + * @throws IOException + */ + private Result getResultIfOwner(Get get) throws IOException { + Scan scan = new Scan(get); + // Check if the Get currently contains all columns or only specific columns + if (scan.getFamilyMap().size() > 0) { + // Add the OWNER column if the scan is already only over specific columns + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); + } + scan.setMaxResultSize(1); + SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, + CompareFilter.CompareOp.EQUAL, serverNameBytes); + scan.setFilter(checkOwner); + ResultScanner scanner = null; + try { + scanner = replicationTable.getScanner(scan); + Result result = scanner.next(); + return (result == null || result.isEmpty()) ? null : result; + } finally { + if (scanner != null) { + scanner.close(); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java index 5df9379..a7b2f26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -141,15 +142,16 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { super.setConf(conf); try { initReplicationQueuesClient(conf, zk); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } } private void initReplicationQueuesClient(Configuration conf, ZooKeeperWatcher zk) - throws ZooKeeperConnectionException, IOException { + throws Exception { this.zkw = zk; - this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, new WarnOnlyAbortable()); + this.rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments( + conf, new WarnOnlyAbortable(), zkw)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 9ecba11..9e724db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import java.io.IOException; import java.util.Collections; @@ -67,7 +68,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { try { // The concurrently created new WALs may not be included in the return list, // but they won't be deleted because they're not in the checking set. - wals = loadWALsFromQueues(); + wals = replicationQueues.getAllWALs(); } catch (KeeperException e) { LOG.warn("Failed to read zookeeper, skipping checking deletable files"); return Collections.emptyList(); @@ -88,43 +89,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { }}); } - /** - * Load all wals in all replication queues from ZK. This method guarantees to return a - * snapshot which contains all WALs in the zookeeper at the start of this call even there - * is concurrent queue failover. However, some newly created WALs during the call may - * not be included. - */ - private Set loadWALsFromQueues() throws KeeperException { - for (int retry = 0; ; retry++) { - int v0 = replicationQueues.getQueuesZNodeCversion(); - List rss = replicationQueues.getListOfReplicators(); - if (rss == null) { - LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); - return ImmutableSet.of(); - } - Set wals = Sets.newHashSet(); - for (String rs : rss) { - List listOfPeers = replicationQueues.getAllQueues(rs); - // if rs just died, this will be null - if (listOfPeers == null) { - continue; - } - for (String id : listOfPeers) { - List peersWals = replicationQueues.getLogsInQueue(rs, id); - if (peersWals != null) { - wals.addAll(peersWals); - } - } - } - int v1 = replicationQueues.getQueuesZNodeCversion(); - if (v0 == v1) { - return wals; - } - LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", - v0, v1, retry)); - } - } - @Override public void setConf(Configuration config) { // If replication is disabled, keep all members null @@ -148,10 +112,10 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { super.setConf(conf); try { this.zkw = zk; - this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, - new WarnOnlyAbortable()); + this.replicationQueues = ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(conf, new WarnOnlyAbortable(), zkw)); this.replicationQueues.init(); - } catch (ReplicationException e) { + } catch (Exception e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index e472558..8d66c8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; @@ -67,13 +68,14 @@ public class ReplicationChecker { try { this.zkw = zkw; this.errorReporter = errorReporter; - this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection); + this.queuesClient = ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(conf, connection, zkw)); this.queuesClient.init(); this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient, connection); this.replicationPeers.init(); this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection); - } catch (ReplicationException e) { + } catch (Exception e) { throw new IOException("failed to construct ReplicationChecker", e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 18950a2..eecaae1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -167,7 +168,7 @@ public class TestLogsCleaner { ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); cleaner.setConf(conf); - ReplicationQueuesClient rqcMock = Mockito.mock(ReplicationQueuesClient.class); + ReplicationQueuesClientZKImpl rqcMock = Mockito.mock(ReplicationQueuesClientZKImpl.class); Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4); Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java index bd6d070..346ff37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java @@ -39,6 +39,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; @@ -58,15 +59,20 @@ public class TestReplicationStateHBaseImpl { private static ReplicationQueues rq1; private static ReplicationQueues rq2; private static ReplicationQueues rq3; + private static ReplicationQueuesClient rqc; private static ReplicationPeers rp; - private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 123L) - .toString(); + + private static final String server0 = ServerName.valueOf("hostname0.example.org", 1234, -1L) + .toString(); + private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 1L) + .toString(); private static final String server2 = ServerName.valueOf("hostname2.example.org", 1234, 1L) - .toString(); + .toString(); private static final String server3 = ServerName.valueOf("hostname3.example.org", 1234, 1L) - .toString(); + .toString(); + private static DummyServer ds0; private static DummyServer ds1; private static DummyServer ds2; private static DummyServer ds3; @@ -77,9 +83,9 @@ public class TestReplicationStateHBaseImpl { utility.startMiniCluster(); conf = utility.getConfiguration(); conf.setClass("hbase.region.replica.replication.ReplicationQueuesType", - ReplicationQueuesHBaseImpl.class, ReplicationQueues.class); - conf.setClass("hbase.region.replica.replication.ReplicationQueuesType", - ReplicationQueuesHBaseImpl.class, ReplicationQueues.class); + TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); + conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType", + TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName); @@ -88,6 +94,9 @@ public class TestReplicationStateHBaseImpl { @Before public void setUp() { try { + ds0 = new DummyServer(server0); + rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments( + conf, ds0)); ds1 = new DummyServer(server1); rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw)); rq1.init(server1); @@ -99,9 +108,6 @@ public class TestReplicationStateHBaseImpl { rq3.init(server3); rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); rp.init(); - rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1")); - rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2")); - rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3")); } catch (Exception e) { fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage()); } @@ -165,13 +171,13 @@ public class TestReplicationStateHBaseImpl { try { rq1.getLogPosition("Queue1", "NotHereWAL"); fail("Replication queue should have thrown a ReplicationException for reading from a " + - "non-existent WAL"); + "non-existent WAL"); } catch (ReplicationException e) { } try { rq1.getLogPosition("NotHereQueue", "NotHereWAL"); fail("Replication queue should have thrown a ReplicationException for reading from a " + - "non-existent queue"); + "non-existent queue"); } catch (ReplicationException e) { } // Test removing logs @@ -198,6 +204,13 @@ public class TestReplicationStateHBaseImpl { @Test public void TestMultipleReplicationQueuesHBaseImpl () { try { + rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1")); + rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2")); + rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3")); + } catch (ReplicationException e) { + fail("Failed to add peers to ReplicationPeers"); + } + try { // Test adding in WAL files rq1.addLog("Queue1", "WALLogFile1.1"); rq1.addLog("Queue1", "WALLogFile1.2"); @@ -298,6 +311,56 @@ public class TestReplicationStateHBaseImpl { } } + @Test + public void TestReplicationQueuesClient() throws Exception{ + + // Test ReplicationQueuesClient log tracking + rq1.addLog("Queue1", "WALLogFile1.1"); + assertEquals(1, rqc.getLogsInQueue(server1, "Queue1").size()); + rq1.removeLog("Queue1", "WALLogFile1.1"); + assertEquals(0, rqc.getLogsInQueue(server1, "Queue1").size()); + rq2.addLog("Queue2", "WALLogFile2.1"); + rq2.addLog("Queue2", "WALLogFile2.2"); + assertEquals(2, rqc.getLogsInQueue(server2, "Queue2").size()); + rq3.addLog("Queue1", "WALLogFile1.1"); + rq3.addLog("Queue3", "WALLogFile3.1"); + rq3.addLog("Queue3", "WALLogFile3.2"); + + // Test ReplicationQueueClient log tracking for faulty cases + assertEquals(0, ds0.getAbortCount()); + assertNull(rqc.getLogsInQueue("NotHereServer", "NotHereQueue")); + assertNull(rqc.getLogsInQueue(server1, "NotHereQueue")); + assertNull(rqc.getLogsInQueue("NotHereServer", "WALLogFile1.1")); + assertEquals(3, ds0.getAbortCount()); + // Test ReplicationQueueClient replicators + List replicators = rqc.getListOfReplicators(); + assertEquals(3, replicators.size()); + assertTrue(replicators.contains(server1)); + assertTrue(replicators.contains(server2)); + rq1.removeQueue("Queue1"); + assertEquals(2, rqc.getListOfReplicators().size()); + + // Test ReplicationQueuesClient queue tracking + assertEquals(0, rqc.getAllQueues(server1).size()); + rq1.addLog("Queue2", "WALLogFile2.1"); + rq1.addLog("Queue3", "WALLogFile3.1"); + assertEquals(2, rqc.getAllQueues(server1).size()); + rq1.removeAllQueues(); + assertEquals(0, rqc.getAllQueues(server1).size()); + + // Test ReplicationQueuesClient queue tracking for faulty cases + assertEquals(0, rqc.getAllQueues("NotHereServer").size()); + + // Test ReplicationQueuesClient get all WAL's + assertEquals(5 , rqc.getAllWALs().size()); + rq3.removeLog("Queue1", "WALLogFile1.1"); + assertEquals(4, rqc.getAllWALs().size()); + rq3.removeAllQueues(); + assertEquals(2, rqc.getAllWALs().size()); + rq2.removeAllQueues(); + assertEquals(0, rqc.getAllWALs().size()); + } + @After public void clearQueues() throws Exception{ rq1.removeAllQueues(); @@ -306,6 +369,7 @@ public class TestReplicationStateHBaseImpl { assertEquals(0, rq1.getAllQueues().size()); assertEquals(0, rq2.getAllQueues().size()); assertEquals(0, rq3.getAllQueues().size()); + ds0.resetAbortCount(); ds1.resetAbortCount(); ds2.resetAbortCount(); ds3.resetAbortCount(); @@ -313,7 +377,7 @@ public class TestReplicationStateHBaseImpl { @After public void tearDown() throws KeeperException, IOException { - ZKUtil.deleteNodeRecursively(zkw, replicationZNode); + ZKUtil.deleteNodeRecursively(zkw, replicationZNode); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 972a400..a357a1f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -96,11 +96,13 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw)); rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw)); rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw)); + rqc = ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(conf, ds1, zkw)); } catch (Exception e) { - // This should not occur, because getReplicationQueues() only throws for ReplicationQueuesHBaseImpl + // This should not occur, because getReplicationQueues() only throws for + // TableBasedReplicationQueuesImpl fail("ReplicationFactory.getReplicationQueues() threw an IO Exception"); } - rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1); rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index e14fd3c..bf47d4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -68,6 +68,8 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; @@ -436,8 +438,9 @@ public class TestReplicationSourceManager { s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); - ReplicationQueuesClient client = - ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1); + ReplicationQueuesClientZKImpl client = + (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper())); int v0 = client.getQueuesZNodeCversion(); rq1.claimQueues(s0.getServerName().getServerName()); -- 2.8.0-rc2