Index: src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java (revision 684642) +++ src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java (working copy) @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scanner; import org.apache.hadoop.hbase.filter.RowFilterInterface; @@ -41,6 +42,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.StringUtils; /** * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a @@ -85,17 +87,18 @@ implements RecordReader { private byte [] startRow; private byte [] endRow; + private byte [] lastRow; private RowFilterInterface trrRowFilter; private Scanner scanner; private HTable htable; private byte [][] trrInputColumns; /** - * Build the scanner. Not done in constructor to allow for extension. + * Restart from survivable exceptions by creating a new scanner. * * @throws IOException */ - public void init() throws IOException { + public void restart(byte[] firstRow) throws IOException { if ((endRow != null) && (endRow.length > 0)) { if (trrRowFilter != null) { final Set rowFiltersSet = @@ -107,15 +110,24 @@ rowFiltersSet)); } else { this.scanner = - this.htable.getScanner(trrInputColumns, startRow, endRow); + this.htable.getScanner(trrInputColumns, firstRow, endRow); } } else { this.scanner = - this.htable.getScanner(trrInputColumns, startRow, trrRowFilter); + this.htable.getScanner(trrInputColumns, firstRow, trrRowFilter); } } /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + public void init() throws IOException { + restart(startRow); + } + + /** * @param htable the {@link HTable} to scan. */ public void setHTable(HTable htable) { @@ -190,19 +202,24 @@ /** * @param key HStoreKey as input key. * @param value MapWritable as input value - * - * Converts Scanner.next() to Text, RowResult - * * @return true if there was more data * @throws IOException */ @SuppressWarnings("unchecked") public boolean next(ImmutableBytesWritable key, RowResult value) throws IOException { - RowResult result = this.scanner.next(); + RowResult result; + try { + result = this.scanner.next(); + } catch (UnknownScannerException e) { + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + restart(lastRow); + result = this.scanner.next(); + } boolean hasMore = result != null && result.size() > 0; if (hasMore) { key.set(result.getRow()); + lastRow = key.get(); Writables.copyWritable(result, value); } return hasMore;