Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java (revision 1482639) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -76,6 +78,26 @@ Configuration conf; boolean useWriteAheadLogging; + /* + * setup related config entries for distant cluster, if specified in conf + */ + void setZookeeperQuorum() { + String address = this.conf.get(TableOutputFormat.QUORUM_ADDRESS); + int zkClientPort = this.conf.getInt(TableOutputFormat.QUORUM_PORT, 0); + + try { + if (address != null) { + ZKUtil.applyClusterKeyToConf(this.conf, address); + } + if (zkClientPort != 0) { + this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); + } + } catch(IOException e) { + LOG.error(e); + throw new RuntimeException(e); + } + } + /** * @param conf * HBaseConfiguration to used @@ -90,6 +112,7 @@ this.tables = new HashMap(); this.conf = conf; this.useWriteAheadLogging = useWriteAheadLogging; + setZookeeperQuorum(); } /**