Index: src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (revision 1470693) +++ src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (working copy) @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -71,27 +72,42 @@ // Trying recovery boolean recovered = false; long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 300000); + // interval at which we call isFileClosed() to see if lease recovery is done + int interval = conf.getInt("hbase.lease.recovery.interval", 1000); + // retrying lease recovery may preempt pending lease recovery; default to waiting for 4 seconds + // after calling recoverLease + int waitingPeriod = conf.getInt("hbase.lease.recovery.waiting.period", 4000); // conf parameter passed from unit test, indicating whether fs.append() should be triggered boolean triggerAppend = conf.getBoolean(TEST_TRIGGER_DFS_APPEND, false); Exception ex = null; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + if (triggerAppend) throw new IOException(); + try { + recovered = (Boolean) DistributedFileSystem.class.getMethod( + "recoverLease", new Class[] { Path.class }).invoke(dfs, p); + if (!recovered) LOG.debug("recoverLease returned false"); + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw (IOException) ite.getCause(); + } + } catch (Exception e) { + LOG.debug("Failed fs.recoverLease invocation, " + e.toString() + + ", trying fs.append instead"); + ex = e; + } + Method m = null; + try { + m = DistributedFileSystem.class.getMethod("isFileClosed", new Class []{Path.class}); + } catch (Exception e) { + // ignore - the underlying hdfs doesn't have HDFS-4525 + LOG.debug("hdfs doesn't support isFileClosed"); + } + boolean fileClosed = false; + int nbAttempt = 0; while (!recovered) { + nbAttempt++; try { - try { - DistributedFileSystem dfs = (DistributedFileSystem) fs; - if (triggerAppend) throw new IOException(); - try { - recovered = (Boolean) DistributedFileSystem.class.getMethod( - "recoverLease", new Class[] { Path.class }).invoke(dfs, p); - if (!recovered) LOG.debug("recoverLease returned false"); - } catch (InvocationTargetException ite) { - // function was properly called, but threw it's own exception - throw (IOException) ite.getCause(); - } - } catch (Exception e) { - LOG.debug("Failed fs.recoverLease invocation, " + e.toString() + - ", trying fs.append instead"); - ex = e; - } if (ex != null || System.currentTimeMillis() - startWaiting > recoveryTimeout) { LOG.debug("trying fs.append for " + p + " with " + ex); ex = null; // assume the following append() call would succeed @@ -123,13 +139,21 @@ } } try { - Thread.sleep(1000); + Thread.sleep(nbAttempt == 1 && m == null ? waitingPeriod : interval); } catch (InterruptedException ie) { InterruptedIOException iioe = new InterruptedIOException(); iioe.initCause(ie); throw iioe; } + if (m == null) return; + try { + fileClosed = (Boolean) m.invoke(dfs, p); + // if lease recovery is done, we can return. + if (fileClosed) break; + } catch (Exception e) { + LOG.warn("Got Exception checking whether file is closed for " + p, e); + } } - LOG.info("Finished lease recover attempt for " + p); + LOG.info("Finished lease recovery attempt for " + p); } }