Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1452766) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy) @@ -290,6 +290,13 @@ public static void initCredentials(Job job) throws IOException { if (User.isHBaseSecurityEnabled(job.getConfiguration())) { try { + // init credentials for remote cluster + String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); + if (quorumAddress != null) { + Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); + ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); + User.getCurrent().obtainAuthTokenForJob(peerConf, job); + } User.getCurrent().obtainAuthTokenForJob(job.getConfiguration(), job); } catch (InterruptedException ie) { LOG.info("Interrupted obtaining user authentication token"); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (revision 1452766) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (working copy) @@ -182,15 +182,21 @@ @Override public void setConf(Configuration otherConf) { this.conf = HBaseConfiguration.create(otherConf); + String tableName = this.conf.get(OUTPUT_TABLE); if(tableName == null || tableName.length() <= 0) { throw new IllegalArgumentException("Must specify table name"); } + String address = this.conf.get(QUORUM_ADDRESS); - int zkClientPort = conf.getInt(QUORUM_PORT, 0); + int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); String serverClass = this.conf.get(REGION_SERVER_CLASS); String serverImpl = this.conf.get(REGION_SERVER_IMPL); + try { + // reset peer cluster's clusterId for token authentication to work + this.conf.set(HConstants.CLUSTER_ID, ""); + if (address != null) { ZKUtil.applyClusterKeyToConf(this.conf, address); } @@ -198,7 +204,7 @@ this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); } if (zkClientPort != 0) { - conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); + this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); } this.table = new HTable(this.conf, tableName); this.table.setAutoFlush(false); Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1452766) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -631,20 +631,20 @@ } private String clusterId = null; + public final void retrieveClusterId(){ - if (conf.get(HConstants.CLUSTER_ID) != null){ + clusterId = conf.get(HConstants.CLUSTER_ID); + + if (clusterId != null && !clusterId.isEmpty()) { return; } // No synchronized here, worse case we will retrieve it twice, that's // not an issue. - if (this.clusterId == null){ - this.clusterId = conf.get(HConstants.CLUSTER_ID); - if (this.clusterId == null) { ZooKeeperKeepAliveConnection zkw = null; try { zkw = getKeepAliveZooKeeperWatcher(); - this.clusterId = ZKClusterId.readClusterIdZNode(zkw); + clusterId = ZKClusterId.readClusterIdZNode(zkw); if (clusterId == null) { LOG.info("ClusterId read in ZooKeeper is null"); } @@ -657,14 +657,11 @@ zkw.close(); } } - if (this.clusterId == null) { - this.clusterId = "default"; + if (clusterId == null) { + clusterId = "default"; } LOG.info("ClusterId is " + clusterId); - } - } - conf.set(HConstants.CLUSTER_ID, clusterId); }