Index: src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java (revision 1341041) +++ src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java (working copy) @@ -195,16 +195,16 @@ * * @throws IOException */ - static HTable createIOEScannerTable(byte[] name) throws IOException { + static HTable createIOEScannerTable(byte[] name, final int failCnt) + throws IOException { // build up a mock scanner stuff to fail the first time Answer a = new Answer() { - boolean first = true; + int cnt = 0; @Override public ResultScanner answer(InvocationOnMock invocation) throws Throwable { // first invocation return the busted mock scanner - if (first) { - first = false; + if (cnt++ < failCnt) { // create mock ResultScanner that always fails. Scan scan = mock(Scan.class); doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe @@ -230,16 +230,16 @@ * * @throws IOException */ - static HTable createDNRIOEScannerTable(byte[] name) throws IOException { + static HTable createDNRIOEScannerTable(byte[] name, final int failCnt) + throws IOException { // build up a mock scanner stuff to fail the first time Answer a = new Answer() { - boolean first = true; + int cnt = 0; @Override public ResultScanner answer(InvocationOnMock invocation) throws Throwable { // first invocation return the busted mock scanner - if (first) { - first = false; + if (cnt++ < failCnt) { // create mock ResultScanner that always fails. Scan scan = mock(Scan.class); doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe @@ -280,23 +280,46 @@ */ @Test public void testTableRecordReaderScannerFail() throws IOException { - HTable htable = createIOEScannerTable("table2".getBytes()); + HTable htable = createIOEScannerTable("table2".getBytes(), 1); runTestMapred(htable); } /** + * Run test assuming Scanner IOException failure using mapred api, + * + * @throws IOException + */ + @Test(expected = IOException.class) + public void testTableRecordReaderScannerFailTwice() throws IOException { + HTable htable = createIOEScannerTable("table3".getBytes(), 2); + runTestMapred(htable); + } + + /** * Run test assuming UnknownScannerException (which is a type of * DoNotRetryIOException) using mapred api. * * @throws DoNotRetryIOException */ - @Test(expected = DoNotRetryIOException.class) + @Test public void testTableRecordReaderScannerTimeout() throws IOException { - HTable htable = createDNRIOEScannerTable("table3".getBytes()); + HTable htable = createDNRIOEScannerTable("table4".getBytes(), 1); runTestMapred(htable); } /** + * Run test assuming UnknownScannerException (which is a type of + * DoNotRetryIOException) using mapred api. + * + * @throws DoNotRetryIOException + */ + @Test(expected = DoNotRetryIOException.class) + public void testTableRecordReaderScannerTimeoutTwice() throws IOException { + HTable htable = createDNRIOEScannerTable("table5".getBytes(), 2); + runTestMapred(htable); + } + + /** * Run test assuming no errors using newer mapreduce api * * @throws IOException @@ -318,24 +341,51 @@ @Test public void testTableRecordReaderScannerFailMapreduce() throws IOException, InterruptedException { - HTable htable = createIOEScannerTable("table2-mr".getBytes()); + HTable htable = createIOEScannerTable("table2-mr".getBytes(), 1); runTestMapreduce(htable); } /** + * Run test assuming Scanner IOException failure using newer mapreduce api + * + * @throws IOException + * @throws InterruptedException + */ + @Test(expected = IOException.class) + public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException, + InterruptedException { + HTable htable = createIOEScannerTable("table3-mr".getBytes(), 2); + runTestMapreduce(htable); + } + + /** * Run test assuming UnknownScannerException (which is a type of * DoNotRetryIOException) using newer mapreduce api * * @throws InterruptedException * @throws DoNotRetryIOException */ - @Test(expected = DoNotRetryIOException.class) + @Test public void testTableRecordReaderScannerTimeoutMapreduce() throws IOException, InterruptedException { - HTable htable = createDNRIOEScannerTable("table3-mr".getBytes()); + HTable htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1); runTestMapreduce(htable); } + /** + * Run test assuming UnknownScannerException (which is a type of + * DoNotRetryIOException) using newer mapreduce api + * + * @throws InterruptedException + * @throws DoNotRetryIOException + */ + @Test(expected = DoNotRetryIOException.class) + public void testTableRecordReaderScannerTimeoutMapreduceTwice() + throws IOException, InterruptedException { + HTable htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2); + runTestMapreduce(htable); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java (revision 1341041) +++ src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java (working copy) @@ -24,7 +24,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -206,9 +205,9 @@ rowcount = 0; } } - } catch (DoNotRetryIOException e) { - throw e; } catch (IOException e) { + // try to handle all IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown LOG.debug("recovered from " + StringUtils.stringifyException(e)); if (lastSuccessfulRow == null) { LOG.warn("We are restarting the first next() invocation," + Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java (revision 1341041) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java (working copy) @@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -65,6 +64,7 @@ private Result value = null; private TaskAttemptContext context = null; private Method getCounter = null; + private long numRestarts = 0; private long timestamp; private int rowcount; private boolean logScannerActivity = false; @@ -202,9 +202,9 @@ rowcount = 0; } } - } catch (DoNotRetryIOException e) { - throw e; } catch (IOException e) { + // try to handle all IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown LOG.info("recovered from " + StringUtils.stringifyException(e)); if (lastSuccessfulRow == null) { LOG.warn("We are restarting the first next() invocation," + @@ -219,6 +219,7 @@ scanner.next(); // skip presumed already mapped row } value = scanner.next(); + numRestarts++; } if (value != null && value.size() > 0) { key.set(value.getRow()); @@ -274,6 +275,8 @@ HBASE_COUNTER_GROUP_NAME, mlv.getName()); ct.increment(mlv.getCurrentIntervalValue()); } + ((Counter) this.getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, + "NUM_SCANNER_RESTARTS")).increment(numRestarts); } catch (Exception e) { LOG.debug("can't update counter." + StringUtils.stringifyException(e)); }