commit 33b8f0aac0b3edf2f2c0157e2dec174048f0d6fc Author: Yu Li Date: Sun Dec 13 14:52:40 2015 +0800 HBASE-14969 Add throughput controller for flush diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index b5f412d..8ac9dd1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; /** @@ -151,7 +151,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { @Override protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, - CompactionThroughputController throughputController, boolean major) throws IOException { + ThroughputController throughputController, boolean major) throws IOException { if (!(scanner instanceof MobCompactionStoreScanner)) { throw new IllegalArgumentException( "The scanner should be an instance of MobCompactionStoreScanner"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 999d25c..46d35b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; @@ -96,7 +97,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { */ @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status) throws IOException { + MonitoredTask status, ThroughputController throughputController) throws IOException { ArrayList result = new ArrayList(); int cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index f54f008..2f9d914 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -40,8 +40,8 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -89,7 +89,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi private final ThreadPoolExecutor splits; private final ThreadPoolExecutor mergePool; - private volatile CompactionThroughputController compactionThroughputController; + private volatile ThroughputController compactionThroughputController; /** * Splitting should not take place if the total number of regions exceed this. @@ -672,7 +672,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } } - CompactionThroughputController old = this.compactionThroughputController; + ThroughputController old = this.compactionThroughputController; if (old != null) { old.stop("configuration change"); } @@ -717,7 +717,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } @VisibleForTesting - public CompactionThroughputController getCompactionThroughputController() { + public ThroughputController getCompactionThroughputController() { return compactionThroughputController; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index da326e3..1a8b4bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -21,16 +21,16 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; -import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -120,13 +120,13 @@ public class DefaultStoreEngine extends StoreEngine< } @Override - public List compact(CompactionThroughputController throughputController) + public List compact(ThroughputController throughputController) throws IOException { return compact(throughputController, null); } @Override - public List compact(CompactionThroughputController throughputController, User user) + public List compact(ThroughputController throughputController, User user) throws IOException { return compactor.compact(request, throughputController, user); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 2f51277..4656605 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.util.StringUtils; /** @@ -44,7 +45,7 @@ public class DefaultStoreFlusher extends StoreFlusher { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status) throws IOException { + MonitoredTask status, ThroughputController throughputController) throws IOException { ArrayList result = new ArrayList(); int cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries @@ -71,7 +72,7 @@ public class DefaultStoreFlusher extends StoreFlusher { writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); IOException e = null; try { - performFlush(scanner, writer, smallestReadPoint); + performFlush(scanner, writer, smallestReadPoint, throughputController); } catch (IOException ioe) { e = ioe; // throw the exception out diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 496c7e2..d2fbf29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.mob.MobFileName; import org.apache.hadoop.hbase.mob.MobStoreEngine; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HFileArchiveUtil; @@ -458,7 +458,7 @@ public class HMobStore extends HStore { */ @Override public List compact(CompactionContext compaction, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { // If it's major compaction, try to find whether there's a sweeper is running // If yes, mark the major compaction as retainDeleteMarkers if (compaction.getRequest().isAllFiles()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d059cd0..85e0488 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -151,9 +151,9 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -1725,7 +1725,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (Store s : getStores()) { CompactionContext compaction = s.requestCompaction(); if (compaction != null) { - CompactionThroughputController controller = null; + ThroughputController controller = null; if (rsServices != null) { controller = CompactionThroughputControllerFactory.create(rsServices, conf); } @@ -1759,7 +1759,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException e */ @VisibleForTesting - void compactStore(byte[] family, CompactionThroughputController throughputController) + void compactStore(byte[] family, ThroughputController throughputController) throws IOException { Store s = getStore(family); CompactionContext compaction = s.requestCompaction(); @@ -1784,12 +1784,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return whether the compaction completed */ public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { return compact(compaction, store, throughputController, null); } public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { assert compaction != null && compaction.hasSelection(); assert !compaction.getRequest().getFiles().isEmpty(); if (this.closing.get() || this.closed.get()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b2cc78a..4e1cc77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; @@ -136,6 +137,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.controller.FlushThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; @@ -189,6 +192,7 @@ import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; + import sun.misc.Signal; import sun.misc.SignalHandler; @@ -198,7 +202,8 @@ import sun.misc.SignalHandler; */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @SuppressWarnings("deprecation") -public class HRegionServer extends HasThread implements RegionServerServices, LastSequenceId { +public class HRegionServer extends HasThread implements + RegionServerServices, LastSequenceId, ConfigurationObserver { private static final Log LOG = LogFactory.getLog(HRegionServer.class); @@ -487,6 +492,8 @@ public class HRegionServer extends HasThread implements RegionServerServices, La private CompactedHFilesDischarger compactedFileDischarger; + private volatile ThroughputController flushThroughputController; + /** * Starts a HRegionServer at the default location. * @param conf @@ -609,6 +616,7 @@ public class HRegionServer extends HasThread implements RegionServerServices, La putUpWebUI(); this.walRoller = new LogRoller(this, this); this.choreService = new ChoreService(getServerName().toString(), true); + this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); if (!SystemUtils.IS_OS_WINDOWS) { Signal.handle(new Signal("HUP"), new SignalHandler() { @@ -899,6 +907,7 @@ public class HRegionServer extends HasThread implements RegionServerServices, La // Registering the compactSplitThread object with the ConfigurationManager. configurationManager.registerObserver(this.compactSplitThread); configurationManager.registerObserver(this.rpcServices); + configurationManager.registerObserver(this); } /** @@ -3380,4 +3389,28 @@ public class HRegionServer extends HasThread implements RegionServerServices, La public boolean walRollRequestFinished() { return this.walRoller.walRollFinished(); } + + @Override + public ThroughputController getFlushThroughputController() { + return flushThroughputController; + } + + @Override + public double getFlushPressure() { + if (getRegionServerAccounting() == null || cacheFlusher == null) { + // return 0 during RS initialization + return 0.0; + } + return getRegionServerAccounting().getGlobalMemstoreSize() * 1.0 + / cacheFlusher.globalMemStoreLimitLowMark; + } + + @Override + public void onConfigurationChange(Configuration newConf) { + ThroughputController old = this.flushThroughputController; + if (old != null) { + old.stop("configuration change"); + } + this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 9ebdaee..17350ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -77,9 +77,9 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; @@ -845,7 +845,7 @@ public class HStore implements Store { /** * Snapshot this stores memstore. Call before running - * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask)} + * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController)} * so it has some work to do. */ void snapshot() { @@ -858,15 +858,16 @@ public class HStore implements Store { } /** - * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. + * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. * @param logCacheFlushId flush sequence number * @param snapshot * @param status + * @param throughputController * @return The path name of the tmp file to which the store was flushed - * @throws IOException + * @throws IOException if exception occurs during process */ protected List flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, - MonitoredTask status) throws IOException { + MonitoredTask status, ThroughputController throughputController) throws IOException { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. @@ -876,7 +877,8 @@ public class HStore implements Store { IOException lastException = null; for (int i = 0; i < flushRetriesNumber; i++) { try { - List pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status); + List pathNames = + flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController); Path lastPathName = null; try { for (Path pathName : pathNames) { @@ -1175,13 +1177,13 @@ public class HStore implements Store { */ @Override public List compact(CompactionContext compaction, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { return compact(compaction, throughputController, null); } @Override public List compact(CompactionContext compaction, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { assert compaction != null; List sfs = null; CompactionRequest cr = compaction.getRequest(); @@ -2058,7 +2060,10 @@ public class HStore implements Store { @Override public void flushCache(MonitoredTask status) throws IOException { - tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status); + RegionServerServices rsService = region.getRegionServerServices(); + ThroughputController throughputController = + rsService == null ? null : rsService.getFlushThroughputController(); + tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index cd4816c..7bf968a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.wal.WAL; import org.apache.zookeeper.KeeperException; @@ -231,4 +232,16 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure() */ double getCompactionPressure(); + + /** + * @return the controller to avoid flush too fast + */ + public ThroughputController getFlushThroughputController(); + + /** + * @return the flush pressure of all stores on this regionserver. The value should be greater than + * or equal to 0.0, and any value greater than 1.0 means we enter the emergency state that + * global memstore size already exceeds lower limit. + */ + double getFlushPressure(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 8bb10f0..b22bd27 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -22,8 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.NavigableSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -32,6 +30,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.io.HeapSize; @@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; /** @@ -225,14 +225,14 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf void cancelRequestedCompaction(CompactionContext compaction); /** - * @deprecated see compact(CompactionContext, CompactionThroughputController, User) + * @deprecated see compact(CompactionContext, ThroughputController, User) */ @Deprecated List compact(CompactionContext compaction, - CompactionThroughputController throughputController) throws IOException; + ThroughputController throughputController) throws IOException; List compact(CompactionContext compaction, - CompactionThroughputController throughputController, User user) throws IOException; + ThroughputController throughputController, User user) throws IOException; /** * @return true if we should run a major compaction. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index bcc0a90..958d4ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -19,18 +19,22 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; /** * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one). @@ -51,10 +55,11 @@ abstract class StoreFlusher { * @param snapshot Memstore snapshot. * @param cacheFlushSeqNum Log cache flush sequence number. * @param status Task that represents the flush operation and may be updated with status. + * @param throughputController A controller to avoid flush too fast * @return List of files written. Can be empty; must not be null. */ public abstract List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, - MonitoredTask status) throws IOException; + MonitoredTask status, ThroughputController throughputController) throws IOException; protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum, MonitoredTask status) throws IOException { @@ -104,9 +109,10 @@ abstract class StoreFlusher { * @param scanner Scanner to get data from. * @param sink Sink to write data to. Could be StoreFile.Writer. * @param smallestReadPoint Smallest read point used for the flush. + * @param throughputController A controller to avoid flush too fast */ - protected void performFlush(InternalScanner scanner, - Compactor.CellSink sink, long smallestReadPoint) throws IOException { + protected void performFlush(InternalScanner scanner, Compactor.CellSink sink, + long smallestReadPoint, ThroughputController throughputController) throws IOException { int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); @@ -115,17 +121,54 @@ abstract class StoreFlusher { List kvs = new ArrayList(); boolean hasMore; - do { - hasMore = scanner.next(kvs, scannerContext); - if (!kvs.isEmpty()) { - for (Cell c : kvs) { - // If we know that this KV is going to be included always, then let us - // set its memstoreTS to 0. This will help us save space when writing to - // disk. - sink.append(c); + String flushName = generateFlushName(); + // no control on system table (such as meta, namespace, etc) flush + boolean control = throughputController != null && !store.getRegionInfo().isSystemTable(); + if (control) { + throughputController.start(flushName); + } + try { + do { + hasMore = scanner.next(kvs, scannerContext); + if (!kvs.isEmpty()) { + for (Cell c : kvs) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to + // disk. + sink.append(c); + int len = KeyValueUtil.length(c); + if (control) { + throughputController.control(flushName, len); + } + } + kvs.clear(); } - kvs.clear(); + } while (hasMore); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted while control throughput of flushing " + + flushName); + } finally { + if (control) { + throughputController.finish(flushName); } - } while (hasMore); + } + } + + /** + * Used to prevent flush name conflict when multiple flushes running parallel on the same store. + */ + private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0); + + private String generateFlushName() { + int counter; + for (;;) { + counter = NAME_COUNTER.get(); + int next = counter == Integer.MAX_VALUE ? 0 : counter + 1; + if (NAME_COUNTER.compareAndSet(counter, next)) { + break; + } + } + return store.getRegionInfo().getRegionNameAsString() + "#" + + store.getFamily().getNameAsString() + "#flush#" + counter; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index 3707290..cfa063e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -23,16 +23,16 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; import com.google.common.base.Preconditions; @@ -100,14 +100,14 @@ public class StripeStoreEngine extends StoreEngine compact(CompactionThroughputController throughputController) + public List compact(ThroughputController throughputController) throws IOException { Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); return this.stripeRequest.execute(compactor, throughputController, null); } @Override - public List compact(CompactionThroughputController throughputController, User user) + public List compact(ThroughputController throughputController, User user) throws IOException { Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); return this.stripeRequest.execute(compactor, throughputController, user); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 9deea7a..0a1b70c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import com.google.common.annotations.VisibleForTesting; @@ -56,7 +57,7 @@ public class StripeStoreFlusher extends StoreFlusher { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, - MonitoredTask status) throws IOException { + MonitoredTask status, ThroughputController throughputController) throws IOException { List result = new ArrayList(); int cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries @@ -80,7 +81,7 @@ public class StripeStoreFlusher extends StoreFlusher { mw.init(storeScanner, factory, store.getComparator()); synchronized (flushLock) { - performFlush(scanner, mw, smallestReadPoint); + performFlush(scanner, mw, smallestReadPoint, throughputController); result = mw.commitWriters(cacheFlushSeqNum, false); success = true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java index cb16966..fedaafd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; @@ -69,10 +70,10 @@ public abstract class CompactionContext { * Runs the compaction based on current selection. select/forceSelect must have been called. * @return The new file paths resulting from compaction. */ - public abstract List compact(CompactionThroughputController throughputController) + public abstract List compact(ThroughputController throughputController) throws IOException; - public abstract List compact(CompactionThroughputController throughputController, User user) + public abstract List compact(ThroughputController throughputController, User user) throws IOException; public CompactionRequest getRequest() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java deleted file mode 100644 index 657ecb4..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.compactions; - -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; - -/** - * A utility that constrains the total throughput of one or more simultaneous flows (compactions) by - * sleeping when necessary. - */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public interface CompactionThroughputController extends Stoppable { - - /** - * Setup controller for the given region server. - */ - void setup(RegionServerServices server); - - /** - * Start a compaction. - */ - void start(String compactionName); - - /** - * Control the compaction throughput. Will sleep if too fast. - * @return the actual sleep time. - */ - long control(String compactionName, long size) throws InterruptedException; - - /** - * Finish a compaction. Should call this method in a finally block. - */ - void finish(String compactionName); -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java index 132c27f..812408c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java @@ -23,33 +23,37 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.util.ReflectionUtils; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class CompactionThroughputControllerFactory { +public final class CompactionThroughputControllerFactory { private static final Log LOG = LogFactory.getLog(CompactionThroughputControllerFactory.class); public static final String HBASE_THROUGHPUT_CONTROLLER_KEY = "hbase.regionserver.throughput.controller"; - private static final Class + private CompactionThroughputControllerFactory() { + } + + private static final Class DEFAULT_THROUGHPUT_CONTROLLER_CLASS = PressureAwareCompactionThroughputController.class; - public static CompactionThroughputController create(RegionServerServices server, + public static ThroughputController create(RegionServerServices server, Configuration conf) { - Class clazz = getThroughputControllerClass(conf); - CompactionThroughputController controller = ReflectionUtils.newInstance(clazz, conf); + Class clazz = getThroughputControllerClass(conf); + ThroughputController controller = ReflectionUtils.newInstance(clazz, conf); controller.setup(server); return controller; } - public static Class getThroughputControllerClass( + public static Class getThroughputControllerClass( Configuration conf) { String className = conf.get(HBASE_THROUGHPUT_CONTROLLER_KEY, DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName()); try { - return Class.forName(className).asSubclass(CompactionThroughputController.class); + return Class.forName(className).asSubclass(ThroughputController.class); } catch (Exception e) { LOG.warn( "Unable to load configured throughput controller '" + className diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 660ea91..2a67317 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; @@ -297,7 +298,7 @@ public abstract class Compactor { } } return store.getRegionInfo().getRegionNameAsString() + "#" - + store.getFamily().getNameAsString() + "#" + counter; + + store.getFamily().getNameAsString() + "#compaction#" + counter; } /** @@ -312,7 +313,7 @@ public abstract class Compactor { */ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, - CompactionThroughputController throughputController, boolean major) throws IOException { + ThroughputController throughputController, boolean major) throws IOException { long bytesWrittenProgressForCloseCheck = 0; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 069d221..b4e02ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -34,11 +34,12 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; /** * Compact passed set of files. Create an instance and then call - * {@link #compact(CompactionRequest, CompactionThroughputController, User)} + * {@link #compact(CompactionRequest, ThroughputController, User)} */ @InterfaceAudience.Private public class DefaultCompactor extends Compactor { @@ -52,7 +53,7 @@ public class DefaultCompactor extends Compactor { * Do a minor/major compaction on an explicit set of storefiles from a Store. */ public List compact(final CompactionRequest request, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -173,7 +174,7 @@ public class DefaultCompactor extends Compactor { /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to - * {@link #compact(CompactionRequest, CompactionThroughputController, User)}; + * {@link #compact(CompactionRequest, ThroughputController, User)}; * @param filesToCompact the files to compact. These are used as the compactionSelection for * the generated {@link CompactionRequest}. * @param isMajor true to major compact (prune all deletes, max versions, etc) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java index 766b2cb..cd3e449 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java @@ -19,47 +19,18 @@ package org.apache.hadoop.hbase.regionserver.compactions; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.controller.NoLimitThroughputController; /** - * A dummy CompactionThroughputController that does nothing. + * A dummy ThroughputController that does nothing. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class NoLimitCompactionThroughputController implements CompactionThroughputController { +public class NoLimitCompactionThroughputController extends NoLimitThroughputController { public static final NoLimitCompactionThroughputController INSTANCE = new NoLimitCompactionThroughputController(); @Override - public void setup(RegionServerServices server) { - } - - @Override - public void start(String compactionName) { - } - - @Override - public long control(String compactionName, long size) throws InterruptedException { - return 0; - } - - @Override - public void finish(String compactionName) { - } - - private volatile boolean stopped; - - @Override - public void stop(String why) { - stopped = true; - } - - @Override - public boolean isStopped() { - return stopped; - } - - @Override public String toString() { return "NoLimitCompactionThroughputController"; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java index 11ab568..532ca19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java @@ -17,19 +17,14 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.regionserver.controller.PressureAwareThroughputController; /** * A throughput controller which uses the follow schema to limit throughput @@ -37,7 +32,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; *
  • If compaction pressure is greater than 1.0, no limitation.
  • *
  • In off peak hours, use a fixed throughput limitation * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}
  • - *
  • In normal hours, the max throughput is tune between + *
  • In normal hours, the max throughput is tuned between * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula "lower + * (higer - lower) * compactionPressure", where compactionPressure is in range [0.0, 1.0]
  • @@ -45,8 +40,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure() */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class PressureAwareCompactionThroughputController extends Configured implements - CompactionThroughputController, Stoppable { +public class PressureAwareCompactionThroughputController extends PressureAwareThroughputController { private final static Log LOG = LogFactory .getLog(PressureAwareCompactionThroughputController.class); @@ -73,51 +67,12 @@ public class PressureAwareCompactionThroughputController extends Configured impl private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000; - /** - * Stores the information of one controlled compaction. - */ - private static final class ActiveCompaction { - - private final long startTime; - - private long lastControlTime; - - private long lastControlSize; - - private long totalSize; - - private long numberOfSleeps; - - private long totalSleepTime; - - // prevent too many debug log - private long lastLogTime; - - ActiveCompaction() { - long currentTime = EnvironmentEdgeManager.currentTime(); - this.startTime = currentTime; - this.lastControlTime = currentTime; - this.lastLogTime = currentTime; - } - } - - private long maxThroughputHigherBound; - - private long maxThroughputLowerBound; + // check compaction throughput every this size + private static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL = + "hbase.hstore.compaction.throughput.control.check.interval"; private long maxThroughputOffpeak; - private OffPeakHours offPeakHours; - - private long controlPerSize; - - private int tuningPeriod; - - volatile double maxThroughput; - - private final ConcurrentMap activeCompactions = - new ConcurrentHashMap(); - @Override public void setup(final RegionServerServices server) { server.getChoreService().scheduleChore( @@ -141,14 +96,14 @@ public class PressureAwareCompactionThroughputController extends Configured impl // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to // calculate the throughput limitation. maxThroughputToSet = - maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound) + maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound) * compactionPressure; } if (LOG.isDebugEnabled()) { LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to " + throughputDesc(maxThroughputToSet)); } - this.maxThroughput = maxThroughputToSet; + this.setMaxThroughput(maxThroughputToSet); } @Override @@ -157,7 +112,7 @@ public class PressureAwareCompactionThroughputController extends Configured impl if (conf == null) { return; } - this.maxThroughputHigherBound = + this.maxThroughputUpperBound = conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND); this.maxThroughputLowerBound = @@ -167,97 +122,32 @@ public class PressureAwareCompactionThroughputController extends Configured impl conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK, DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK); this.offPeakHours = OffPeakHours.getInstance(conf); - this.controlPerSize = this.maxThroughputLowerBound; - this.maxThroughput = this.maxThroughputLowerBound; + this.controlPerSize = + conf.getLong(HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL, + this.maxThroughputLowerBound); + this.setMaxThroughput(this.maxThroughputLowerBound); this.tuningPeriod = getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD); LOG.info("Compaction throughput configurations, higher bound: " - + throughputDesc(maxThroughputHigherBound) + ", lower bound " + + throughputDesc(maxThroughputUpperBound) + ", lower bound " + throughputDesc(maxThroughputLowerBound) + ", off peak: " + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms"); } - private String throughputDesc(long deltaSize, long elapsedTime) { - return throughputDesc((double) deltaSize / elapsedTime * 1000); - } - - private String throughputDesc(double speed) { - if (speed >= 1E15) { // large enough to say it is unlimited - return "unlimited"; - } else { - return String.format("%.2f MB/sec", speed / 1024 / 1024); - } - } - @Override - public void start(String compactionName) { - activeCompactions.put(compactionName, new ActiveCompaction()); + public String toString() { + return "DefaultCompactionThroughputController [maxThroughput=" + + throughputDesc(getMaxThroughput()) + ", activeCompactions=" + activeOperations.size() + + "]"; } @Override - public long control(String compactionName, long size) throws InterruptedException { - ActiveCompaction compaction = activeCompactions.get(compactionName); - compaction.totalSize += size; - long deltaSize = compaction.totalSize - compaction.lastControlSize; - if (deltaSize < controlPerSize) { - return 0; - } - long now = EnvironmentEdgeManager.currentTime(); - double maxThroughputPerCompaction = this.maxThroughput / activeCompactions.size(); - long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms - long elapsedTime = now - compaction.lastControlTime; - compaction.lastControlSize = compaction.totalSize; - if (elapsedTime >= minTimeAllowed) { - compaction.lastControlTime = EnvironmentEdgeManager.currentTime(); - return 0; - } - // too fast - long sleepTime = minTimeAllowed - elapsedTime; - if (LOG.isDebugEnabled()) { - // do not log too much - if (now - compaction.lastLogTime > 60L * 1000) { - LOG.debug(compactionName + " sleep " + sleepTime + " ms because current throughput is " - + throughputDesc(deltaSize, elapsedTime) + ", max allowed is " - + throughputDesc(maxThroughputPerCompaction) + ", already slept " - + compaction.numberOfSleeps + " time(s) and total slept time is " - + compaction.totalSleepTime + " ms till now."); - compaction.lastLogTime = now; - } + protected boolean skipControl(long deltaSize, long controlSize) { + if (deltaSize < controlSize) { + return true; + } else { + return false; } - Thread.sleep(sleepTime); - compaction.numberOfSleeps++; - compaction.totalSleepTime += sleepTime; - compaction.lastControlTime = EnvironmentEdgeManager.currentTime(); - return sleepTime; - } - - @Override - public void finish(String compactionName) { - ActiveCompaction compaction = activeCompactions.remove(compactionName); - long elapsedTime = Math.max(1, EnvironmentEdgeManager.currentTime() - compaction.startTime); - LOG.info(compactionName + " average throughput is " - + throughputDesc(compaction.totalSize, elapsedTime) + ", slept " - + compaction.numberOfSleeps + " time(s) and total slept time is " - + compaction.totalSleepTime + " ms. " + activeCompactions.size() - + " active compactions remaining, total limit is " + throughputDesc(maxThroughput)); - } - - private volatile boolean stopped = false; - - @Override - public void stop(String why) { - stopped = true; - } - - @Override - public boolean isStopped() { - return stopped; - } - - @Override - public String toString() { - return "DefaultCompactionThroughputController [maxThroughput=" + throughputDesc(maxThroughput) - + ", activeCompactions=" + activeCompactions.size() + "]"; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 5f024b8..6900b86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConcatenatedLists; @@ -392,7 +393,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { protected byte[] majorRangeFromRow = null, majorRangeToRow = null; public List execute(StripeCompactor compactor, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { return execute(compactor, throughputController, null); } /** @@ -402,7 +403,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { * @return result of compact(...) */ public abstract List execute(StripeCompactor compactor, - CompactionThroughputController throughputController, User user) throws IOException; + ThroughputController throughputController, User user) throws IOException; public StripeCompactionRequest(CompactionRequest request) { this.request = request; @@ -454,7 +455,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List execute(StripeCompactor compactor, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user); } @@ -505,7 +506,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List execute(StripeCompactor compactor, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 021965c..af1f29f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -53,14 +54,14 @@ public class StripeCompactor extends Compactor { public List compact(CompactionRequest request, List targetBoundaries, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow, throughputController, null); } public List compact(CompactionRequest request, List targetBoundaries, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:"); @@ -77,14 +78,14 @@ public class StripeCompactor extends Compactor { public List compact(CompactionRequest request, int targetCount, long targetSize, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { return compact(request, targetCount, targetSize, left, right, majorRangeFromRow, majorRangeToRow, throughputController, null); } public List compact(CompactionRequest request, int targetCount, long targetSize, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Executing compaction with " + targetSize + " target file size, no more than " + targetCount + " files, in [" @@ -98,7 +99,7 @@ public class StripeCompactor extends Compactor { private List compactInternal(StripeMultiFileWriter mw, final CompactionRequest request, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { final Collection filesToCompact = request.getFiles(); final FileDetails fd = getFileDetails(filesToCompact, request.isMajor()); this.progress = new CompactionProgress(fd.maxKeyCount); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/FlushThroughputControllerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/FlushThroughputControllerFactory.java new file mode 100644 index 0000000..5534794 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/FlushThroughputControllerFactory.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.controller; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.util.ReflectionUtils; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public final class FlushThroughputControllerFactory { + + private static final Log LOG = LogFactory.getLog(FlushThroughputControllerFactory.class); + + public static final String HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY = + "hbase.regionserver.flush.throughput.controller"; + + private static final Class + DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS = NoLimitFlushThroughputController.class; + + private FlushThroughputControllerFactory() { + } + + public static ThroughputController create(RegionServerServices server, + Configuration conf) { + Class clazz = getThroughputControllerClass(conf); + ThroughputController controller = ReflectionUtils.newInstance(clazz, conf); + controller.setup(server); + return controller; + } + + public static Class getThroughputControllerClass( + Configuration conf) { + String className = + conf.get(HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS.getName()); + try { + return Class.forName(className).asSubclass(ThroughputController.class); + } catch (Exception e) { + LOG.warn( + "Unable to load configured flush throughput controller '" + className + + "', load default throughput controller " + + DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e); + return DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/NoLimitFlushThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/NoLimitFlushThroughputController.java new file mode 100644 index 0000000..d16c67d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/NoLimitFlushThroughputController.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.controller; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A dummy ThroughputController that does nothing. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class NoLimitFlushThroughputController extends NoLimitThroughputController { + + public static final NoLimitFlushThroughputController INSTANCE = + new NoLimitFlushThroughputController(); + + @Override + public String toString() { + return "NoLimitFlushThroughputController"; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/NoLimitThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/NoLimitThroughputController.java new file mode 100644 index 0000000..422a1f4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/NoLimitThroughputController.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.controller; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public abstract class NoLimitThroughputController implements ThroughputController { + + @Override + public void setup(RegionServerServices server) { + } + + @Override + public void start(String compactionName) { + } + + @Override + public long control(String compactionName, long size) throws InterruptedException { + return 0; + } + + @Override + public void finish(String compactionName) { + } + + private boolean stopped; + + @Override + public void stop(String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/PressureAwareFlushThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/PressureAwareFlushThroughputController.java new file mode 100644 index 0000000..67a3ea3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/PressureAwareFlushThroughputController.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.controller; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; + +/** + * A throughput controller which uses the follow schema to limit throughput + *
      + *
    • If flush pressure is greater than or equal to 1.0, no limitation.
    • + *
    • In normal case, the max throughput is tuned between + * {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND} and + * {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND}, using the formula "lower + + * (upper - lower) * flushPressure", where flushPressure is in range [0.0, 1.0)
    • + *
    + * @see org.apache.hadoop.hbase.regionserver.HRegionServer#getFlushPressure() + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class PressureAwareFlushThroughputController extends PressureAwareThroughputController { + + private static final Log LOG = LogFactory.getLog(PressureAwareFlushThroughputController.class); + + public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND = + "hbase.hstore.flush.throughput.upper.bound"; + + private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND = + 200L * 1024 * 1024; + + public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND = + "hbase.hstore.flush.throughput.lower.bound"; + + private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND = + 100L * 1024 * 1024; + + public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD = + "hbase.hstore.flush.throughput.tune.period"; + + private static final int DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD = 20 * 1000; + + // check flush throughput every this size + public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL = + "hbase.hstore.flush.throughput.control.check.interval"; + + private static final long DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL = + 10L * 1024 * 1024;// 10MB + + @Override + public void setup(final RegionServerServices server) { + server.getChoreService().scheduleChore( + new ScheduledChore("FlushThroughputTuner", this, tuningPeriod, this.tuningPeriod) { + + @Override + protected void chore() { + tune(server.getFlushPressure()); + } + }); + } + + private void tune(double flushPressure) { + double maxThroughputToSet; + if (flushPressure >= 1.0) { + // set to unlimited if global memstore size already exceeds lower limit + maxThroughputToSet = Double.MAX_VALUE; + } else { + // flushPressure is between 0.0 and 1.0, we use a simple linear formula to + // calculate the throughput limitation. + maxThroughputToSet = + maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound) + * flushPressure; + } + if (LOG.isDebugEnabled()) { + LOG.debug("flushPressure is " + flushPressure + ", tune flush throughput to " + + throughputDesc(maxThroughputToSet)); + } + this.setMaxThroughput(maxThroughputToSet); + } + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + if (conf == null) { + return; + } + this.maxThroughputUpperBound = + conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, + DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND); + this.maxThroughputLowerBound = + conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, + DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND); + this.offPeakHours = OffPeakHours.getInstance(conf); + this.controlPerSize = + conf.getLong(HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL, + DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL); + this.setMaxThroughput(this.maxThroughputLowerBound); + this.tuningPeriod = + getConf().getInt(HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, + DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD); + LOG.info("Flush throughput configurations, upper bound: " + + throughputDesc(maxThroughputUpperBound) + ", lower bound " + + throughputDesc(maxThroughputLowerBound) + ", tuning period: " + tuningPeriod + " ms"); + } + + @Override + public String toString() { + return "DefaultFlushController [maxThroughput=" + throughputDesc(getMaxThroughput()) + + ", activeFlushNumber=" + activeOperations.size() + "]"; + } + + @Override + protected boolean skipControl(long deltaSize, long controlSize) { + // for flush, we control the flow no matter whether the flush size is small + return false; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/PressureAwareThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/PressureAwareThroughputController.java new file mode 100644 index 0000000..59ecb70 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/PressureAwareThroughputController.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.controller; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public abstract class PressureAwareThroughputController extends Configured implements + ThroughputController, Stoppable { + private static final Log LOG = LogFactory.getLog(PressureAwareThroughputController.class); + + /** + * Stores the information of one controlled compaction. + */ + private static final class ActiveOperation { + + private final long startTime; + + private long lastControlTime; + + private long lastControlSize; + + private long totalSize; + + private long numberOfSleeps; + + private long totalSleepTime; + + // prevent too many debug log + private long lastLogTime; + + ActiveOperation() { + long currentTime = EnvironmentEdgeManager.currentTime(); + this.startTime = currentTime; + this.lastControlTime = currentTime; + this.lastLogTime = currentTime; + } + } + + protected long maxThroughputUpperBound; + + protected long maxThroughputLowerBound; + + protected OffPeakHours offPeakHours; + + protected long controlPerSize; + + protected int tuningPeriod; + + private volatile double maxThroughput; + + protected final ConcurrentMap activeOperations = + new ConcurrentHashMap(); + + @Override + abstract public void setup(final RegionServerServices server); + + protected String throughputDesc(long deltaSize, long elapsedTime) { + return throughputDesc((double) deltaSize / elapsedTime * 1000); + } + + protected String throughputDesc(double speed) { + if (speed >= 1E15) { // large enough to say it is unlimited + return "unlimited"; + } else { + return String.format("%.2f MB/sec", speed / 1024 / 1024); + } + } + + @Override + public void start(String opName) { + activeOperations.put(opName, new ActiveOperation()); + } + + @Override + public long control(String opName, long size) throws InterruptedException { + ActiveOperation operation = activeOperations.get(opName); + operation.totalSize += size; + long deltaSize = operation.totalSize - operation.lastControlSize; + if (deltaSize < controlPerSize) { + return 0; + } + long now = EnvironmentEdgeManager.currentTime(); + double maxThroughputPerCompaction = this.getMaxThroughput() / activeOperations.size(); + long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms + long elapsedTime = now - operation.lastControlTime; + operation.lastControlSize = operation.totalSize; + if (elapsedTime >= minTimeAllowed) { + operation.lastControlTime = EnvironmentEdgeManager.currentTime(); + return 0; + } + // too fast + long sleepTime = minTimeAllowed - elapsedTime; + if (LOG.isDebugEnabled()) { + // do not log too much + if (now - operation.lastLogTime > 5L * 1000) { + LOG.debug("deltaSize: " + deltaSize + " bytes; elapseTime: " + elapsedTime + " ns"); + LOG.debug(opName + " sleep " + sleepTime + " ms because current throughput is " + + throughputDesc(deltaSize, elapsedTime) + ", max allowed is " + + throughputDesc(maxThroughputPerCompaction) + ", already slept " + + operation.numberOfSleeps + " time(s) and total slept time is " + + operation.totalSleepTime + " ms till now."); + operation.lastLogTime = now; + } + } + Thread.sleep(sleepTime); + operation.numberOfSleeps++; + operation.totalSleepTime += sleepTime; + operation.lastControlTime = EnvironmentEdgeManager.currentTime(); + return sleepTime; + } + + /** + * Check whether to skip control given delta size and control size + * @param deltaSize Delta size since last control + * @param controlSize Size limit to perform control + * @return a boolean indicates whether to skip this control + */ + abstract protected boolean skipControl(long deltaSize, long controlSize); + + @Override + public void finish(String opName) { + ActiveOperation operation = activeOperations.remove(opName); + long elapsedTime = EnvironmentEdgeManager.currentTime() - operation.startTime; + LOG.info(opName + " average throughput is " + + throughputDesc(operation.totalSize, elapsedTime) + ", slept " + + operation.numberOfSleeps + " time(s) and total slept time is " + + operation.totalSleepTime + " ms. " + activeOperations.size() + + " active operations remaining, total limit is " + throughputDesc(getMaxThroughput())); + } + + private volatile boolean stopped = false; + + @Override + public void stop(String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } + + public double getMaxThroughput() { + return maxThroughput; + } + + public void setMaxThroughput(double maxThroughput) { + this.maxThroughput = maxThroughput; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/ThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/ThroughputController.java new file mode 100644 index 0000000..b736e62 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/controller/ThroughputController.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.controller; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +/** + * A utility that constrains the total throughput of one or more simultaneous flows by + * sleeping when necessary. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public interface ThroughputController extends Stoppable { + + /** + * Setup controller for the given region server. + */ + void setup(RegionServerServices server); + + /** + * Start the throughput controller. + */ + void start(String name); + + /** + * Control the throughput. Will sleep if too fast. + * @return the actual sleep time. + */ + long control(String name, long size) throws InterruptedException; + + /** + * Finish the controller. Should call this method in a finally block. + */ + void finish(String name); +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 0986ad7..c8c1901 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -317,4 +318,13 @@ public class MockRegionServerServices implements RegionServerServices { // TODO Auto-generated method stub return null; } + + public ThroughputController getFlushThroughputController() { + return null; + } + + @Override + public double getFlushPressure() { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 94a63d8..5815494 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -44,7 +43,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -122,7 +121,7 @@ public class TestIOFencing { @Override public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { try { return super.compact(compaction, store, throughputController); } finally { @@ -132,7 +131,7 @@ public class TestIOFencing { @Override public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { try { return super.compact(compaction, store, throughputController, user); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index e20c4ad..9ac9e79 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -60,12 +60,12 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -235,7 +235,7 @@ public class TestRegionObserverScannerOpenHook { @Override public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { boolean ret = super.compact(compaction, store, throughputController); if (ret) compactionStateChangeLatch.countDown(); return ret; @@ -243,7 +243,7 @@ public class TestRegionObserverScannerOpenHook { @Override public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { boolean ret = super.compact(compaction, store, throughputController, user); if (ret) compactionStateChangeLatch.countDown(); return ret; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 32f644b..f515ce5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -662,4 +663,13 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { // TODO Auto-generated method stub return null; } + + public ThroughputController getFlushThroughputController() { + return null; + } + + @Override + public double getFlushPressure() { + return 0; + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index b374bdc..9b3c3dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -58,10 +58,10 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -363,13 +363,13 @@ public class TestCompaction { } @Override - public List compact(CompactionThroughputController throughputController) + public List compact(ThroughputController throughputController) throws IOException { return compact(throughputController, null); } @Override - public List compact(CompactionThroughputController throughputController, User user) + public List compact(ThroughputController throughputController, User user) throws IOException { finishCompaction(this.selectedFiles); return new ArrayList(); @@ -421,13 +421,13 @@ public class TestCompaction { } @Override - public List compact(CompactionThroughputController throughputController) + public List compact(ThroughputController throughputController) throws IOException { return compact(throughputController, null); } @Override - public List compact(CompactionThroughputController throughputController, User user) + public List compact(ThroughputController throughputController, User user) throws IOException { try { isInCompact = true; @@ -510,10 +510,10 @@ public class TestCompaction { HRegion r = mock(HRegion.class); when( r.compact(any(CompactionContext.class), any(Store.class), - any(CompactionThroughputController.class), any(User.class))).then(new Answer() { + any(ThroughputController.class), any(User.class))).then(new Answer() { public Boolean answer(InvocationOnMock invocation) throws Throwable { invocation.getArgumentAt(0, CompactionContext.class).compact( - invocation.getArgumentAt(2, CompactionThroughputController.class)); + invocation.getArgumentAt(2, ThroughputController.class)); return true; } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index 1454aa8..ff2143e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -77,7 +77,7 @@ public class TestStripeStoreEngine { when( mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class), - any(CompactionThroughputController.class), any(User.class))) + any(ThroughputController.class), any(User.class))) .thenReturn(new ArrayList()); // Produce 3 L0 files. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java index 4456ef2..bed67c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java @@ -204,7 +204,7 @@ public class TestCompactionWithThroughputController { PressureAwareCompactionThroughputController throughputController = (PressureAwareCompactionThroughputController) regionServer.compactSplitThread .getCompactionThroughputController(); - assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON); + assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); Table table = conn.getTable(tableName); for (int i = 0; i < 5; i++) { byte[] value = new byte[0]; @@ -212,19 +212,19 @@ public class TestCompactionWithThroughputController { TEST_UTIL.flush(tableName); } Thread.sleep(2000); - assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON); + assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); byte[] value1 = new byte[0]; table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1)); TEST_UTIL.flush(tableName); Thread.sleep(2000); - assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON); + assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); byte[] value = new byte[0]; table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value)); TEST_UTIL.flush(tableName); Thread.sleep(2000); - assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON); + assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitCompactionThroughputController.class.getName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/controller/TestFlushWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/controller/TestFlushWithThroughputController.java new file mode 100644 index 0000000..27eefbe --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/controller/TestFlushWithThroughputController.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.controller; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreEngine; +import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestFlushWithThroughputController { + + private static final Log LOG = LogFactory.getLog(TestFlushWithThroughputController.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final double EPSILON = 1E-6; + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store generateAndFlushData() throws IOException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTable table = TEST_UTIL.createTable(tableName, family); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[256 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + } + return getStoreWithName(tableName); + } + + private long testFlushWithThroughputLimit() throws Exception { + long throughputLimit = 1L * 1024 * 1024; + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + PressureAwareFlushThroughputController.class.getName()); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, + throughputLimit); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, + throughputLimit); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL, + throughputLimit); + TEST_UTIL.startMiniCluster(1); + try { + long startTime = System.nanoTime(); + Store store = generateAndFlushData(); + assertEquals(10, store.getStorefilesCount()); + long duration = System.nanoTime() - startTime; + double throughput = (double) store.getStorefilesSize() / duration * 1000 * 1000 * 1000; + LOG.debug("Throughput is: " + (throughput / 1024 / 1024) + " MB/s"); + // confirm that the speed limit work properly(not too fast, and also not too slow) + // 20% is the max acceptable error rate. + assertTrue(throughput < throughputLimit * 1.2); + assertTrue(throughput > throughputLimit * 0.8); + return duration; + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + private long testFlushWithoutThroughputLimit() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + NoLimitFlushThroughputController.class.getName()); + TEST_UTIL.startMiniCluster(1); + try { + long startTime = System.nanoTime(); + Store store = generateAndFlushData(); + assertEquals(10, store.getStorefilesCount()); + long duration = System.nanoTime() - startTime; + double throughput = (double) store.getStorefilesSize() / duration * 1000 * 1000 * 1000; + LOG.debug("Throughput w/o limit is: " + (throughput / 1024 / 1024) + " MB/s"); + return duration; + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testFlushControl() throws Exception { + long limitTime = testFlushWithThroughputLimit(); + long noLimitTime = testFlushWithoutThroughputLimit(); + LOG.info("With 1M/s limit, flush use " + (limitTime / 1000000) + + "ms; without limit, flush use " + (noLimitTime / 1000000) + "ms"); + // Commonly if multiple region flush at the same time, the throughput could be very high + // but flush in this test is in serial, so we use a weak assumption. + assertTrue(limitTime > 2 * noLimitTime); + } + + /** + * Test the tuning task of {@link PressureAwareFlushThroughputController} + */ + @Test + public void testFlushThroughputTuning() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, + 20L * 1024 * 1024); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, + 10L * 1024 * 1024); + conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + PressureAwareFlushThroughputController.class.getName()); + conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, + 3000); + TEST_UTIL.startMiniCluster(1); + assertEquals(10L * 1024 * 1024, + ((PressureAwareThroughputController) TEST_UTIL.getMiniHBaseCluster().getRegionServer(0) + .getFlushThroughputController()).getMaxThroughput(), EPSILON); + Connection conn = ConnectionFactory.createConnection(conf); + try { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + htd.setCompactionEnabled(false); + TEST_UTIL.getHBaseAdmin().createTable(htd); + TEST_UTIL.waitTableAvailable(tableName); + HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); + PressureAwareFlushThroughputController throughputController = + (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController(); + Table table = conn.getTable(tableName); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[256 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + } + Thread.sleep(5000); + double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure()); + assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON); + + conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + NoLimitFlushThroughputController.class.getName()); + regionServer.onConfigurationChange(conf); + assertTrue(throughputController.isStopped()); + assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitFlushThroughputController); + } finally { + conn.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } + + /** + * Test the logic for striped store. + */ + @Test + public void testFlushControlForStripedStore() throws Exception { + TEST_UTIL.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY, + StripeStoreEngine.class.getName()); + testFlushControl(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 549a018..98599cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -64,7 +64,19 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.*; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; +import org.apache.hadoop.hbase.regionserver.FlushRequestListener; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.controller.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -644,11 +656,11 @@ public class TestWALReplay { } @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status) throws IOException { + MonitoredTask status, ThroughputController throughputController) throws IOException { if (throwExceptionWhenFlushing.get()) { throw new IOException("Simulated exception by tests"); } - return super.flushSnapshot(snapshot, cacheFlushId, status); + return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController); } };