diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a4a7537..f58729d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1757,6 +1757,37 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + @Override + public void waitForFlushes() { + synchronized (writestate) { + if (this.writestate.readOnly) { + // we should not wait for replayed flushed if we are read only (for example in case the + // region is a secondary replica). + return; + } + if (!writestate.flushing) return; + long start = System.currentTimeMillis(); + boolean interrupted = false; + try { + while (writestate.flushing) { + LOG.debug("waiting for cache flush to complete for region " + this); + try { + writestate.wait(); + } catch (InterruptedException iex) { + // essentially ignore and propagate the interrupt back up + LOG.warn("Interrupted while waiting"); + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + long duration = System.currentTimeMillis() - start; + LOG.debug("Waited " + duration + " ms for flush to complete"); + } + } protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( final String threadNamePrefix) { int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 295b825..5ff5e52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -745,4 +745,7 @@ public interface Region extends ConfigurationObserver { /** Wait for all current flushes and compactions of the region to complete */ void waitForFlushesAndCompactions(); + /** Wait for all current flushes of the region to complete + */ + void waitForFlushes(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java index 9c42e4d..22df895 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; @@ -95,7 +96,12 @@ public class FlushSnapshotSubprocedure extends Subprocedure { LOG.debug("take snapshot without flush memstore first"); } else { LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); - region.flush(true); + FlushResult res = region.flush(true); + if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { + // CANNOT_FLUSH may mean that a flush is already on-going + // we need to wait for that flush to complete + region.waitForFlushes(); + } } ((HRegion)region).addRegionToSnapshot(snapshot, monitor); if (snapshotSkipFlush) {