Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java (revision 1198524) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java (working copy) @@ -129,10 +129,11 @@ /** * Closes the split. * - * + * @throws IOException When closing the table fails. */ - public void close() { + public void close() throws IOException{ this.scanner.close(); + this.htable.close(); } /** Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java (revision 1198524) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java (working copy) @@ -70,10 +70,11 @@ /** * Closes the split. * + * @throws IOException When closing the table fails. * @see org.apache.hadoop.mapreduce.RecordReader#close() */ @Override - public void close() { + public void close() throws IOException { this.recordReaderImpl.close(); } Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (revision 1198524) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (working copy) @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -36,6 +37,7 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.StringUtils; /** * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an @@ -47,10 +49,8 @@ * class ExampleTIF extends TableInputFormatBase implements JobConfigurable { * * public void configure(JobConf job) { - * HTable exampleTable = new HTable(HBaseConfiguration.create(job), - * Bytes.toBytes("exampleTable")); * // mandatory - * setHTable(exampleTable); + * setTableName("exampleTable"); * Text[] inputColumns = new byte [][] { Bytes.toBytes("columnA"), * Bytes.toBytes("columnB") }; * // mandatory @@ -73,7 +73,7 @@ /** Holds the details for the internal scanner. */ private Scan scan = null; /** The table to scan. */ - private HTable table = null; + private String tableName = null; /** The reader scanning the table, can be a custom one. */ private TableRecordReader tableRecordReader = null; @@ -94,11 +94,16 @@ public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { - if (table == null) { + HTable table; + try { + table = new HTable(new Configuration(context.getConfiguration()), tableName); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); throw new IOException("Cannot create a record reader because of a" + " previous error. Please look at the previous logs lines from" + " the task's full log for more details."); } + TableSplit tSplit = (TableSplit) split; TableRecordReader trr = this.tableRecordReader; // if no table record reader was provided use default @@ -125,44 +130,49 @@ */ @Override public List getSplits(JobContext context) throws IOException { - if (table == null) { - throw new IOException("No table was provided."); - } - Pair keys = table.getStartEndKeys(); - if (keys == null || keys.getFirst() == null || - keys.getFirst().length == 0) { - throw new IOException("Expecting at least one region."); - } - int count = 0; - List splits = new ArrayList(keys.getFirst().length); - for (int i = 0; i < keys.getFirst().length; i++) { - if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { - continue; + if (tableName == null) { + throw new IOException("No table name was provided."); + } + HTable table = new HTable(new Configuration(context.getConfiguration()), tableName); + try { + Pair keys = table.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || + keys.getFirst().length == 0) { + throw new IOException("Expecting at least one region."); } - String regionLocation = table.getRegionLocation(keys.getFirst()[i]). - getHostname(); - byte[] startRow = scan.getStartRow(); - byte[] stopRow = scan.getStopRow(); - // determine if the given start an stop key fall into the region - if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || - Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && - (stopRow.length == 0 || - Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { - byte[] splitStart = startRow.length == 0 || - Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? - keys.getFirst()[i] : startRow; - byte[] splitStop = (stopRow.length == 0 || - Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && - keys.getSecond()[i].length > 0 ? - keys.getSecond()[i] : stopRow; - InputSplit split = new TableSplit(table.getTableName(), - splitStart, splitStop, regionLocation); - splits.add(split); - if (LOG.isDebugEnabled()) - LOG.debug("getSplits: split -> " + (count++) + " -> " + split); + int count = 0; + List splits = new ArrayList(keys.getFirst().length); + for (int i = 0; i < keys.getFirst().length; i++) { + if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + String regionLocation = table.getRegionLocation(keys.getFirst()[i]). + getHostname(); + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { + byte[] splitStart = startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? + keys.getFirst()[i] : startRow; + byte[] splitStop = (stopRow.length == 0 || + Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && + keys.getSecond()[i].length > 0 ? + keys.getSecond()[i] : stopRow; + InputSplit split = new TableSplit(table.getTableName(), + splitStart, splitStop, regionLocation); + splits.add(split); + if (LOG.isDebugEnabled()) + LOG.debug("getSplits: split -> " + (count++) + " -> " + split); + } } + return splits; + } finally { + table.close(); } - return splits; } /** @@ -192,19 +202,19 @@ } /** - * Allows subclasses to get the {@link HTable}. + * Allows subclasses to get the table name. */ - protected HTable getHTable() { - return this.table; + protected String getTableName() { + return this.tableName; } /** - * Allows subclasses to set the {@link HTable}. + * Allows subclasses to set the table name. * - * @param table The table to get the data from. + * @param tableName The table to get the data from. */ - protected void setHTable(HTable table) { - this.table = table; + protected void setTableName(String tableName) { + this.tableName = tableName; } /** Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (revision 1198524) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (working copy) @@ -86,12 +86,7 @@ @Override public void setConf(Configuration configuration) { this.conf = configuration; - String tableName = conf.get(INPUT_TABLE); - try { - setHTable(new HTable(new Configuration(conf), tableName)); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } + setTableName(conf.get(INPUT_TABLE)); Scan scan = null;