diff --git src/main/java/org/apache/hadoop/hbase/HConstants.java src/main/java/org/apache/hadoop/hbase/HConstants.java index 763fe89..da9df10 100644 --- src/main/java/org/apache/hadoop/hbase/HConstants.java +++ src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -595,6 +595,19 @@ public final class HConstants { /** Host name of the local machine */ public static final String LOCALHOST = "localhost"; + /** By default, don't let the client buffer get any bigger */ + public static final long DEFAULT_HBASE_CLIENT_BUFFER_MULTIPLIER = 1; + /** + * Configuration key for the multiple that can be applied to write buffer, for + * extreme situations + */ + public static final String CLIENT_WRITE_BUFFER_MULTIPLIER_KEY = "hbase.client.write.buffer.multipier"; + /** + * Conf key for the class name of the backoff policy if there is server + * pressure + */ + public static final String CLIENT_BACKOFF_POLICY_KEY = "hbase.client.write.backoff.policy"; + private HConstants() { // Can't be instantiated with this ctor. } diff --git src/main/java/org/apache/hadoop/hbase/client/BackoffPolicy.java src/main/java/org/apache/hadoop/hbase/client/BackoffPolicy.java new file mode 100644 index 0000000..0d43241 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/client/BackoffPolicy.java @@ -0,0 +1,12 @@ +package org.apache.hadoop.hbase.client; + +/** + * Policy to use when evaluting whether or not the client should wait to flush + * commits if the server has back pressure + */ +public interface BackoffPolicy { + + public long calculateWaitTime(float serverPressure, + long currentWriteBufferSize, long writeBufferSize, long maxWriteBufferSize); + +} diff --git src/main/java/org/apache/hadoop/hbase/client/HTable.java src/main/java/org/apache/hadoop/hbase/client/HTable.java index 57605e6..86478a1 100644 --- src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -115,7 +115,14 @@ public class HTable implements HTableInterface { private boolean closed; private int operationTimeout; private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts. + private static final Class DEFAULT_BACKOFF_POLICY = NoServerBackoffPolicy.class; private final boolean cleanupOnClose; // close the connection in close() + // ok since pressure is just a recommendation, so missing an update is OK + private volatile float serverPressure = 0; + // max size that we let a write buffer grow, in pressure situations + private long maxWriteBufferSize; + private BackoffPolicy backoffPolicy; + private volatile Thread flusher; /** * Creates an object to access a HBase table. @@ -212,6 +219,19 @@ public class HTable implements HTableInterface { HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.writeBufferSize = this.configuration.getLong( "hbase.client.write.buffer", 2097152); + this.maxWriteBufferSize = this.writeBufferSize + * this.configuration.getLong( + HConstants.CLIENT_WRITE_BUFFER_MULTIPLIER_KEY, + HConstants.DEFAULT_HBASE_CLIENT_BUFFER_MULTIPLIER); + try { + this.backoffPolicy = this.configuration.getClass( + HConstants.CLIENT_BACKOFF_POLICY_KEY, DEFAULT_BACKOFF_POLICY, + BackoffPolicy.class).newInstance(); + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } this.clearBufferOnFail = true; this.autoFlush = true; this.currentWriteBufferSize = 0; @@ -740,12 +760,62 @@ public class HTable implements HTableInterface { // we need to periodically see if the writebuffer is full instead of waiting until the end of the List n++; if (n % DOPUT_WB_CHECK == 0 && currentWriteBufferSize > writeBufferSize) { - flushCommits(); + checkAndFlushCommits(); } } - if (autoFlush || currentWriteBufferSize > writeBufferSize) { + checkAndFlushCommits(); + } + + private void checkAndFlushCommits() throws IOException { + // if we autoflush, then never block on the client for the server + if (autoFlush) { flushCommits(); + return; } + + // if we are outside the bounds of the default writebuffer + if (currentWriteBufferSize >= writeBufferSize) { + LOG.debug("Write buffer exceeds standard allowed, checking expansion"); + // if we aren't doing any buffering, then max = writeBufferSize and we + // just flush right way + if (currentWriteBufferSize >= maxWriteBufferSize) { + flushCommits(); + return; + } + // if the current buffer is less than the max, we can wait a little bit + // and allow writes to accumulate before flushing + LOG.debug("Write buffer in expansion bounds, attempting backoff"); + // double checked locking/singleton here to optimize for lots of puts + if (flusher == null) { + synchronized (this) { + if (flusher == null) { + long sleepTime = backoffPolicy.calculateWaitTime(serverPressure, + currentWriteBufferSize, writeBufferSize, maxWriteBufferSize); + // setup the flusher thread + // maybe should use the daemonThreadFactory here? + flusher = new Thread(new WriteFlusher(sleepTime, this)); + flusher.setDaemon(true); + flusher.setPriority(Thread.NORM_PRIORITY); + flusher.setName("WriteFlushDaemonThread"); + // and then start the flushing async + flusher.start(); + } + } + } + // at this point we have a flusher thread that will wait a little while + // and then flush all the current and new writes to the server + } + LOG.debug("Write buffer doesn't exceed acceptable size, not forcing flush"); + + } + + /** + * FOR TESTING ONLY! + * + * @return the current backoff policy + */ + BackoffPolicy getBackOffPolicy() { + return this.backoffPolicy; } /** @@ -885,6 +955,10 @@ public class HTable implements HTableInterface { */ @Override public void flushCommits() throws IOException { + // if we don't have anything to write, then we are done and bail out + if (this.currentWriteBufferSize == 0) + return; + try { Object[] results = new Object[writeBuffer.size()]; try { @@ -892,6 +966,7 @@ public class HTable implements HTableInterface { } catch (InterruptedException e) { throw new IOException(e); } finally { + float currentPressure = 0; // mutate list so that it is empty for complete success, or contains // only failed records results are returned in the same order as the // requests in list walk the list backwards, so we can remove from list @@ -900,8 +975,14 @@ public class HTable implements HTableInterface { if (results[i] instanceof Result) { // successful Puts are removed from the list here. writeBuffer.remove(i); + // get the worst pressure from the results + if (results[i] instanceof MonitoredResult) + currentPressure = ((MonitoredResult) results[i]).getPressure() > currentPressure ? ((MonitoredResult) results[i]) + .getPressure() : currentPressure; } } + // update the current pressure to the server pressure + this.serverPressure = currentPressure; } } finally { if (clearBufferOnFail) { @@ -1102,6 +1183,42 @@ public class HTable implements HTableInterface { } } + static class WriteFlusher implements Runnable { + private static final Log LOG = LogFactory.getLog(WriteFlusher.class); + private final long sleepTime; + private final HTable parent; + + public WriteFlusher(long sleepTime, HTable parent) { + this.sleepTime = sleepTime; + this.parent = parent; + } + + @SuppressWarnings("synthetic-access") + @Override + public void run() { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.warn( + "Write flusher woken before completing sleep, attempting to flush writes", + e); + } + try { + // then remove this from the htable, since we will run + synchronized (parent) { + parent.flusher = null; + } + // and then attempt to flush all the current writes + parent.flushCommits(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + + } + + } + /** * Enable or disable region cache prefetch for the table. It will be * applied for the given table's all HTable instances who share the same diff --git src/main/java/org/apache/hadoop/hbase/client/MonitoredResult.java src/main/java/org/apache/hadoop/hbase/client/MonitoredResult.java new file mode 100644 index 0000000..3273dd6 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/client/MonitoredResult.java @@ -0,0 +1,41 @@ +package org.apache.hadoop.hbase.client; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Result that also has the pressure of the Memstore from the action that caused + * the result + */ +public class MonitoredResult extends Result { + + private float pressure = 0; + + /** + * Nullary constructor for Writable + */ + public MonitoredResult() { + } + + public MonitoredResult(float pressure) { + this.pressure = pressure; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.pressure = in.readFloat(); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeFloat(this.pressure); + } + + public float getPressure() { + return pressure; + } + +} diff --git src/main/java/org/apache/hadoop/hbase/client/NoServerBackoffPolicy.java src/main/java/org/apache/hadoop/hbase/client/NoServerBackoffPolicy.java new file mode 100644 index 0000000..8f0896a --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/client/NoServerBackoffPolicy.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.hbase.client; + +/** + * {@link BackoffPolicy} that completely ignores the current server pressure and + * doesn't block on the client. + */ +public class NoServerBackoffPolicy implements BackoffPolicy { + + @Override + public long calculateWaitTime(float serverPressure, + long currentWriteBufferSize, long writeBufferSize, long maxWriteBufferSize) { + return 0; + } + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 25cb31d..a7cb11f 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -87,6 +86,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowLock; +import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.client.coprocessor.ExecResult; @@ -297,7 +297,7 @@ public class HRegion implements HeapSize { // , Writable{ private volatile long lastFlushTime; final RegionServerServices rsServices; private List> recentFlushes = new ArrayList>(); - private long blockingMemStoreSize; + long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard closes final ReentrantReadWriteLock lock = @@ -329,6 +329,8 @@ public class HRegion implements HeapSize { // , Writable{ // These ones are not reset to zero when queried, unlike the previous. public static final ConcurrentMap numericPersistentMetrics = new ConcurrentHashMap(); + private MemstorePressureMonitor pressureMonitor; + /** * Used for metrics where we want track a metrics (such as latency) over a * number of operations. @@ -633,6 +635,15 @@ public class HRegion implements HeapSize { // , Writable{ coprocessorHost.postOpen(); } + // enable the pressure monitor, if it should be turned on + this.pressureMonitor = this.conf.getBoolean( + MemstorePressureMonitor.MEMSTORE_PRESSURE_MONITOR_ENABLED_KEY, false) ? new MemstorePressureMonitor() + : null; + if (this.pressureMonitor != null) { + // then fully configure the monitor + this.pressureMonitor.init(this, this.getConf()); + } + status.markComplete("Region opened successfully"); return nextSeqid; } @@ -1943,6 +1954,7 @@ public class HRegion implements HeapSize { // , Writable{ BatchOperationInProgress> batchOp = new BatchOperationInProgress>(putsAndLocks); + float pressure = 0; while (!batchOp.isDone()) { checkReadOnly(); checkResources(); @@ -1950,7 +1962,14 @@ public class HRegion implements HeapSize { // , Writable{ long newSize; startRegionOperation(); this.writeRequestsCount.increment(); + try { + // if we are monitoring pressure, then check on that + // TODO pass this back to the client at some point... + if (this.pressureMonitor != null) { + pressure = pressureMonitor.monitor(); + } + long addedSize = doMiniBatchPut(batchOp); newSize = this.addAndGetGlobalMemstoreSize(addedSize); } finally { @@ -1960,7 +1979,12 @@ public class HRegion implements HeapSize { // , Writable{ requestFlush(); } } - return batchOp.retCodeDetails; + OperationStatus[] stats = batchOp.retCodeDetails; + // update the current pressure for that statuses + for (OperationStatus stat : stats) { + stat.setPressure(pressure); + } + return stats; } @SuppressWarnings("unchecked") @@ -2450,7 +2474,6 @@ public class HRegion implements HeapSize { // , Writable{ this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit, clusterId, now, this.htableDescriptor); } - long addedSize = applyFamilyMapToMemstore(familyMap, null); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); } finally { @@ -2518,6 +2541,13 @@ public class HRegion implements HeapSize { // , Writable{ } /** + * FOR TESTING ONLY! Get the pressure monitor loaded into the region + */ + public MemstorePressureMonitor getPressureMonitor() { + return this.pressureMonitor; + } + + /** * Remove all the keys listed in the map from the memstore. This method is * called when a Put has updated memstore but subequently fails to update * the wal. This method is then invoked to rollback the memstore. diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7d7be3c..8d803b9 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -83,16 +83,17 @@ import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.MonitoredResult; import org.apache.hadoop.hbase.client.MultiAction; import org.apache.hadoop.hbase.client.MultiResponse; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.client.coprocessor.ExecResult; @@ -341,6 +342,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, private static final String OPEN = "OPEN"; private static final String CLOSE = "CLOSE"; + /** If the MemStore pressure is configured to be monitored */ + private boolean memstorePressureMonitorEnabled = false; + /** * Starts a HRegionServer at the default location * @@ -411,6 +415,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, "hbase.regionserver.kerberos.principal", this.isa.getHostName()); regionServerAccounting = new RegionServerAccounting(); cacheConfig = new CacheConfig(conf); + + //check to see if we need to monitor memstore pressure + this.memstorePressureMonitorEnabled = this.conf.getBoolean( + MemstorePressureMonitor.MEMSTORE_PRESSURE_MONITOR_ENABLED_KEY, false); } /** @@ -3373,7 +3381,13 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, Object result = null; if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) { - result = new Result(); + // checking if we need up send along the pressure + if (memstorePressureMonitorEnabled) { + result = new MonitoredResult(code.getPressure()); + } + else { + result = new Result(); + } } else if (code.getOperationStatusCode() == OperationStatusCode.SANITY_CHECK_FAILURE) { result = new DoNotRetryIOException(code.getExceptionMsg()); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/MemstorePressureMonitor.java src/main/java/org/apache/hadoop/hbase/regionserver/MemstorePressureMonitor.java new file mode 100644 index 0000000..ed61adf --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemstorePressureMonitor.java @@ -0,0 +1,141 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; + +/** + * Monitor the current 'pressure' on the memstore so the client can be notified + * if the memstore is starting become overwhelmed. + *

+ * This is the main class to help do graceful degredation of memstore + * performance rather than the complete blocking when there is a compaction + * and the MemStore attempts to flush. + */ +public class MemstorePressureMonitor extends Configured { + + private static final Log LOG = LogFactory + .getLog(MemstorePressureMonitor.class); + + public static final Class DEFAULT_MONITOR = PressureMonitor.NonBlockingPressureMontior.class; + + /** + * true if the pressure monitor should be enabled. + */ + public static final String MEMSTORE_PRESSURE_MONITOR_ENABLED_KEY = "hbase.memstore.monitor.enabled"; + + /** + * Specify the class to use for the pressure monitor. If none is specifed, + * then no presure monitoring shouild be used + */ + public static final String MEMSTORE_PRESSURE_MONITOR_CLASS_KEY = "hbase.memstore.montior.class"; + + /** + * If the RegionServer should block the clients for a limited time to help + * relieve pressure. Should be true if enabled + */ + public static final String MEMSTORE_PRESSURE_MONITOR_ENABLE_SERVER_PAUSE_KEY = "hbase.memstore.monitor.serverpause.enabled"; + + /** + * Long indicating number of milliseconds to pause on the server (when + * {@value #MEMSTORE_PRESSURE_MONITOR_ENABLE_SERVER_PAUSE_KEY} is enabled) + * when receiving excessive pressure + */ + public static final String MEMSTORE_PRESSURE_MONITOR_SERVER_PAUSE_TIME_KEY = "hbase.memstore.monitor.serverpause.time"; + + /** + * Amount of pressure at which to start blocking client writes on the server, + * if {@value #MEMSTORE_PRESSURE_MONITOR_ENABLE_SERVER_PAUSE_KEY} is enabled. + */ + public static final String MEMSTORE_PRESSURE_MONITOR_SERVER_PAUSE_THRESHOLD_KEY = "hbase.memstore.monitor.threshold"; + /** Pause on the server for 10ms */ + private static final long DEFAULT_SERVER_PAUSE_TIME = 10; + + /** Only start pausing when we have a 50% threshold */ + private static final float DEFAULT_SERVER_PAUSE_THRESHOLD = 0.5F; + + private PressureMonitor calc; + + private boolean blockOnServer; + + private long serverPauseTime; + + private float serverPauseThreshold; + + public void init(HRegion parent, Configuration conf) throws IOException { + this.setConf(conf); + try { + LOG.info("Initializing the memstore pressure monitor"); + // load the pressure calculator + calc = conf + .getClass(MEMSTORE_PRESSURE_MONITOR_CLASS_KEY, DEFAULT_MONITOR) + .asSubclass(PressureMonitor.class).newInstance(); + LOG.debug("Using a " + calc.getClass() + " to monitor pressure."); + // and then prepare it for use + calc.init(parent); + + // then figure out if we want to block on server + blockOnServer = conf.getBoolean( + MEMSTORE_PRESSURE_MONITOR_ENABLE_SERVER_PAUSE_KEY, false); + // if we are blocking on the server, then read of the necessary info + if (blockOnServer) { + LOG.debug("Memstore pressure on server is enabled"); + serverPauseTime = conf.getLong( + MEMSTORE_PRESSURE_MONITOR_SERVER_PAUSE_TIME_KEY, + DEFAULT_SERVER_PAUSE_TIME); + serverPauseThreshold = conf.getFloat( + MEMSTORE_PRESSURE_MONITOR_SERVER_PAUSE_THRESHOLD_KEY, + DEFAULT_SERVER_PAUSE_THRESHOLD); + } + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } + + /** + * FOR TESTING ONLY! + * + * @return the calculator used to determine the memstore pressure + */ + public PressureMonitor getCalculator() { + return this.calc; + } + + /** + * Monitor the state of the memstore and block the write if enabled. + * + * @return the current memstore pressure. + */ + public float monitor() { + float pressure = calc + .calculatePressure(calc.getParent().getStores().size()); + + // TODO switched over to trace + LOG.debug("Current memstore pressure: " + pressure); + + if (blockOnServer) { + if (pressure > serverPauseThreshold) { + try { + Thread.sleep(serverPauseTime); + } catch (InterruptedException e) { + LOG.warn("Interrupted while providing server-side back pressure", e); + } + } + } + + return pressure; + } + + /* TODO ----TO BE MOVED TO CLIENT ----- */ + public static final String CLIENT_MUTATION_BUFFER_MULTIPLER = "hbase.client.buffer.multiplier"; + + // NOTES: + // Region flushes when memstoreSize > blockingMemstoreStore, HRegion, line + // 2361 + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java index 1b94ab5..6b60b1f 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java @@ -45,16 +45,26 @@ public class OperationStatus { private final OperationStatusCode code; private final String exceptionMsg; + + private float pressure; public OperationStatus(OperationStatusCode code) { - this(code, ""); + this(code, "", (long) 0.0); } public OperationStatus(OperationStatusCode code, String exceptionMsg) { + this(code, exceptionMsg, (long) 0.0); + } + + public OperationStatus(OperationStatusCode code, String exceptionMsg, float pressure) { this.code = code; this.exceptionMsg = exceptionMsg; + this.pressure = pressure; } + public void setPressure(float pressure) { + this.pressure = pressure; + } /** * @return OperationStatusCode @@ -70,4 +80,11 @@ public class OperationStatus { return exceptionMsg; } + /** + * @return Memstore back pressure on this operation + */ + public float getPressure() { + return pressure; + } + } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/PressureMonitor.java src/main/java/org/apache/hadoop/hbase/regionserver/PressureMonitor.java new file mode 100644 index 0000000..acfa408 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/PressureMonitor.java @@ -0,0 +1,84 @@ +package org.apache.hadoop.hbase.regionserver; + +public abstract class PressureMonitor { + + /** + * Calculate the current pressure on the MemStore. + *

+ * Increased pressure will attempt to be communicated back to the client while + * the store attempts to finish compactions + * + * @param currentNumStoreFiles + * @return number between 0.0 and 1.0. 0.0 indicates 'no pressure' and that + * memstore is completely able to take writes; 1.0 indicates that the + * memstore is 'full' and any more writes will be blocked until the + * memstore is ready. + */ + public abstract float calculatePressure(int currentNumStoreFiles); + + private HRegion parent; + + /** + * Initialized the calculator and bind it to a given region + * + * @param parent + * Region that the calculator will be applied to. + */ + public void init(HRegion parent) { + this.parent = parent; + } + + /** + * Get the max amount a memstore can grow to, in bytes, before it is flushed. + *

+ * Equal to the flush size * block multiplier + * + * @return the total number of bytes a memstore can grow to until it must + * block writes from the client + */ + protected long getBlockingMemStoreSize() { + return parent.blockingMemStoreSize; + } + + /** + * The number of bytes stored in the current memstore (across all stores). + * + * @return size of the overall memstore in bytes. + */ + protected long currentMemstoreSize() { + return parent.memstoreSize.get(); + } + + /** + * @return the number of files that, when reached, the region will block + * writes until a minor compaction has taken place. + */ + protected int getNumBlockingStoreFiles() { + return StoreUtils.getNumBlockingStoreFiles(parent.getConf()); + } + + /** + * Advanced calculations only! Allows full access and can cause significant + * runtime issues if misused. + * + * @return the region to which this calculator is being applied. + */ + protected HRegion getParent() { + return this.parent; + } + + /** + * Calculator that never considers there to be any pressure on the memstore. + *

+ * This monitor will always lead to the traditional saw-tooth pattern when + * writes overload the memstore. + */ + public static class NonBlockingPressureMontior extends + PressureMonitor { + + @Override + public float calculatePressure(int currentNumStoreFiles) { + return 0; + } + } +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/SimpleMemStorePressureMonitor.java src/main/java/org/apache/hadoop/hbase/regionserver/SimpleMemStorePressureMonitor.java new file mode 100644 index 0000000..44e492a --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/SimpleMemStorePressureMonitor.java @@ -0,0 +1,17 @@ +package org.apache.hadoop.hbase.regionserver; + +public class SimpleMemStorePressureMonitor extends PressureMonitor { + + @Override + public float calculatePressure(int currentNumStoreFiles) { + // if we have enough files to block, then we generate pressure + if (currentNumStoreFiles >= this.getNumBlockingStoreFiles()) { + // then we just do simple math do determine the % of pressure + return this.currentMemstoreSize() + / (float) this.getBlockingMemStoreSize(); + } + // no pressure if we can still flush the memstore w/o any issue + return 0; + } + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java new file mode 100644 index 0000000..de8733b --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java @@ -0,0 +1,25 @@ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; + +public final class StoreUtils { + + private static final int DEFAULT_NUM_BLOCKING_STORE_FILES = 7; + + /** + * Get the configured number of Store files when the region server should + * block client writes to allow compaction to catch up with writes. + * + * @param conf + * {@link Configuration} to read from + * @return the set, or default, number of store files when to block + */ + public static int getNumBlockingStoreFiles(Configuration conf) { + return conf.getInt("hbase.hstore.blockingStoreFiles", + DEFAULT_NUM_BLOCKING_STORE_FILES); + } + + private StoreUtils() { + } + +} diff --git src/test/java/org/apache/hadoop/hbase/client/TestClientBackOff.java src/test/java/org/apache/hadoop/hbase/client/TestClientBackOff.java new file mode 100644 index 0000000..d6b99fb --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/client/TestClientBackOff.java @@ -0,0 +1,168 @@ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MemstorePressureMonitor; +import org.apache.hadoop.hbase.regionserver.PressureMonitor; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test that the client is correctly attempting to backoff + */ +@Category(MediumTests.class) +public class TestClientBackOff { + private static HBaseTestingUtility UTIL; + private static MiniHBaseCluster CLUSTER; + private static Configuration CONF; + + private static final byte[] TEST_TABLE = Bytes.toBytes("testTable"); + private static final byte[] TEST_FAM = Bytes.toBytes("a"); + private static final byte[] TEST_QUAL = new byte[] { 2 }; + + @BeforeClass + public static void setupClass() throws Exception { + UTIL = new HBaseTestingUtility(); + CONF = UTIL.getConfiguration(); + + // disable the ui, since we don't care + CONF.setInt("hbase.regionsever.info.port", -1); + + // and then use our own backoff policy that we can monitor + CONF.setClass(HConstants.CLIENT_BACKOFF_POLICY_KEY, + MonitorableClientBackoff.class, BackoffPolicy.class); + + // and then set the bounds on the size of the client buffer + CONF.setInt(HConstants.CLIENT_WRITE_BUFFER_MULTIPLIER_KEY, 2); + // set to client buffer to 20 bytes + CONF.setInt("hbase.client.write.buffer", 2000); + + // and then use our own server-side pressure monitor so we can artificially + // create pressure + CONF.setBoolean( + MemstorePressureMonitor.MEMSTORE_PRESSURE_MONITOR_ENABLED_KEY, true); + CONF.setClass(MemstorePressureMonitor.MEMSTORE_PRESSURE_MONITOR_CLASS_KEY, + ArtificialPressureMontior.class, PressureMonitor.class); + + Configuration.dumpConfiguration(CONF, new PrintWriter(System.out)); + // and then create the cluster + CLUSTER = UTIL.startMiniCluster(1); + + } + + public static void teardown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testServerSideRunning() throws Exception { + // create and start the test table + HBaseAdmin admin = new HBaseAdmin(CONF); + HTableDescriptor desc = new HTableDescriptor(TEST_TABLE); + desc.addFamily(new HColumnDescriptor(TEST_FAM)); + admin.createTable(desc); + + // make sure our pressure monitor got loaded + List regions = CLUSTER.getRegions(TEST_TABLE); + for (HRegion region : regions) { + assertEquals(ArtificialPressureMontior.class, region.getPressureMonitor() + .getCalculator().getClass()); + } + + // and then connect to it + HTable actual = new HTable(CONF, "testTable"); + // verfiy that the htable was properly created + MonitorableClientBackoff policy = (MonitorableClientBackoff) actual + .getBackOffPolicy(); + + // now start spying on the real table + HTable spy = spy(actual); + + // buffer writes + spy.setAutoFlush(false); + + int totalPuts = 5; + + // buffer = 2000 bytes w/ 2x expansion multiplier + // this means we will see a policy check every 5->8 puts + List puts = new ArrayList(totalPuts); + for (int i = 0; i < totalPuts; i++) { + // this put is 498 bytes big + Put p = new Put(new byte[] { (byte) i }); + p.add(TEST_FAM, TEST_QUAL, new byte[] { 4 }); + puts.add(p); + + } + // which will blow past the official buffer, but use the expansion buffer + // and the backoff policy once + spy.put(puts); + + // and that we actually checked the policy + assertEquals(1, policy.count); + // and all the pressure-related info got updated + assertEquals(0.0, policy.lastPressure, 0); + assertEquals(puts.get(0).heapSize() * totalPuts, policy.currentWriteSize); + + // and then we do the puts again to make sure the policy gets updated + spy.put(puts); + + // once for the initial put, once for the second + verify(spy, times(2)).flushCommits(); + + // and that we actually checked the policy + assertEquals(2, policy.count); + // and all the pressure-related info got updated + assertEquals(0.6, policy.lastPressure, 0.0001);// diff < 0.0001 for writable + assertEquals(puts.get(0).heapSize() * totalPuts, policy.currentWriteSize); + } + + /** + * Backoff monitor on the client that we can query for current state. + */ + public static class MonitorableClientBackoff implements BackoffPolicy { + int count = 0; + float lastPressure = 0; + long currentWriteSize = 0; + + @Override + public long calculateWaitTime(float serverPressure, + long currentWriteBufferSize, long writeBufferSize, + long maxWriteBufferSize) { + count++; + lastPressure = serverPressure; + currentWriteSize = currentWriteBufferSize; + return 0; + } + + } + + /** + * Pressure monitor that will always (Artificially) report pressure on the + * memstore. A "fake" that will always report 60% pressure + */ + public static class ArtificialPressureMontior extends PressureMonitor { + + @Override + public float calculatePressure(int currentNumStoreFiles) { + return (float) .6; + } + + } +} diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStorePressureMonitor.java src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStorePressureMonitor.java new file mode 100644 index 0000000..1afdf0f --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStorePressureMonitor.java @@ -0,0 +1,113 @@ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestMemStorePressureMonitor { + + private static HBaseTestingUtility UTIL; + private static MiniHBaseCluster CLUSTER; + private static Configuration CONF; + + private static final byte[] TEST_TABLE = Bytes.toBytes("testTable"); + private static final byte[] TEST_FAM = Bytes.toBytes("fam1"); + private static final byte[] TEST_QUAL = Bytes.toBytes("qual1"); + + @BeforeClass + public static void setupClass() throws Exception { + UTIL = new HBaseTestingUtility(); + CONF = UTIL.getConfiguration(); + + // set the config options we want on our cluster + // enable the monitor + CONF.setBoolean( + MemstorePressureMonitor.MEMSTORE_PRESSURE_MONITOR_ENABLED_KEY, true); + // set our own special monitor that we can monitor + CONF.setClass(MemstorePressureMonitor.MEMSTORE_PRESSURE_MONITOR_CLASS_KEY, + CountingPressureMontior.class, PressureMonitor.class); + // enable blocking on the servers + CONF.setBoolean( + MemstorePressureMonitor.MEMSTORE_PRESSURE_MONITOR_ENABLE_SERVER_PAUSE_KEY, + true); + // ensure that we have no pressure on the memstore to cause blocking + CONF.setFloat( + MemstorePressureMonitor.MEMSTORE_PRESSURE_MONITOR_SERVER_PAUSE_THRESHOLD_KEY, + 0.0F); + // and disable the ui, since we don't care + CONF.setInt("hbase.regionsever.info.port", -1); + + // and then create the cluster + CLUSTER = UTIL.startMiniCluster(1); + + } + + @AfterClass + public static void teardown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testServerSideRunning() throws Exception { + // create and start the test table + HBaseAdmin admin = new HBaseAdmin(CONF); + HTableDescriptor desc = new HTableDescriptor(TEST_TABLE); + desc.addFamily(new HColumnDescriptor(TEST_FAM)); + admin.createTable(desc); + + // and then connect to it + HTable client = new HTable(CONF, "testTable"); + + int totalPuts = 3; + Put p = new Put(Bytes.toBytes("row")); + p.add(TEST_FAM, TEST_QUAL, new byte[] { 1, 2, 3, 4 }); + for (int i = 0; i < totalPuts; i++) { + client.put(p); + client.flushCommits(); + } + + // ensure that we got a pressure request for every put + int sentRequests = 0; + List regions = CLUSTER.getRegions(TEST_TABLE); + for (HRegion region : regions) { + CountingPressureMontior pm = (CountingPressureMontior) region + .getPressureMonitor().getCalculator(); + sentRequests += pm.getNumPressureRequests(); + } + + assertEquals("Sent requests != expected memstore calculation requests", + totalPuts, sentRequests); + } + + public static class CountingPressureMontior extends PressureMonitor { + + private int pressureRequestCounter = 0; + + @Override + public float calculatePressure(int currentNumStoreFiles) { + pressureRequestCounter++; + return 0; + } + + public int getNumPressureRequests() { + return pressureRequestCounter; + } + + } +}