Index: hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java (revision 0) @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Basic test for the CopyTable M/R tool + */ +@Category(LargeTests.class) +public class TestCopyTable { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster; + + @BeforeClass + public static void beforeClass() throws Exception { + cluster = TEST_UTIL.startMiniCluster(3); + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Simple end-to-end test + * @throws Exception + */ + @Test + public void testCopyTable() throws Exception { + final byte[] TABLENAME1 = Bytes.toBytes("testCopyTable1"); + final byte[] TABLENAME2 = Bytes.toBytes("testCopyTable2"); + final byte[] FAMILY = Bytes.toBytes("family"); + final byte[] COLUMN1 = Bytes.toBytes("c1"); + + HTable t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY); + HTable t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY); + + // put rows into the first table + for (int i = 0; i < 10; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.add(FAMILY, COLUMN1, COLUMN1); + t1.put(p); + } + + CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration()); + + assertEquals( + 0, + copy.run(new String[] { "--new.name=" + Bytes.toString(TABLENAME2), + Bytes.toString(TABLENAME1) })); + + // verify the data was copied into table 2 + for (int i = 0; i < 10; i++) { + Get g = new Get(Bytes.toBytes("row" + i)); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(Bytes.equals(COLUMN1, r.raw()[0].getQualifier())); + } + + t1.close(); + t2.close(); + TEST_UTIL.deleteTable(TABLENAME1); + TEST_UTIL.deleteTable(TABLENAME2); + } + + @Test + public void testStartStopRow() throws Exception { + final byte[] TABLENAME1 = Bytes.toBytes("testStartStopRow1"); + final byte[] TABLENAME2 = Bytes.toBytes("testStartStopRow2"); + final byte[] FAMILY = Bytes.toBytes("family"); + final byte[] COLUMN1 = Bytes.toBytes("c1"); + final byte[] ROW0 = Bytes.toBytes("row0"); + final byte[] ROW1 = Bytes.toBytes("row1"); + final byte[] ROW2 = Bytes.toBytes("row2"); + + HTable t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY); + HTable t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY); + + // put rows into the first table + Put p = new Put(ROW0); + p.add(FAMILY, COLUMN1, COLUMN1); + t1.put(p); + p = new Put(ROW1); + p.add(FAMILY, COLUMN1, COLUMN1); + t1.put(p); + p = new Put(ROW2); + p.add(FAMILY, COLUMN1, COLUMN1); + t1.put(p); + + CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration()); + assertEquals( + 0, + copy.run(new String[] { "--new.name=" + Bytes.toString(TABLENAME2), "--startrow=row1", + "--stoprow=row2", Bytes.toString(TABLENAME1) })); + + // verify the data was copied into table 2 + // row1 exist, row0, row2 do not exist + Get g = new Get(ROW1); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(Bytes.equals(COLUMN1, r.raw()[0].getQualifier())); + + g = new Get(ROW0); + r = t2.get(g); + assertEquals(0, r.size()); + + g = new Get(ROW2); + r = t2.get(g); + assertEquals(0, r.size()); + + t1.close(); + t2.close(); + TEST_UTIL.deleteTable(TABLENAME1); + TEST_UTIL.deleteTable(TABLENAME2); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java (revision 1487016) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java (working copy) @@ -21,12 +21,15 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.HashMap; @@ -39,18 +42,23 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public class CopyTable { +public class CopyTable extends Configured implements Tool { final static String NAME = "copytable"; static long startTime = 0; static long endTime = 0; static int versions = -1; static String tableName = null; + static String startRow = null; + static String stopRow = null; static String newTableName = null; static String peerAddress = null; static String families = null; static boolean allCells = false; - + + public CopyTable(Configuration conf) { + super(conf); + } /** * Sets up the actual job. * @@ -78,6 +86,15 @@ if (versions >= 0) { scan.setMaxVersions(versions); } + + if (startRow != null) { + scan.setStartRow(Bytes.toBytes(startRow)); + } + + if (stopRow != null) { + scan.setStopRow(Bytes.toBytes(stopRow)); + } + if(families != null) { String[] fams = families.split(","); Map cfRenameMap = new HashMap(); @@ -120,6 +137,8 @@ System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); System.err.println(" specify if different from current cluster"); System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); + System.err.println(" startrow the start row"); + System.err.println(" stoprow the stop row"); System.err.println(" starttime beginning of the time range (unixtime in millis)"); System.err.println(" without endtime means from starttime to forever"); System.err.println(" endtime end of the time range. Ignored if no starttime specified."); @@ -159,7 +178,19 @@ printUsage(null); return false; } - + + final String startRowArgKey = "--startrow="; + if (cmd.startsWith(startRowArgKey)) { + startRow = cmd.substring(startRowArgKey.length()); + continue; + } + + final String stopRowArgKey = "--stoprow="; + if (cmd.startsWith(stopRowArgKey)) { + stopRow = cmd.substring(stopRowArgKey.length()); + continue; + } + final String startTimeArgKey = "--starttime="; if (cmd.startsWith(startTimeArgKey)) { startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); @@ -232,12 +263,15 @@ * @throws Exception When running the job fails. */ public static void main(String[] args) throws Exception { - Configuration conf = HBaseConfiguration.create(); - String[] otherArgs = - new GenericOptionsParser(conf, args).getRemainingArgs(); - Job job = createSubmittableJob(conf, otherArgs); - if (job != null) { - System.exit(job.waitForCompletion(true) ? 0 : 1); - } + int ret = ToolRunner.run(new CopyTable(HBaseConfiguration.create()), args); + System.exit(ret); } + + @Override + public int run(String[] args) throws Exception { + String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + Job job = createSubmittableJob(getConf(), otherArgs); + if (job == null) return 1; + return job.waitForCompletion(true) ? 0 : 1; + } }