Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (revision 1552845) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (working copy) @@ -396,7 +396,7 @@ // the configured value 2. assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); - batchWriteAndWait(table, 3, false, 10000); + batchWriteAndWait(table, 3, false, 14000); assertTrue("LowReplication Roller should've been disabled, current replication=" + ((FSHLog) log).getLogReplication(), !log.isLowReplicationRollEnabled()); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (revision 1552845) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (working copy) @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -123,7 +124,6 @@ private volatile long syncedTillHere = 0; private long lastDeferredTxid; private final Path oldLogDir; - private volatile boolean logRollRunning; private WALCoprocessorHost coprocessorHost; @@ -160,7 +160,7 @@ * This lock makes sure only one log roll runs at the same time. Should not be taken while * any other lock is held. We don't just use synchronized because that results in bogus and * tedious findbugs warning when it thinks synchronized controls writer thread safety */ - private final Object rollWriterLock = new Object(); + private final ReentrantLock rollWriterLock = new ReentrantLock(true); /** * Map of encoded region names to their most recent sequence/edit id in their memstore. @@ -479,7 +479,8 @@ @Override public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { - synchronized (rollWriterLock) { + rollWriterLock.lock(); + try { // Return if nothing to flush. if (!force && this.writer != null && this.numEntries.get() <= 0) { return null; @@ -490,7 +491,6 @@ return null; } try { - this.logRollRunning = true; if (!closeBarrier.beginOp()) { LOG.debug("HLog closing. Skipping rolling of writer"); return regionsToFlush; @@ -563,10 +563,11 @@ regionsToFlush = getRegionsToForceFlush(); } } finally { - this.logRollRunning = false; closeBarrier.endOp(); } return regionsToFlush; + } finally { + rollWriterLock.unlock(); } } @@ -1112,17 +1113,19 @@ this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); - // TODO: preserving the old behavior for now, but this check is strange. It's not - // protected by any locks here, so for all we know rolling locks might start - // as soon as we enter the "if". Is this best-effort optimization check? - if (!this.logRollRunning) { - checkLowReplication(); + boolean logRollNeeded = false; + if (rollWriterLock.tryLock()) { try { + logRollNeeded = checkLowReplication(); + } finally { + rollWriterLock.unlock(); + } + try { curLogSize = tempWriter.getLength(); - if (curLogSize > this.logrollsize) { + if (logRollNeeded || curLogSize > this.logrollsize) { requestLogRoll(); } - } catch (IOException x) { + }catch (IOException x) { LOG.debug("Log roll failed and will be retried. (This is not an error)"); } } @@ -1139,7 +1142,11 @@ @Override public void postAppend(List entries) {} - private void checkLowReplication() { + /* + * @return whether log roll should be requested + */ + private boolean checkLowReplication() { + boolean logRollNeeded = false; // if the number of replicas in HDFS has fallen below the configured // value, then roll logs. try { @@ -1152,7 +1159,7 @@ + numCurrentReplicas + " replicas but expecting no less than " + this.minTolerableReplication + " replicas. " + " Requesting close of hlog."); - requestLogRoll(); + logRollNeeded = true; // If rollWriter is requested, increase consecutiveLogRolls. Once it // is larger than lowReplicationRollLimit, disable the // LowReplication-Roller @@ -1171,7 +1178,7 @@ // So we should not enable LowReplication-Roller. If numEntries // is lower than or equals 1, we consider it as a new writer. if (this.numEntries.get() <= 1) { - return; + return logRollNeeded; } // Once the live datanode number and the replicas return to normal, // enable the LowReplication-Roller. @@ -1183,6 +1190,7 @@ LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + " still proceeding ahead..."); } + return logRollNeeded; } /**