diff --git src/main/java/org/apache/hadoop/hbase/mapred/Driver.java src/main/java/org/apache/hadoop/hbase/mapred/Driver.java index d38956c..0239375 100644 --- src/main/java/org/apache/hadoop/hbase/mapred/Driver.java +++ src/main/java/org/apache/hadoop/hbase/mapred/Driver.java @@ -21,21 +21,36 @@ package org.apache.hadoop.hbase.mapred; import org.apache.hadoop.util.ProgramDriver; +import com.google.common.annotations.VisibleForTesting; + /** - * Driver for hbase mapreduce jobs. Select which to run by passing - * name of job to this main. + * Driver for hbase mapreduce jobs. Select which to run by passing name of job + * to this main. */ @Deprecated public class Driver { + + static ProgramDriver pgdMock; + + @VisibleForTesting + static void setProgramDriver(ProgramDriver programDriverMock) { + pgdMock = programDriverMock; + } + /** * @param args * @throws Throwable */ public static void main(String[] args) throws Throwable { - ProgramDriver pgd = new ProgramDriver(); - pgd.addClass(RowCounter.NAME, RowCounter.class, - "Count rows in HBase table"); - ProgramDriver.class.getMethod("driver", new Class [] {String[].class}). - invoke(pgd, new Object[]{args}); + ProgramDriver pgd = null; + + if (pgdMock != null) + pgd = pgdMock; + else + pgd = new ProgramDriver(); + + pgd.addClass(RowCounter.NAME, RowCounter.class, "Count rows in HBase table"); + ProgramDriver.class.getMethod("driver", new Class[] { String[].class }) + .invoke(pgd, new Object[] { args }); } -} +} \ No newline at end of file diff --git src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java new file mode 100644 index 0000000..7c432c8 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java @@ -0,0 +1,40 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.util.ProgramDriver; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category(SmallTests.class) +public class TestDriver { + + @Test + @SuppressWarnings("deprecation") + public void driverMethodshouldBeCalled() throws Throwable { + ProgramDriver programDriverMock = Mockito.mock(ProgramDriver.class); + Driver.setProgramDriver(programDriverMock); + Driver.main(new String[] {}); + Mockito.verify(programDriverMock).driver(Mockito.any(String[].class)); + + Driver.main(new String[] {}); + } +} diff --git src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java new file mode 100644 index 0000000..5df4ba6 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java @@ -0,0 +1,174 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static com.google.common.collect.ImmutableList.of; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.ImmutableList; + +@Category(SmallTests.class) +public class TestGroupingTableMap { + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldNotCallCollectonSinceFindUniqueKeyValueMoreThanOnes() + throws Exception { + GroupingTableMap gTableMap = null; + try { + Result result = mock(Result.class); + Reporter reporter = mock(Reporter.class); + gTableMap = new GroupingTableMap(); + Configuration cfg = new Configuration(); + cfg.set(GroupingTableMap.GROUP_COLUMNS, + "familyA:qualifierA familyB:qualifierB"); + JobConf jobConf = new JobConf(cfg); + gTableMap.configure(jobConf); + + byte[] row = {}; + ImmutableList keyValues = of( + new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), + Bytes.toBytes("1111")), new KeyValue(row, "familyA".getBytes(), + "qualifierA".getBytes(), Bytes.toBytes("2222")), + new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), + Bytes.toBytes("3333"))); + when(result.list()).thenReturn(keyValues); + OutputCollector outputCollectorMock = mock(OutputCollector.class); + gTableMap.map(null, result, outputCollectorMock, reporter); + verify(outputCollectorMock, never()).collect( + any(ImmutableBytesWritable.class), any(Result.class)); + } finally { + if (gTableMap != null) + gTableMap.close(); + } + } + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldCreateNewKeyAlthoughExtraKey() throws Exception { + GroupingTableMap gTableMap = null; + try { + Result result = mock(Result.class); + Reporter reporter = mock(Reporter.class); + gTableMap = new GroupingTableMap(); + Configuration cfg = new Configuration(); + cfg.set(GroupingTableMap.GROUP_COLUMNS, + "familyA:qualifierA familyB:qualifierB"); + JobConf jobConf = new JobConf(cfg); + gTableMap.configure(jobConf); + + byte[] row = {}; + ImmutableList keyValues = of( + new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), + Bytes.toBytes("1111")), new KeyValue(row, "familyB".getBytes(), + "qualifierB".getBytes(), Bytes.toBytes("2222")), + new KeyValue(row, "familyC".getBytes(), "qualifierC".getBytes(), + Bytes.toBytes("3333"))); + when(result.list()).thenReturn(keyValues); + OutputCollector outputCollectorMock = mock(OutputCollector.class); + gTableMap.map(null, result, outputCollectorMock, reporter); + verify(outputCollectorMock, times(1)).collect( + any(ImmutableBytesWritable.class), any(Result.class)); + } finally { + if (gTableMap != null) + gTableMap.close(); + } + } + + @Test + @SuppressWarnings({ "deprecation" }) + public void shouldCreateNewKey() throws Exception { + GroupingTableMap gTableMap = null; + try { + Result result = mock(Result.class); + Reporter reporter = mock(Reporter.class); + final byte[] bSeparator = Bytes.toBytes(" "); + gTableMap = new GroupingTableMap(); + Configuration cfg = new Configuration(); + cfg.set(GroupingTableMap.GROUP_COLUMNS, + "familyA:qualifierA familyB:qualifierB"); + JobConf jobConf = new JobConf(cfg); + gTableMap.configure(jobConf); + + final byte[] firstPartKeyValue = Bytes.toBytes("34879512738945"); + final byte[] secondPartKeyValue = Bytes.toBytes("35245142671437"); + byte[] row = {}; + ImmutableList keyValues = of( + new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), + firstPartKeyValue), new KeyValue(row, "familyB".getBytes(), + "qualifierB".getBytes(), secondPartKeyValue)); + when(result.list()).thenReturn(keyValues); + + OutputCollector outputCollector = new OutputCollector() { + @Override + public void collect(ImmutableBytesWritable arg, Result result) + throws IOException { + assertArrayEquals(com.google.common.primitives.Bytes.concat( + firstPartKeyValue, bSeparator, secondPartKeyValue), + arg.copyBytes()); + } + }; + + gTableMap.map(null, result, outputCollector, reporter); + final byte[] firstPartValue = Bytes.toBytes("238947928"); + final byte[] secondPartValue = Bytes.toBytes("4678456942345"); + byte[][] data = { firstPartValue, secondPartValue }; + ImmutableBytesWritable byteWritable = gTableMap.createGroupKey(data); + assertArrayEquals(com.google.common.primitives.Bytes.concat( + firstPartValue, bSeparator, secondPartValue), byteWritable.get()); + } finally { + if (gTableMap != null) + gTableMap.close(); + } + } + + @Test + @SuppressWarnings({ "deprecation" }) + public void shouldReturnNullFromCreateGroupKey() throws Exception { + GroupingTableMap gTableMap = null; + try { + gTableMap = new GroupingTableMap(); + assertNull(gTableMap.createGroupKey(null)); + } finally { + if (gTableMap != null) + gTableMap.close(); + } + } +} diff --git src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java new file mode 100644 index 0000000..4ff5265 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category(SmallTests.class) +public class TestIdentityTableMap { + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldCollectPredefinedTimes() throws IOException { + int recordNumber = 999; + Result resultMock = mock(Result.class); + IdentityTableMap identityTableMap = null; + try { + Reporter reporterMock = mock(Reporter.class); + identityTableMap = new IdentityTableMap(); + ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class); + OutputCollector outputCollectorMock = + mock(OutputCollector.class); + + for (int i = 0; i < recordNumber; i++) + identityTableMap.map(bytesWritableMock, resultMock, + outputCollectorMock, reporterMock); + + verify(outputCollectorMock, times(recordNumber)).collect( + Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class)); + } finally { + if (identityTableMap != null) + identityTableMap.close(); + } + } +} \ No newline at end of file diff --git src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java new file mode 100644 index 0000000..d9bfee7 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java @@ -0,0 +1,162 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import com.google.common.base.Joiner; + +@Category(SmallTests.class) +public class TestRowCounter { + + @Test + @SuppressWarnings("deprecation") + public void shouldPrintUsage() throws Exception { + String expectedOutput = "rowcounter [...]"; + String result = new OutputReader(System.out) { + @Override + void doRead() { + assertEquals(-1, RowCounter.printUsage()); + } + }.read(); + + assertTrue(result.startsWith(expectedOutput)); + } + + @Test + @SuppressWarnings("deprecation") + public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree() + throws Exception { + final String[] args = new String[] { "one", "two" }; + String line = "ERROR: Wrong number of parameters: " + args.length; + String result = new OutputReader(System.err) { + @Override + void doRead() throws Exception { + assertEquals(-1, new RowCounter().run(args)); + } + }.read(); + + assertTrue(result.startsWith(line)); + } + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldRegInReportEveryIncomingRow() throws IOException { + int iterationNumber = 999; + RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper(); + Reporter reporter = mock(Reporter.class); + for (int i = 0; i < iterationNumber; i++) + mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class), + mock(OutputCollector.class), reporter); + + Mockito.verify(reporter, times(iterationNumber)).incrCounter( + any(Enum.class), anyInt()); + } + + @Test + @SuppressWarnings({ "deprecation" }) + public void shouldCreateAndRunSubmittableJob() throws Exception { + RowCounter rCounter = new RowCounter(); + rCounter.setConf(HBaseConfiguration.create()); + String[] args = new String[] { "\temp", "tableA", "column1", "column2", + "column3" }; + JobConf jobConfig = rCounter.createSubmittableJob(args); + + assertNotNull(jobConfig); + assertEquals(0, jobConfig.getNumReduceTasks()); + assertEquals("rowcounter", jobConfig.getJobName()); + assertEquals(jobConfig.getMapOutputValueClass(), Result.class); + assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class); + assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), Joiner.on(' ') + .join("column1", "column2", "column3")); + assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class); + } + + enum Outs { + OUT, ERR + } + + private static abstract class OutputReader { + private final PrintStream ps; + private PrintStream oldPrintStream; + private Outs outs; + + protected OutputReader(PrintStream ps) { + this.ps = ps; + } + + protected String read() throws Exception { + ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); + if (ps == System.out) { + oldPrintStream = System.out; + outs = Outs.OUT; + System.setOut(new PrintStream(outBytes)); + } else if (ps == System.err) { + oldPrintStream = System.err; + outs = Outs.ERR; + System.setErr(new PrintStream(outBytes)); + } else { + throw new IllegalStateException("OutputReader: unsupported PrintStream"); + } + + try { + doRead(); + return new String(outBytes.toByteArray()); + } finally { + switch (outs) { + case OUT: { + System.setOut(oldPrintStream); + break; + } + case ERR: { + System.setErr(oldPrintStream); + break; + } + default: + throw new IllegalStateException( + "OutputReader: unsupported PrintStream"); + } + } + } + + abstract void doRead() throws Exception; + } +} diff --git src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java new file mode 100644 index 0000000..cdd2462 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java @@ -0,0 +1,58 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSplitTable { + + @Test + @SuppressWarnings("deprecation") + public void testSplitTableCompareTo() { + TableSplit aTableSplit = new TableSplit(Bytes.toBytes("tableA"), + Bytes.toBytes("aaa"), Bytes.toBytes("ddd"), "locationA"); + + TableSplit bTableSplit = new TableSplit(Bytes.toBytes("tableA"), + Bytes.toBytes("iii"), Bytes.toBytes("kkk"), "locationA"); + + TableSplit cTableSplit = new TableSplit(Bytes.toBytes("tableA"), + Bytes.toBytes("lll"), Bytes.toBytes("zzz"), "locationA"); + + assertTrue(aTableSplit.compareTo(aTableSplit) == 0); + assertTrue(bTableSplit.compareTo(bTableSplit) == 0); + assertTrue(cTableSplit.compareTo(cTableSplit) == 0); + + assertTrue(aTableSplit.compareTo(bTableSplit) < 0); + assertTrue(bTableSplit.compareTo(aTableSplit) > 0); + + assertTrue(aTableSplit.compareTo(cTableSplit) < 0); + assertTrue(cTableSplit.compareTo(aTableSplit) > 0); + + assertTrue(bTableSplit.compareTo(cTableSplit) < 0); + assertTrue(cTableSplit.compareTo(bTableSplit) > 0); + + assertTrue(cTableSplit.compareTo(aTableSplit) > 0); + } +} \ No newline at end of file diff --git src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java index 38be7e4..ba1797d 100644 --- src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java +++ src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java @@ -35,12 +35,20 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; @@ -100,6 +108,55 @@ public class TestTableInputFormat { return table; } + public static HTable createTableWithLogEnabledConfiguration(byte[] tableName, + int logFrame, int frameNumber) throws IOException { + + // override setting + org.apache.hadoop.conf.Configuration c1 = UTIL.getConfiguration(); + c1.setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true); + c1.setInt( + org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT, + logFrame); + + HTable table = UTIL.createTable(tableName, FAMILY); + int rowLimit = logFrame * frameNumber; + for (int i = 0; i < rowLimit; i++) { + Put p = new Put(Bytes.toBytes("aaa" + i)); + p.add(FAMILY, null, Bytes.toBytes("value" + i)); + table.put(p); + } + return table; + } + + static void runTestMapredWithLimit(HTable table, int logFrame, int frameNumber) + throws IOException { + org.apache.hadoop.hbase.mapred.TableRecordReader trr = new org.apache.hadoop.hbase.mapred.TableRecordReader(); + boolean more = false; + Result r = new Result(); + int rowLimit = logFrame * frameNumber; + ImmutableBytesWritable key = new ImmutableBytesWritable(); + + trr.setStartRow("aaa".getBytes()); + trr.setEndRow("zzz".getBytes()); + trr.setHTable(table); + trr.setInputColumns(columns); + + // define custom filter + Filter vFilter = new ValueFilter(CompareOp.EQUAL, + new RegexStringComparator(".*value.*")); + trr.setRowFilter(vFilter); + trr.init(); + + for (int i = 0; i < rowLimit; i++) { + more = trr.next(key, r); + assertTrue(more); + } + + // no more data + more = trr.next(key, r); + assertFalse(more); + } + /** * Verify that the result and key have expected values. * @@ -126,8 +183,7 @@ public class TestTableInputFormat { * @throws IOException */ static void runTestMapred(HTable table) throws IOException { - org.apache.hadoop.hbase.mapred.TableRecordReader trr = - new org.apache.hadoop.hbase.mapred.TableRecordReader(); + org.apache.hadoop.hbase.mapred.TableRecordReader trr = new org.apache.hadoop.hbase.mapred.TableRecordReader(); trr.setStartRow("aaa".getBytes()); trr.setEndRow("zzz".getBytes()); trr.setHTable(table); @@ -160,8 +216,7 @@ public class TestTableInputFormat { */ static void runTestMapreduce(HTable table) throws IOException, InterruptedException { - org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr = - new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl(); + org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr = new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl(); Scan s = new Scan(); s.setStartRow("aaa".getBytes()); s.setStopRow("zzz".getBytes()); @@ -262,6 +317,15 @@ public class TestTableInputFormat { return htable; } + @Test + public void testTableRecordReaderWithLogEnabled() throws IOException { + int logFrame = 10; + int frameNumber = 10; + HTable table = createTableWithLogEnabledConfiguration("table0".getBytes(), + logFrame, frameNumber); + runTestMapredWithLimit(table, logFrame, frameNumber); + } + /** * Run test assuming no errors using mapred api. * @@ -352,8 +416,8 @@ public class TestTableInputFormat { * @throws InterruptedException */ @Test(expected = IOException.class) - public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException, - InterruptedException { + public void testTableRecordReaderScannerFailMapreduceTwice() + throws IOException, InterruptedException { HTable htable = createIOEScannerTable("table3-mr".getBytes(), 2); runTestMapreduce(htable); } @@ -387,7 +451,5 @@ public class TestTableInputFormat { } @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); } - diff --git src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java new file mode 100644 index 0000000..b917cfc --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java @@ -0,0 +1,281 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +@Category(LargeTests.class) +public class TestTableMapReduceUtil { + + private static final Log LOG = LogFactory + .getLog(TestTableMapReduceUtil.class); + + private static HTable presidentsTable; + private static final String TABLE_NAME = "People"; + + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info"); + private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name"); + + private static ImmutableSet presidentsRowKeys = ImmutableSet.of( + "president1", "president2", "president3"); + private static Iterator presidentNames = ImmutableSet.of( + "John F. Kennedy", "George W. Bush", "Barack Obama").iterator(); + + private static ImmutableSet actorsRowKeys = ImmutableSet.of("actor1", + "actor2"); + private static Iterator actorNames = ImmutableSet.of( + "Jack Nicholson", "Martin Freeman").iterator(); + + private static String PRESIDENT_PATTERN = "president"; + private static String ACTOR_PATTERN = "actor"; + private static ImmutableMap> relation = ImmutableMap + .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + presidentsTable = createAndFillTable(Bytes.toBytes(TABLE_NAME)); + UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniMapReduceCluster(); + UTIL.shutdownMiniCluster(); + } + + @Before + public void before() throws IOException { + LOG.info("before"); + UTIL.ensureSomeRegionServersAvailable(1); + LOG.info("before done"); + } + + public static HTable createAndFillTable(byte[] tableName) throws IOException { + HTable table = UTIL.createTable(tableName, COLUMN_FAMILY); + createPutCommand(table); + return table; + } + + private static void createPutCommand(HTable table) throws IOException { + for (String president : presidentsRowKeys) { + if (presidentNames.hasNext()) { + Put p = new Put(Bytes.toBytes(president)); + p.add(COLUMN_FAMILY, COLUMN_QUALIFIER, + Bytes.toBytes(presidentNames.next())); + table.put(p); + } + } + + for (String actor : actorsRowKeys) { + if (actorNames.hasNext()) { + Put p = new Put(Bytes.toBytes(actor)); + p.add(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next())); + table.put(p); + } + } + } + + /** + * Check what the given number of reduce tasks for the given job configuration + * does not exceed the number of regions for the given table. + */ + @Test + @SuppressWarnings("deprecation") + public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable() + throws IOException { + assertNotNull(presidentsTable); + Configuration cfg = UTIL.getConfiguration(); + JobConf jobConf = new JobConf(cfg); + TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.setScannerCaching(jobConf, 100); + assertEquals(1, jobConf.getNumReduceTasks()); + assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0)); + + jobConf.setNumReduceTasks(10); + TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); + assertEquals(1, jobConf.getNumReduceTasks()); + } + + @Test + @SuppressWarnings("deprecation") + public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable() + throws IOException { + Configuration cfg = UTIL.getConfiguration(); + JobConf jobConf = new JobConf(cfg); + TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); + assertEquals(1, jobConf.getNumMapTasks()); + + jobConf.setNumMapTasks(10); + TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); + assertEquals(1, jobConf.getNumMapTasks()); + } + + @Test + @SuppressWarnings("deprecation") + public void shoudBeValidMapReduceEvaluation() throws Exception { + String OUTPUT_DIR = "target" + File.separator + "test-data" + + File.separator + "shoudBeValidMapReduceEvaluation-output"; + Path FQ_OUTPUT_DIR = new Path(OUTPUT_DIR) + .makeQualified(new LocalFileSystem()); + JobConf jobConf = new JobConf(UTIL.getConfiguration()); + FileOutputFormat.setOutputPath(jobConf, FQ_OUTPUT_DIR); + assertNotNull(FileOutputFormat.getOutputPath(jobConf)); + jobConf.setJobName("MapReduce"); + jobConf.setNumReduceTasks(1); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), + ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, + jobConf); + TableMapReduceUtil.initTableReduceJob(TABLE_NAME, + ClassificatorRowReduce.class, jobConf); + RunningJob job = JobClient.runJob(jobConf); + assertTrue(job.isSuccessful()); + } + + @Test + @SuppressWarnings("deprecation") + public void shoudBeValidMapReduceWithPartitionerEvaluation() + throws IOException { + String OUTPUT_DIR = "target" + File.separator + "test-data" + + File.separator + + "shoudBeValidMapReduceWithPartitionerEvaluation-output"; + Path FQ_OUTPUT_DIR = new Path(OUTPUT_DIR) + .makeQualified(new LocalFileSystem()); + + JobConf jobConf = new JobConf(UTIL.getConfiguration()); + FileOutputFormat.setOutputPath(jobConf, FQ_OUTPUT_DIR); + assertNotNull(FileOutputFormat.getOutputPath(jobConf)); + jobConf.setJobName("MapReduceWithPartitioner"); + jobConf.setNumReduceTasks(2); + TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), + ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, + jobConf); + + TableMapReduceUtil.initTableReduceJob(TABLE_NAME, + ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class); + RunningJob job = JobClient.runJob(jobConf); + assertTrue(job.isSuccessful()); + } + + @SuppressWarnings("deprecation") + static class ClassificatorRowReduce extends MapReduceBase implements + TableReduce { + + @Override + public void reduce(ImmutableBytesWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + String strKey = Bytes.toString(key.get()); + List result = new ArrayList(); + while (values.hasNext()) + result.add(values.next()); + + if (relation.keySet().contains(strKey)) { + Set set = relation.get(strKey); + if (set != null) { + assertEquals(set.size(), result.size()); + } else { + throwAccertionError("Test infrastructure error: set is null"); + } + } else { + throwAccertionError("Test infrastructure error: key not found in map"); + } + } + + private void throwAccertionError(String errorMessage) throws AssertionError { + throw new AssertionError(errorMessage); + } + } + + @SuppressWarnings("deprecation") + static class ClassificatorMapper extends MapReduceBase implements + TableMap { + + final ImmutableBytesWritable pKey = new ImmutableBytesWritable( + Bytes.toBytes(PRESIDENT_PATTERN)); + final ImmutableBytesWritable aKey = new ImmutableBytesWritable( + Bytes.toBytes(ACTOR_PATTERN)); + + @Override + public void map(ImmutableBytesWritable row, Result result, + OutputCollector outCollector, + Reporter reporter) throws IOException { + String rowKey = Bytes.toString(result.getRow()); + ImmutableBytesWritable outKey = null; + + if (rowKey.startsWith(PRESIDENT_PATTERN)) { + outKey = pKey; + } else if (rowKey.startsWith(ACTOR_PATTERN)) { + outKey = aKey; + } else { + throw new AssertionError("unexpected rowKey"); + } + + String name = Bytes.toString(result.getValue(COLUMN_FAMILY, + COLUMN_QUALIFIER)); + outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).add( + COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name))); + } + } +}