Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1022813) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -61,12 +61,12 @@ private static final char ZNODE_PATH_SEPARATOR = '/'; /** - * Creates a new connection to ZooKeeper, pulling settings and quorum config + * Creates a new connection to ZooKeeper, pulling settings and ensemble config * from the specified configuration object using methods from {@link ZKConfig}. * * Sets the connection status monitoring watcher to the specified watcher. * - * @param conf configuration to pull quorum and other settings from + * @param conf configuration to pull ensemble and other settings from * @param watcher watcher to monitor connection changes * @return connection to zookeeper * @throws IOException if unable to connect to zk or config problem @@ -74,26 +74,26 @@ public static ZooKeeper connect(Configuration conf, Watcher watcher) throws IOException { Properties properties = ZKConfig.makeZKProps(conf); - String quorum = ZKConfig.getZKQuorumServersString(properties); - return connect(conf, quorum, watcher); + String ensemble = ZKConfig.getZKQuorumServersString(properties); + return connect(conf, ensemble, watcher); } - public static ZooKeeper connect(Configuration conf, String quorum, + public static ZooKeeper connect(Configuration conf, String ensemble, Watcher watcher) throws IOException { - return connect(conf, quorum, watcher, ""); + return connect(conf, ensemble, watcher, ""); } - public static ZooKeeper connect(Configuration conf, String quorum, + public static ZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, final String descriptor) throws IOException { - if(quorum == null) { - throw new IOException("Unable to determine ZooKeeper quorum"); + if(ensemble == null) { + throw new IOException("Unable to determine ZooKeeper ensemble"); } int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000); - LOG.info(descriptor + " opening connection to ZooKeeper with quorum (" + - quorum + ")"); - return new ZooKeeper(quorum, timeout, watcher); + LOG.info(descriptor + " opening connection to ZooKeeper with ensemble (" + + ensemble + ")"); + return new ZooKeeper(ensemble, timeout, watcher); } // @@ -164,9 +164,9 @@ * @return ensemble key with a name (if any) */ public static String getZooKeeperClusterKey(Configuration conf, String name) { - String quorum = conf.get(HConstants.ZOOKEEPER_QUORUM.replaceAll( + String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM.replaceAll( "[\\t\\n\\x0B\\f\\r]", "")); - StringBuilder builder = new StringBuilder(quorum); + StringBuilder builder = new StringBuilder(ensemble); builder.append(":"); builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); if (name != null && !name.isEmpty()) { Index: src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java (revision 1022813) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java (working copy) @@ -69,11 +69,10 @@ * @throws InterruptedException When the job gets interrupted. */ @Override - public void reduce(Writable key, Iterable values, - Context context) throws IOException, InterruptedException { + public void reduce(Writable key, Iterable values, Context context) + throws IOException, InterruptedException { for(Writable putOrDelete : values) { context.write(key, putOrDelete); } } - -} +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (revision 1022813) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (working copy) @@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -37,7 +36,6 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.util.StringUtils; /** * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an @@ -189,8 +187,6 @@ return true; } - - /** * Allows subclasses to get the {@link HTable}. */ @@ -235,5 +231,4 @@ protected void setTableRecordReader(TableRecordReader tableRecordReader) { this.tableRecordReader = tableRecordReader; } - -} +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1022813) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy) @@ -26,17 +26,15 @@ import java.io.IOException; import java.net.URL; import java.net.URLDecoder; -import java.util.ArrayList; import java.util.Enumeration; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; @@ -46,7 +44,6 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.conf.Configuration; /** * Utility for {@link TableMapper} and {@link TableReducer} @@ -64,13 +61,15 @@ * @param mapper The mapper class to use. * @param outputKeyClass The class of the output key. * @param outputValueClass The class of the output value. - * @param job The current job to adjust. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. * @throws IOException When setting up the details fails. */ public static void initTableMapperJob(String table, Scan scan, Class mapper, Class outputKeyClass, - Class outputValueClass, Job job) throws IOException { + Class outputValueClass, Job job) + throws IOException { job.setInputFormatClass(TableInputFormat.class); if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); @@ -148,10 +147,18 @@ * * @param table The output table. * @param reducer The reducer class to use. - * @param job The current job to adjust. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. * @param partitioner Partitioner to use. Pass null to use * default partitioner. - * @param quorumAddress Distant cluster to write to + * @param quorumAddress Distant cluster to write to; default is null for + * output to the cluster that is designated in hbase-site.xml. + * Set this String to the zookeeper ensemble of an alternate remote cluster + * when you would have the reduce write a cluster that is other than the + * default; e.g. copying tables between clusters, the source would be + * designated by hbase-site.xml and this param would have the + * ensemble address of the remote cluster. The format to pass is particular. + * Pass <hbase.zookeeper.quorum> ':' <ZOOKEEPER_ZNODE_PARENT>. * @param serverClass redefined hbase.regionserver.class * @param serverImpl redefined hbase.regionserver.impl * @throws IOException When determining the region count fails. @@ -165,12 +172,14 @@ job.setOutputFormatClass(TableOutputFormat.class); if (reducer != null) job.setReducerClass(reducer); conf.set(TableOutputFormat.OUTPUT_TABLE, table); + // If passed a quorum/ensemble address, pass it on to TableOutputFormat. if (quorumAddress != null) { if (quorumAddress.split(":").length == 2) { conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress); } else { - throw new IOException("Please specify the peer cluster as " + - HConstants.ZOOKEEPER_QUORUM+":"+HConstants.ZOOKEEPER_ZNODE_PARENT); + // Not in expected format. + throw new IOException("Please specify the peer cluster using the format of " + + HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_ZNODE_PARENT); } } if (serverClass != null && serverImpl != null) { Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (revision 1022813) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (working copy) @@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; @@ -141,5 +140,4 @@ setScan(scan); } - -} +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (revision 1022813) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (working copy) @@ -23,7 +23,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; @@ -34,7 +35,6 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.conf.Configuration; /** * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored @@ -43,13 +43,22 @@ * * @param The type of the key. Ignored in this class. */ -public class TableOutputFormat extends OutputFormat { +public class TableOutputFormat extends OutputFormat +implements Configurable { private final Log LOG = LogFactory.getLog(TableOutputFormat.class); + /** Job parameter that specifies the output table. */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; - /** Optional job parameter to specify a peer cluster */ + + /** + * Optional job parameter to specify a peer cluster. + * Used specifying remote cluster when copying between hbase clusters (the + * source is picked up from hbase-site.xml). + * @see {@link TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)} + */ public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum"; + /** Optional specification of the rs class name of the peer cluster */ public static final String REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; @@ -57,6 +66,11 @@ public static final String REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; + /** The configuration. */ + private Configuration conf = null; + + private HTable table; + /** * Writes the reducer output to an HBase table. * @@ -120,32 +134,7 @@ public RecordWriter getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { - // expecting exactly one path - Configuration conf = new Configuration(context.getConfiguration()); - String tableName = conf.get(OUTPUT_TABLE); - String address = conf.get(QUORUM_ADDRESS); - String serverClass = conf.get(REGION_SERVER_CLASS); - String serverImpl = conf.get(REGION_SERVER_IMPL); - HTable table = null; - try { - HBaseConfiguration.addHbaseResources(conf); - if (address != null) { - // Check is done in TMRU - String[] parts = address.split(":"); - conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); - conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]); - } - if (serverClass != null) { - conf.set(HConstants.REGION_SERVER_CLASS, serverClass); - conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); - } - table = new HTable(conf, tableName); - } catch(IOException e) { - LOG.error(e); - throw e; - } - table.setAutoFlush(false); - return new TableRecordWriter(table); + return new TableRecordWriter(this.table); } /** @@ -178,4 +167,32 @@ return new TableOutputCommitter(); } -} + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + String tableName = conf.get(OUTPUT_TABLE); + String address = conf.get(QUORUM_ADDRESS); + String serverClass = conf.get(REGION_SERVER_CLASS); + String serverImpl = conf.get(REGION_SERVER_IMPL); + try { + if (address != null) { + // Check is done in TMRU + String[] parts = address.split(":"); + conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]); + } + if (serverClass != null) { + conf.set(HConstants.REGION_SERVER_CLASS, serverClass); + conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); + } + this.table = new HTable(conf, tableName); + table.setAutoFlush(false); + LOG.info("Created table instance for " + tableName); + } catch(IOException e) { + LOG.error(e); + } + } +} \ No newline at end of file