From e3d9bd2c39731d337911e332eff6c2e73ca582fc Mon Sep 17 00:00:00 2001 From: chenheng Date: Mon, 23 Nov 2015 17:06:51 +0800 Subject: [PATCH] HBASE-14866 VerifyReplication should use peer configuration in peer connection --- .../mapreduce/replication/VerifyReplication.java | 30 ++++++++++++++-------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 28f9f39..edaaeef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce.replication; import java.io.IOException; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,6 +54,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; /** * This map-only job compares the data from a local table with a remote one. @@ -76,6 +78,7 @@ public class VerifyReplication extends Configured implements Tool { static String tableName = null; static String families = null; static String peerId = null; + static ObjectMapper objectMapper = new ObjectMapper(); private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; @@ -108,6 +111,7 @@ public class VerifyReplication extends Configured implements Tool { Context context) throws IOException { if (replicatedScanner == null) { + Configuration conf = context.getConfiguration(); final Scan scan = new Scan(); scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); @@ -127,10 +131,14 @@ public class VerifyReplication extends Configured implements Tool { final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); - String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); + String clusterKey = conf.get(NAME + ".peerClusterKey"); + String peerConfigStr = conf.get(NAME + ".peerConfig"); + Map peerConfig = objectMapper.readValue(peerConfigStr, Map.class); Configuration peerConf = HBaseConfiguration.create(conf); - ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); - + ZKUtil.applyClusterKeyToConf(peerConf, clusterKey); + for (Map.Entry entry : peerConfig.entrySet()) { + peerConf.set(entry.getKey(), entry.getValue()); + } TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); connection = ConnectionFactory.createConnection(peerConf); replicatedTable = connection.getTable(tableName); @@ -208,7 +216,7 @@ public class VerifyReplication extends Configured implements Tool { } } - private static String getPeerQuorumAddress(final Configuration conf) throws IOException { + private static ReplicationPeerConfig getPeerConfig(final Configuration conf) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeerZKImpl peer = null; try { @@ -225,8 +233,7 @@ public class VerifyReplication extends Configured implements Tool { if (pair == null) { throw new IOException("Couldn't get peer conf!"); } - Configuration peerConf = rp.getPeerConf(peerId).getSecond(); - return ZKUtil.getZooKeeperClusterKey(peerConf); + return pair.getFirst(); } catch (ReplicationException e) { throw new IOException( "An error occured while trying to connect to the remove peer cluster", e); @@ -265,9 +272,12 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME+".families", families); } - String peerQuorumAddress = getPeerQuorumAddress(conf); - conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); - LOG.info("Peer Quorum Address: " + peerQuorumAddress); + ReplicationPeerConfig rpc = getPeerConfig(conf); + conf.set(NAME + ".peerClusterKey", rpc.getClusterKey()); + String peerConfigStr = objectMapper.writeValueAsString(rpc.getConfiguration()); + conf.set(NAME + ".peerConfig", peerConfigStr); + LOG.info("peerClusterKey:" + rpc.getClusterKey() + + ", peerpeerConfig: " + peerConfigStr); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(VerifyReplication.class); @@ -287,7 +297,7 @@ public class VerifyReplication extends Configured implements Tool { Verifier.class, null, null, job); // Obtain the auth token from peer cluster - TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress); + TableMapReduceUtil.initCredentialsForCluster(job, rpc.getClusterKey()); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); -- 1.9.3 (Apple Git-50)