diff --git src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java index 26a6f61..7fe273b 100644 --- src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java +++ src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapred; import java.io.File; import java.io.IOException; +import java.util.Iterator; import java.util.Map; import java.util.NavigableMap; @@ -28,7 +29,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -42,11 +42,15 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; + /** * Test Map/Reduce job over HBase tables. The map/reduce process we're testing * on our tables is simple - take every row in the table, reverse the value of @@ -58,7 +62,7 @@ public class TestTableMapReduce { LogFactory.getLog(TestTableMapReduce.class.getName()); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - static final String MULTI_REGION_TABLE_NAME = "mrtest"; + static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest"); static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); @@ -69,12 +73,10 @@ public class TestTableMapReduce { @BeforeClass public static void beforeClass() throws Exception { - HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME); - desc.addFamily(new HColumnDescriptor(INPUT_FAMILY)); - desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY)); UTIL.startMiniCluster(); - HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); - admin.createTable(desc, HBaseTestingUtility.KEYS); + HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY}); + UTIL.createMultiRegions(table, INPUT_FAMILY); + UTIL.loadTable(table, INPUT_FAMILY); UTIL.startMiniMapReduceCluster(); } @@ -150,7 +152,8 @@ public class TestTableMapReduce { IdentityTableReduce.class, jobConf); LOG.info("Started " + Bytes.toString(table.getTableName())); - JobClient.runJob(jobConf); + RunningJob job = JobClient.runJob(jobConf); + assertTrue(job.isSuccessful()); LOG.info("After map/reduce completion"); // verify map-reduce results @@ -184,7 +187,7 @@ public class TestTableMapReduce { // continue } } - org.junit.Assert.assertTrue(verified); + assertTrue(verified); } /** @@ -199,7 +202,10 @@ public class TestTableMapReduce { TableInputFormat.addColumns(scan, columns); ResultScanner scanner = table.getScanner(scan); try { - for (Result r : scanner) { + Iterator itr = scanner.iterator(); + assertTrue(itr.hasNext()); + while(itr.hasNext()) { + Result r = itr.next(); if (LOG.isDebugEnabled()) { if (r.size() > 2 ) { throw new IOException("Too many results, expected 2 got " + @@ -247,7 +253,7 @@ public class TestTableMapReduce { r.getRow() + ", first value=" + first + ", second value=" + second); } - org.junit.Assert.fail(); + fail(); } } } finally { diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java deleted file mode 100644 index cc5b1df..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.File; -import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.fail; -import static org.junit.Assert.assertTrue; - -/** - * Test Map/Reduce job over HBase tables. The map/reduce process we're testing - * on our tables is simple - take every row in the table, reverse the value of - * a particular cell, and write it back to the table. - */ -@Category(LargeTests.class) -public class TestMulitthreadedTableMapper { - private static final Log LOG = LogFactory.getLog(TestMulitthreadedTableMapper.class); - private static final HBaseTestingUtility UTIL = - new HBaseTestingUtility(); - static final String MULTI_REGION_TABLE_NAME = "mrtest"; - static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); - static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); - static final int NUMBER_OF_THREADS = 10; - - @BeforeClass - public static void beforeClass() throws Exception { - HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME); - desc.addFamily(new HColumnDescriptor(INPUT_FAMILY)); - desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY)); - UTIL.startMiniCluster(); - HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); - admin.createTable(desc, HBaseTestingUtility.KEYS); - UTIL.startMiniMapReduceCluster(); - } - - @AfterClass - public static void afterClass() throws Exception { - UTIL.shutdownMiniMapReduceCluster(); - UTIL.shutdownMiniCluster(); - } - - /** - * Pass the given key and processed record reduce - */ - public static class ProcessContentsMapper - extends TableMapper { - - /** - * Pass the key, and reversed value to reduce - * - * @param key - * @param value - * @param context - * @throws IOException - */ - public void map(ImmutableBytesWritable key, Result value, - Context context) - throws IOException, InterruptedException { - if (value.size() != 1) { - throw new IOException("There should only be one input column"); - } - Map>> - cf = value.getMap(); - if(!cf.containsKey(INPUT_FAMILY)) { - throw new IOException("Wrong input columns. Missing: '" + - Bytes.toString(INPUT_FAMILY) + "'."); - } - // Get the original value and reverse it - String originalValue = new String(value.getValue(INPUT_FAMILY, null), - HConstants.UTF8_ENCODING); - StringBuilder newValue = new StringBuilder(originalValue); - newValue.reverse(); - // Now set the value to be collected - Put outval = new Put(key.get()); - outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); - context.write(key, outval); - } - } - - /** - * Test multithreadedTableMappper map/reduce against a multi-region table - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testMultithreadedTableMapper() - throws IOException, InterruptedException, ClassNotFoundException { - runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()), - MULTI_REGION_TABLE_NAME)); - } - - private void runTestOnTable(HTable table) - throws IOException, InterruptedException, ClassNotFoundException { - Job job = null; - try { - LOG.info("Before map/reduce startup"); - job = new Job(table.getConfiguration(), "process column contents"); - job.setNumReduceTasks(1); - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILY); - TableMapReduceUtil.initTableMapperJob( - Bytes.toString(table.getTableName()), scan, - MultithreadedTableMapper.class, ImmutableBytesWritable.class, - Put.class, job); - MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class); - MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS); - TableMapReduceUtil.initTableReducerJob( - Bytes.toString(table.getTableName()), - IdentityTableReducer.class, job); - FileOutputFormat.setOutputPath(job, new Path("test")); - LOG.info("Started " + Bytes.toString(table.getTableName())); - job.waitForCompletion(true); - LOG.info("After map/reduce completion"); - // verify map-reduce results - verify(Bytes.toString(table.getTableName())); - } finally { - table.close(); - if (job != null) { - FileUtil.fullyDelete( - new File(job.getConfiguration().get("hadoop.tmp.dir"))); - } - } - } - - private void verify(String tableName) throws IOException { - HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName); - boolean verified = false; - long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); - int numRetries = UTIL.getConfiguration().getInt("hbase.client.retries.number", 5); - for (int i = 0; i < numRetries; i++) { - try { - LOG.info("Verification attempt #" + i); - verifyAttempt(table); - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - LOG.debug("Verification attempt failed: " + e.getMessage()); - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - assertTrue(verified); - table.close(); - } - - /** - * Looks at every value of the mapreduce output and verifies that indeed - * the values have been reversed. - * - * @param table Table to scan. - * @throws IOException - * @throws NullPointerException if we failed to find a cell value - */ - private void verifyAttempt(final HTable table) - throws IOException, NullPointerException { - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILY); - scan.addFamily(OUTPUT_FAMILY); - ResultScanner scanner = table.getScanner(scan); - try { - for (Result r : scanner) { - if (LOG.isDebugEnabled()) { - if (r.size() > 2 ) { - throw new IOException("Too many results, expected 2 got " + - r.size()); - } - } - byte[] firstValue = null; - byte[] secondValue = null; - int count = 0; - for(KeyValue kv : r.list()) { - if (count == 0) { - firstValue = kv.getValue(); - }else if (count == 1) { - secondValue = kv.getValue(); - }else if (count == 2) { - break; - } - count++; - } - String first = ""; - if (firstValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": first value is null"); - } - first = new String(firstValue, HConstants.UTF8_ENCODING); - String second = ""; - if (secondValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": second value is null"); - } - byte[] secondReversed = new byte[secondValue.length]; - for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { - secondReversed[i] = secondValue[j]; - } - second = new String(secondReversed, HConstants.UTF8_ENCODING); - if (first.compareTo(second) != 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("second key is not the reverse of first. row=" + - Bytes.toStringBinary(r.getRow()) + ", first value=" + first + - ", second value=" + second); - } - fail(); - } - } - } finally { - scanner.close(); - } - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java new file mode 100644 index 0000000..f77b7eb --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; + +/** + * Test Map/Reduce job over HBase tables. The map/reduce process we're testing + * on our tables is simple - take every row in the table, reverse the value of + * a particular cell, and write it back to the table. + */ +@Category(LargeTests.class) +public class TestMultithreadedTableMapper { + private static final Log LOG = LogFactory.getLog(TestMultithreadedTableMapper.class); + private static final HBaseTestingUtility UTIL = + new HBaseTestingUtility(); + static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest"); + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); + static final int NUMBER_OF_THREADS = 10; + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY}); + UTIL.createMultiRegions(table, INPUT_FAMILY); + UTIL.loadTable(table, INPUT_FAMILY); + UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniMapReduceCluster(); + UTIL.shutdownMiniCluster(); + } + + /** + * Pass the given key and processed record reduce + */ + public static class ProcessContentsMapper + extends TableMapper { + + /** + * Pass the key, and reversed value to reduce + * + * @param key + * @param value + * @param context + * @throws IOException + */ + public void map(ImmutableBytesWritable key, Result value, + Context context) + throws IOException, InterruptedException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map>> + cf = value.getMap(); + if(!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + // Get the original value and reverse it + String originalValue = new String(value.getValue(INPUT_FAMILY, null), + HConstants.UTF8_ENCODING); + StringBuilder newValue = new StringBuilder(originalValue); + newValue.reverse(); + // Now set the value to be collected + Put outval = new Put(key.get()); + outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); + context.write(key, outval); + } + } + + /** + * Test multithreadedTableMappper map/reduce against a multi-region table + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testMultithreadedTableMapper() + throws IOException, InterruptedException, ClassNotFoundException { + runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()), + MULTI_REGION_TABLE_NAME)); + } + + private void runTestOnTable(HTable table) + throws IOException, InterruptedException, ClassNotFoundException { + Job job = null; + try { + LOG.info("Before map/reduce startup"); + job = new Job(table.getConfiguration(), "process column contents"); + job.setNumReduceTasks(1); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + TableMapReduceUtil.initTableMapperJob( + Bytes.toString(table.getTableName()), scan, + MultithreadedTableMapper.class, ImmutableBytesWritable.class, + Put.class, job); + MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class); + MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS); + TableMapReduceUtil.initTableReducerJob( + Bytes.toString(table.getTableName()), + IdentityTableReducer.class, job); + FileOutputFormat.setOutputPath(job, new Path("test")); + LOG.info("Started " + Bytes.toString(table.getTableName())); + assertTrue(job.waitForCompletion(true)); + LOG.info("After map/reduce completion"); + // verify map-reduce results + verify(Bytes.toString(table.getTableName())); + } finally { + table.close(); + if (job != null) { + FileUtil.fullyDelete( + new File(job.getConfiguration().get("hadoop.tmp.dir"))); + } + } + } + + private void verify(String tableName) throws IOException { + HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName); + boolean verified = false; + long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); + int numRetries = UTIL.getConfiguration().getInt("hbase.client.retries.number", 5); + for (int i = 0; i < numRetries; i++) { + try { + LOG.info("Verification attempt #" + i); + verifyAttempt(table); + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + LOG.debug("Verification attempt failed: " + e.getMessage()); + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + assertTrue(verified); + table.close(); + } + + /** + * Looks at every value of the mapreduce output and verifies that indeed + * the values have been reversed. + * + * @param table Table to scan. + * @throws IOException + * @throws NullPointerException if we failed to find a cell value + */ + private void verifyAttempt(final HTable table) + throws IOException, NullPointerException { + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + scan.addFamily(OUTPUT_FAMILY); + ResultScanner scanner = table.getScanner(scan); + try { + Iterator itr = scanner.iterator(); + assertTrue(itr.hasNext()); + while(itr.hasNext()) { + Result r = itr.next(); + if (LOG.isDebugEnabled()) { + if (r.size() > 2 ) { + throw new IOException("Too many results, expected 2 got " + + r.size()); + } + } + byte[] firstValue = null; + byte[] secondValue = null; + int count = 0; + for(KeyValue kv : r.list()) { + if (count == 0) { + firstValue = kv.getValue(); + }else if (count == 1) { + secondValue = kv.getValue(); + }else if (count == 2) { + break; + } + count++; + } + String first = ""; + if (firstValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": first value is null"); + } + first = new String(firstValue, HConstants.UTF8_ENCODING); + String second = ""; + if (secondValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": second value is null"); + } + byte[] secondReversed = new byte[secondValue.length]; + for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { + secondReversed[i] = secondValue[j]; + } + second = new String(secondReversed, HConstants.UTF8_ENCODING); + if (first.compareTo(second) != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("second key is not the reverse of first. row=" + + Bytes.toStringBinary(r.getRow()) + ", first value=" + first + + ", second value=" + second); + } + fail(); + } + } + } finally { + scanner.close(); + } + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} + diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java index 9268d6d..b351444 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.File; import java.io.IOException; +import java.util.Iterator; import java.util.Map; import java.util.NavigableMap; @@ -30,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -59,18 +59,16 @@ public class TestTableMapReduce { private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - static final String MULTI_REGION_TABLE_NAME = "mrtest"; + static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest"); static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); @BeforeClass public static void beforeClass() throws Exception { - HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME); - desc.addFamily(new HColumnDescriptor(INPUT_FAMILY)); - desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY)); UTIL.startMiniCluster(); - HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); - admin.createTable(desc, HBaseTestingUtility.KEYS); + HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY}); + UTIL.createMultiRegions(table, INPUT_FAMILY); + UTIL.loadTable(table, INPUT_FAMILY); UTIL.startMiniMapReduceCluster(); } @@ -150,7 +148,7 @@ public class TestTableMapReduce { IdentityTableReducer.class, job); FileOutputFormat.setOutputPath(job, new Path("test")); LOG.info("Started " + Bytes.toString(table.getTableName())); - job.waitForCompletion(true); + assertTrue(job.waitForCompletion(true)); LOG.info("After map/reduce completion"); // verify map-reduce results @@ -204,7 +202,10 @@ public class TestTableMapReduce { scan.addFamily(OUTPUT_FAMILY); ResultScanner scanner = table.getScanner(scan); try { - for (Result r : scanner) { + Iterator itr = scanner.iterator(); + assertTrue(itr.hasNext()); + while(itr.hasNext()) { + Result r = itr.next(); if (LOG.isDebugEnabled()) { if (r.size() > 2 ) { throw new IOException("Too many results, expected 2 got " +