Index: hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1092,6 +1092,18 @@ public static final String SWIFT_CLIENT_SOCKS_PROXY_HOST_AND_PORT = "hbase.client.swift.socks.proxy.hostAndPort"; + /** + * Key of the number of store-files for blocking flush + */ + public static final String HSTORE_BLOCKING_STORE_FILES_KEY = + "hbase.hstore.blockingStoreFiles"; + + /** + * Key to the maximum wait time in milliseconds for blocking flush + */ + public static final String HSTORE_BLOCKING_WAIT_TIME_KEY = + "hbase.hstore.blockingWaitTime"; + public static final long DEFAULT_HSTORE_BLOCKING_WAIT_TIME = 90000; private HConstants() { // Can't be instantiated with this constructor. } Index: hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java =================================================================== --- hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -37,5 +37,5 @@ * would be flushed. * */ - void request(HRegion region, boolean selectiveFlushRequest); + void request(HRegionIf region, boolean selectiveFlushRequest); } Index: hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -146,7 +146,7 @@ * regionName is a unique identifier for this HRegion. (startKey, endKey] * defines the keyspace for this HRegion. */ -public class HRegion implements HeapSize, ConfigurationObserver { +public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf { public static final Log LOG = LogFactory.getLog(HRegion.class); static final String SPLITDIR = "splits"; static final String MERGEDIR = "merges"; @@ -718,10 +718,8 @@ } } - /** - * @return True if this region has references. - */ - boolean hasReferences() { + @Override + public boolean hasReferences() { for (Store store : this.stores.values()) { for (StoreFile sf : store.getStorefiles()) { // Found a reference, return. @@ -789,6 +787,7 @@ } /** @return a HRegionInfo object for this region */ + @Override public HRegionInfo getRegionInfo() { return this.regionInfo; } @@ -1052,7 +1051,7 @@ return this.lastStoreFlushTimeMap.get(store); } - /** @return how info about the last flushes */ + @Override public List> getRecentFlushInfo() { // only MemStoreFlusher thread should be calling this, so read lock is okay this.splitsAndClosesLock.readLock().lock(); @@ -1344,32 +1343,11 @@ * @throws IOException */ public boolean flushcache() throws IOException { - return flushcache(false); + return flushMemstoreShapshot(false); } - /** - * Flush the cache. - * - * When this method is called the cache will be flushed unless: - *
    - *
  1. the cache is empty
  2. - *
  3. the region is closed.
  4. - *
  5. a flush is already in progress
  6. - *
  7. writes are disabled
  8. - *
- * - *

This method may block for some time, so it should not be called from a - * time-sensitive thread. - * - * @param selectiveFlushRequest If true, selectively flush column families - * which dominate the memstore size, provided it - * is enabled in the configuration. - * - * @return true if cache was flushed - * - * @throws IOException general io exceptions - */ - public boolean flushcache(boolean selectiveFlushRequest) throws IOException { + @Override + public boolean flushMemstoreShapshot(boolean selectiveFlushRequest) throws IOException { // If a selective flush was requested, but the per-column family switch is // off, we cannot do a selective flush. if (selectiveFlushRequest && !perColumnFamilyFlushEnabled) { @@ -2337,6 +2315,7 @@ * @throws IOException * @return true if the new put was execute, false otherwise */ + @SuppressWarnings("deprecation") public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, byte [] expectedValue, Writable w, Integer lockId, boolean writeToWAL) throws IOException{ @@ -2914,6 +2893,9 @@ return this.stores.get(column); } + /** + * @return a map from column family to Store. + */ public Map getStores() { return this.stores; } @@ -3675,6 +3657,7 @@ * @return result * @throws IOException read exceptions */ + @SuppressWarnings("deprecation") public Result get(final Get get, final Integer lockid) throws IOException { // Verify families are all valid if (get.hasFamilies()) { @@ -4157,4 +4140,13 @@ s.updateConfiguration(); } } + + @Override + public int maxStoreFilesCount() { + int res = 0; + for (Store hstore : this.stores.values()) { + res = Math.max(res, hstore.getStorefilesCount()); + } + return res; + } } Index: hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java =================================================================== --- hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java +++ hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java @@ -0,0 +1,77 @@ +/** + * Copyright 2014 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.util.Pair; + +/** + * The interface of a storage instance of a whole region. + * + * The only online server is using HRegion as the implementation. + */ +public interface HRegionIf { + /** + * @return the HRegionInfo of this region + */ + public HRegionInfo getRegionInfo(); + + /** + * Flushes the cache. + * + * When this method is called the cache will be flushed unless: + *

    + *
  1. the cache is empty
  2. + *
  3. the region is closed.
  4. + *
  5. a flush is already in progress
  6. + *
  7. writes are disabled
  8. + *
+ * + *

+ * This method may block for some time, so it should not be called from a + * time-sensitive thread. + * + * @param selectiveFlushRequest If true, selectively flush column families + * which dominate the memstore size, provided it + * is enabled in the configuration. + * + * @return true if cache was flushed + */ + public boolean flushMemstoreShapshot(boolean selectiveFlushRequest) + throws IOException; + + /** + * @return how info about the last flushes + */ + public List> getRecentFlushInfo(); + + /** + * @return True if this region has references. + */ + public boolean hasReferences(); + + /** + * @return the maximum number of files among all stores. + */ + public int maxStoreFilesCount(); +} Index: hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -98,7 +98,6 @@ import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.MultiAction; import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.client.MultiPutResponse; @@ -120,7 +119,6 @@ import org.apache.hadoop.hbase.io.hfile.PreloadThreadPool; import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram; import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket; -import org.apache.hadoop.hbase.io.hfile.histogram.HistogramUtils; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRPCOptions; @@ -176,7 +174,7 @@ * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. */ -public class HRegionServer implements HRegionInterface, +public class HRegionServer implements HRegionInterface, HRegionServerIf, HBaseRPCErrorHandler, Runnable, Watcher, ConfigurationObserver { public static final Log LOG = LogFactory.getLog(HRegionServer.class); private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING); @@ -1322,6 +1320,7 @@ } } + @Override public AtomicLong getGlobalMemstoreSize() { return globalMemstoreSize; } @@ -1459,12 +1458,7 @@ return stop; } - /** - * Checks to see if the file system is still accessible. - * If not, sets abortRequested and stopRequested - * - * @return false if file system is not available - */ + @Override public void checkFileSystem() { long curtime = EnvironmentEdgeManager.currentTimeMillis(); synchronized (lastCheckFSAt){ @@ -1808,9 +1802,7 @@ } } - /** - * @return Region server metrics instance. - */ + @Override public RegionServerMetrics getMetrics() { return this.metrics; } @@ -3468,10 +3460,7 @@ Bytes.mapKey(hr.getRegionInfo().getRegionName()), hr); } - /** - * @return A new Map of online regions sorted by region size with the first - * entry being the biggest. - */ + @Override public SortedMap getCopyOfOnlineRegionsSortedBySize() { // we'll sort the regions in reverse SortedMap sortedRegions = new TreeMap( @@ -4004,7 +3993,7 @@ return null; } - /** @return what the regionserver thread name should be */ + @Override public String getRSThreadName() { return "RS-" + serverInfo.getServerName(); } @@ -4118,10 +4107,20 @@ @Override public HRegionLocation getLocation(byte[] table, byte[] row, boolean reload) - throws IOException { + throws IOException { if (reload) { return regionServerConnection.relocateRegion(new StringBytes(table), row); } return regionServerConnection.locateRegion(new StringBytes(table), row); } + + @Override + public boolean requestSplit(HRegionIf r) { + return this.compactSplitThread.requestSplit((HRegion) r); + } + + @Override + public void requestCompaction(HRegionIf r, String why) { + this.compactSplitThread.requestCompaction((HRegion) r, why); + } } Index: hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java =================================================================== --- hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java +++ hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java @@ -0,0 +1,75 @@ +/** + * Copyright 2014 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.SortedMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics; + +/** + * The interface representing a region server. + * A regions server serves a set of HRegions available to clients. + * + * The only online server is using HRegionServer as the implementation. + */ +public interface HRegionServerIf { + /** + * @return what the regionserver thread name should be + */ + public String getRSThreadName(); + + /** + * Checks to see if the file system is still accessible. + * If not, sets abortRequested and stopRequested + * + * @return false if file system is not available + */ + public void checkFileSystem(); + + /** + * Requests the region server to make a split on a specific region-store. + */ + public boolean requestSplit(HRegionIf r); + + /** + * Requests the region server to make a compaction on a specific region-store. + * + * @param r the region-store. + * @param why Why compaction requested -- used in debug messages + */ + public void requestCompaction(HRegionIf r, String why); + + /** + * @return Region server metrics instance. + */ + public RegionServerMetrics getMetrics(); + + /** + * @return the size of global mem-store in bytes as an AtomicLong. + */ + public AtomicLong getGlobalMemstoreSize(); + + /** + * @return A new SortedMap of online regions sorted by region size with the + * first entry being the biggest. + */ + public SortedMap getCopyOfOnlineRegionsSortedBySize(); +} Index: hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -19,30 +19,29 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.conf.ConfigurationObserver; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.DaemonThreadFactory; -import org.apache.hadoop.hbase.util.HasThread; -import org.apache.hadoop.util.StringUtils; - import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.SortedMap; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.Future; -import java.util.concurrent.Delayed; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.hadoop.hbase.util.DaemonThreadFactory; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.StringUtils; /** * Thread that flushes cache on request @@ -55,14 +54,10 @@ */ class MemStoreFlusher implements FlushRequester, ConfigurationObserver { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); - // These two data members go together. Any entry in the one must have - // a corresponding entry in the other. - private final Map regionsInQueue = - new HashMap(); - - private final boolean perColumnFamilyFlushEnabled; - private final HRegionServer server; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Map>> + regionsInQueue = new HashMap<>(); + + private final HRegionServerIf server; protected final long globalMemStoreLimit; protected final long globalMemStoreLimitLowMark; @@ -73,20 +68,18 @@ "hbase.regionserver.global.memstore.upperLimit"; private static final String LOWER_KEY = "hbase.regionserver.global.memstore.lowerLimit"; - private long blockingStoreFilesNumber; + + private int blockingStoreFilesNumber; private long blockingWaitTime; private int handlerCount; - private final ThreadPoolExecutor flushes; - Map futures = new HashMap(); + private final ScheduledThreadPoolExecutor threadPool; /** * @param conf * @param server */ - public MemStoreFlusher(final Configuration conf, - final HRegionServer server) { - super(); + public MemStoreFlusher(final Configuration conf, HRegionServerIf server) { this.server = server; long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER, @@ -99,28 +92,26 @@ } this.globalMemStoreLimitLowMark = lower; this.blockingStoreFilesNumber = - conf.getInt("hbase.hstore.blockingStoreFiles", -1); + conf.getInt(HConstants.HSTORE_BLOCKING_STORE_FILES_KEY, -1); if (this.blockingStoreFilesNumber == -1) { this.blockingStoreFilesNumber = 1 + conf.getInt("hbase.hstore.compactionThreshold", 3); } - this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", - 90000); + this.blockingWaitTime = + conf.getLong(HConstants.HSTORE_BLOCKING_WAIT_TIME_KEY, + HConstants.DEFAULT_HSTORE_BLOCKING_WAIT_TIME); - this.perColumnFamilyFlushEnabled = conf.getBoolean( - HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, - HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH); // number of "memstore flusher" threads per region server this.handlerCount = conf.getInt(HConstants.FLUSH_THREADS, HConstants.DEFAULT_FLUSH_THREADS); LOG.info("globalMemStoreLimit=" + StringUtils.humanReadableInt(this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) + ", maxHeap=" + StringUtils.humanReadableInt(max)); - this.flushes = new ThreadPoolExecutor(handlerCount, handlerCount, - 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), + this.threadPool = new ScheduledThreadPoolExecutor(handlerCount, new DaemonThreadFactory("flush-thread-")); + this.threadPool.setMaximumPoolSize(handlerCount); } /** @@ -147,58 +138,70 @@ return (long)(max * limit); } - public void request(HRegion r, boolean isSelective) { + @Override + public void request(HRegionIf r, boolean isSelective) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush // queue. It'll come out near immediately. FlushQueueEntry fqe = new FlushQueueEntry(r, isSelective); - this.regionsInQueue.put(r, fqe); - executeFlushQueueEntry(fqe); + regionsInQueue.put(r, Pair.newPair(fqe, (Future) null)); + executeFlushQueueEntry(fqe, 0); + } else { + LOG.info("Flush for " + r + " already scheduled."); } } } - protected void executeFlushQueueEntry(final FlushQueueEntry fqe) { - Runnable runnable = new Runnable() { + /** + * Called synchronized with regionsInQueue + */ + protected void executeFlushQueueEntry(final FlushQueueEntry fqe, long msDelay) { + Callable callable = new Callable() { @Override - public void run() { - try { - String name = String.format("%s.cacheFlusher.%d", MemStoreFlusher.this.server.getRSThreadName(), - MemStoreFlusher.this.flushes.getCorePoolSize() + 1); - if (!flushRegion(fqe, name)) { - LOG.warn("Failed to flush " + fqe.region); - } - } catch (Exception ex) { - LOG.error("Cache flush failed" + - (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""), - ex - ); - server.checkFileSystem(); + public Boolean call() throws Exception { + String name = + String.format("%s.cacheFlusher.%d", + MemStoreFlusher.this.server.getRSThreadName(), + Thread.currentThread().getId()); + if (!flushRegion(fqe, name)) { + LOG.warn("Failed to flush " + fqe.region); + return false; } + return true; } }; - futures.put(fqe, this.flushes.submit(runnable)); + + LOG.debug("Schedule a flush request " + fqe + " with delay " + msDelay + + "ms"); + + Future future = + this.threadPool.schedule(callable, msDelay, TimeUnit.MILLISECONDS); + Pair> pair = + regionsInQueue.get(fqe.region); + if (pair != null) { + pair.setSecond(future); + } } /** * Only interrupt once it's done with a run through the work loop. */ void interruptIfNecessary() { - flushes.shutdown(); + threadPool.shutdown(); } boolean isAlive() { - return !flushes.isShutdown(); + return !threadPool.isShutdown(); } void join() { boolean done = false; while (!done) { try { - done = flushes.awaitTermination(60, TimeUnit.SECONDS); LOG.debug("Waiting for flush thread to finish..."); + done = threadPool.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException ie) { LOG.error("Interrupted waiting for flush thread to finish..."); } @@ -209,25 +212,26 @@ * A flushRegion that checks store file count. If too many, puts the flush * on delay queue to retry later. * @param fqe - * @return true if the region was successfully flushed, false otherwise. If + * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the log was * not flushed. */ private boolean flushRegion(final FlushQueueEntry fqe, String why) { - HRegion region = fqe.region; + HRegionIf region = fqe.region; if (!fqe.region.getRegionInfo().isMetaRegion() && - isTooManyStoreFiles(region)) { + region.maxStoreFilesCount() > this.blockingStoreFilesNumber) { if (fqe.isMaximumWait(this.blockingWaitTime)) { - LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) + - "ms on a compaction to clean up 'too many store files'; waited " + - "long enough... proceeding with flush of " + - region.getRegionNameAsString()); + LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) + + "ms on a compaction to clean up 'too many store files'; waited " + + "long enough... proceeding with flush of " + + region.getRegionInfo().getRegionNameAsString()); } else { // If this is first time we've been put off, then emit a log message. if (fqe.getRequeueCount() <= 0) { // Note: We don't impose blockingStoreFiles constraint on meta regions - LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + - "store files; delaying flush up to " + this.blockingWaitTime + "ms"); + LOG.warn("Region " + region.getRegionInfo().getRegionNameAsString() + + " has too many store files; delaying flush up to " + + this.blockingWaitTime + "ms"); } /* If a split has been requested, we avoid scheduling a compaction @@ -237,80 +241,58 @@ * references to parent regions are removed, and we can split this * region further. */ - if (!this.server.compactSplitThread.requestSplit(region) - || region.hasReferences()) { - this.server.compactSplitThread.requestCompaction(region, why); + if (!this.server.requestSplit(region) || region.hasReferences()) { + this.server.requestCompaction(region, why); } - // Put back on the queue. Have it come back out of the queue - // after a delay of this.blockingWaitTime / 100 ms. - executeFlushQueueEntry(fqe.requeue(this.blockingWaitTime / 100)); - // Tell a lie, it's not flushed but it's ok + + synchronized (this.regionsInQueue) { + // Put back on the queue. Have it come back out of the queue + // after a delay of this.blockingWaitTime / 100 ms. + executeFlushQueueEntry(fqe.requeue(), this.blockingWaitTime / 100); + } + // Tell a lie, it's not flushed but it's OK return true; } } - return flushRegion(region, why, false, fqe.isSelectiveFlushRequest()); + try { + return flushRegionNow(region, why, fqe.selective()); + } finally { + // the task is executed, remove from regionsInQueue + synchronized (this.regionsInQueue) { + this.regionsInQueue.remove(region); + } + } } /** * Flush a region. * @param region Region to flush. - * @param emergencyFlush Set if we are being force flushed. If true the region - * needs to be removed from the flush queue. If false, when we were called - * from the main flusher run loop and we got the entry to flush by calling - * poll on the flush queue (which removed it). * @param selectiveFlushRequest Do we want to selectively flush only the * column families that dominate the memstore size? * * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the log was * not flushed. */ - private boolean flushRegion(final HRegion region, String why, - final boolean emergencyFlush, boolean selectiveFlushRequest) { - - synchronized (this.regionsInQueue) { - FlushQueueEntry fqe = this.regionsInQueue.remove(region); - if (fqe != null && emergencyFlush) { - Future future = futures.get(fqe); - if (future != null) { - try { - future.get(); - if (region.flushcache(selectiveFlushRequest)) { - server.compactSplitThread.requestCompaction(region, why); - } - server.getMetrics().addFlush(region.getRecentFlushInfo()); - } catch (IOException ex) { - LOG.warn("Cache flush failed" + - (region != null ? (" for region " + - Bytes.toString(region.getRegionName())) : ""), - RemoteExceptionHandler.checkIOException(ex) - ); - server.checkFileSystem(); - return false; - } catch (InterruptedException e) { - LOG.warn("Flush failed" + - (region != null ? (" for region " + - Bytes.toString(region.getRegionName())) : "")); - } catch (ExecutionException e) { - LOG.warn("Flush failed" + - (region != null ? (" for region " + - Bytes.toString(region.getRegionName())) : "")); - } finally { - lock.readLock().unlock(); - } - } + private boolean flushRegionNow(HRegionIf region, String why, + boolean selectiveFlushRequest) { + try { + boolean res = region.flushMemstoreShapshot(selectiveFlushRequest); + if (res) { + server.requestCompaction(region, why); } + server.getMetrics().addFlush(region.getRecentFlushInfo()); + return res; + } catch (IOException ex) { + LOG.warn( + "Cache flush failed" + + (region != null + ? (" for region " + region.getRegionInfo().getRegionNameAsString()) + : ""), + RemoteExceptionHandler.checkIOException(ex)); + server.checkFileSystem(); + return false; } - return true; - } - - private boolean isTooManyStoreFiles(HRegion region) { - for (Store hstore: region.stores.values()) { - if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { - return true; - } - } - return false; } /** @@ -325,21 +307,57 @@ } } + /** + * Makes an emergency flush. + * + * If a flush request is found in the queue, these method will wait for that + * flush to be finished. Otherwise a flush will be performed in the current + * thread by calling to {@code #flushRegionNow(IRegion, String, boolean)} + */ + private boolean doEmergencyFlush(HRegionIf region, String why, + boolean selectiveFlushRequest) { + Pair> pair; + synchronized (regionsInQueue) { + pair = regionsInQueue.get(region); + } + if (pair != null) { + // Already has flush request, wait for its finish. + try { + return pair.getSecond().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info("Interrupted waiting for flushing of " + region, e); + return false; + } catch (ExecutionException e) { + // This should not happen actually, all exception should have been + // caught in Callable. + LOG.info("ExecutionException caught for flushing of " + region, e); + return false; + } + } + + // Perform a flush in current thread + return flushRegionNow(region, why, selectiveFlushRequest); + } + /* * Emergency! Need to flush memory. */ private synchronized void flushSomeRegions() { if (this.server.getGlobalMemstoreSize().get() < globalMemStoreLimit) { return; // double check the global memstore size inside of the synchronized block. } - + // keep flushing until we hit the low water mark long globalMemStoreSize = -1; ArrayList regionsToCompact = new ArrayList(); - for (SortedMap m = + SortedMap m = this.server.getCopyOfOnlineRegionsSortedBySize(); - (globalMemStoreSize = this.server.getGlobalMemstoreSize().get()) >= - this.globalMemStoreLimitLowMark;) { + while (true) { + globalMemStoreSize = this.server.getGlobalMemstoreSize().get(); + if (globalMemStoreSize < this.globalMemStoreLimitLowMark) { + break; + } // flush the region with the biggest memstore if (m.size() <= 0) { LOG.info("No online regions to flush though we've been asked flush " + @@ -356,52 +374,50 @@ " exceeded; currently " + StringUtils.humanReadableInt(globalMemStoreSize) + " and flushing till " + StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark)); - if (!flushRegion(biggestMemStoreRegion, "emergencyFlush", true, false)) { + if (!doEmergencyFlush(biggestMemStoreRegion, "emergencyFlush", false)) { LOG.warn("Flush failed"); break; } regionsToCompact.add(biggestMemStoreRegion); } for (HRegion region : regionsToCompact) { - server.compactSplitThread.requestCompaction(region, "emergencyFlush"); + server.requestCompaction(region, "emergencyFlush"); } } /** - * Datastructure used in the flush queue. Holds region and retry count. - * Keeps tabs on how old this object is. Implements {@link Delayed}. On + * Data structure used in the flush queue. Holds region and retry count. + * Keeps tabs on how old this object is. Implements {@link Delayed}. On * construction, the delay is zero. When added to a delay queue, we'll come - * out near immediately. Call {@link #requeue(long)} passing delay in - * milliseconds before readding to delay queue if you want it to stay there + * out near immediately. Call {@link #requeue(long)} passing delay in + * milliseconds before reading to delay queue if you want it to stay there * a while. */ - static class FlushQueueEntry implements Delayed { - private final HRegion region; + static class FlushQueueEntry { + private final HRegionIf region; private final long createTime; - private long whenToExpire; private int requeueCount = 0; - private boolean selectiveFlushRequest; + private boolean selective; /** * @param r The region to flush - * @param selectiveFlushRequest Do we want to flush only the column + * @param selective Do we want to flush only the column * families that dominate the memstore size, * i.e., do a selective flush? If we are * doing log rolling, then we should not do a * selective flush. */ - FlushQueueEntry(final HRegion r, boolean selectiveFlushRequest) { + FlushQueueEntry(final HRegionIf r, boolean selective) { this.region = r; this.createTime = System.currentTimeMillis(); - this.whenToExpire = this.createTime; - this.selectiveFlushRequest = selectiveFlushRequest; + this.selective = selective; } /** * @return Is this a request for a selective flush? */ - public boolean isSelectiveFlushRequest() { - return selectiveFlushRequest; + public boolean selective() { + return selective; } /** @@ -419,29 +435,21 @@ public int getRequeueCount() { return this.requeueCount; } - + /** - * @param when When to expire, when to come up out of the queue. - * Specify in milliseconds. This method adds System.currentTimeMillis() - * to whatever you pass. - * @return This. + * Increases the requeue count. + * + * @return this. */ - public FlushQueueEntry requeue(final long when) { - this.whenToExpire = System.currentTimeMillis() + when; + public FlushQueueEntry requeue() { this.requeueCount++; return this; } @Override - public long getDelay(TimeUnit unit) { - return unit.convert(this.whenToExpire - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); - } - - @Override - public int compareTo(Delayed other) { - return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - - other.getDelay(TimeUnit.MILLISECONDS)).intValue(); + public String toString() { + return "{regin: " + region + ", created: " + createTime + ", requeue: " + + requeueCount + ", selective: " + selective + "}"; } } @@ -451,12 +459,11 @@ // number of "memstore flusher" threads per region server int handlerCount = newConf.getInt(HConstants.FLUSH_THREADS, HConstants.DEFAULT_FLUSH_THREADS); if(this.handlerCount != handlerCount){ - LOG.info("Changing the value of " + HConstants.FLUSH_THREADS + - " from " + this.handlerCount + " to " + - handlerCount); + LOG.info("Changing the value of " + HConstants.FLUSH_THREADS + " from " + + this.handlerCount + " to " + handlerCount); } - this.flushes.setMaximumPoolSize(handlerCount); - this.flushes.setCorePoolSize(handlerCount); + this.threadPool.setMaximumPoolSize(handlerCount); + this.threadPool.setCorePoolSize(handlerCount); this.handlerCount = handlerCount; } @@ -467,7 +474,35 @@ * @return */ protected int getFlushThreadNum() { - return this.flushes.getCorePoolSize(); + return this.threadPool.getCorePoolSize(); } + /** + * Waits for all current request to be done. + * Used only in testcases. + */ + void waitAllRequestDone() throws ExecutionException, InterruptedException { + while (true) { + // Fetch futures. + List> futures = new ArrayList<>(); + synchronized (this.regionsInQueue) { + for (Pair> pair : + regionsInQueue.values()) { + futures.add(pair.getSecond()); + } + } + + if (futures.size() == 0) { + // No more requests, quit + return; + } + + // Wait for futures + for (Future future : futures) { + future.get(); + } + // This is a loop because some new requests may be generated during + // executing current requests. + } + } } Index: hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java =================================================================== --- hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java +++ hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java @@ -164,7 +164,7 @@ (cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize)); // Flush! - region.flushcache(true); + region.flushMemstoreShapshot(true); // Will use these to check if anything changed. long oldCF2MemstoreSize = cf2MemstoreSize; @@ -205,7 +205,7 @@ oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); // Flush again - region.flushcache(true); + region.flushMemstoreShapshot(true); // Recalculate everything cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); @@ -228,7 +228,7 @@ // In that case, we should flush all the CFs. // Clearing the existing memstores. - region.flushcache(false); + region.flushMemstoreShapshot(false); // The memstore limit is 200*1024 and the column family flush threshold is // around 50*1024. We try to just hit the memstore limit with each CF's @@ -241,7 +241,7 @@ region.put(createPut(5, i)); } - region.flushcache(true); + region.flushMemstoreShapshot(true); // Since we won't find any CF above the threshold, and hence no specific // store to flush, we should flush all the memstores. Assert.assertEquals(0, region.getMemstoreSize().get()); @@ -254,7 +254,7 @@ * FM1 FM2 FM3 */ // flush all - region.flushcache(false); + region.flushMemstoreShapshot(false); region.put(createPut(2, 1)); region.put(createPut(3, 1)); @@ -279,7 +279,7 @@ * FM1 FM2 FM3 */ // flush, should only flush family 1 - region.flushcache(true); + region.flushMemstoreShapshot(true); final long sizeFamily1_2 = region.getStore(FAMILY1).getMemStoreSize(); final long sizeFamily2_2 = region.getStore(FAMILY2).getMemStoreSize(); @@ -295,7 +295,7 @@ * FM1 FM2 FM3 */ // flush all - region.flushcache(false); + region.flushMemstoreShapshot(false); region.put(createPut(2, 1)); region.put(createPut(3, 1)); @@ -318,7 +318,7 @@ * FM1 FM2 FM3 */ // flush, should flush all - region.flushcache(true); + region.flushMemstoreShapshot(true); final long sizeFamily1_4 = region.getStore(FAMILY1).getMemStoreSize(); final long sizeFamily2_4 = region.getStore(FAMILY2).getMemStoreSize(); @@ -417,7 +417,7 @@ (cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize)); // Flush! - region.flushcache(true); + region.flushMemstoreShapshot(true); cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); @@ -501,7 +501,7 @@ desiredRegion != null); // Flush the region selectively. - desiredRegion.flushcache(true); + desiredRegion.flushMemstoreShapshot(true); long totalMemstoreSize; long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; Index: hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java =================================================================== --- hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java +++ hadoop/branches/titan/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java @@ -0,0 +1,176 @@ +/** + * Copyright 2014 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.SortedMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.StringBytes; +import org.junit.Test; + +/** + * Testcases for MemStoreFlusher + */ +public class TestMemStoreFlusher { + private static class HRegionServerMock implements HRegionServerIf { + RegionServerMetrics metrics; + public HRegionServerMock(Configuration conf) { + metrics = new RegionServerMetrics(conf); + } + + AtomicLong globalMemstoreSize = new AtomicLong(0); + @Override + public String getRSThreadName() { + return "RSThread"; + } + + @Override + public void checkFileSystem() { + } + + @Override + public boolean requestSplit(HRegionIf r) { + return false; + } + + @Override + public RegionServerMetrics getMetrics() { + return metrics; + } + + @Override + public AtomicLong getGlobalMemstoreSize() { + return globalMemstoreSize; + } + + @Override + public SortedMap getCopyOfOnlineRegionsSortedBySize() { + return null; + } + + @Override + public void requestCompaction(HRegionIf r, String why) { + } + + } + + private static class HRegionMock implements HRegionIf { + private final HRegionInfo info; + final AtomicInteger flushedCount = new AtomicInteger(); + /** + * The number of checking maxStoreFilesCount times + */ + final AtomicInteger checkingCount = new AtomicInteger(); + final int maxStoreFilesCount; + + public HRegionMock(StringBytes tableName, int maxStoreFilesCount) { + info = new HRegionInfo(new HTableDescriptor(tableName.getBytes()), + null, null); + this.maxStoreFilesCount = maxStoreFilesCount; + } + + @Override + public boolean flushMemstoreShapshot(boolean selectiveFlushRequest) + throws IOException { + flushedCount.addAndGet(1); + return true; + } + + @Override + public List> getRecentFlushInfo() { + return new ArrayList<>(); + } + + @Override + public HRegionInfo getRegionInfo() { + return info; + } + + @Override + public boolean hasReferences() { + return false; + } + + @Override + public int maxStoreFilesCount() { + checkingCount.addAndGet(1); + return maxStoreFilesCount; + } + } + + @Test + public void testFlush() throws Exception { + final Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.HSTORE_BLOCKING_STORE_FILES_KEY, "1"); + + HRegionServerMock server = new HRegionServerMock(conf); + MemStoreFlusher flusher = new MemStoreFlusher(conf, server); + + HRegionMock region = new HRegionMock(new StringBytes("testFlush"), 1); + + // Make a flush request + flusher.request(region, false); + + flusher.waitAllRequestDone(); + Assert.assertEquals("Number of flushing", 1, region.flushedCount.get()); + } + + /** + * In MemStoreFlusher, if there are too many store files, a delay should be + * performed waiting for some files to be merged. This case assure this delay + * is performed. + */ + @Test + public void testDelay() throws Exception { + final long blockingWaitTime = 2000; + + final Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.HSTORE_BLOCKING_STORE_FILES_KEY, "1"); + conf.set(HConstants.HSTORE_BLOCKING_WAIT_TIME_KEY, "" + blockingWaitTime); + + HRegionServerMock server = new HRegionServerMock(conf); + MemStoreFlusher flusher = new MemStoreFlusher(conf, server); + + // 3 is larger than 1 + HRegionMock region = new HRegionMock(new StringBytes("testDelay"), 3); + + // Make a flush request + flusher.request(region, false); + + flusher.waitAllRequestDone(); + + Assert.assertEquals("Number of flushing", 1, region.flushedCount.get()); + Assert.assertTrue("Number of checking should <= " + 100 + ", but got " + + region.checkingCount.get(), region.checkingCount.get() <= 100); + } +}