Index: src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (revision 894947) +++ src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (working copy) @@ -38,10 +38,24 @@ private final Log LOG = LogFactory.getLog(TableInputFormat.class); - /** Job parameter that specifies the output table. */ + /** Job parameter that specifies the input table. */ public static final String INPUT_TABLE = "hbase.mapreduce.inputtable"; - /** Space delimited list of columns. */ + /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified. */ public static final String SCAN = "hbase.mapreduce.scan"; + /** Space delimited list of columns to scan. */ + public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns"; + /** The timestamp used to filter columns with a specific timestamp. */ + public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp"; + /** The starting timestamp used to filter columns with a specific range of versions. */ + public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start"; + /** The ending timestamp used to filter columns with a specific range of versions. */ + public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end"; + /** The maximum number of version to return. */ + public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions"; + /** Set to false to disable server-side caching of blocks for this scan. */ + public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks"; + /** The number of rows for caching that will be passed to scanners. */ + public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows"; /** The configuration. */ private Configuration conf = null; @@ -74,12 +88,49 @@ } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); } + Scan scan = null; - try { - scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); - } catch (IOException e) { - LOG.error("An error occurred.", e); + + if (conf.get(SCAN) != null) { + try { + scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); + } catch (IOException e) { + LOG.error("An error occurred.", e); + } + } else { + try { + scan = new Scan(); + + if (conf.get(SCAN_COLUMNS) != null) { + scan.addColumns(conf.get(SCAN_COLUMNS)); + } + + if (conf.get(SCAN_TIMESTAMP) != null) { + scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); + } + + if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) { + scan.setTimeRange( + Long.parseLong(conf.get(SCAN_TIMERANGE_START)), + Long.parseLong(conf.get(SCAN_TIMERANGE_END))); + } + + if (conf.get(SCAN_MAXVERSIONS) != null) { + scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS))); + } + + if (conf.get(SCAN_CACHEBLOCKS) != null) { + scan.setCacheBlocks(Boolean.parseBoolean(conf.get(SCAN_CACHEBLOCKS))); + } + + if (conf.get(SCAN_CACHEDROWS) != null) { + scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); + } + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } } + setScan(scan); }