diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java index 46d8c71..8c195b6 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java @@ -32,13 +32,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExitUtil; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.io.Text; @@ -250,9 +249,9 @@ public class CellCounter { "string : used to separate the rowId/column family name and qualifier name."); System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " + "operation to a limited subset of rows from the table based on regex or prefix pattern."); - System.exit(-1); + ExitUtil.exit(-1); } Job job = createSubmittableJob(conf, otherArgs); - System.exit(job.waitForCompletion(true) ? 0 : 1); + ExitUtil.exit(job.waitForCompletion(true) ? 0 : 1); } } diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index 025473b..ef0b50a 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExitUtil; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; @@ -263,7 +264,7 @@ public class CopyTable extends Configured implements Tool { */ public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new CopyTable(HBaseConfiguration.create()), args); - System.exit(ret); + ExitUtil.exit(ret); } @Override diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java index 2c53f6d..ebdca83 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExitUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; @@ -183,9 +184,9 @@ public class Export { String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { usage("Wrong number of arguments: " + otherArgs.length); - System.exit(-1); + ExitUtil.exit(-1); } Job job = createSubmittableJob(conf, otherArgs); - System.exit(job.waitForCompletion(true)? 0 : 1); + ExitUtil.exit(job.waitForCompletion(true)? 0 : 1); } } \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index e5a797f..455644a 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExitUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -447,9 +448,9 @@ public class Import { String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { usage("Wrong number of arguments: " + otherArgs.length); - System.exit(-1); + ExitUtil.exit(-1); } Job job = createSubmittableJob(conf, otherArgs); - System.exit(job.waitForCompletion(true) ? 0 : 1); + ExitUtil.exit(job.waitForCompletion(true) ? 0 : 1); } } diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 4a61da7..1e12dc7 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mapreduce; import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.ExitUtil; import java.io.IOException; import java.util.ArrayList; @@ -372,7 +373,7 @@ public class ImportTsv { String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { usage("Wrong number of arguments: " + otherArgs.length); - System.exit(-1); + ExitUtil.exit(-1); } // Make sure columns are specified @@ -380,7 +381,7 @@ public class ImportTsv { if (columns == null) { usage("No columns specified. Please specify with -D" + COLUMNS_CONF_KEY+"=..."); - System.exit(-1); + ExitUtil.exit(-1); } // Make sure they specify exactly one column as the row key @@ -390,7 +391,7 @@ public class ImportTsv { } if (rowkeysFound != 1) { usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC); - System.exit(-1); + ExitUtil.exit(-1); } // Make sure we have at most one column as the timestamp key @@ -402,14 +403,14 @@ public class ImportTsv { if (tskeysFound > 1) { usage("Must specify at most one column as " + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); - System.exit(-1); + ExitUtil.exit(-1); } // Make sure one or more columns are specified excluding rowkey and // timestamp key if (columns.length - (rowkeysFound + tskeysFound) < 1) { usage("One or more columns in addition to the row key and timestamp(optional) are required"); - System.exit(-1); + ExitUtil.exit(-1); } // If timestamp option is not specified, use current system time. @@ -422,7 +423,7 @@ public class ImportTsv { hbaseAdmin = new HBaseAdmin(conf); Job job = createSubmittableJob(conf, otherArgs); - System.exit(job.waitForCompletion(true) ? 0 : 1); + ExitUtil.exit(job.waitForCompletion(true) ? 0 : 1); } } diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index 5ebe712..9f3c797 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExitUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; @@ -164,12 +165,12 @@ public class RowCounter { String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 1) { printUsage("Wrong number of parameters: " + args.length); - System.exit(-1); + ExitUtil.exit(-1); } Job job = createSubmittableJob(conf, otherArgs); if (job == null) { - System.exit(-1); + ExitUtil.exit(-1); } - System.exit(job.waitForCompletion(true) ? 0 : 1); + ExitUtil.exit(job.waitForCompletion(true) ? 0 : 1); } } diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index b0a7b69..6ed60c5 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExitUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -289,7 +290,7 @@ public class WALPlayer extends Configured implements Tool { */ public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args); - System.exit(ret); + ExitUtil.exit(ret); } @Override @@ -297,7 +298,7 @@ public class WALPlayer extends Configured implements Tool { String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); if (otherArgs.length < 2) { usage("Wrong number of arguments: " + otherArgs.length); - System.exit(-1); + ExitUtil.exit(-1); } Job job = createSubmittableJob(otherArgs); return job.waitForCompletion(true) ? 0 : 1; diff --git src/main/java/org/apache/hadoop/hbase/util/ExitException.java src/main/java/org/apache/hadoop/hbase/util/ExitException.java new file mode 100644 index 0000000..1a58045 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/util/ExitException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +public class ExitException extends RuntimeException{ + + private int exitCode=0; + public ExitException(int exitCode){ + this.exitCode=exitCode; + } + + public int getExitCode(){ + return exitCode; + } +} diff --git src/main/java/org/apache/hadoop/hbase/util/ExitUtil.java src/main/java/org/apache/hadoop/hbase/util/ExitUtil.java new file mode 100644 index 0000000..ec12acb --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/util/ExitUtil.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * class for masquerading System.exit(); + */ +package org.apache.hadoop.hbase.util; + +public class ExitUtil { + + private static boolean test=false; + public static void exit(int exitCode){ + if(test){ + throw new ExitException(exitCode); + }else{ + System.exit(exitCode); + } + } + public static void activeTest(){ + test=true; + } +} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java new file mode 100644 index 0000000..cfa11c8 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExitException; +import org.apache.hadoop.hbase.util.ExitUtil; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.*; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; + + +@Category(LargeTests.class) +public class TestCellCounter { + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final byte[] ROW1 = Bytes.toBytes("row1"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final String FAMILY_A_STRING = "a"; + private static final String FAMILY_B_STRING = "b"; + private static final byte[] FAMILY_A = Bytes.toBytes(FAMILY_A_STRING); + private static final byte[] FAMILY_B = Bytes.toBytes(FAMILY_B_STRING); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + private static Path FQ_OUTPUT_DIR; + private static final String OUTPUT_DIR = "target" + File.separator + "test-data" + File.separator + + "output"; + private static long now = System.currentTimeMillis(); + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + UTIL.startMiniMapReduceCluster(); + FQ_OUTPUT_DIR = new Path(OUTPUT_DIR).makeQualified(new LocalFileSystem()); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniMapReduceCluster(); + UTIL.shutdownMiniCluster(); + } + + /** + * Test CellCounter all data should print to output + * + */ + @Test + public void testCellCounter() throws Exception { + String sourceTable = "sourceTable"; + + byte[][] families = { FAMILY_A, FAMILY_B }; + HTable t = UTIL.createTable(Bytes.toBytes(sourceTable), families); + try { + Put p = new Put(ROW1); + p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1" }; + runCount(args); + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total ROWS" + "\t" + "1")); + assertTrue(data.contains("b;q" + "\t" + "1")); + assertTrue(data.contains("a;q" + "\t" + "1")); + assertTrue(data.contains("row1;a;q_Versions" + "\t" + "2")); + } finally { + t.close(); + } + } + + private boolean runCount(String[] args) throws IOException, InterruptedException, + ClassNotFoundException { + // need to make a copy of the configuration because to make sure + // different temp dirs are used. + GenericOptionsParser opts = new GenericOptionsParser( + new Configuration(UTIL.getConfiguration()), args); + Configuration configuration = opts.getConfiguration(); + args = opts.getRemainingArgs(); + Job job = CellCounter.createSubmittableJob(configuration, args); + job.waitForCompletion(false); + return job.isSuccessful(); + } + + /** + * Test main method of CellCounter + */ + @Test + public void testCellCounterMain() throws Exception { + + PrintStream oldPrintStream = System.err; + ByteArrayOutputStream data = new ByteArrayOutputStream(); + String[] args = {}; + System.setErr(new PrintStream(data)); + try { + System.setErr(new PrintStream(data)); + + try { + ExitUtil.activeTest(); + CellCounter.main(args); + fail("should be SecurityException"); + } catch (ExitException e) { + assertEquals(-1, e.getExitCode()); + assertTrue(data.toString().contains("ERROR: Wrong number of parameters:")); + // should be print usage help + assertTrue(data.toString().contains( "Usage:")); + } + + } finally { + System.setErr(oldPrintStream); + } + + } +} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java index e93d781..9f1852e 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java @@ -19,7 +19,14 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; + + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.MiniHBaseCluster; @@ -28,6 +35,10 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExitException; +import org.apache.hadoop.hbase.util.ExitUtil; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -40,6 +51,14 @@ import org.junit.experimental.categories.Category; public class TestCopyTable { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static MiniHBaseCluster cluster; + private static final byte[] ROW1 = Bytes.toBytes("row1"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final String FAMILY_A_STRING = "a"; + private static final String FAMILY_B_STRING = "b"; + private static final byte[] FAMILY_A = Bytes.toBytes(FAMILY_A_STRING); + private static final byte[] FAMILY_B = Bytes.toBytes(FAMILY_B_STRING); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + @BeforeClass public static void beforeClass() throws Exception { @@ -145,4 +164,84 @@ public class TestCopyTable { TEST_UTIL.deleteTable(TABLENAME1); TEST_UTIL.deleteTable(TABLENAME2); } + /** + * Test copy of table from sourceTable to targetTable all rows from family a + */ + @Test + public void testRenameFamily() throws Exception { + String sourceTable = "sourceTable"; + String targetTable = "targetTable"; + + byte[][] families = { FAMILY_A, FAMILY_B }; + + HTable t = TEST_UTIL.createTable(Bytes.toBytes(sourceTable), families); + HTable t2 = TEST_UTIL.createTable(Bytes.toBytes(targetTable), families); + Put p = new Put(ROW1); + p.add(FAMILY_A, QUALIFIER, Bytes.toBytes("Data11")); + p.add(FAMILY_B, QUALIFIER, Bytes.toBytes("Data12")); + p.add(FAMILY_A, QUALIFIER, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.add(FAMILY_B, QUALIFIER, Bytes.toBytes("Dat21")); + p.add(FAMILY_A, QUALIFIER, Bytes.toBytes("Data22")); + p.add(FAMILY_B, QUALIFIER, Bytes.toBytes("Data23")); + t.put(p); + + long currentTime = System.currentTimeMillis(); + String[] args = new String[] { "--new.name=" + targetTable, "--families=a:b", + "--starttime=" + (currentTime - 100000), "--endtime=" + (currentTime + 100000), + "--versions=1", sourceTable }; + + assertNull(t2.get(new Get(ROW1)).getRow()); + assertTrue(runCopy(args)); + + } + + /** + * Test main method of CopyTable. + */ + @Test + public void testMainMethod() throws Exception { + String[] emptyArgs = { "-h" }; + PrintStream oldWriter = System.err; + ByteArrayOutputStream data = new ByteArrayOutputStream(); + PrintStream writer = new PrintStream(data); + System.setErr(writer); + try { + clean(); + ExitUtil.activeTest(); + CopyTable.main(emptyArgs); + }catch(ExitException e){ + assertEquals(1,e.getExitCode()); + } finally { + System.setErr(oldWriter); + } + assertTrue(data.toString().contains("rs.class")); + assertTrue(data.toString().contains("Usage:")); + } + + private boolean runCopy(String[] args) throws IOException, InterruptedException, + ClassNotFoundException { + clean(); + GenericOptionsParser opts = new GenericOptionsParser( + new Configuration(TEST_UTIL.getConfiguration()), args); + Configuration configuration = opts.getConfiguration(); + args = opts.getRemainingArgs(); + Job job = CopyTable.createSubmittableJob(configuration, args); + job.waitForCompletion(false); + return job.isSuccessful(); + } + private void clean(){ + CopyTable.startTime = 0; + CopyTable.endTime = 0; + CopyTable.versions = -1; + CopyTable.tableName = null; + CopyTable.startRow = null; + CopyTable.stopRow = null; + CopyTable.newTableName = null; + CopyTable.peerAddress = null; + CopyTable.families = null; + CopyTable.allCells = false; + + } } diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestGroupingTableMapper.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestGroupingTableMapper.java new file mode 100644 index 0000000..be3ef47 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestGroupingTableMapper.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Mapper; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.*; + +@Category(SmallTests.class) +public class TestGroupingTableMapper { + + /** + * Test GroupingTableMapper class + */ + @Test + public void testGroupingTableMapper() throws Exception { + + GroupingTableMapper mapper = new GroupingTableMapper(); + Configuration configuration = new Configuration(); + configuration.set(GroupingTableMapper.GROUP_COLUMNS, "family1:clm family2:clm"); + mapper.setConf(configuration); + + Result result = mock(Result.class); + @SuppressWarnings("unchecked") + Mapper.Context context = + mock(Mapper.Context.class); + context.write(any(ImmutableBytesWritable.class), any(Result.class)); + List keyValue = new ArrayList(); + byte[] row = {}; + keyValue.add(new KeyValue(row, Bytes.toBytes("family2"), Bytes.toBytes("clm"), Bytes + .toBytes("value1"))); + keyValue.add(new KeyValue(row, Bytes.toBytes("family1"), Bytes.toBytes("clm"), Bytes + .toBytes("value2"))); + when(result.list()).thenReturn(keyValue); + mapper.map(null, result, context); + // template data + byte[][] data = { Bytes.toBytes("value1"), Bytes.toBytes("value2") }; + ImmutableBytesWritable ibw = mapper.createGroupKey(data); + verify(context).write(ibw, result); + } + +} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestHRegionPartitioner.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestHRegionPartitioner.java new file mode 100644 index 0000000..f466e68 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestHRegionPartitioner.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; + +@Category(MediumTests.class) +public class TestHRegionPartitioner { + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniMapReduceCluster(); + UTIL.shutdownMiniCluster(); + } + + /** + * Test HRegionPartitioner + */ + @Test + public void testHRegionPartitioner() throws Exception { + + byte[][] families = { Bytes.toBytes("familyA"), Bytes.toBytes("familyB") }; + + UTIL.createTable(Bytes.toBytes("out_table"), families, 1, Bytes.toBytes("aa"), + Bytes.toBytes("cc"), 3); + + HRegionPartitioner partitioner = new HRegionPartitioner(); + Configuration configuration = UTIL.getConfiguration(); + configuration.set(TableOutputFormat.OUTPUT_TABLE, "out_table"); + partitioner.setConf(configuration); + ImmutableBytesWritable writable = new ImmutableBytesWritable(Bytes.toBytes("bb")); + + assertEquals(1, partitioner.getPartition(writable, 10L, 3)); + assertEquals(0, partitioner.getPartition(writable, 10L, 1)); + } +} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index c0cd2f1..f42d052 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -20,8 +20,13 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -39,10 +44,17 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExitException; +import org.apache.hadoop.hbase.util.ExitUtil; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.util.GenericOptionsParser; import org.junit.After; import org.junit.AfterClass; @@ -51,7 +63,14 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import static org.mockito.Mockito.*; + +/** + * Tests the table import and table export MR job functionality + */ @Category(MediumTests.class) public class TestImportExport { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); @@ -87,6 +106,48 @@ public class TestImportExport { } /** + * Runs an export job with the specified command line args + * + * @param args + * @return true if job completed successfully + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + boolean runExport(String[] args) throws IOException, InterruptedException, ClassNotFoundException { + // need to make a copy of the configuration because to make sure different + // temp dirs are used. + GenericOptionsParser opts = new GenericOptionsParser( + new Configuration(UTIL.getConfiguration()), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + Job job = Export.createSubmittableJob(conf, args); + job.waitForCompletion(false); + return job.isSuccessful(); + } + + /** + * Runs an import job with the specified command line args + * + * @param args + * @return true if job completed successfully + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + boolean runImport(String[] args) throws IOException, InterruptedException, ClassNotFoundException { + // need to make a copy of the configuration because to make sure different + // temp dirs are used. + GenericOptionsParser opts = new GenericOptionsParser( + new Configuration(UTIL.getConfiguration()), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + Job job = Import.createSubmittableJob(conf, args); + job.waitForCompletion(false); + return job.isSuccessful(); + } + + /** * Test simple replication case with column mapping * @throws Exception */ @@ -108,11 +169,15 @@ public class TestImportExport { String[] args = new String[] { EXPORT_TABLE, OUTPUT_DIR, - "1000" + "1000", + Long.toString(0), + Long.toString(System.currentTimeMillis()), + "^row." }; GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); Configuration conf = opts.getConfiguration(); + conf.setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true); args = opts.getRemainingArgs(); Job job = Export.createSubmittableJob(conf, args); @@ -187,7 +252,7 @@ public class TestImportExport { p.add(FAMILYA, QUAL, now+4, QUAL); t.put(p); - Delete d = new Delete(ROW1, now+3, null); + Delete d = new Delete(ROW1, now+3); t.delete(d); d = new Delete(ROW1); d.deleteColumns(FAMILYA, QUAL, now+2); @@ -343,4 +408,117 @@ public class TestImportExport { results.close(); return count; } + + /** + * test main method. Import should print help and call System.exit + */ + @Test + public void testImportMain() throws Exception { + PrintStream oldPrintStream = System.err; + SecurityManager SECURITY_MANAGER = System.getSecurityManager(); + ByteArrayOutputStream data = new ByteArrayOutputStream(); + String[] args = {}; + System.setErr(new PrintStream(data)); + try { + System.setErr(new PrintStream(data)); + ExitUtil.activeTest(); + Import.main(args); + fail("should be ExitException"); + } catch (ExitException e) { + assertEquals(-1, e.getExitCode()); + assertTrue(data.toString().contains("Wrong number of arguments:")); + assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); + assertTrue(data.toString().contains("-Dimport.filter.class=")); + assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); + assertTrue(data.toString().contains("-Dmapred.reduce.tasks.speculative.execution=false")); + } finally { + System.setErr(oldPrintStream); + System.setSecurityManager(SECURITY_MANAGER); + } + } + + /** + * test main method. Export should print help and call System.exit + */ + @Test + public void testExportMain() throws Exception { + PrintStream oldPrintStream = System.err; + ByteArrayOutputStream data = new ByteArrayOutputStream(); + String[] args = {}; + System.setErr(new PrintStream(data)); + try { + System.setErr(new PrintStream(data)); + ExitUtil.activeTest(); + Export.main(args); + fail("should be ExitException"); + } catch (ExitException e) { + assertEquals(-1, e.getExitCode()); + assertTrue(data.toString().contains("Wrong number of arguments:")); + assertTrue(data + .toString() + .contains( + "Usage: Export [-D ]* [ " + + "[ []] [^[regex pattern] or [Prefix] to filter]]")); + assertTrue(data.toString().contains("-D hbase.mapreduce.scan.column.family=")); + assertTrue(data.toString().contains("-D hbase.mapreduce.include.deleted.rows=true")); + assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100")); + assertTrue(data.toString().contains("-Dmapred.map.tasks.speculative.execution=false")); + assertTrue(data.toString().contains("-Dmapred.reduce.tasks.speculative.execution=false")); + } finally { + System.setErr(oldPrintStream); + } + } + + /** + * Test map method of Importer + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testKeyValueImporter() throws Exception { + KeyValueImporter importer = new KeyValueImporter(); + Configuration configuration = new Configuration(); + Context ctx = mock(Context.class); + when(ctx.getConfiguration()).thenReturn(configuration); + + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; + KeyValue key = (KeyValue) invocation.getArguments()[1]; + assertEquals("Key", Bytes.toString(writer.get())); + assertEquals("row", Bytes.toString(key.getRow())); + return null; + } + }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class)); + + importer.setup(ctx); + Result value = mock(Result.class); + KeyValue[] keys = { + new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), + Bytes.toBytes("value")), + new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), + Bytes.toBytes("value1")) }; + when(value.raw()).thenReturn(keys); + importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx); + + } + + /** + * Test addFilterAndArguments method of Import This method set couple + * parameters into Configuration + */ + @Test + public void testAddFilterAndArguments() { + Configuration configuration = new Configuration(); + + List args = new ArrayList(); + args.add("param1"); + args.add("param2"); + + Import.addFilterAndArguments(configuration, FilterBase.class, args); + assertEquals("org.apache.hadoop.hbase.filter.FilterBase", + configuration.get(Import.FILTER_CLASS_CONF_KEY)); + assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); + } } diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 77d044b..fb5371a 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -19,37 +19,33 @@ */ package org.apache.hadoop.hbase.mapreduce; -import java.io.UnsupportedEncodingException; -import java.util.List; -import java.util.ArrayList; - +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.GenericOptionsParser; - +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Result; - +import org.apache.hadoop.hbase.util.ExitException; +import org.apache.hadoop.hbase.util.ExitUtil; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; import org.junit.Test; - -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; import org.junit.experimental.categories.Category; +import java.io.*; +import java.util.ArrayList; +import java.util.List; + import static org.junit.Assert.*; @Category(MediumTests.class) @@ -355,8 +351,83 @@ public class TestImportTsv { return new String(bytes); } + /** + * Test main method of ImportTsv + */ + + @Test + public void testMain() throws Exception { + PrintStream oldErrorStream = System.err; + ByteArrayOutputStream data = new ByteArrayOutputStream(); + System.setErr(new PrintStream(data)); + // test print help test + try { + String args[] = {}; + ExitUtil.activeTest(); + ImportTsv.main(args); + + } catch (ExitException e) { + assertEquals(-1, e.getExitCode()); + assertTrue(data.toString().contains("ERROR: Wrong number of arguments: 0")); + assertTrue(data.toString().contains( + "Usage: importtsv -Dimporttsv.columns=a,b,c ")); + assertEquals("org.apache.hadoop.hbase.util.ExitException", e.toString()); + + } + + HBaseTestingUtility htu1 = new HBaseTestingUtility(); + + htu1.startMiniCluster(); + htu1.startMiniMapReduceCluster(); + + Configuration conf = htu1.getConfiguration(); + + String inputFile = "InputFile2.esv"; + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream op = fs.create(new Path(inputFile), true); + + op.write(Bytes.toBytes("KEY\u001bVALUE1\u001bVALUE2\n")); + op.close(); + + final byte[] FAM = Bytes.toBytes("family"); + final byte[] TAB = Bytes.toBytes("myTableName"); + HTableDescriptor desc = new HTableDescriptor(TAB); + desc.addFamily(new HColumnDescriptor(FAM)); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + admin.close(); + // save old configure file + FileUtils.moveFile(new File("target" + File.separator + "test-classes" + File.separator + + "hbase-site.xml"), new File("target" + File.separator + "test-classes" + File.separator + + "hbase-site.xml.old")); + File fconfig = new File("target" + File.separator + "test-classes" + File.separator + + "hbase-site.xml"); + // and write new configure file + OutputStream out = new FileOutputStream(fconfig); + conf.writeXml(out); + out.close(); + // try to execute ImportTsv + try { + String[] args = { "-Dimporttsv.columns=a,b,HBASE_ROW_KEY", "myTableName", inputFile }; + ImportTsv.main(args); + fail("should be exit!"); + } catch (ExitException e) { + assertEquals(0, e.getExitCode()); + } finally { + System.setErr(oldErrorStream); + fconfig.delete(); + FileUtils.moveFile(new File("target" + File.separator + "test-classes" + File.separator + + "hbase-site.xml.old"), new File("target" + File.separator + "test-classes" + + File.separator + "hbase-site.xml")); + + htu1.shutdownMiniMapReduceCluster(); + htu1.shutdownMiniCluster(); + + } + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} +} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 05abd3a..fd4d983 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -142,17 +142,15 @@ public class TestLoadIncrementalHFiles { final byte[] TABLE = Bytes.toBytes("mytable_"+testName); - HBaseAdmin admin = new HBaseAdmin(util.getConfiguration()); HTableDescriptor htd = new HTableDescriptor(TABLE); HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY); familyDesc.setBloomFilterType(bloomType); htd.addFamily(familyDesc); - admin.createTable(htd, SPLIT_KEYS); - HTable table = new HTable(util.getConfiguration(), TABLE); - util.waitTableAvailable(TABLE, 30000); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration(), useSecure); - loader.doBulkLoad(dir, table); + String [] args= {dir.toString(),"mytable_"+testName}; + loader.run(args); + HTable table = new HTable(util.getConfiguration(), TABLE); assertEquals(expectedRows, util.countRows(table)); } diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapReduceClasses.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapReduceClasses.java new file mode 100644 index 0000000..fc4418e --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapReduceClasses.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.Export.Exporter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.mockito.Mockito.*; +import static junit.framework.Assert.*; + +@Category(SmallTests.class) +public class TestMapReduceClasses { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testIdentityTableMapper() throws Exception { + IdentityTableMapper mapper = new IdentityTableMapper(); + Scan scan = new Scan(); + Job job = new Job(new Configuration()); + IdentityTableMapper.initJob("table", scan, Exporter.class, job); + assertEquals(TableInputFormat.class, job.getInputFormatClass()); + assertEquals(Result.class, job.getMapOutputValueClass()); + assertEquals(ImmutableBytesWritable.class, job.getMapOutputKeyClass()); + assertEquals(Exporter.class, job.getMapperClass()); + + Context context = mock(Context.class); + ImmutableBytesWritable key = new ImmutableBytesWritable(Bytes.toBytes("key")); + Result value = new Result(key); + mapper.map(key, value, context); + verify(context).write(key, value); + } + +} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java new file mode 100644 index 0000000..1a35ea6 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExitException; +import org.apache.hadoop.hbase.util.ExitUtil; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; + +import static org.junit.Assert.*; + +/** + * Test the rowcounter map reduce job. + */ +@Category(LargeTests.class) +public class TestRowCounter { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String TABLE_NAME = "testRowCounter"; + private final static String COL_FAM = "col_fam"; + private final static String COL1 = "c1"; + private final static String COL2 = "c2"; + private final static int TOTAL_ROWS = 10; + private final static int ROWS_WITH_ONE_COL = 2; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + TEST_UTIL.startMiniMapReduceCluster(); + HTable table = TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME), Bytes.toBytes(COL_FAM)); + writeRows(table); + table.close(); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.shutdownMiniMapReduceCluster(); + } + + /** + * Test a case when no column was specified in command line arguments. + * + * @throws Exception + */ + @Test + public void testRowCounterNoColumn() throws Exception { + String[] args = new String[] { TABLE_NAME }; + runRowCount(args, 10); + } + + /** + * Test a case when the column specified in command line arguments is + * exclusive for few rows. + * + * @throws Exception + */ + @Test + public void testRowCounterExclusiveColumn() throws Exception { + String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COL1 }; + runRowCount(args, 8); + } + + /** + * Test a case when the column specified in command line arguments is not part + * of first KV for a row. + * + * @throws Exception + */ + @Test + public void testRowCounterHiddenColumn() throws Exception { + String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COL2 }; + runRowCount(args, 2); + } + + /** + * Run the RowCounter map reduce job and verify the row count. + * + * @param args + * the command line arguments to be used for rowcounter job. + * @param expectedCount + * the expected row count (result of map reduce job). + * @throws Exception + */ + private void runRowCount(String[] args, int expectedCount) throws Exception { + GenericOptionsParser opts = new GenericOptionsParser(TEST_UTIL.getConfiguration(), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + Job job = RowCounter.createSubmittableJob(conf, args); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS); + assertEquals(expectedCount, counter.getValue()); + } + + /** + * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have + * two columns, Few have one. + * + * @param table + * @throws IOException + */ + private static void writeRows(HTable table) throws IOException { + final byte[] family = Bytes.toBytes(COL_FAM); + final byte[] value = Bytes.toBytes("abcd"); + final byte[] col1 = Bytes.toBytes(COL1); + final byte[] col2 = Bytes.toBytes(COL2); + ArrayList rowsUpdate = new ArrayList(); + // write few rows with two columns + int i = 0; + for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) { + byte[] row = Bytes.toBytes("row" + i); + Put put = new Put(row); + put.add(family, col1, value); + put.add(family, col2, value); + rowsUpdate.add(put); + } + + // write few rows with only one column + for (; i < TOTAL_ROWS; i++) { + byte[] row = Bytes.toBytes("row" + i); + Put put = new Put(row); + put.add(family, col2, value); + rowsUpdate.add(put); + } + table.put(rowsUpdate); + } + + /** + * test main method. Import should print help and call System.exit + */ + @Test + public void testImportMain() throws Exception { + PrintStream oldPrintStream = System.err; + ByteArrayOutputStream data = new ByteArrayOutputStream(); + String[] args = {}; + System.setErr(new PrintStream(data)); + try { + System.setErr(new PrintStream(data)); + + try { + ExitUtil.activeTest(); + + RowCounter.main(args); + fail("should be SecurityException"); + } catch (ExitException e) { + assertEquals(-1, e.getExitCode()); + assertTrue(data.toString().contains("Wrong number of parameters:")); + assertTrue(data.toString().contains("Usage:")); + } + data.reset(); + try { + args = new String[2]; + args[0] = "table"; + args[1] = "--range=1"; + RowCounter.main(args); + fail("should be SecurityException"); + } catch (ExitException e) { + assertEquals(-1, e.getExitCode()); + assertTrue(data + .toString() + .contains( + "Please specify range in such format as \"--range=a,b\" or, " + + "with only one boundary, \"--range=,b\" or \"--range=a,\"")); + assertTrue(data + .toString() + .contains( + "Usage: RowCounter [options] [--range=[startKey],[endKey]]" + + " [ ...]")); + } + + } finally { + System.setErr(oldPrintStream); + } + + } + +} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java new file mode 100644 index 0000000..9a1ecfc --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedPartitioner; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import static org.junit.Assert.*; + +/** + * Test TableMapReduceUtil + */ + +@Category(SmallTests.class) +public class TestTableMapReduceUtil { + + /** + * Test initTableMapperJob method + */ + @Test + public void testInitTableMapperJob() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); + TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class, + Text.class, job, false, HLogInputFormat.class); + assertEquals(HLogInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); + assertEquals("org.apache.hadoop.io.serializer.WritableSerialization", job.getConfiguration() + .get("io.serializations")); + + configuration = new Configuration(); + job = new Job(configuration, "tableName"); + TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), + Import.Importer.class, Text.class, Text.class, job, false, HLogInputFormat.class); + assertEquals(HLogInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); + assertEquals("org.apache.hadoop.io.serializer.WritableSerialization", job.getConfiguration() + .get("io.serializations")); + + configuration = new Configuration(); + job = new Job(configuration, "tableName"); + TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), + Import.Importer.class, Text.class, Text.class, job); + assertEquals(TableInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); + assertEquals("org.apache.hadoop.io.serializer.WritableSerialization", job.getConfiguration() + .get("io.serializations")); + + configuration = new Configuration(); + job = new Job(configuration, "tableName"); + TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), + Import.Importer.class, Text.class, Text.class, job, false); + assertEquals(TableInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); + assertEquals("org.apache.hadoop.io.serializer.WritableSerialization", job.getConfiguration() + .get("io.serializations")); + } + + /** + * test initTableReducerJob method + * + * @throws Exception + */ + @Test + public void testInitCredentials() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration); + TableMapReduceUtil + .initTableReducerJob("table", IdentityTableReducer.class, job, + KeyFieldBasedPartitioner.class, "quorum:12345:directory", "serverClass", "serverImpl", + true); + configuration = job.getConfiguration(); + assertEquals("quorum:12345:directory", configuration.get(TableOutputFormat.QUORUM_ADDRESS)); + assertEquals(TableOutputFormat.class, job.getOutputFormatClass()); + assertEquals("serverClass", configuration.get(TableOutputFormat.REGION_SERVER_CLASS)); + assertEquals("serverImpl", configuration.get(TableOutputFormat.REGION_SERVER_IMPL)); + assertEquals(KeyFieldBasedPartitioner.class, job.getPartitionerClass()); + + } + +} \ No newline at end of file diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java index ded4dd6..1dd7441 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java @@ -43,5 +43,24 @@ public class TestTableSplit { @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); + + + @Test + public void testCompareTo() { + TableSplit split1 = new TableSplit("table".getBytes(), "row-start".getBytes(), + "row-end".getBytes(), "location"); + TableSplit split2 = new TableSplit("table".getBytes(), "row-start".getBytes(), + "row-end".getBytes(), "location"); + TableSplit split3 = new TableSplit("table2".getBytes(), "row-start".getBytes(), + "row-end".getBytes(), "location"); + TableSplit split4 = new TableSplit("table".getBytes(), "row-start1".getBytes(), + "row-end".getBytes(), "location"); + + assertEquals(0, split1.compareTo(split2)); + assertEquals(-1, split1.compareTo(split3)); + assertEquals(-1, split1.compareTo(split4)); + + } + } diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 93653af..3b9e753 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -17,27 +17,36 @@ */ package org.apache.hadoop.hbase.mapreduce; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNull; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.WALPlayer.HLogKeyValueMapper; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExitException; +import org.apache.hadoop.hbase.util.ExitUtil; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Mapper.Context; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; /** * Basic test for the WALPlayer M/R tool @@ -90,14 +99,137 @@ public class TestWALPlayer { String walInputDir = new Path(cluster.getMaster().getMasterFileSystem() .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); - WALPlayer player = new WALPlayer(TEST_UTIL.getConfiguration()); + Configuration configuration= TEST_UTIL.getConfiguration(); + WALPlayer player = new WALPlayer(configuration); + String optionName="_test_.name"; + configuration.set(optionName, "1000"); + player.setupTime(configuration, optionName); + assertEquals(1000,configuration.getLong(optionName,0)); assertEquals(0, player.run(new String[] { walInputDir, Bytes.toString(TABLENAME1), Bytes.toString(TABLENAME2) })); + // verify the WAL was player into table 2 Get g = new Get(ROW); Result r = t2.get(g); assertEquals(1, r.size()); assertTrue(Bytes.equals(COLUMN2, r.raw()[0].getQualifier())); } + /** + * Simple end-to-end test + * @throws Exception + */ + @Test + public void testWALPlayerFileOut() throws Exception { + final byte[] TABLENAME1 = Bytes.toBytes("testWALPlayer11"); + final byte[] TABLENAME2 = Bytes.toBytes("testWALPlayer12"); + final byte[] FAMILY = Bytes.toBytes("family"); + final byte[] COLUMN1 = Bytes.toBytes("c1"); + final byte[] COLUMN2 = Bytes.toBytes("c2"); + final byte[] ROW = Bytes.toBytes("row"); + HTable t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY); + HTable t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY); + + // put a row into the first table + Put p = new Put(ROW); + p.add(FAMILY, COLUMN1, COLUMN1); + p.add(FAMILY, COLUMN2, COLUMN2); + t1.put(p); + // delete one column + Delete d = new Delete(ROW); + d.deleteColumns(FAMILY, COLUMN1); + t1.delete(d); + + // replay the WAL, map table 1 to table 2 + HLog log = cluster.getRegionServer(0).getWAL(); + log.rollWriter(); + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem() + .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); + + Configuration configuration= TEST_UTIL.getConfiguration(); + configuration.set(WALPlayer.BULK_OUTPUT_CONF_KEY,new File("myout").getAbsolutePath()); + WALPlayer player = new WALPlayer(configuration); + String optionName="_test_.name"; + configuration.set(optionName, "1000"); + player.setupTime(configuration, optionName); + assertEquals(1000,configuration.getLong(optionName,0)); + assertEquals(0, player.run(new String[] { walInputDir, Bytes.toString(TABLENAME1), + Bytes.toString(TABLENAME2) })); + + + // verify the WAL was player into table 2 + Result r = t2.get(new Get(ROW)); + assertEquals(0, r.size()); + } + + /** + * Test HLogKeyValueMapper setup and map + */ + @Test + public void testHLogKeyValueMapper() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(WALPlayer.TABLES_KEY, "table"); + HLogKeyValueMapper mapper = new HLogKeyValueMapper(); + HLogKey key = mock(HLogKey.class); + when(key.getTablename()).thenReturn(Bytes.toBytes("table")); + @SuppressWarnings("unchecked") + Mapper.Context context = + mock(Context.class); + when(context.getConfiguration()).thenReturn(configuration); + + WALEdit value = mock(WALEdit.class); + List values = new ArrayList(); + KeyValue kv1 = mock(KeyValue.class); + when(kv1.getFamily()).thenReturn(Bytes.toBytes("family")); + when(kv1.getRow()).thenReturn(Bytes.toBytes("row")); + values.add(kv1); + when(value.getKeyValues()).thenReturn(values); + mapper.setup(context); + + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; + KeyValue key = (KeyValue) invocation.getArguments()[1]; + assertEquals("row", Bytes.toString(writer.get())); + assertEquals("row", Bytes.toString(key.getRow())); + return null; + } + }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class)); + + mapper.map(key, value, context); + + } + + /** + * Test main method + */ + @Test + public void testMainMethod() throws Exception { + + PrintStream oldPrintStream = System.err; + ByteArrayOutputStream data = new ByteArrayOutputStream(); + String[] args = {}; + System.setErr(new PrintStream(data)); + try { + System.setErr(new PrintStream(data)); + try { + ExitUtil.activeTest(); + WALPlayer.main(args); + fail("should be SecurityException"); + } catch (ExitException e) { + assertEquals(-1, e.getExitCode()); + assertTrue(data.toString().contains("ERROR: Wrong number of arguments:")); + assertTrue(data.toString().contains( + "Usage: WALPlayer [options] []")); + assertTrue(data.toString().contains("-Dhlog.bulk.output=/path/for/output")); + } + + } finally { + System.setErr(oldPrintStream); + } + + } + }