diff --git a/src/contrib/mdc_replication/ivy.xml b/src/contrib/mdc_replication/ivy.xml index 7d85aa7..ce39f4c 100644 --- a/src/contrib/mdc_replication/ivy.xml +++ b/src/contrib/mdc_replication/ivy.xml @@ -86,6 +86,10 @@ rev="${hadoop-hdfs.version}" conf="common->default" changing="true" > + + + @@ -99,6 +103,8 @@ rev="${hadoop-core.version}" conf="test->default" transitive="false" changing="true" /> + diff --git a/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java index affbf7a..b1fe35a 100644 --- a/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java +++ b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java @@ -19,41 +19,60 @@ */ package org.apache.hadoop.hbase.replication; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.EmptyWatcher; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface; +import org.apache.hadoop.hbase.mapreduce.CopyTable; +import org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.mapreduce.Job; +import org.junit.After; +import org.junit.AfterClass; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.Assert.assertArrayEquals; - -import org.junit.*; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; -import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface; -import org.apache.hadoop.conf.Configuration; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.util.List; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; public class TestReplication implements HConstants{ protected static final Log LOG = LogFactory.getLog(TestReplication.class); - private Configuration conf1; - private Configuration conf2; + private static Configuration conf1; + private static Configuration conf2; - private ZooKeeperWrapper zkw1; - private ZooKeeperWrapper zkw2; + private static ZooKeeperWrapper zkw1; + private static ZooKeeperWrapper zkw2; - private HBaseTestingUtility utility1; - private HBaseTestingUtility utility2; + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; - private final int NB_ROWS_IN_BATCH = 100; - private final long SLEEP_TIME = 500; - private final int NB_RETRIES = 10; + private static final int NB_ROWS_IN_BATCH = 100; + private static final long SLEEP_TIME = 500; //ms + private static final int NB_RETRIES = 10; + + private static final byte[] tableName = Bytes.toBytes("test"); + private static final byte[] famName = Bytes.toBytes("f"); + private static final byte[] row = Bytes.toBytes("row"); /** @@ -61,7 +80,68 @@ public class TestReplication implements HConstants{ */ @BeforeClass public static void setUpBeforeClass() throws Exception { + try { + conf1 = HBaseConfiguration.create(); + conf1.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class + .getName()); + conf1.set(REGION_SERVER_IMPL, ReplicationRegionServer.class + .getName()); + conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1"); + + utility1 = new HBaseTestingUtility(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + zkw1 = new ZooKeeperWrapper(conf1, EmptyWatcher.instance); + zkw1.writeZNode("/1", "replication", ""); + zkw1.writeZNode("/1/replication", "master", + conf1.get(ZOOKEEPER_QUORUM)+":" + + conf1.get("hbase.zookeeper.property.clientPort")+":/1"); + setIsReplication("true"); + + LOG.info("Setup first Zk"); + + conf2 = HBaseConfiguration.create(); + conf2.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class + .getName()); + conf2.set(REGION_SERVER_IMPL, ReplicationRegionServer.class + .getName()); + conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2"); + + utility2 = new HBaseTestingUtility(conf2); + utility2.setZkCluster(miniZK); + zkw2 = new ZooKeeperWrapper(conf2, EmptyWatcher.instance); + zkw2.writeZNode("/2", "replication", ""); + zkw2.writeZNode("/2/replication", "master", + conf1.get(ZOOKEEPER_QUORUM)+":" + + conf1.get("hbase.zookeeper.property.clientPort")+":/1"); + + zkw1.writeZNode("/1/replication/peers", "test", + conf2.get(ZOOKEEPER_QUORUM)+":" + + conf2.get("hbase.zookeeper.property.clientPort")+":/2"); + + LOG.info("Setup second Zk"); + + utility1.startMiniCluster(); + utility2.startMiniCluster(); + + utility1.startMiniMapReduceCluster(); + + HTableDescriptor table = new HTableDescriptor(tableName); + HColumnDescriptor fam = new HColumnDescriptor(famName); + table.addFamily(fam); + + HBaseAdmin admin1 = new HBaseAdmin(conf1); + HBaseAdmin admin2 = new HBaseAdmin(conf2); + admin1.createTable(table); + admin2.createTable(table); + } catch (Exception ex) { ex.printStackTrace(); throw ex; } + } + + private static void setIsReplication(String bool) throws Exception { + zkw1.writeZNode("/1/replication", "state", bool); + // Takes some ms for ZK to fire the watcher + Thread.sleep(100); } /** @@ -75,75 +155,21 @@ public class TestReplication implements HConstants{ * @throws java.lang.Exception */ @Before - public void setUp() throws Exception { - try { - conf1 = HBaseConfiguration.create(); - conf1.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class - .getName()); - conf1.set(REGION_SERVER_IMPL, ReplicationRegionServer.class - .getName()); - conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1"); - - utility1 = new HBaseTestingUtility(conf1); - utility1.startMiniZKCluster(); - MiniZooKeeperCluster miniZK = utility1.getZkCluster(); - zkw1 = new ZooKeeperWrapper(conf1, EmptyWatcher.instance); - zkw1.writeZNode("/1", "replication", ""); - zkw1.writeZNode("/1/replication", "master", - conf1.get(ZOOKEEPER_QUORUM)+":" + - conf1.get("hbase.zookeeper.property.clientPort")+":/1"); - setIsReplication("true"); - - - LOG.info("Setup first Zk"); - - conf2 = HBaseConfiguration.create(); - conf2.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class - .getName()); - conf2.set(REGION_SERVER_IMPL, ReplicationRegionServer.class - .getName()); - conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2"); - - utility2 = new HBaseTestingUtility(conf2); - utility2.setZkCluster(miniZK); - zkw2 = new ZooKeeperWrapper(conf2, EmptyWatcher.instance); - zkw2.writeZNode("/2", "replication", ""); - zkw2.writeZNode("/2/replication", "master", - conf1.get(ZOOKEEPER_QUORUM)+":" + - conf1.get("hbase.zookeeper.property.clientPort")+":/1"); - - zkw1.writeZNode("/1/replication/peers", "test", - conf2.get(ZOOKEEPER_QUORUM)+":" + - conf2.get("hbase.zookeeper.property.clientPort")+":/2"); - - LOG.info("Setup second Zk"); - } catch (Exception ex) { ex.printStackTrace(); throw ex; } - } + public void setUp() throws Exception {} /** * @throws java.lang.Exception */ @After - public void tearDown() throws Exception {} + public void tearDown() throws Exception { + setIsReplication("false"); + utility1.truncateTable(tableName); + utility2.truncateTable(tableName); + setIsReplication("true"); + } @Test public void testReplication() throws Exception { - utility1.startMiniCluster(); - utility2.startMiniCluster(); - - byte[] tableName = Bytes.toBytes("test"); - byte[] famName = Bytes.toBytes("f"); - byte[] row = Bytes.toBytes("row"); - - HTableDescriptor table = new HTableDescriptor(tableName); - HColumnDescriptor fam = new HColumnDescriptor(famName); - table.addFamily(fam); - - HBaseAdmin admin1 = new HBaseAdmin(conf1); - HBaseAdmin admin2 = new HBaseAdmin(conf2); - admin1.createTable(table); - admin2.createTable(table); - Put put = new Put(row); put.add(famName, row, row); @@ -215,10 +241,6 @@ public class TestReplication implements HConstants{ // Test stopping replication setIsReplication("false"); - // Takes some ms for ZK to fire the watcher - Thread.sleep(100); - - put = new Put(Bytes.toBytes("stop start")); put.add(famName, row, row); table1.put(put); @@ -239,11 +261,8 @@ public class TestReplication implements HConstants{ } // Test restart replication - setIsReplication("true"); - Thread.sleep(100); - table1.put(put); for(int i = 0; i < NB_RETRIES; i++) { @@ -262,7 +281,32 @@ public class TestReplication implements HConstants{ } - private void setIsReplication(String bool) throws Exception{ - zkw1.writeZNode("/1/replication", "state", bool); + @Test + public void testMRCopy() throws Exception { + + // This test will be transformed to care about scoping + // once HBASE-1728 gets in + + setIsReplication("false"); + HTable table1 = new HTable(conf1, tableName); + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.add(famName, row, row); + table1.put(put); + } + String[] args = new String[] { + "--rs.class="+ReplicationRegionInterface.class.getName(), + "--rs.impl="+ReplicationRegionServer.class.getName(), + "--peer.adr="+conf2.get(ZOOKEEPER_QUORUM)+":/2", "test"}; + Job job = CopyTable.createSubmittableJob(conf1, args); + assertTrue(job.waitForCompletion(true)); + + HTable table2 = new HTable(conf2, tableName); + Scan scan = new Scan(); + ResultScanner scanner = table2.getScanner(scan); + Result[] res = scanner.next(NB_ROWS_IN_BATCH); + scanner.close(); + assertEquals(NB_ROWS_IN_BATCH, res.length); + } } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/src/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java new file mode 100644 index 0000000..d76f936 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -0,0 +1,192 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.Job; + +import java.io.IOException; + +/** + * Tool used to copy a table to another one which can be on a different setup. + * It is also configurable with a start and time as well as a specification + * of the region server implementation if different from the local cluster. + */ +public class CopyTable { + + final static String NAME = "Copy Table"; + static String rsClass = null; + static String rsImpl = null; + static long startTime = 0; + static long endTime = 0; + static String tableName = null; + static String newTableName = null; + static String peerAddress = null; + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + if (!doCommandLine(args)) { + return null; + } + Cluster mrCluster = new Cluster(conf); + Job job = Job.getInstance(mrCluster, conf); + job.setJobName(NAME + "_" + tableName); + job.setJarByClass(CopyTable.class); + Scan scan = new Scan(); + if (startTime != 0) { + scan.setTimeRange(startTime, + endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); + } + TableMapReduceUtil.initTableMapperJob(tableName, scan, + Import.Importer.class, null, null, job); + TableMapReduceUtil.initTableReducerJob( + newTableName == null ? tableName : newTableName, null, job, + null, peerAddress, rsClass, rsImpl); + job.setNumReduceTasks(0); + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void printUsage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: CopyTable [--rs.class=CLASS] " + + "[--rs.impl=IMPL] [--starttime=X] [--endtime=Y] " + + "[--new.name=NEW] [--peer.adr=ADR] "); + System.err.println(); + System.err.println("Options:"); + System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); + System.err.println(" specify if different from current cluster"); + System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); + System.err.println(" starttime beginning of the time range"); + System.err.println(" without endtime means from starttime to forever"); + System.err.println(" endtime end of the time range"); + System.err.println(" new.name new table's name"); + System.err.println(" peer.adr Address of the peer cluster given in the format"); + System.err.println(" hbase.zookeeer.quorum:zookeeper.znode.parent"); + System.err.println(); + System.err.println("Args:"); + System.err.println(" tablename Name of the table to copy"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:"); + System.err.println(" $ bin/hbase " + + "org.apache.hadoop.hbase.mapreduce.CopyTable --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface " + + "--rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer --starttime=1265875194289 --endtime=1265878794289 " + + "--peer.adr=server1,server2,server3:/hbase TestTable "); + } + + private static boolean doCommandLine(final String[] args) { + // Process command-line args. TODO: Better cmd-line processing + // (but hopefully something not as painful as cli options). + if (args.length < 1) { + printUsage(null); + return false; + } + try { + for (int i = 0; i < args.length; i++) { + String cmd = args[i]; + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(null); + return false; + } + + final String rsClassArgKey = "--rs.class="; + if (cmd.startsWith(rsClassArgKey)) { + rsClass = cmd.substring(rsClassArgKey.length()); + continue; + } + + final String rsImplArgKey = "--rs.impl="; + if (cmd.startsWith(rsImplArgKey)) { + rsImpl = cmd.substring(rsImplArgKey.length()); + continue; + } + + final String startTimeArgKey = "--starttime="; + if (cmd.startsWith(startTimeArgKey)) { + startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); + continue; + } + + final String endTimeArgKey = "--endtime="; + if (cmd.startsWith(endTimeArgKey)) { + endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); + continue; + } + + final String newNameArgKey = "--new.name="; + if (cmd.startsWith(rsClassArgKey)) { + newTableName = cmd.substring(newNameArgKey.length()); + continue; + } + + final String peerAdrArgKey = "--peer.adr="; + if (cmd.startsWith(peerAdrArgKey)) { + peerAddress = cmd.substring(peerAdrArgKey.length()); + continue; + } + + if (i == args.length-1) { + tableName = cmd; + } + } + if (newTableName == null && peerAddress == null) { + printUsage("At least a new table name or a " + + "peer address must be specified"); + return false; + } + } catch (Exception e) { + e.printStackTrace(); + printUsage("Can't start because " + e.getMessage()); + return false; + } + return true; + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + Job job = createSubmittableJob(conf, args); + if (job != null) { + System.exit(job.waitForCompletion(true) ? 0 : 1); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java index e22e296..fd5b3b0 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java @@ -36,6 +36,8 @@ public class Driver { "Count rows in HBase table"); pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS."); pgd.addClass(Import.NAME, Import.class, "Import data written by Export."); + pgd.addClass(CopyTable.NAME, CopyTable.class, + "Export a table from local cluster to peer cluster"); pgd.driver(args); } } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 6a07de5..d38cc69 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -26,6 +26,7 @@ import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.conf.Configuration; /** * Utility for {@link TableMapper} and {@link TableReducer} @@ -112,26 +114,61 @@ public class TableMapReduceUtil { /** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job to adjust. + * @param partitioner Partitioner to use. Pass null to use + * default partitioner. + * @throws IOException When determining the region count fails. + */ + public static void initTableReducerJob(String table, + Class reducer, Job job, + Class partitioner) throws IOException { + initTableReducerJob(table, reducer, job, null, null, null, null); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. * * @param table The output table. * @param reducer The reducer class to use. * @param job The current job to adjust. * @param partitioner Partitioner to use. Pass null to use * default partitioner. + * @param quorumAddress Distant cluster to write to + * @param serverClass redefined hbase.regionserver.class + * @param serverImpl redefined hbase.regionserver.impl * @throws IOException When determining the region count fails. */ public static void initTableReducerJob(String table, - Class reducer, Job job, Class partitioner) - throws IOException { + Class reducer, Job job, + Class partitioner, String quorumAddress, String serverClass, + String serverImpl) throws IOException { + + Configuration conf = job.getConfiguration(); job.setOutputFormatClass(TableOutputFormat.class); if (reducer != null) job.setReducerClass(reducer); - job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); + conf.set(TableOutputFormat.OUTPUT_TABLE, table); + if(quorumAddress != null) { + if(quorumAddress.split(":").length == 2) { + conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress); + } else { + throw new IOException("Please specify the peer cluster as " + + HConstants.ZOOKEEPER_QUORUM+":"+HConstants.ZOOKEEPER_ZNODE_PARENT); + } + } + if(serverClass != null && serverImpl != null) { + conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); + conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); + } job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); if (partitioner == HRegionPartitioner.class) { - HBaseConfiguration.addHbaseResources(job.getConfiguration()); + HBaseConfiguration.addHbaseResources(conf); job.setPartitionerClass(HRegionPartitioner.class); - HTable outputTable = new HTable(job.getConfiguration(), table); + HTable outputTable = new HTable(conf, table); int regions = outputTable.getRegionsInfo().size(); if (job.getNumReduceTasks() > regions) { job.setNumReduceTasks(outputTable.getRegionsInfo().size()); diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index 9f51ea7..41fe3f9 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -33,6 +34,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.conf.Configuration; /** * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored @@ -46,6 +48,14 @@ public class TableOutputFormat extends OutputFormat { private final Log LOG = LogFactory.getLog(TableOutputFormat.class); /** Job parameter that specifies the output table. */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; + /** Optional job parameter to specify a peer cluster */ + public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum"; + /** Optional specification of the rs class name of the peer cluster */ + public static final String + REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; + /** Optional specification of the rs impl name of the peer cluster */ + public static final String + REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; /** * Writes the reducer output to an HBase table. @@ -111,12 +121,25 @@ public class TableOutputFormat extends OutputFormat { TaskAttemptContext context) throws IOException, InterruptedException { // expecting exactly one path - String tableName = context.getConfiguration().get(OUTPUT_TABLE); + Configuration conf = new Configuration(context.getConfiguration()); + String tableName = conf.get(OUTPUT_TABLE); + String address = conf.get(QUORUM_ADDRESS); + String serverClass = conf.get(REGION_SERVER_CLASS); + String serverImpl = conf.get(REGION_SERVER_IMPL); HTable table = null; try { - HBaseConfiguration.addHbaseResources(context.getConfiguration()); - table = new HTable(context.getConfiguration(), - tableName); + HBaseConfiguration.addHbaseResources(conf); + if (address != null) { + // Check is done in TMRU + String[] parts = address.split(":"); + conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]); + } + if (serverClass != null) { + conf.set(HConstants.REGION_SERVER_CLASS, serverClass); + conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); + } + table = new HTable(conf, tableName); } catch(IOException e) { LOG.error(e); throw e;