Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (revision 1470659) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (working copy) @@ -31,8 +31,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; +import java.lang.reflect.Method; - /** * Implementation for hdfs */ @@ -59,45 +59,67 @@ // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves // beyond that limit 'to be safe'. long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting; + // interval at which we call isFileClosed() to see if lease recovery is done + int interval = conf.getInt("hbase.lease.recovery.interval", 1000); + // retrying lease recovery may preempt pending lease recovery; default to waiting for 4 seconds + // after calling recoverLease + int waitingPeriod = conf.getInt("hbase.lease.recovery.waiting.period", 4000); boolean recovered = false; + try { + // recoverLease is asynchronous. We expect it to return true at the first call if the + // file is closed. So, it returns: + // - false when it starts the lease recovery (i.e. lease recovery not *yet* done) + // - true when the lease recovery has succeeded or the file is closed. + recovered = dfs.recoverLease(p); + LOG.info("Attempt to recoverLease on file " + p + + " returned " + recovered + ", trying for " + + (EnvironmentEdgeManager.currentTimeMillis() - startWaiting) + "ms"); + } catch (IOException e) { + if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { + // This exception comes out instead of FNFE, fix it + throw new FileNotFoundException("The given HLog wasn't found at " + p); + } + LOG.warn("Got IOException on attempt to recover lease for file " + p, e); + return; + } + Method m = null; + try { + m = DistributedFileSystem.class.getMethod("isFileClosed", new Class []{Path.class}); + } catch (Exception e) { + // ignore - the underlying hdfs doesn't have HDFS-4525 + LOG.debug("hdfs doesn't support isFileClosed"); + } + boolean fileClosed = false; int nbAttempt = 0; while (!recovered) { nbAttempt++; - try { - // recoverLease is asynchronous. We expect it to return true at the first call if the - // file is closed. So, it returns: - // - false when it starts the lease recovery (i.e. lease recovery not *yet* done - // - true when the lease recovery has succeeded or the file is closed. - recovered = dfs.recoverLease(p); - LOG.info("Attempt " + nbAttempt + " to recoverLease on file " + p + - " returned " + recovered + ", trying for " + - (EnvironmentEdgeManager.currentTimeMillis() - startWaiting) + "ms"); - } catch (IOException e) { - if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { - // This exception comes out instead of FNFE, fix it - throw new FileNotFoundException("The given HLog wasn't found at " + p); - } - LOG.warn("Got IOException on attempt " + nbAttempt + " to recover lease for file " + p + - ", retrying.", e); - } if (!recovered) { if (reporter != null && !reporter.progress()) { throw new InterruptedIOException("Operation is cancelled"); } - // try at least twice. - if (nbAttempt > 2 && recoveryTimeout < EnvironmentEdgeManager.currentTimeMillis()) { + if (recoveryTimeout < EnvironmentEdgeManager.currentTimeMillis()) { LOG.error("Can't recoverLease after " + nbAttempt + " attempts and " + (EnvironmentEdgeManager.currentTimeMillis() - startWaiting) + "ms " + " for " + p + " - continuing without the lease, but we could have a data loss."); } else { try { - Thread.sleep(nbAttempt < 3 ? 500 : 1000); + // if underlying hdfs doesn't have HDFS-4525, we wait for + // hbase.lease.recovery.waiting.period and return. + Thread.sleep(nbAttempt == 1 && m == null ? waitingPeriod : interval); } catch (InterruptedException ie) { InterruptedIOException iioe = new InterruptedIOException(); iioe.initCause(ie); throw iioe; } } + if (m == null) return; + try { + fileClosed = (Boolean) m.invoke(dfs, p); + // if lease recovery is done, we can return. + if (fileClosed) return; + } catch (Exception e) { + LOG.warn("Got Exception checking whether file is closed for " + p, e); + } } } }