Index: src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (revision 1451134) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (working copy) @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; @@ -201,6 +202,24 @@ } TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); + + /** + * Obtain the auth token from peer cluster for job before start map. + */ + if (User.isHBaseSecurityEnabled(job.getConfiguration())) { + try { + HConnection connection = HConnectionManager.getConnection(conf); + ReplicationZookeeper zk = new ReplicationZookeeper(connection, conf, + connection.getZooKeeperWatcher()); + ReplicationPeer peer = zk.getPeer(conf.get(NAME + ".peerId")); + User.getCurrent().obtainAuthTokenForJob(peer.getConfiguration(), job); + } catch (InterruptedException e) { + throw new IOException("Got a InterruptedException", e); + } catch (KeeperException e) { + throw new IOException("Got a ZK exception", e); + } + } + job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); return job; Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1451134) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -171,9 +171,9 @@ this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); - ZKUtil.createWithParents(this.zookeeper, this.peersZNode); + ZKUtil.createWithParentsIfNotExists(this.zookeeper, this.peersZNode); this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName); - ZKUtil.createWithParents(this.zookeeper, this.rsZNode); + ZKUtil.createWithParentsIfNotExists(this.zookeeper, this.rsZNode); // Set a tracker on replicationStateNodeNode this.statusTracker = Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1451134) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -917,14 +917,16 @@ if (isSecureZooKeeper(zkw.getConfiguration())) { // Certain znodes are accessed directly by the client, // so they must be readable by non-authenticated clients - if ((node.equals(zkw.baseZNode) == true) || - (node.equals(zkw.rootServerZNode) == true) || - (node.equals(zkw.masterAddressZNode) == true) || - (node.equals(zkw.clusterIdZNode) == true) || - (node.equals(zkw.rsZNode) == true) || - (node.equals(zkw.backupMasterAddressesZNode) == true) || - (node.startsWith(zkw.masterTableZNode) == true) || - (node.startsWith(zkw.masterTableZNode92) == true)) { + if ((node.equals(zkw.baseZNode) == true) + || (node.equals(zkw.rootServerZNode) == true) + || (node.equals(zkw.masterAddressZNode) == true) + || (node.equals(zkw.clusterIdZNode) == true) + || (node.equals(zkw.rsZNode) == true) + || (node.equals(zkw.backupMasterAddressesZNode) == true) + || (node.startsWith(zkw.masterTableZNode) == true) + || (node.startsWith(joinZNode(zkw.baseZNode, zkw.getConfiguration() + .get("zookeeper.znode.replication", "replication"))) == true) + || (node.startsWith(zkw.masterTableZNode92) == true)) { return ZooKeeperWatcher.CREATOR_ALL_AND_WORLD_READABLE; } return Ids.CREATOR_ALL_ACL; @@ -1135,6 +1137,49 @@ zkw.interruptedException(ie); } } + + /** + * Creates the specified node and all parent nodes required for it to exist. + * + * No watches are set and no errors are thrown if the node already exists, the + * difference with createWithParents is this func check if the node exists before + * create either than to know the node exists by catch NodeExistsException throwed + * by create. + * + * The nodes created are persistent and open access. + * + * @param zkw zk reference + * @param znode path of node + * @throws KeeperException if unexpected zookeeper exception + */ + public static void createWithParentsIfNotExists(ZooKeeperWatcher zkw, + String znode) throws KeeperException { + try { + if (znode == null) { + return; + } + waitForZKConnectionIfAuthenticating(zkw); + try { + if (zkw.getRecoverableZooKeeper().exists(znode, zkw) != null) { + LOG.info("Node " + znode + " already exists, no need create!"); + return; + } + } catch (InterruptedException e) { + zkw.interruptedException(e); + LOG.info("Check if node " + znode + " exists interrupted!"); + return; + } + zkw.getRecoverableZooKeeper().create(znode, new byte[0], + createACL(zkw, znode), CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException nee) { + return; + } catch (KeeperException.NoNodeException nne) { + createWithParents(zkw, getParent(znode)); + createWithParents(zkw, znode); + } catch (InterruptedException ie) { + zkw.interruptedException(ie); + } + } // // Deletes