diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index cdca97c..3d33a76 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -324,6 +324,20 @@ public static void initCredentials(Job job) throws IOException { } } + public static void initCredentialsForCluster(Job job, String quorumAddress) + throws IOException { + UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); + if (userProvider.isHBaseSecurityEnabled()) { + try { + Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); + ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); + obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent()); + } catch (InterruptedException e) { + throw new IOException("Got an InterruptedException", e); + } + } + } + private static void obtainAuthTokenForJob(Job job, Configuration conf, User user) throws IOException, InterruptedException { Token authToken = getAuthToken(conf, user); diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index b7d540b..8ad86d3 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.zookeeper.KeeperException; @@ -110,17 +111,13 @@ public class VerifyReplication { HConnectionManager.execute(new HConnectable(conf) { @Override public Void connect(HConnection conn) throws IOException { - try { - ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, - conn.getZooKeeperWatcher()); - ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId")); - HTable replicatedTable = new HTable(peer.getConfiguration(), - conf.get(NAME+".tableName")); - scan.setStartRow(value.getRow()); - replicatedScanner = replicatedTable.getScanner(scan); - } catch (KeeperException e) { - throw new IOException("Got a ZK exception", e); - } + String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); + Configuration peerConf = HBaseConfiguration.create(conf); + ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); + + HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName")); + scan.setStartRow(value.getRow()); + replicatedScanner = replicatedTable.getScanner(scan); return null; } }); @@ -143,6 +140,26 @@ public class VerifyReplication { } } + private static String getPeerQuorumAddress(final Configuration conf) throws IOException { + HConnection conn = HConnectionManager.getConnection(conf); + try { + ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, + conn.getZooKeeperWatcher()); + + ReplicationPeer peer = zk.getPeer(peerId); + if (peer == null) { + throw new IOException("Couldn't get peer conf!"); + } + + Configuration peerConf = peer.getConfiguration(); + return ZKUtil.getZooKeeperClusterKey(peerConf); + } catch (KeeperException e) { + throw new IOException("Got a ZK exception", e); + } finally { + conn.close(); + } + } + /** * Sets up the actual job. * @@ -185,6 +202,11 @@ public class VerifyReplication { if (families != null) { conf.set(NAME+".families", families); } + + String peerQuorumAddress = getPeerQuorumAddress(conf); + conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); + LOG.info("Peer Quorum Address: " + peerQuorumAddress); + Job job = new Job(conf, NAME + "_" + tableName); job.setJarByClass(VerifyReplication.class); @@ -201,6 +223,10 @@ public class VerifyReplication { } TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); + + // Obtain the auth token from peer cluster + TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress); + job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); return job;