diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index ebdd335..c9a6217 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -25,11 +25,11 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; import java.io.Closeable; import java.io.IOException; @@ -68,7 +68,7 @@ public class ReplicationAdmin implements Closeable { /** * Constructor that creates a connection to the local ZooKeeper ensemble. * @param conf Configuration to use - * @throws IOException if the connection to ZK cannot be made + * @throws IOException if an internal error occurs * @throws RuntimeException if replication isn't enabled. */ public ReplicationAdmin(Configuration conf) throws IOException { @@ -85,8 +85,8 @@ public class ReplicationAdmin implements Closeable { ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection); this.replicationQueuesClient.init(); - } catch (KeeperException e) { - throw new IOException("Unable setup the ZooKeeper connection", e); + } catch (ReplicationException e) { + throw new IOException("An internal error occured.", e); } } @@ -116,7 +116,7 @@ public class ReplicationAdmin implements Closeable { * @throws IllegalStateException if there's already one slave since * multi-slave isn't supported yet. */ - public void addPeer(String id, String clusterKey) throws IOException { + public void addPeer(String id, String clusterKey) throws ReplicationException { this.replicationPeers.addPeer(id, clusterKey); } @@ -124,7 +124,7 @@ public class ReplicationAdmin implements Closeable { * Removes a peer cluster and stops the replication to it. * @param id a short that identifies the cluster */ - public void removePeer(String id) throws IOException { + public void removePeer(String id) throws ReplicationException { this.replicationPeers.removePeer(id); } @@ -132,7 +132,7 @@ public class ReplicationAdmin implements Closeable { * Restart the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void enablePeer(String id) throws IOException { + public void enablePeer(String id) throws ReplicationException { this.replicationPeers.enablePeer(id); } @@ -140,7 +140,7 @@ public class ReplicationAdmin implements Closeable { * Stop the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void disablePeer(String id) throws IOException { + public void disablePeer(String id) throws ReplicationException { this.replicationPeers.disablePeer(id); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java new file mode 100644 index 0000000..8d5d364 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.HBaseException; + +/** + * An HBase Replication base exception. + */ +@InterfaceAudience.Public +public class ReplicationException extends HBaseException { + + private static final long serialVersionUID = -8885598603988198062L; + + public ReplicationException() { + super(); + } + + public ReplicationException(final String message) { + super(message); + } + + public ReplicationException(final String message, final Throwable t) { + super(message, t); + } + + public ReplicationException(final Throwable t) { + super(t); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 21f9ee4..5ba1bea 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.replication; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -27,7 +26,6 @@ import java.util.UUID; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; -import org.apache.zookeeper.KeeperException; /** * This provides an interface for maintaining a set of peer clusters. These peers are remote slave @@ -44,9 +42,8 @@ public interface ReplicationPeers { /** * Initialize the ReplicationPeers interface. - * @throws KeeperException */ - void init() throws IOException, KeeperException; + void init() throws ReplicationException; /** * Add a new remote slave cluster for replication. @@ -54,25 +51,25 @@ public interface ReplicationPeers { * @param clusterKey the concatenation of the slave cluster's: * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent */ - void addPeer(String peerId, String clusterKey) throws IOException; + void addPeer(String peerId, String clusterKey) throws ReplicationException; /** * Removes a remote slave cluster and stops the replication to it. * @param peerId a short that identifies the cluster */ - void removePeer(String peerId) throws IOException; + void removePeer(String peerId) throws ReplicationException; /** * Restart the replication to the specified remote slave cluster. * @param peerId a short that identifies the cluster */ - void enablePeer(String peerId) throws IOException; + void enablePeer(String peerId) throws ReplicationException; /** * Stop the replication to the specified remote slave cluster. * @param peerId a short that identifies the cluster */ - void disablePeer(String peerId) throws IOException; + void disablePeer(String peerId) throws ReplicationException; /** * Get the replication status for the specified connected remote slave cluster. @@ -106,7 +103,7 @@ public interface ReplicationPeers { * @param peerId a short that identifies the cluster * @return true if a new connection was made, false if no new connection was made. */ - boolean connectToPeer(String peerId) throws IOException, KeeperException; + boolean connectToPeer(String peerId) throws ReplicationException; /** * Disconnect from a remote slave cluster. @@ -142,5 +139,5 @@ public interface ReplicationPeers { * @param peerId a short that identifies the cluster * @return the configuration for the peer cluster, null if it was unable to get the configuration */ - Configuration getPeerConf(String peerId) throws KeeperException; + Configuration getPeerConf(String peerId) throws ReplicationException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index f0d6f14..33ff5d6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -80,16 +80,21 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public void init() throws IOException, KeeperException { - ZKUtil.createWithParents(this.zookeeper, this.peersZNode); - connectExistingPeers(); + public void init() throws ReplicationException { + try { + ZKUtil.createWithParents(this.zookeeper, this.peersZNode); + connectExistingPeers(); + } catch (KeeperException e) { + throw new ReplicationException("Internal error initializing replication peers", e); + } } @Override - public void addPeer(String id, String clusterKey) throws IOException { + public void addPeer(String id, String clusterKey) throws ReplicationException { try { if (peerExists(id)) { - throw new IllegalArgumentException("Cannot add existing peer"); + throw new IllegalArgumentException("Cannot add a peer with id=" + id + + " because that id already exists."); } ZKUtil.createWithParents(this.zookeeper, this.peersZNode); ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id), @@ -101,30 +106,32 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re ENABLED_ZNODE_BYTES); // A peer is enabled by default } catch (KeeperException e) { - throw new IOException("Unable to add peer", e); + throw new ReplicationException("Internal error when adding peer with id=" + id + + ", clusterKey=" + clusterKey, e); } } @Override - public void removePeer(String id) throws IOException { + public void removePeer(String id) throws ReplicationException { try { if (!peerExists(id)) { - throw new IllegalArgumentException("Cannot remove inexisting peer"); + throw new IllegalArgumentException("Cannot remove peer with id=" + id + + " because that id does not exist."); } ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)); } catch (KeeperException e) { - throw new IOException("Unable to remove a peer", e); + throw new ReplicationException("Internal error when removing peer with id=" + id, e); } } @Override - public void enablePeer(String id) throws IOException { + public void enablePeer(String id) throws ReplicationException { changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED); LOG.info("peer " + id + " is enabled"); } @Override - public void disablePeer(String id) throws IOException { + public void disablePeer(String id) throws ReplicationException { changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED); LOG.info("peer " + id + " is disabled"); } @@ -132,20 +139,25 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public boolean getStatusOfConnectedPeer(String id) { if (!this.peerClusters.containsKey(id)) { - throw new IllegalArgumentException("peer " + id + " is not connected"); + throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); } return this.peerClusters.get(id).getPeerEnabled().get(); } @Override - public boolean connectToPeer(String peerId) throws IOException, KeeperException { + public boolean connectToPeer(String peerId) throws ReplicationException { if (peerClusters == null) { return false; } if (this.peerClusters.containsKey(peerId)) { return false; } - ReplicationPeer peer = getPeer(peerId); + ReplicationPeer peer = null; + try { + peer = getPeer(peerId); + } catch (Exception e) { + throw new ReplicationException("Error connecting to peer with id=" + peerId, e); + } if (peer == null) { return false; } @@ -227,9 +239,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public Configuration getPeerConf(String peerId) throws KeeperException { + public Configuration getPeerConf(String peerId) throws ReplicationException { String znode = ZKUtil.joinZNode(this.peersZNode, peerId); - byte[] data = ZKUtil.getData(this.zookeeper, znode); + byte[] data = null; + try { + data = ZKUtil.getData(this.zookeeper, znode); + } catch (KeeperException e) { + throw new ReplicationException("Internal error getting configuration for peer with id=" + + peerId, e); + } if (data == null) { LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId); return null; @@ -278,10 +296,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re /** * A private method used during initialization. This method attempts to connect to all registered * peer clusters. This method does not set a watch on the peer cluster znodes. - * @throws IOException - * @throws KeeperException */ - private void connectExistingPeers() throws IOException, KeeperException { + private void connectExistingPeers() throws ReplicationException, KeeperException { List znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); if (znodes != null) { for (String z : znodes) { @@ -334,13 +350,13 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re * Update the state znode of a peer cluster. * @param id * @param state - * @throws IOException */ private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state) - throws IOException { + throws ReplicationException { try { if (!peerExists(id)) { - throw new IllegalArgumentException("peer " + id + " is not registered"); + throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id + + " does not exist."); } String peerStateZNode = getPeerStateNode(id); byte[] stateBytes = @@ -351,9 +367,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } else { ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes); } - LOG.info("state of the peer " + id + " changed to " + state.name()); + LOG.info("Peer with id= " + id + " is now " + state.name()); } catch (KeeperException e) { - throw new IOException("Unable to change state of the peer " + id, e); + throw new ReplicationException("Unable to change state of the peer with id=" + id, e); } } @@ -364,7 +380,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re * @throws IOException * @throws KeeperException */ - private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException { + private ReplicationPeer getPeer(String peerId) throws ReplicationException, IOException, + KeeperException { Configuration peerConf = getPeerConf(peerId); if (peerConf == null) { return null; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index d641088..80d1adf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -23,7 +23,6 @@ import java.util.SortedMap; import java.util.SortedSet; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.zookeeper.KeeperException; /** * This provides an interface for maintaining a region server's replication queues. These queues @@ -37,7 +36,7 @@ public interface ReplicationQueues { * @param serverName The server name of the region server that owns the replication queues this * interface manages. */ - void init(String serverName) throws KeeperException; + void init(String serverName) throws ReplicationException; /** * Remove a replication queue. @@ -49,9 +48,8 @@ public interface ReplicationQueues { * Add a new HLog file to the given queue. If the queue does not exist it is created. * @param queueId a String that identifies the queue. * @param filename name of the HLog - * @throws KeeperException */ - void addLog(String queueId, String filename) throws KeeperException; + void addLog(String queueId, String filename) throws ReplicationException; /** * Remove an HLog file from the given queue. @@ -74,7 +72,7 @@ public interface ReplicationQueues { * @param filename name of the HLog * @return the current position in the file */ - long getLogPosition(String queueId, String filename) throws KeeperException; + long getLogPosition(String queueId, String filename) throws ReplicationException; /** * Remove all replication queues for this region server. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java index 8fd3277..689afba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication; import java.util.List; -import org.apache.zookeeper.KeeperException; /** * This provides an interface for clients of replication to view replication queues. These queues @@ -31,7 +30,7 @@ public interface ReplicationQueuesClient { /** * Initialize the replication queue client interface. */ - public void init() throws KeeperException; + public void init() throws ReplicationException; /** * Get a list of all region servers that have outstanding replication queues. These servers could diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index 6bac186..97457e4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -35,8 +35,12 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem } @Override - public void init() throws KeeperException { - ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); + public void init() throws ReplicationException { + try { + ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); + } catch (KeeperException e) { + throw new ReplicationException("Internal error while initializing a queues client", e); + } } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 1b55f92..58f81ea 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -44,16 +44,16 @@ import org.apache.zookeeper.KeeperException; * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of * all outstanding HLog files on this region server that need to be replicated. The myQueuesZnode is * the regionserver name (a concatenation of the region server’s hostname, client port and start - * code). For example: + * code). For example: * - * /hbase/replication/rs/hostname.example.org,6020,1234 + * /hbase/replication/rs/hostname.example.org,6020,1234 * * Within this znode, the region server maintains a set of HLog replication queues. These queues are * represented by child znodes named using there give queue id. For example: * * /hbase/replication/rs/hostname.example.org,6020,1234/1 * /hbase/replication/rs/hostname.example.org,6020,1234/2 - * + * * Each queue has one child znode for every HLog that still needs to be replicated. The value of * these HLog child znodes is the latest position that has been replicated. This position is updated * every time a HLog entry is replicated. For example: @@ -75,9 +75,14 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override - public void init(String serverName) throws KeeperException { + public void init(String serverName) throws ReplicationException { this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName); - ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode); + try { + ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode); + } catch (KeeperException e) { + throw new ReplicationException( + "Internal error occured while initializing replication queues.", e); + } } @Override @@ -90,10 +95,16 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override - public void addLog(String queueId, String filename) throws KeeperException { + public void addLog(String queueId, String filename) throws ReplicationException { String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); znode = ZKUtil.joinZNode(znode, filename); - ZKUtil.createWithParents(this.zookeeper, znode); + try { + ZKUtil.createWithParents(this.zookeeper, znode); + } catch (KeeperException e) { + throw new ReplicationException( + "Internal Error: Could not add log because znode could not be created. queueId=" + + queueId + ", filename=" + filename); + } } @Override @@ -122,10 +133,16 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override - public long getLogPosition(String queueId, String filename) throws KeeperException { + public long getLogPosition(String queueId, String filename) throws ReplicationException { String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); String znode = ZKUtil.joinZNode(clusterZnode, filename); - byte[] bytes = ZKUtil.getData(this.zookeeper, znode); + byte[] bytes = null; + try { + bytes = ZKUtil.getData(this.zookeeper, znode); + } catch (KeeperException e) { + throw new ReplicationException("Internal Error: could not get position in log for queueId=" + + queueId + ", filename=" + filename, e); + } try { return ZKUtil.parseHLogPositionFrom(bytes); } catch (DeserializationException de) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 2bf5fda..aebc172 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -36,15 +36,14 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.zookeeper.KeeperException; /** * This map-only job compares the data from a local table with a remote one. @@ -128,8 +127,8 @@ public class VerifyReplication { HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName")); scan.setStartRow(value.getRow()); replicatedScanner = replicatedTable.getScanner(scan); - } catch (KeeperException e) { - throw new IOException("Got a ZK exception", e); + } catch (ReplicationException e) { + throw new IOException("An internal replication error occured", e); } finally { if (peer != null) { peer.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 423a1fd..bf43fa8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -190,6 +190,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; @@ -2196,8 +2197,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa /** * Load the replication service objects, if any */ - static private void createNewReplicationInstance(Configuration conf, - HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{ + static private void createNewReplicationInstance(Configuration conf, HRegionServer server, + FileSystem fs, Path logDir, Path oldLogDir) throws IOException { // If replication is not enabled, then return immediately. if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 86669fa..42a7c8c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; @@ -125,7 +126,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this); this.replicationQueues.init(); - } catch (KeeperException e) { + } catch (ReplicationException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } catch (IOException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 8692390..7eb9050 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -119,8 +120,8 @@ public class Replication implements WALActionsListener, this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, this.conf, this.server, this.server); - } catch (KeeperException ke) { - throw new IOException("Failed replication handler create", ke); + } catch (ReplicationException e) { + throw new IOException("Failed replication handler create", e); } UUID clusterId = null; try { @@ -197,7 +198,11 @@ public class Replication implements WALActionsListener, */ public void startReplicationService() throws IOException { if (this.replication) { - this.replicationManager.init(); + try { + this.replicationManager.init(); + } catch (ReplicationException e) { + throw new IOException(e); + } this.replicationSink = new ReplicationSink(this.conf, this.server); this.scheduleThreadPool.scheduleAtFixedRate( new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 726bb6b..9565055 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.Block import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -234,7 +235,7 @@ public class ReplicationSource extends Thread LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " + this.repLogReader.getPosition()); } - } catch (KeeperException e) { + } catch (ReplicationException e) { this.terminate("Couldn't get the position of this recovered queue " + this.peerClusterZnode, e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a10e4e6..e984a7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -190,7 +191,7 @@ public class ReplicationSourceManager implements ReplicationListener { * Adds a normal source per registered peer cluster and tries to process all * old region server hlog queues */ - public void init() throws IOException { + protected void init() throws IOException, ReplicationException { for (String id : this.replicationPeers.getConnectedPeers()) { addSource(id); } @@ -216,7 +217,8 @@ public class ReplicationSourceManager implements ReplicationListener { * @return the source that was created * @throws IOException */ - public ReplicationSourceInterface addSource(String id) throws IOException { + protected ReplicationSourceInterface addSource(String id) throws IOException, + ReplicationException { ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, stopper, id, this.clusterId); @@ -229,11 +231,12 @@ public class ReplicationSourceManager implements ReplicationListener { this.hlogsById.get(id).add(name); try { this.replicationQueues.addLog(src.getPeerClusterZnode(), name); - } catch (KeeperException ke) { - String message = "Cannot add log to zk for" + - " replication when creating a new source"; + } catch (ReplicationException e) { + String message = + "Cannot add log to queue when creating a new source, queueId=" + + src.getPeerClusterZnode() + ", filename=" + name; stopper.stop(message); - throw new IOException(message, ke); + throw e; } src.enqueueLog(this.latestPath); } @@ -289,8 +292,9 @@ public class ReplicationSourceManager implements ReplicationListener { for (ReplicationSourceInterface source : this.sources) { try { this.replicationQueues.addLog(source.getPeerClusterZnode(), name); - } catch (KeeperException ke) { - throw new IOException("Cannot add log to zk for replication", ke); + } catch (ReplicationException e) { + throw new IOException("Cannot add log to replication queue with id=" + + source.getPeerClusterZnode() + ", filename=" + name, e); } } for (SortedSet hlogs : this.hlogsById.values()) { @@ -323,7 +327,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @return the created source * @throws IOException */ - public ReplicationSourceInterface getReplicationSource(final Configuration conf, + protected ReplicationSourceInterface getReplicationSource(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException { @@ -431,10 +435,7 @@ public class ReplicationSourceManager implements ReplicationListener { if (added) { addSource(id); } - } catch (IOException e) { - // TODO manage better than that ? - LOG.error("Error while adding a new peer", e); - } catch (KeeperException e) { + } catch (Exception e) { LOG.error("Error while adding a new peer", e); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index d79cc60..d07b99f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -68,7 +68,7 @@ public abstract class TestReplicationStateBasic { } @Test - public void testReplicationQueuesClient() throws KeeperException { + public void testReplicationQueuesClient() throws ReplicationException { rqc.init(); // Test methods with empty state assertEquals(0, rqc.getListOfReplicators().size()); @@ -109,7 +109,7 @@ public abstract class TestReplicationStateBasic { } @Test - public void testReplicationQueues() throws KeeperException, IOException { + public void testReplicationQueues() throws ReplicationException { rq1.init(server1); rq2.init(server2); rq3.init(server3); @@ -255,7 +255,7 @@ public abstract class TestReplicationStateBasic { * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, * 3, 4, 5 log files respectively */ - protected void populateQueues() throws KeeperException, IOException { + protected void populateQueues() throws ReplicationException { rq1.addLog("trash", "trash"); rq1.removeQueue("trash");