diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java index 3c501c5..d1b673a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java @@ -21,24 +21,31 @@ package org.apache.hadoop.hbase.mapred; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.util.ProgramDriver; +import com.google.common.annotations.VisibleForTesting; /** - * Driver for hbase mapreduce jobs. Select which to run by passing - * name of job to this main. + * Driver for hbase mapreduce jobs. Select which to run by passing name of job + * to this main. */ @Deprecated @InterfaceAudience.Public @InterfaceStability.Stable public class Driver { + + private static ProgramDriver pgd = new ProgramDriver(); + + @VisibleForTesting + static void setProgramDriver(ProgramDriver pgd0) { + pgd = pgd0; + } + /** * @param args * @throws Throwable */ public static void main(String[] args) throws Throwable { - ProgramDriver pgd = new ProgramDriver(); - pgd.addClass(RowCounter.NAME, RowCounter.class, - "Count rows in HBase table"); - ProgramDriver.class.getMethod("driver", new Class [] {String[].class}). - invoke(pgd, new Object[]{args}); + pgd.addClass(RowCounter.NAME, RowCounter.class, "Count rows in HBase table"); + ProgramDriver.class.getMethod("driver", new Class[] { String[].class }) + .invoke(pgd, new Object[] { args }); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java new file mode 100644 index 0000000..e89dbb0 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java @@ -0,0 +1,41 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.util.ProgramDriver; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@Category(SmallTests.class) +public class TestDriver { + + @Test + @SuppressWarnings("deprecation") + public void testDriverMainMethod() throws Throwable { + ProgramDriver programDriverMock = mock(ProgramDriver.class); + Driver.setProgramDriver(programDriverMock); + Driver.main(new String[]{}); + verify(programDriverMock).driver(Mockito.any(String[].class)); + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java new file mode 100644 index 0000000..d2becba --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java @@ -0,0 +1,167 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static com.google.common.collect.ImmutableList.of; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.ImmutableList; + +@Category(SmallTests.class) +public class TestGroupingTableMap { + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldNotCallCollectonSinceFindUniqueKeyValueMoreThanOnes() + throws Exception { + GroupingTableMap gTableMap = null; + try { + Result result = mock(Result.class); + Reporter reporter = mock(Reporter.class); + gTableMap = new GroupingTableMap(); + Configuration cfg = new Configuration(); + cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB"); + JobConf jobConf = new JobConf(cfg); + gTableMap.configure(jobConf); + + byte[] row = {}; + ImmutableList keyValues = of( + new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")), + new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("2222")), + new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("3333"))); + when(result.list()).thenReturn(keyValues); + OutputCollector outputCollectorMock = + mock(OutputCollector.class); + gTableMap.map(null, result, outputCollectorMock, reporter); + verify(outputCollectorMock, never()) + .collect(any(ImmutableBytesWritable.class), any(Result.class)); + } finally { + if (gTableMap != null) + gTableMap.close(); + } + } + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldCreateNewKeyAlthoughExtraKey() throws Exception { + GroupingTableMap gTableMap = null; + try { + Result result = mock(Result.class); + Reporter reporter = mock(Reporter.class); + gTableMap = new GroupingTableMap(); + Configuration cfg = new Configuration(); + cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB"); + JobConf jobConf = new JobConf(cfg); + gTableMap.configure(jobConf); + + byte[] row = {}; + ImmutableList keyValues = of( + new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")), + new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("2222")), + new KeyValue(row, "familyC".getBytes(), "qualifierC".getBytes(), Bytes.toBytes("3333"))); + when(result.list()).thenReturn(keyValues); + OutputCollector outputCollectorMock = + mock(OutputCollector.class); + gTableMap.map(null, result, outputCollectorMock, reporter); + verify(outputCollectorMock, times(1)) + .collect(any(ImmutableBytesWritable.class), any(Result.class)); + } finally { + if (gTableMap != null) + gTableMap.close(); + } + } + + @Test + @SuppressWarnings({ "deprecation" }) + public void shouldCreateNewKey() throws Exception { + GroupingTableMap gTableMap = null; + try { + Result result = mock(Result.class); + Reporter reporter = mock(Reporter.class); + final byte[] bSeparator = Bytes.toBytes(" "); + gTableMap = new GroupingTableMap(); + Configuration cfg = new Configuration(); + cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB"); + JobConf jobConf = new JobConf(cfg); + gTableMap.configure(jobConf); + + final byte[] firstPartKeyValue = Bytes.toBytes("34879512738945"); + final byte[] secondPartKeyValue = Bytes.toBytes("35245142671437"); + byte[] row = {}; + ImmutableList keyValues = of( + new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), firstPartKeyValue), + new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), secondPartKeyValue)); + when(result.list()).thenReturn(keyValues); + + OutputCollector outputCollector = + new OutputCollector() { + @Override + public void collect(ImmutableBytesWritable arg, Result result) throws IOException { + assertArrayEquals(com.google.common.primitives.Bytes.concat(firstPartKeyValue, bSeparator, + secondPartKeyValue), arg.copyBytes()); + } + }; + + gTableMap.map(null, result, outputCollector, reporter); + final byte[] firstPartValue = Bytes.toBytes("238947928"); + final byte[] secondPartValue = Bytes.toBytes("4678456942345"); + byte[][] data = { firstPartValue, secondPartValue }; + ImmutableBytesWritable byteWritable = gTableMap.createGroupKey(data); + assertArrayEquals(com.google.common.primitives.Bytes.concat(firstPartValue, + bSeparator, secondPartValue), byteWritable.get()); + } finally { + if (gTableMap != null) + gTableMap.close(); + } + } + + @Test + @SuppressWarnings({ "deprecation" }) + public void shouldReturnNullFromCreateGroupKey() throws Exception { + GroupingTableMap gTableMap = null; + try { + gTableMap = new GroupingTableMap(); + assertNull(gTableMap.createGroupKey(null)); + } finally { + if(gTableMap != null) + gTableMap.close(); + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java new file mode 100644 index 0000000..dc289fa --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java @@ -0,0 +1,63 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category(SmallTests.class) +public class TestIdentityTableMap { + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldCollectPredefinedTimes() throws IOException { + int recordNumber = 999; + Result resultMock = mock(Result.class); + IdentityTableMap identityTableMap = null; + try { + Reporter reporterMock = mock(Reporter.class); + identityTableMap = new IdentityTableMap(); + ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class); + OutputCollector outputCollectorMock = + mock(OutputCollector.class); + + for (int i = 0; i < recordNumber; i++) + identityTableMap.map(bytesWritableMock, resultMock, outputCollectorMock, + reporterMock); + + verify(outputCollectorMock, times(recordNumber)).collect( + Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class)); + } finally { + if (identityTableMap != null) + identityTableMap.close(); + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java new file mode 100644 index 0000000..c872a26 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java @@ -0,0 +1,162 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import com.google.common.base.Joiner; + +@Category(SmallTests.class) +public class TestRowCounter { + + @Test + @SuppressWarnings("deprecation") + public void shouldPrintUsage() throws Exception { + String expectedOutput = "rowcounter [...]"; + String result = new OutputReader(System.out) { + @Override + void doRead() { + assertEquals(-1, RowCounter.printUsage()); + } + }.read(); + + assertTrue(result.startsWith(expectedOutput)); + } + + @Test + @SuppressWarnings("deprecation") + public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree() + throws Exception { + final String[] args = new String[] { "one", "two" }; + String line = "ERROR: Wrong number of parameters: " + args.length; + String result = new OutputReader(System.err) { + @Override + void doRead() throws Exception { + assertEquals(-1, new RowCounter().run(args)); + } + }.read(); + + assertTrue(result.startsWith(line)); + } + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldRegInReportEveryIncomingRow() throws IOException { + int iterationNumber = 999; + RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper(); + Reporter reporter = mock(Reporter.class); + for (int i = 0; i < iterationNumber; i++) + mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class), + mock(OutputCollector.class), reporter); + + Mockito.verify(reporter, times(iterationNumber)).incrCounter( + any(Enum.class), anyInt()); + } + + @Test + @SuppressWarnings({ "deprecation" }) + public void shouldCreateAndRunSubmittableJob() throws Exception { + RowCounter rCounter = new RowCounter(); + rCounter.setConf(HBaseConfiguration.create()); + String[] args = new String[] { "\temp", "tableA", "column1", "column2", + "column3" }; + JobConf jobConfig = rCounter.createSubmittableJob(args); + + assertNotNull(jobConfig); + assertEquals(0, jobConfig.getNumReduceTasks()); + assertEquals("rowcounter", jobConfig.getJobName()); + assertEquals(jobConfig.getMapOutputValueClass(), Result.class); + assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class); + assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), Joiner.on(' ') + .join("column1", "column2", "column3")); + assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class); + } + + enum Outs { + OUT, ERR + } + + private static abstract class OutputReader { + private final PrintStream ps; + private PrintStream oldPrintStream; + private Outs outs; + + protected OutputReader(PrintStream ps) { + this.ps = ps; + } + + protected String read() throws Exception { + ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); + if (ps == System.out) { + oldPrintStream = System.out; + outs = Outs.OUT; + System.setOut(new PrintStream(outBytes)); + } else if (ps == System.err) { + oldPrintStream = System.err; + outs = Outs.ERR; + System.setErr(new PrintStream(outBytes)); + } else { + throw new IllegalStateException("OutputReader: unsupported PrintStream"); + } + + try { + doRead(); + return new String(outBytes.toByteArray()); + } finally { + switch (outs) { + case OUT: { + System.setOut(oldPrintStream); + break; + } + case ERR: { + System.setErr(oldPrintStream); + break; + } + default: + throw new IllegalStateException( + "OutputReader: unsupported PrintStream"); + } + } + } + + abstract void doRead() throws Exception; + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java new file mode 100644 index 0000000..68d2fe0 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java @@ -0,0 +1,88 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSplitTable { + + @Test + @SuppressWarnings("deprecation") + public void testSplitTableCompareTo() { + TableSplit aTableSplit = new TableSplit(Bytes.toBytes("tableA"), + Bytes.toBytes("aaa"), Bytes.toBytes("ddd"), "locationA"); + + TableSplit bTableSplit = new TableSplit(Bytes.toBytes("tableA"), + Bytes.toBytes("iii"), Bytes.toBytes("kkk"), "locationA"); + + TableSplit cTableSplit = new TableSplit(Bytes.toBytes("tableA"), + Bytes.toBytes("lll"), Bytes.toBytes("zzz"), "locationA"); + + assertTrue(aTableSplit.compareTo(aTableSplit) == 0); + assertTrue(bTableSplit.compareTo(bTableSplit) == 0); + assertTrue(cTableSplit.compareTo(cTableSplit) == 0); + + assertTrue(aTableSplit.compareTo(bTableSplit) < 0); + assertTrue(bTableSplit.compareTo(aTableSplit) > 0); + + assertTrue(aTableSplit.compareTo(cTableSplit) < 0); + assertTrue(cTableSplit.compareTo(aTableSplit) > 0); + + assertTrue(bTableSplit.compareTo(cTableSplit) < 0); + assertTrue(cTableSplit.compareTo(bTableSplit) > 0); + + assertTrue(cTableSplit.compareTo(aTableSplit) > 0); + } + + @Test + @SuppressWarnings("deprecation") + public void testSplitTableEquals() { + assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"), + Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes + .toBytes("tableB"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"), + "locationA"))); + + assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"), + Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes + .toBytes("tableA"), Bytes.toBytes("bbb"), Bytes.toBytes("ddd"), + "locationA"))); + + assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"), + Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes + .toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("eee"), + "locationA"))); + + assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"), + Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes + .toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"), + "locationB"))); + + assertTrue(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"), + Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes + .toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"), + "locationA"))); + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java new file mode 100644 index 0000000..99fc48d --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java @@ -0,0 +1,274 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +@Category(LargeTests.class) +public class TestTableMapReduceUtil { + + private static final Log LOG = LogFactory + .getLog(TestTableMapReduceUtil.class); + + private static HTable presidentsTable; + private static final String TABLE_NAME = "People"; + + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info"); + private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name"); + + private static ImmutableSet presidentsRowKeys = ImmutableSet.of( + "president1", "president2", "president3"); + private static Iterator presidentNames = ImmutableSet.of( + "John F. Kennedy", "George W. Bush", "Barack Obama").iterator(); + + private static ImmutableSet actorsRowKeys = ImmutableSet.of("actor1", + "actor2"); + private static Iterator actorNames = ImmutableSet.of( + "Jack Nicholson", "Martin Freeman").iterator(); + + private static String PRESIDENT_PATTERN = "president"; + private static String ACTOR_PATTERN = "actor"; + private static ImmutableMap> relation = ImmutableMap + .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + presidentsTable = createAndFillTable(Bytes.toBytes(TABLE_NAME)); + UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniMapReduceCluster(); + UTIL.shutdownMiniCluster(); + } + + @Before + public void before() throws IOException { + LOG.info("before"); + UTIL.ensureSomeRegionServersAvailable(1); + LOG.info("before done"); + } + + public static HTable createAndFillTable(byte[] tableName) throws IOException { + HTable table = UTIL.createTable(tableName, COLUMN_FAMILY); + createPutCommand(table); + return table; + } + + private static void createPutCommand(HTable table) throws IOException { + for (String president : presidentsRowKeys) { + if (presidentNames.hasNext()) { + Put p = new Put(Bytes.toBytes(president)); + p.add(COLUMN_FAMILY, COLUMN_QUALIFIER, + Bytes.toBytes(presidentNames.next())); + table.put(p); + } + } + + for (String actor : actorsRowKeys) { + if (actorNames.hasNext()) { + Put p = new Put(Bytes.toBytes(actor)); + p.add(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next())); + table.put(p); + } + } + } + + /** + * Check what the given number of reduce tasks for the given job configuration + * does not exceed the number of regions for the given table. + */ + @Test + @SuppressWarnings("deprecation") + public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable() + throws IOException { + Assert.assertNotNull(presidentsTable); + Configuration cfg = UTIL.getConfiguration(); + JobConf jobConf = new JobConf(cfg); + TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.setScannerCaching(jobConf, 100); + assertEquals(1, jobConf.getNumReduceTasks()); + assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0)); + + jobConf.setNumReduceTasks(10); + TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); + assertEquals(1, jobConf.getNumReduceTasks()); + } + + @Test + @SuppressWarnings("deprecation") + public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable() + throws IOException { + Configuration cfg = UTIL.getConfiguration(); + JobConf jobConf = new JobConf(cfg); + TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); + assertEquals(1, jobConf.getNumMapTasks()); + + jobConf.setNumMapTasks(10); + TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); + assertEquals(1, jobConf.getNumMapTasks()); + } + + @Test + @SuppressWarnings("deprecation") + public void shoudBeValidMapReduceEvaluation() throws Exception { + Configuration cfg = UTIL.getConfiguration(); + JobConf jobConf = new JobConf(cfg); + try { + jobConf.setJobName("process row task"); + jobConf.setNumReduceTasks(1); + TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), + ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, + jobConf); + TableMapReduceUtil.initTableReduceJob(TABLE_NAME, + ClassificatorRowReduce.class, jobConf); + RunningJob job = JobClient.runJob(jobConf); + assertTrue(job.isSuccessful()); + } finally { + if (jobConf != null) + FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); + } + } + + @Test + @SuppressWarnings("deprecation") + public void shoudBeValidMapReduceWithPartitionerEvaluation() + throws IOException { + Configuration cfg = UTIL.getConfiguration(); + JobConf jobConf = new JobConf(cfg); + try { + jobConf.setJobName("process row task"); + jobConf.setNumReduceTasks(2); + TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), + ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, + jobConf); + + TableMapReduceUtil.initTableReduceJob(TABLE_NAME, + ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class); + RunningJob job = JobClient.runJob(jobConf); + assertTrue(job.isSuccessful()); + } finally { + if (jobConf != null) + FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); + } + } + + @SuppressWarnings("deprecation") + static class ClassificatorRowReduce extends MapReduceBase implements + TableReduce { + + @Override + public void reduce(ImmutableBytesWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + String strKey = Bytes.toString(key.get()); + List result = new ArrayList(); + while (values.hasNext()) + result.add(values.next()); + + if (relation.keySet().contains(strKey)) { + Set set = relation.get(strKey); + if (set != null) { + assertEquals(set.size(), result.size()); + } else { + throwAccertionError("Test infrastructure error: set is null"); + } + } else { + throwAccertionError("Test infrastructure error: key not found in map"); + } + } + + private void throwAccertionError(String errorMessage) throws AssertionError { + throw new AssertionError(errorMessage); + } + } + + @SuppressWarnings("deprecation") + static class ClassificatorMapper extends MapReduceBase implements + TableMap { + + @Override + public void map(ImmutableBytesWritable row, Result result, + OutputCollector outCollector, + Reporter reporter) throws IOException { + String rowKey = Bytes.toString(result.getRow()); + final ImmutableBytesWritable pKey = new ImmutableBytesWritable( + Bytes.toBytes(PRESIDENT_PATTERN)); + final ImmutableBytesWritable aKey = new ImmutableBytesWritable( + Bytes.toBytes(ACTOR_PATTERN)); + ImmutableBytesWritable outKey = null; + + if (rowKey.startsWith(PRESIDENT_PATTERN)) { + outKey = pKey; + } else if (rowKey.startsWith(ACTOR_PATTERN)) { + outKey = aKey; + } else { + throw new AssertionError("unexpected rowKey"); + } + + String name = Bytes.toString(result.getValue(COLUMN_FAMILY, + COLUMN_QUALIFIER)); + outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).add( + COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name))); + } + } +}