diff --git a/src/contrib/mdc_replication/ivy.xml b/src/contrib/mdc_replication/ivy.xml
index 7d85aa7..ce39f4c 100644
--- a/src/contrib/mdc_replication/ivy.xml
+++ b/src/contrib/mdc_replication/ivy.xml
@@ -86,6 +86,10 @@
rev="${hadoop-hdfs.version}" conf="common->default" changing="true" >
+
+
+
@@ -99,6 +103,8 @@
rev="${hadoop-core.version}" conf="test->default" transitive="false" changing="true" />
+
diff --git a/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
index affbf7a..b1fe35a 100644
--- a/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
+++ b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
@@ -19,41 +19,60 @@
*/
package org.apache.hadoop.hbase.replication;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.EmptyWatcher;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.hbase.mapreduce.CopyTable;
+import org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.Assert.assertArrayEquals;
-
-import org.junit.*;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.List;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
public class TestReplication implements HConstants{
protected static final Log LOG = LogFactory.getLog(TestReplication.class);
- private Configuration conf1;
- private Configuration conf2;
+ private static Configuration conf1;
+ private static Configuration conf2;
- private ZooKeeperWrapper zkw1;
- private ZooKeeperWrapper zkw2;
+ private static ZooKeeperWrapper zkw1;
+ private static ZooKeeperWrapper zkw2;
- private HBaseTestingUtility utility1;
- private HBaseTestingUtility utility2;
+ private static HBaseTestingUtility utility1;
+ private static HBaseTestingUtility utility2;
- private final int NB_ROWS_IN_BATCH = 100;
- private final long SLEEP_TIME = 500;
- private final int NB_RETRIES = 10;
+ private static final int NB_ROWS_IN_BATCH = 100;
+ private static final long SLEEP_TIME = 500; //ms
+ private static final int NB_RETRIES = 10;
+
+ private static final byte[] tableName = Bytes.toBytes("test");
+ private static final byte[] famName = Bytes.toBytes("f");
+ private static final byte[] row = Bytes.toBytes("row");
/**
@@ -61,7 +80,68 @@ public class TestReplication implements HConstants{
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ try {
+ conf1 = HBaseConfiguration.create();
+ conf1.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
+ .getName());
+ conf1.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
+ .getName());
+ conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
+
+ utility1 = new HBaseTestingUtility(conf1);
+ utility1.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+ zkw1 = new ZooKeeperWrapper(conf1, EmptyWatcher.instance);
+ zkw1.writeZNode("/1", "replication", "");
+ zkw1.writeZNode("/1/replication", "master",
+ conf1.get(ZOOKEEPER_QUORUM)+":" +
+ conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+ setIsReplication("true");
+
+ LOG.info("Setup first Zk");
+
+ conf2 = HBaseConfiguration.create();
+ conf2.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
+ .getName());
+ conf2.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
+ .getName());
+ conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
+
+ utility2 = new HBaseTestingUtility(conf2);
+ utility2.setZkCluster(miniZK);
+ zkw2 = new ZooKeeperWrapper(conf2, EmptyWatcher.instance);
+ zkw2.writeZNode("/2", "replication", "");
+ zkw2.writeZNode("/2/replication", "master",
+ conf1.get(ZOOKEEPER_QUORUM)+":" +
+ conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+
+ zkw1.writeZNode("/1/replication/peers", "test",
+ conf2.get(ZOOKEEPER_QUORUM)+":" +
+ conf2.get("hbase.zookeeper.property.clientPort")+":/2");
+
+ LOG.info("Setup second Zk");
+
+ utility1.startMiniCluster();
+ utility2.startMiniCluster();
+
+ utility1.startMiniMapReduceCluster();
+
+ HTableDescriptor table = new HTableDescriptor(tableName);
+ HColumnDescriptor fam = new HColumnDescriptor(famName);
+ table.addFamily(fam);
+
+ HBaseAdmin admin1 = new HBaseAdmin(conf1);
+ HBaseAdmin admin2 = new HBaseAdmin(conf2);
+ admin1.createTable(table);
+ admin2.createTable(table);
+ } catch (Exception ex) { ex.printStackTrace(); throw ex; }
+ }
+
+ private static void setIsReplication(String bool) throws Exception {
+ zkw1.writeZNode("/1/replication", "state", bool);
+ // Takes some ms for ZK to fire the watcher
+ Thread.sleep(100);
}
/**
@@ -75,75 +155,21 @@ public class TestReplication implements HConstants{
* @throws java.lang.Exception
*/
@Before
- public void setUp() throws Exception {
- try {
- conf1 = HBaseConfiguration.create();
- conf1.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
- .getName());
- conf1.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
- .getName());
- conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
-
- utility1 = new HBaseTestingUtility(conf1);
- utility1.startMiniZKCluster();
- MiniZooKeeperCluster miniZK = utility1.getZkCluster();
- zkw1 = new ZooKeeperWrapper(conf1, EmptyWatcher.instance);
- zkw1.writeZNode("/1", "replication", "");
- zkw1.writeZNode("/1/replication", "master",
- conf1.get(ZOOKEEPER_QUORUM)+":" +
- conf1.get("hbase.zookeeper.property.clientPort")+":/1");
- setIsReplication("true");
-
-
- LOG.info("Setup first Zk");
-
- conf2 = HBaseConfiguration.create();
- conf2.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
- .getName());
- conf2.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
- .getName());
- conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
-
- utility2 = new HBaseTestingUtility(conf2);
- utility2.setZkCluster(miniZK);
- zkw2 = new ZooKeeperWrapper(conf2, EmptyWatcher.instance);
- zkw2.writeZNode("/2", "replication", "");
- zkw2.writeZNode("/2/replication", "master",
- conf1.get(ZOOKEEPER_QUORUM)+":" +
- conf1.get("hbase.zookeeper.property.clientPort")+":/1");
-
- zkw1.writeZNode("/1/replication/peers", "test",
- conf2.get(ZOOKEEPER_QUORUM)+":" +
- conf2.get("hbase.zookeeper.property.clientPort")+":/2");
-
- LOG.info("Setup second Zk");
- } catch (Exception ex) { ex.printStackTrace(); throw ex; }
- }
+ public void setUp() throws Exception {}
/**
* @throws java.lang.Exception
*/
@After
- public void tearDown() throws Exception {}
+ public void tearDown() throws Exception {
+ setIsReplication("false");
+ utility1.truncateTable(tableName);
+ utility2.truncateTable(tableName);
+ setIsReplication("true");
+ }
@Test
public void testReplication() throws Exception {
- utility1.startMiniCluster();
- utility2.startMiniCluster();
-
- byte[] tableName = Bytes.toBytes("test");
- byte[] famName = Bytes.toBytes("f");
- byte[] row = Bytes.toBytes("row");
-
- HTableDescriptor table = new HTableDescriptor(tableName);
- HColumnDescriptor fam = new HColumnDescriptor(famName);
- table.addFamily(fam);
-
- HBaseAdmin admin1 = new HBaseAdmin(conf1);
- HBaseAdmin admin2 = new HBaseAdmin(conf2);
- admin1.createTable(table);
- admin2.createTable(table);
-
Put put = new Put(row);
put.add(famName, row, row);
@@ -215,10 +241,6 @@ public class TestReplication implements HConstants{
// Test stopping replication
setIsReplication("false");
- // Takes some ms for ZK to fire the watcher
- Thread.sleep(100);
-
-
put = new Put(Bytes.toBytes("stop start"));
put.add(famName, row, row);
table1.put(put);
@@ -239,11 +261,8 @@ public class TestReplication implements HConstants{
}
// Test restart replication
-
setIsReplication("true");
- Thread.sleep(100);
-
table1.put(put);
for(int i = 0; i < NB_RETRIES; i++) {
@@ -262,7 +281,32 @@ public class TestReplication implements HConstants{
}
- private void setIsReplication(String bool) throws Exception{
- zkw1.writeZNode("/1/replication", "state", bool);
+ @Test
+ public void testMRCopy() throws Exception {
+
+ // This test will be transformed to care about scoping
+ // once HBASE-1728 gets in
+
+ setIsReplication("false");
+ HTable table1 = new HTable(conf1, tableName);
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put put = new Put(Bytes.toBytes(i));
+ put.add(famName, row, row);
+ table1.put(put);
+ }
+ String[] args = new String[] {
+ "--rs.class="+ReplicationRegionInterface.class.getName(),
+ "--rs.impl="+ReplicationRegionServer.class.getName(),
+ "--peer.adr="+conf2.get(ZOOKEEPER_QUORUM)+":/2", "test"};
+ Job job = CopyTable.createSubmittableJob(conf1, args);
+ assertTrue(job.waitForCompletion(true));
+
+ HTable table2 = new HTable(conf2, tableName);
+ Scan scan = new Scan();
+ ResultScanner scanner = table2.getScanner(scan);
+ Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+ scanner.close();
+ assertEquals(NB_ROWS_IN_BATCH, res.length);
+
}
}
diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/src/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
new file mode 100644
index 0000000..d76f936
--- /dev/null
+++ b/src/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -0,0 +1,192 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+/**
+ * Tool used to copy a table to another one which can be on a different setup.
+ * It is also configurable with a start and time as well as a specification
+ * of the region server implementation if different from the local cluster.
+ */
+public class CopyTable {
+
+ final static String NAME = "Copy Table";
+ static String rsClass = null;
+ static String rsImpl = null;
+ static long startTime = 0;
+ static long endTime = 0;
+ static String tableName = null;
+ static String newTableName = null;
+ static String peerAddress = null;
+
+ /**
+ * Sets up the actual job.
+ *
+ * @param conf The current configuration.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws IOException When setting up the job fails.
+ */
+ public static Job createSubmittableJob(Configuration conf, String[] args)
+ throws IOException {
+ if (!doCommandLine(args)) {
+ return null;
+ }
+ Cluster mrCluster = new Cluster(conf);
+ Job job = Job.getInstance(mrCluster, conf);
+ job.setJobName(NAME + "_" + tableName);
+ job.setJarByClass(CopyTable.class);
+ Scan scan = new Scan();
+ if (startTime != 0) {
+ scan.setTimeRange(startTime,
+ endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
+ }
+ TableMapReduceUtil.initTableMapperJob(tableName, scan,
+ Import.Importer.class, null, null, job);
+ TableMapReduceUtil.initTableReducerJob(
+ newTableName == null ? tableName : newTableName, null, job,
+ null, peerAddress, rsClass, rsImpl);
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ /*
+ * @param errorMsg Error message. Can be null.
+ */
+ private static void printUsage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println("Usage: CopyTable [--rs.class=CLASS] " +
+ "[--rs.impl=IMPL] [--starttime=X] [--endtime=Y] " +
+ "[--new.name=NEW] [--peer.adr=ADR] ");
+ System.err.println();
+ System.err.println("Options:");
+ System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
+ System.err.println(" specify if different from current cluster");
+ System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster");
+ System.err.println(" starttime beginning of the time range");
+ System.err.println(" without endtime means from starttime to forever");
+ System.err.println(" endtime end of the time range");
+ System.err.println(" new.name new table's name");
+ System.err.println(" peer.adr Address of the peer cluster given in the format");
+ System.err.println(" hbase.zookeeer.quorum:zookeeper.znode.parent");
+ System.err.println();
+ System.err.println("Args:");
+ System.err.println(" tablename Name of the table to copy");
+ System.err.println();
+ System.err.println("Examples:");
+ System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
+ System.err.println(" $ bin/hbase " +
+ "org.apache.hadoop.hbase.mapreduce.CopyTable --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface " +
+ "--rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer --starttime=1265875194289 --endtime=1265878794289 " +
+ "--peer.adr=server1,server2,server3:/hbase TestTable ");
+ }
+
+ private static boolean doCommandLine(final String[] args) {
+ // Process command-line args. TODO: Better cmd-line processing
+ // (but hopefully something not as painful as cli options).
+ if (args.length < 1) {
+ printUsage(null);
+ return false;
+ }
+ try {
+ for (int i = 0; i < args.length; i++) {
+ String cmd = args[i];
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ printUsage(null);
+ return false;
+ }
+
+ final String rsClassArgKey = "--rs.class=";
+ if (cmd.startsWith(rsClassArgKey)) {
+ rsClass = cmd.substring(rsClassArgKey.length());
+ continue;
+ }
+
+ final String rsImplArgKey = "--rs.impl=";
+ if (cmd.startsWith(rsImplArgKey)) {
+ rsImpl = cmd.substring(rsImplArgKey.length());
+ continue;
+ }
+
+ final String startTimeArgKey = "--starttime=";
+ if (cmd.startsWith(startTimeArgKey)) {
+ startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
+ continue;
+ }
+
+ final String endTimeArgKey = "--endtime=";
+ if (cmd.startsWith(endTimeArgKey)) {
+ endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
+ continue;
+ }
+
+ final String newNameArgKey = "--new.name=";
+ if (cmd.startsWith(rsClassArgKey)) {
+ newTableName = cmd.substring(newNameArgKey.length());
+ continue;
+ }
+
+ final String peerAdrArgKey = "--peer.adr=";
+ if (cmd.startsWith(peerAdrArgKey)) {
+ peerAddress = cmd.substring(peerAdrArgKey.length());
+ continue;
+ }
+
+ if (i == args.length-1) {
+ tableName = cmd;
+ }
+ }
+ if (newTableName == null && peerAddress == null) {
+ printUsage("At least a new table name or a " +
+ "peer address must be specified");
+ return false;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ printUsage("Can't start because " + e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Main entry point.
+ *
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ Job job = createSubmittableJob(conf, args);
+ if (job != null) {
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
+ }
+ }
+}
diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java
index e22e296..fd5b3b0 100644
--- a/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java
+++ b/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java
@@ -36,6 +36,8 @@ public class Driver {
"Count rows in HBase table");
pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
+ pgd.addClass(CopyTable.NAME, CopyTable.class,
+ "Export a table from local cluster to peer cluster");
pgd.driver(args);
}
}
diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index 6a07de5..d38cc69 100644
--- a/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -26,6 +26,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.conf.Configuration;
/**
* Utility for {@link TableMapper} and {@link TableReducer}
@@ -112,26 +114,61 @@ public class TableMapReduceUtil {
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
+ *
+ * @param table The output table.
+ * @param reducer The reducer class to use.
+ * @param job The current job to adjust.
+ * @param partitioner Partitioner to use. Pass null to use
+ * default partitioner.
+ * @throws IOException When determining the region count fails.
+ */
+ public static void initTableReducerJob(String table,
+ Class extends TableReducer> reducer, Job job,
+ Class partitioner) throws IOException {
+ initTableReducerJob(table, reducer, job, null, null, null, null);
+ }
+
+ /**
+ * Use this before submitting a TableReduce job. It will
+ * appropriately set up the JobConf.
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job to adjust.
* @param partitioner Partitioner to use. Pass null to use
* default partitioner.
+ * @param quorumAddress Distant cluster to write to
+ * @param serverClass redefined hbase.regionserver.class
+ * @param serverImpl redefined hbase.regionserver.impl
* @throws IOException When determining the region count fails.
*/
public static void initTableReducerJob(String table,
- Class extends TableReducer> reducer, Job job, Class partitioner)
- throws IOException {
+ Class extends TableReducer> reducer, Job job,
+ Class partitioner, String quorumAddress, String serverClass,
+ String serverImpl) throws IOException {
+
+ Configuration conf = job.getConfiguration();
job.setOutputFormatClass(TableOutputFormat.class);
if (reducer != null) job.setReducerClass(reducer);
- job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
+ conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+ if(quorumAddress != null) {
+ if(quorumAddress.split(":").length == 2) {
+ conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress);
+ } else {
+ throw new IOException("Please specify the peer cluster as " +
+ HConstants.ZOOKEEPER_QUORUM+":"+HConstants.ZOOKEEPER_ZNODE_PARENT);
+ }
+ }
+ if(serverClass != null && serverImpl != null) {
+ conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
+ conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
+ }
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
if (partitioner == HRegionPartitioner.class) {
- HBaseConfiguration.addHbaseResources(job.getConfiguration());
+ HBaseConfiguration.addHbaseResources(conf);
job.setPartitionerClass(HRegionPartitioner.class);
- HTable outputTable = new HTable(job.getConfiguration(), table);
+ HTable outputTable = new HTable(conf, table);
int regions = outputTable.getRegionsInfo().size();
if (job.getNumReduceTasks() > regions) {
job.setNumReduceTasks(outputTable.getRegionsInfo().size());
diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index 9f51ea7..41fe3f9 100644
--- a/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@@ -33,6 +34,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.Configuration;
/**
* Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
@@ -46,6 +48,14 @@ public class TableOutputFormat extends OutputFormat {
private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
/** Job parameter that specifies the output table. */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+ /** Optional job parameter to specify a peer cluster */
+ public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
+ /** Optional specification of the rs class name of the peer cluster */
+ public static final String
+ REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
+ /** Optional specification of the rs impl name of the peer cluster */
+ public static final String
+ REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
/**
* Writes the reducer output to an HBase table.
@@ -111,12 +121,25 @@ public class TableOutputFormat extends OutputFormat {
TaskAttemptContext context)
throws IOException, InterruptedException {
// expecting exactly one path
- String tableName = context.getConfiguration().get(OUTPUT_TABLE);
+ Configuration conf = new Configuration(context.getConfiguration());
+ String tableName = conf.get(OUTPUT_TABLE);
+ String address = conf.get(QUORUM_ADDRESS);
+ String serverClass = conf.get(REGION_SERVER_CLASS);
+ String serverImpl = conf.get(REGION_SERVER_IMPL);
HTable table = null;
try {
- HBaseConfiguration.addHbaseResources(context.getConfiguration());
- table = new HTable(context.getConfiguration(),
- tableName);
+ HBaseConfiguration.addHbaseResources(conf);
+ if (address != null) {
+ // Check is done in TMRU
+ String[] parts = address.split(":");
+ conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]);
+ }
+ if (serverClass != null) {
+ conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
+ conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
+ }
+ table = new HTable(conf, tableName);
} catch(IOException e) {
LOG.error(e);
throw e;