From 01bd995334c15e58c72ec4f940fc8a0a51cc2aee Mon Sep 17 00:00:00 2001 From: nihed Date: Fri, 28 Jul 2017 11:27:20 +0200 Subject: [PATCH] HBASE-18451 PeriodicMemstoreFlusher should inspect the queue before adding a delayed flush request --- .../org/apache/hadoop/hbase/regionserver/FlushRequester.java | 6 ++++-- .../org/apache/hadoop/hbase/regionserver/HRegionServer.java | 11 ++++++----- .../org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java | 9 +++++++-- .../hadoop/hbase/regionserver/TestHeapMemoryManager.java | 7 ++++--- .../hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java | 8 ++++---- 5 files changed, 25 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java index c7e155a3bb..ce5423f3b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -32,8 +32,9 @@ public interface FlushRequester { * @param region the Region requesting the cache flush * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log * rolling. + * @return true if our region is added on the queue, false if not */ - void requestFlush(Region region, boolean forceFlushAllStores); + boolean requestFlush(Region region, boolean forceFlushAllStores); /** * Tell the listener the cache needs to be flushed after a delay @@ -42,8 +43,9 @@ public interface FlushRequester { * @param delay after how much time should the flush happen * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log * rolling. + * @return true if our region is added on the queue, false if not */ - void requestDelayedFlush(Region region, long delay, boolean forceFlushAllStores); + boolean requestDelayedFlush(Region region, long delay, boolean forceFlushAllStores); /** * Register a FlushRequestListener diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 184f070399..c8273b4f07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1819,14 +1819,15 @@ public class HRegionServer extends HasThread implements FlushRequester requester = server.getFlushRequester(); if (requester != null) { long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME; - LOG.info(getName() + " requesting flush of " + - r.getRegionInfo().getRegionNameAsString() + " because " + - whyFlush.toString() + - " after random delay " + randomDelay + "ms"); //Throttle the flushes by putting a delay. If we don't throttle, and there //is a balanced write-load on the regions in a table, we might end up //overwhelming the filesystem with too many flushes at once. - requester.requestDelayedFlush(r, randomDelay, false); + if (requester.requestDelayedFlush(r, randomDelay, false)) { + LOG.info(getName() + " requesting flush of " + + r.getRegionInfo().getRegionNameAsString() + " because " + + whyFlush.toString() + + " after random delay " + randomDelay + "ms"); + } } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 72dfdb6440..1f6efc1220 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -351,7 +351,7 @@ class MemStoreFlusher implements FlushRequester { } @Override - public void requestFlush(Region r, boolean forceFlushAllStores) { + public boolean requestFlush(Region r, boolean forceFlushAllStores) { ((HRegion)r).incrementFlushesQueuedCount(); synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { @@ -360,12 +360,15 @@ class MemStoreFlusher implements FlushRequester { FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); + return true; } + return false; } } @Override - public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) { + + public boolean requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) { ((HRegion)r).incrementFlushesQueuedCount(); synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { @@ -374,7 +377,9 @@ class MemStoreFlusher implements FlushRequester { fqe.requeue(delay); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); + return true; } + return false; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index e1a8d5fb64..40cc734c22 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -789,13 +789,14 @@ public class TestHeapMemoryManager { } @Override - public void requestFlush(Region region, boolean forceFlushAllStores) { + public boolean requestFlush(Region region, boolean forceFlushAllStores) { this.listener.flushRequested(flushType, region); + return true; } @Override - public void requestDelayedFlush(Region region, long delay, boolean forceFlushAllStores) { - + public boolean requestDelayedFlush(Region region, long delay, boolean forceFlushAllStores) { + return true; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index be725fe669..6b25bc2fca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1106,18 +1106,18 @@ public abstract class AbstractTestWALReplay { private HRegion r; @Override - public void requestFlush(Region region, boolean force) { + public boolean requestFlush(Region region, boolean force) { try { r.flush(force); + return true; } catch (IOException e) { throw new RuntimeException("Exception flushing", e); } } @Override - public void requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) { - // TODO Auto-generated method stub - + public boolean requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) { + return true; } @Override -- 2.12.2