Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1146987) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -39,6 +39,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -199,7 +200,11 @@ // This lock prevents starting a log roll during a cache flush. // synchronized is insufficient because a cache flush spans two method calls. private final Lock cacheFlushLock = new ReentrantLock(); - + + // The waiting time for log-roller trying to get the lock of cacheFlushLock. + // If the actual waiting time is longer than it, skip the current log roll. + private final long tryLockWaitingTime; + // We synchronize on updateLock to prevent updates and to prevent a log roll // during an update // locked during appends @@ -345,6 +350,8 @@ this.logrollsize = (long)(this.blocksize * multi); this.optionalFlushInterval = conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000); + this.tryLockWaitingTime = + conf.getLong("hbase.regionserver.logroll.trylock.waitingtime", 5000); if (failIfLogDirExists && fs.exists(dir)) { throw new IOException("Target HLog directory already exists: " + dir); } @@ -469,66 +476,87 @@ return null; } byte [][] regionsToFlush = null; - this.cacheFlushLock.lock(); try { - if (closed) { - return regionsToFlush; - } - // Do all the preparation outside of the updateLock to block - // as less as possible the incoming writes - long currentFilenum = this.filenum; - this.filenum = System.currentTimeMillis(); - Path newPath = computeFilename(); - HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); - int nextInitialReplication = fs.getFileStatus(newPath).getReplication(); - // Can we get at the dfsclient outputstream? If an instance of - // SFLW, it'll have done the necessary reflection to get at the - // protected field name. - OutputStream nextHdfsOut = null; - if (nextWriter instanceof SequenceFileLogWriter) { - nextHdfsOut = - ((SequenceFileLogWriter)nextWriter).getDFSCOutputStream(); - } - // Tell our listeners that a new log was created - if (!this.listeners.isEmpty()) { - for (WALObserver i : this.listeners) { - i.logRolled(newPath); - } - } - - synchronized (updateLock) { - // Clean up current writer. - Path oldFile = cleanupCurrentWriter(currentFilenum); - this.writer = nextWriter; - this.initialReplication = nextInitialReplication; - this.hdfs_out = nextHdfsOut; - - LOG.info((oldFile != null? - "Roll " + FSUtils.getPath(oldFile) + ", entries=" + - this.numEntries.get() + - ", filesize=" + - this.fs.getFileStatus(oldFile).getLen() + ". ": "") + - "New hlog " + FSUtils.getPath(newPath)); - this.numEntries.set(0); - this.logRollRequested = false; - } - // Can we delete any of the old log files? - if (this.outputfiles.size() > 0) { - if (this.lastSeqWritten.isEmpty()) { - LOG.debug("Last sequenceid written is empty. Deleting all old hlogs"); - // If so, then no new writes have come in since all regions were - // flushed (and removed from the lastSeqWritten map). Means can - // remove all but currently open log file. - for (Map.Entry e : this.outputfiles.entrySet()) { - archiveLogFile(e.getValue(), e.getKey()); + if (this.cacheFlushLock.tryLock(this.tryLockWaitingTime, + TimeUnit.MILLISECONDS)) { + this.logRollRequested = true; + try { + if (closed) { + return regionsToFlush; } - this.outputfiles.clear(); - } else { - regionsToFlush = cleanOldLogs(); + // Do all the preparation outside of the updateLock to block + // as less as possible the incoming writes + long currentFilenum = this.filenum; + this.filenum = System.currentTimeMillis(); + Path newPath = computeFilename(); + HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); + + //This method get expect but not the actual replicas of the Hlog file + int nextExpectReplicas = fs.getFileStatus(newPath).getReplication(); + + //Get the current replicas of the Hlog file + int nextActualReplicas = -1; + try + { + nextActualReplicas = getLogReplication(); + } catch (Exception e) { + LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + + " still proceeding ahead..."); + } + // Can we get at the dfsclient outputstream? If an instance of + // SFLW, it'll have done the necessary reflection to get at the + // protected field name. + OutputStream nextHdfsOut = null; + if (nextWriter instanceof SequenceFileLogWriter) { + nextHdfsOut = + ((SequenceFileLogWriter)nextWriter).getDFSCOutputStream(); + } + // Tell our listeners that a new log was created + if (!this.listeners.isEmpty()) { + for (WALObserver i : this.listeners) { + i.logRolled(newPath); + } + } + + synchronized (updateLock) { + // Clean up current writer. + Path oldFile = cleanupCurrentWriter(currentFilenum); + this.writer = nextWriter; + this.initialReplication = nextActualReplicas == -1 ? + nextExpectReplicas : nextActualReplicas; + this.hdfs_out = nextHdfsOut; + + LOG.info((oldFile != null? + "Roll " + FSUtils.getPath(oldFile) + ", entries=" + + this.numEntries.get() + + ", filesize=" + + this.fs.getFileStatus(oldFile).getLen() + ". ": "") + + "New hlog " + FSUtils.getPath(newPath)); + this.numEntries.set(0); + this.logRollRequested = false; + } + // Can we delete any of the old log files? + if (this.outputfiles.size() > 0) { + if (this.lastSeqWritten.isEmpty()) { + LOG.debug("Last sequenceid written is empty. Deleting all old hlogs"); + // If so, then no new writes have come in since all regions were + // flushed (and removed from the lastSeqWritten map). Means can + // remove all but currently open log file. + for (Map.Entry e : this.outputfiles.entrySet()) { + archiveLogFile(e.getValue(), e.getKey()); + } + this.outputfiles.clear(); + } else { + regionsToFlush = cleanOldLogs(); + } + } + } finally { + this.cacheFlushLock.unlock(); } } - } finally { - this.cacheFlushLock.unlock(); + } catch (InterruptedException e) { + LOG.warn("Interrupted rollWriter", e); + Thread.currentThread().interrupt(); } return regionsToFlush; } @@ -1009,7 +1037,6 @@ this.initialReplication + " replicas. " + " Requesting close of hlog."); requestLogRoll(); - logRollRequested = true; } } catch (Exception e) { LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +