From e877b45a5e2f35789897cf5d5b9d6d58325685e5 Mon Sep 17 00:00:00 2001 From: chenheng Date: Mon, 23 Nov 2015 12:26:47 +0800 Subject: [PATCH] HBASE-14866 VerifyReplication should use peer configuration in peer connection --- .../mapreduce/replication/VerifyReplication.java | 27 ++++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 28f9f39..214b0a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.mapreduce.replication; import java.io.IOException; +import java.util.Map; +import com.google.gson.Gson; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -127,10 +129,14 @@ public class VerifyReplication extends Configured implements Tool { final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); - String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); + String peerConfig = conf.get(NAME + ".peerConfig"); + Gson gson = new Gson(); + ReplicationPeerConfig rpc = gson.fromJson(peerConfig, ReplicationPeerConfig.class); Configuration peerConf = HBaseConfiguration.create(conf); - ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); - + ZKUtil.applyClusterKeyToConf(peerConf, rpc.getClusterKey()); + for (Map.Entry entry : rpc.getConfiguration().entrySet()) { + peerConf.set(entry.getKey(), entry.getValue()); + } TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); connection = ConnectionFactory.createConnection(peerConf); replicatedTable = connection.getTable(tableName); @@ -208,7 +214,7 @@ public class VerifyReplication extends Configured implements Tool { } } - private static String getPeerQuorumAddress(final Configuration conf) throws IOException { + private static ReplicationPeerConfig getPeerConfig(final Configuration conf) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeerZKImpl peer = null; try { @@ -225,8 +231,7 @@ public class VerifyReplication extends Configured implements Tool { if (pair == null) { throw new IOException("Couldn't get peer conf!"); } - Configuration peerConf = rp.getPeerConf(peerId).getSecond(); - return ZKUtil.getZooKeeperClusterKey(peerConf); + return pair.getFirst(); } catch (ReplicationException e) { throw new IOException( "An error occured while trying to connect to the remove peer cluster", e); @@ -265,9 +270,11 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME+".families", families); } - String peerQuorumAddress = getPeerQuorumAddress(conf); - conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); - LOG.info("Peer Quorum Address: " + peerQuorumAddress); + ReplicationPeerConfig rpc = getPeerConfig(conf); + Gson gson = new Gson(); + String peerConfigStr = gson.toJson(rpc); + conf.set(NAME + ".peerConfig", peerConfigStr); + LOG.info("peerConfig: " + peerConfigStr); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(VerifyReplication.class); @@ -287,7 +294,7 @@ public class VerifyReplication extends Configured implements Tool { Verifier.class, null, null, job); // Obtain the auth token from peer cluster - TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress); + TableMapReduceUtil.initCredentialsForCluster(job, rpc.getClusterKey()); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); -- 1.9.3 (Apple Git-50)