Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1154812) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -129,9 +129,8 @@ private final long blocksize; private final String prefix; private final Path oldLogDir; - private boolean logRollRequested; + private boolean logRollRunning; - private static Class logWriterClass; private static Class logReaderClass; @@ -140,7 +139,10 @@ } private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer - private int initialReplication; // initial replication factor of SequenceFile.writer + + //Minimum tolerable replicas, if the actual value is less than it, rollWriter would be triggered + private int minTolerableReplication; + private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas final static Object [] NO_ARGS = new Object []{}; @@ -189,9 +191,21 @@ // The timestamp (in ms) when the log file was created. private volatile long filenum = -1; - //number of transactions in the current Hlog. + // number of transactions in the current Hlog. private final AtomicInteger numEntries = new AtomicInteger(0); + // If live datanode count was less than the default replicas value, + // RollWriter will be triggered in each sync(So the RollWriter could be + // triggered one by one in a short time). Using it as a workaround to slow + // down the roll frequency triggered by checkLowReplication(). + private volatile int consecutiveLogRolls = 0; + private final int fewerReplicasRollLimit; + + // If consecutiveLogRolls was larger than fewerReplicasRollLimit, + // then disabled the rolling from checkLowReplication(). + // Enable it if the replications recovered. + private volatile boolean fewerReplicasRollEnabled = true; + // If > than this size, roll the log. This is typically 0.95 times the size // of the default Hdfs block size. private final long logrollsize; @@ -358,6 +372,12 @@ } } this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); + + this.minTolerableReplication = + conf.getInt("hbase.regionserver.hlog.mintolerablereplicas", + this.fs.getDefaultReplication()); + this.fewerReplicasRollLimit = + conf.getInt("hbase.regionserver.hlog.fewerreplicasrollLimit", 5); this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true); LOG.info("HLog configuration: blocksize=" + StringUtils.byteDesc(this.blocksize) + @@ -468,19 +488,20 @@ if (this.writer != null && this.numEntries.get() <= 0) { return null; } + byte [][] regionsToFlush = null; this.cacheFlushLock.lock(); try { if (closed) { return regionsToFlush; } + this.logRollRunning = true; // Do all the preparation outside of the updateLock to block // as less as possible the incoming writes long currentFilenum = this.filenum; this.filenum = System.currentTimeMillis(); Path newPath = computeFilename(); HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); - int nextInitialReplication = fs.getFileStatus(newPath).getReplication(); // Can we get at the dfsclient outputstream? If an instance of // SFLW, it'll have done the necessary reflection to get at the // protected field name. @@ -500,7 +521,6 @@ // Clean up current writer. Path oldFile = cleanupCurrentWriter(currentFilenum); this.writer = nextWriter; - this.initialReplication = nextInitialReplication; this.hdfs_out = nextHdfsOut; LOG.info((oldFile != null? @@ -510,7 +530,7 @@ this.fs.getFileStatus(oldFile).getLen() + ". ": "") + "New hlog " + FSUtils.getPath(newPath)); this.numEntries.set(0); - this.logRollRequested = false; + this.logRollRunning = false; } // Can we delete any of the old log files? if (this.outputfiles.size() > 0) { @@ -533,6 +553,7 @@ return regionsToFlush; } + /** * This method allows subclasses to inject different writers without having to * extend other methods like rollWriter(). @@ -982,7 +1003,8 @@ synchronized (this.updateLock) { syncTime += System.currentTimeMillis() - now; syncOps++; - if (!logRollRequested) { + + if (!logRollRunning) { checkLowReplication(); if (this.writer.getLength() > this.logrollsize) { requestLogRoll(); @@ -998,18 +1020,44 @@ } private void checkLowReplication() { - // if the number of replicas in HDFS has fallen below the initial + // if the number of replicas in HDFS has fallen below the configured // value, then roll logs. try { int numCurrentReplicas = getLogReplication(); + if (numCurrentReplicas != 0 && - numCurrentReplicas < this.initialReplication) { - LOG.warn("HDFS pipeline error detected. " + - "Found " + numCurrentReplicas + " replicas but expecting " + - this.initialReplication + " replicas. " + - " Requesting close of hlog."); - requestLogRoll(); - logRollRequested = true; + numCurrentReplicas < this.minTolerableReplication) { + if (this.fewerReplicasRollEnabled) { + if (this.consecutiveLogRolls < this.fewerReplicasRollLimit) { + LOG.warn("HDFS pipeline error detected. " + + "Found " + numCurrentReplicas + " replicas but expecting no less than " + + this.minTolerableReplication + " replicas. " + + " Requesting close of hlog."); + requestLogRoll(); + // If rollWriter was requested, increase consecutiveLogRolls. Once it's + // larger than fewerReplicasRollLimit, disable FewerReplicas-Roller + this.consecutiveLogRolls++; + } else { + LOG.warn("Too many consecutive RollWriter requests, it's a sign of " + + "the live datanodes count is less than the minimum tolerable replicas."); + this.consecutiveLogRolls = 0; + this.fewerReplicasRollEnabled = false; + } + } + } else if (numCurrentReplicas >= this.minTolerableReplication) { + + if (!this.fewerReplicasRollEnabled) { + // The new writer's log replicas is always the default value. + // So we should not enable FewerReplicas-Roller. If numEntries + // was less than or equals 1, we consider it as a new writer. + if (this.numEntries.get() <= 1) { + return; + } + // Once the live datanode number and the replicas return to normal, + // enable FewerReplicas-Roller. + this.fewerReplicasRollEnabled = true; + LOG.info("FewerReplicas-Roller was enabled."); + } } } catch (Exception e) { LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +