Index: security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java =================================================================== --- security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java (revision 1426834) +++ security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java (working copy) @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.io.HbaseObjectWritable; @@ -72,33 +73,31 @@ super(); } // no public ctor - /* Cache a client using its socket factory as the hash key */ + /* Cache a client using the configured clusterId */ static private class ClientCache { - private Map clients = - new HashMap(); + private Map clients = + new HashMap(); protected ClientCache() {} /** - * Construct & cache an IPC client with the user-provided SocketFactory - * if no cached client exists. + * Construct & cache an IPC client with the configured + * {@link HConstants#CLUSTER_ID} if no cached client exists. * - * @param conf Configuration - * @param factory socket factory + * @param conf + * Configuration + * @param factory + * socket factory * @return an IPC client */ protected synchronized SecureClient getClient(Configuration conf, SocketFactory factory) { - // Construct & cache client. The configuration is only used for timeout, - // and Clients have connection pools. So we can either (a) lose some - // connection pooling and leak sockets, or (b) use the same timeout for all - // configurations. Since the IPC is usually intended globally, not - // per-job, we choose (a). - SecureClient client = clients.get(factory); + String clusterId = conf.get(HConstants.CLUSTER_ID, "default"); + SecureClient client = clients.get(clusterId); if (client == null) { // Make an hbase client instead of hadoop Client. client = new SecureClient(HbaseObjectWritable.class, conf, factory); - clients.put(factory, client); + clients.put(clusterId, client); } else { client.incCount(); } @@ -106,10 +105,11 @@ } /** - * Construct & cache an IPC client with the default SocketFactory - * if no cached client exists. + * Construct & cache an IPC client with the configured + * {@link HConstants#CLUSTER_ID} if no cached client exists. * - * @param conf Configuration + * @param conf + * Configuration * @return an IPC client */ protected synchronized SecureClient getClient(Configuration conf) { @@ -125,7 +125,7 @@ synchronized (this) { client.decCount(); if (client.isZeroReference()) { - clients.remove(client.getSocketFactory()); + clients.remove(client.getClusterId()); } } if (client.isZeroReference()) { Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1426834) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy) @@ -219,6 +219,19 @@ public static void initCredentials(Job job) throws IOException { if (User.isHBaseSecurityEnabled(job.getConfiguration())) { try { + // init credentials for remote cluster + String quorumAddress = job.getConfiguration().get( + TableOutputFormat.QUORUM_ADDRESS); + if (quorumAddress != null) { + String[] parts = ZKUtil.transformClusterKey(quorumAddress); + Configuration peerConf = HBaseConfiguration.create(job + .getConfiguration()); + peerConf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); + peerConf.set("hbase.zookeeper.client.port", parts[1]); + peerConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]); + User.getCurrent().obtainAuthTokenForJob(peerConf, job); + } + User.getCurrent().obtainAuthTokenForJob(job.getConfiguration(), job); } catch (InterruptedException ie) { LOG.info("Interrupted obtaining user authentication token"); Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 1426834) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy) @@ -1202,4 +1202,11 @@ (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout; } } + + /** + * @return the clusterId + */ + public String getClusterId() { + return clusterId; + } }