diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 3dcbf74..27abad5 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -45,6 +45,10 @@ implements Configurable { * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details. */ public static final String SCAN = "hbase.mapreduce.scan"; + /** Scan start row */ + public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start"; + /** Scan stop row */ + public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop"; /** Column Family to Scan */ public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family"; /** Space delimited list of columns to scan. */ @@ -106,6 +110,14 @@ implements Configurable { try { scan = new Scan(); + if (conf.get(SCAN_ROW_START) != null) { + scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START))); + } + + if (conf.get(SCAN_ROW_STOP) != null) { + scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP))); + } + if (conf.get(SCAN_COLUMNS) != null) { addColumns(scan, conf.get(SCAN_COLUMNS)); } diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java index 46e1bee..f3725b9 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java @@ -176,6 +176,7 @@ public class TestTableInputFormatScan { public void testScanEmptyToEmpty() throws IOException, InterruptedException, ClassNotFoundException { testScan(null, null, null); + testScanFromConfiguration(null, null, null); } /** @@ -189,6 +190,7 @@ public class TestTableInputFormatScan { public void testScanEmptyToAPP() throws IOException, InterruptedException, ClassNotFoundException { testScan(null, "app", "apo"); + testScanFromConfiguration(null, "app", "apo"); } /** @@ -202,6 +204,7 @@ public class TestTableInputFormatScan { public void testScanEmptyToBBA() throws IOException, InterruptedException, ClassNotFoundException { testScan(null, "bba", "baz"); + testScanFromConfiguration(null, "bba", "baz"); } /** @@ -215,6 +218,7 @@ public class TestTableInputFormatScan { public void testScanEmptyToBBB() throws IOException, InterruptedException, ClassNotFoundException { testScan(null, "bbb", "bba"); + testScanFromConfiguration(null, "bbb", "bba"); } /** @@ -228,6 +232,7 @@ public class TestTableInputFormatScan { public void testScanEmptyToOPP() throws IOException, InterruptedException, ClassNotFoundException { testScan(null, "opp", "opo"); + testScanFromConfiguration(null, "opp", "opo"); } /** @@ -241,6 +246,7 @@ public class TestTableInputFormatScan { public void testScanOBBToOPP() throws IOException, InterruptedException, ClassNotFoundException { testScan("obb", "opp", "opo"); + testScanFromConfiguration("obb", "opp", "opo"); } /** @@ -254,6 +260,7 @@ public class TestTableInputFormatScan { public void testScanOBBToQPP() throws IOException, InterruptedException, ClassNotFoundException { testScan("obb", "qpp", "qpo"); + testScanFromConfiguration("obb", "qpp", "qpo"); } /** @@ -267,6 +274,7 @@ public class TestTableInputFormatScan { public void testScanOPPToEmpty() throws IOException, InterruptedException, ClassNotFoundException { testScan("opp", null, "zzz"); + testScanFromConfiguration("opp", null, "zzz"); } /** @@ -280,6 +288,7 @@ public class TestTableInputFormatScan { public void testScanYYXToEmpty() throws IOException, InterruptedException, ClassNotFoundException { testScan("yyx", null, "zzz"); + testScanFromConfiguration("yyx", null, "zzz"); } /** @@ -293,6 +302,7 @@ public class TestTableInputFormatScan { public void testScanYYYToEmpty() throws IOException, InterruptedException, ClassNotFoundException { testScan("yyy", null, "zzz"); + testScanFromConfiguration("yyy", null, "zzz"); } /** @@ -306,6 +316,44 @@ public class TestTableInputFormatScan { public void testScanYZYToEmpty() throws IOException, InterruptedException, ClassNotFoundException { testScan("yzy", null, "zzz"); + testScanFromConfiguration("yzy", null, "zzz"); + } + + /** + * Tests an MR Scan initialized from properties set in the Configuration. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + private void testScanFromConfiguration(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") + + "To" + (stop != null ? stop.toUpperCase() : "Empty"); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME)); + c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY)); + c.set(KEY_STARTROW, start != null ? start : ""); + c.set(KEY_LASTROW, last != null ? last : ""); + + if (start != null) { + c.set(TableInputFormat.SCAN_ROW_START, start); + } + + if (stop != null) { + c.set(TableInputFormat.SCAN_ROW_STOP, stop); + } + + Job job = new Job(c, jobName); + job.setMapperClass(ScanMapper.class); + job.setReducerClass(ScanReducer.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(ImmutableBytesWritable.class); + job.setInputFormatClass(TableInputFormat.class); + job.setNumReduceTasks(1); + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + job.waitForCompletion(true); + assertTrue(job.isComplete()); } /** @@ -318,7 +366,7 @@ public class TestTableInputFormatScan { private void testScan(String start, String stop, String last) throws IOException, InterruptedException, ClassNotFoundException { String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") + - "To" + (stop != null ? stop.toUpperCase() : "Empty"); + "To" + (stop != null ? stop.toUpperCase() : "Empty"); LOG.info("Before map/reduce startup - job " + jobName); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); Scan scan = new Scan();