diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 07924e6..f4e9ae4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.NavigableSet; +import java.util.Random; import java.util.RandomAccess; import java.util.Set; import java.util.TreeMap; @@ -569,8 +570,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final RegionServerServices rsServices; private RegionServerAccounting rsAccounting; private long flushCheckInterval; + private long defFlushCheckInterval; + private float flushJitter; // flushPerChanges is to prevent too many changes in memstore private long flushPerChanges; + private long defFlushPerChanges; private long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard closes @@ -660,13 +664,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi .add(confParam) .addStringMap(htd.getConfiguration()) .addBytesMap(htd.getValues()); - this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL, + this.flushJitter = conf.getFloat(MEMSTORE_PERIODIC_FLUSH_JITTER, DEFAULT_FLUSH_JITTER); + this.defFlushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL, DEFAULT_CACHE_FLUSH_INTERVAL); - this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES); - if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) { + this.flushCheckInterval = getNextFlushCheckInterval(); + this.defFlushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES); + if (this.defFlushPerChanges > MAX_FLUSH_PER_CHANGES) { throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed " + MAX_FLUSH_PER_CHANGES); } + this.flushPerChanges = getNextFlushPerChanges(); this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); @@ -1305,8 +1312,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** Conf key for the periodic flush interval */ public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = "hbase.regionserver.optionalcacheflushinterval"; + public static final String MEMSTORE_PERIODIC_FLUSH_JITTER = + "hbase.regionserver.flush.jitter"; /** Default interval for the memstore flush */ public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000; + public final static float DEFAULT_FLUSH_JITTER = 0.25f; public static final int META_CACHE_FLUSH_INTERVAL = 300000; // 5 minutes /** Conf key to force a flush if there are already enough changes for one region in memstore */ @@ -1966,6 +1976,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest + " is > " + this.flushPerChanges + " from current=" + mvcc.getReadPoint()); } + // Set next random flush per changes + this.flushPerChanges = getNextFlushPerChanges(); return true; } if (this.flushCheckInterval <= 0) { @@ -1978,6 +1990,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getRegionInfo().getEncodedName() + " because time of oldest edit=" + store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now); } + // Set next random flush check interval + this.flushCheckInterval = getNextFlushCheckInterval(); return true; } return false; @@ -1992,6 +2006,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (this.maxFlushedSeqId > 0 && (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.getReadPoint())) { whyFlush.append("more than max edits, " + this.flushPerChanges + ", since last flush"); + // Set next random flush per changes + this.flushPerChanges = getNextFlushPerChanges(); return true; } long modifiedFlushCheckInterval = flushCheckInterval; @@ -2013,12 +2029,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) { // we have an old enough edit in the memstore, flush whyFlush.append(s.toString() + " has an old edit so flush to free WALs"); + // Set next random flush check interval + this.flushCheckInterval = getNextFlushCheckInterval(); return true; } } return false; } + private long getNextFlushCheckInterval() + { + Random r = new Random(); + float ff = r.nextFloat(); + return defFlushCheckInterval + + Math.round((ff - 0.5f) * flushJitter * defFlushCheckInterval); + } + + + private long getNextFlushPerChanges() + { + Random r = new Random(); + float ff = r.nextFloat(); + return defFlushPerChanges + + Math.round((ff - 0.5f) * flushJitter * defFlushPerChanges); + } + /** * Flushing all stores. * diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 928f4b6..b7a6ef3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -23,6 +23,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; import java.lang.reflect.InvocationTargetException; import java.net.URLEncoder; import java.util.ArrayList; @@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -506,8 +509,15 @@ public class FSHLog implements WAL { FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir)); this.logrollsize = (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); - - this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); + + float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, + conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, + HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE)); + boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; + if(maxLogsDefined){ + LOG.warn("'hbase.regionserver.maxlogs' was deprecated."); + } + this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", calculateMaxLogFiles(memstoreRatio, logrollsize)); this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication", FSUtils.getDefaultReplication(fs, this.fullPathLogDir)); this.lowReplicationRollLimit = @@ -557,6 +567,13 @@ public class FSHLog implements WAL { this.disruptor.start(); } + private int calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize) + { + MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + int maxLogs = Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize); + return maxLogs; + } + /** * Get the backing files associated with this WAL. * @return may be null if there are no files.