diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 3dcbf74..27abad5 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -45,6 +45,10 @@ implements Configurable { * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details. */ public static final String SCAN = "hbase.mapreduce.scan"; + /** Scan start row */ + public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start"; + /** Scan stop row */ + public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop"; /** Column Family to Scan */ public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family"; /** Space delimited list of columns to scan. */ @@ -106,6 +110,14 @@ implements Configurable { try { scan = new Scan(); + if (conf.get(SCAN_ROW_START) != null) { + scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START))); + } + + if (conf.get(SCAN_ROW_STOP) != null) { + scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP))); + } + if (conf.get(SCAN_COLUMNS) != null) { addColumns(scan, conf.get(SCAN_COLUMNS)); } diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java index 46e1bee..afead7d 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java @@ -308,6 +308,49 @@ public class TestTableInputFormatScan { testScan("yzy", null, "zzz"); } + @Test + public void testScanFromConfiguration() + throws IOException, InterruptedException, ClassNotFoundException { + testScanFromConfiguration("bba", "bbd", "bbc"); + } + + /** + * Tests an MR Scan initialized from properties set in the Configuration. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + private void testScanFromConfiguration(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") + + "To" + (stop != null ? stop.toUpperCase() : "Empty"); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME)); + c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY)); + c.set(KEY_STARTROW, start != null ? start : ""); + c.set(KEY_LASTROW, last != null ? last : ""); + + if (start != null) { + c.set(TableInputFormat.SCAN_ROW_START, start); + } + + if (stop != null) { + c.set(TableInputFormat.SCAN_ROW_STOP, stop); + } + + Job job = new Job(c, jobName); + job.setMapperClass(ScanMapper.class); + job.setReducerClass(ScanReducer.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(ImmutableBytesWritable.class); + job.setInputFormatClass(TableInputFormat.class); + job.setNumReduceTasks(1); + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + job.waitForCompletion(true); + assertTrue(job.isComplete()); + } + /** * Tests a MR scan using specific start and stop rows. * @@ -318,7 +361,7 @@ public class TestTableInputFormatScan { private void testScan(String start, String stop, String last) throws IOException, InterruptedException, ClassNotFoundException { String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") + - "To" + (stop != null ? stop.toUpperCase() : "Empty"); + "To" + (stop != null ? stop.toUpperCase() : "Empty"); LOG.info("Before map/reduce startup - job " + jobName); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); Scan scan = new Scan();