diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a4a7537..e9ac00b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1757,6 +1757,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + @Override + public long waitForFlushes() { + synchronized (writestate) { + if (this.writestate.readOnly) { + // we should not wait for replayed flushed if we are read only (for example in case the + // region is a secondary replica). + return -1; + } + if (!writestate.flushing) return 0; + long start = System.currentTimeMillis(); + boolean interrupted = false; + try { + while (writestate.flushing) { + LOG.debug("waiting for cache flush to complete for region " + this); + try { + writestate.wait(); + } catch (InterruptedException iex) { + // essentially ignore and propagate the interrupt back up + LOG.warn("Interrupted while waiting"); + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + return System.currentTimeMillis() - start; + } + } protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( final String threadNamePrefix) { int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 295b825..7298558 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -745,4 +745,8 @@ public interface Region extends ConfigurationObserver { /** Wait for all current flushes and compactions of the region to complete */ void waitForFlushesAndCompactions(); + /** Wait for all current flushes of the region to complete + * @return the duration in milliseconds of the wait + */ + long waitForFlushes(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java index 9c42e4d..18db09a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; @@ -95,7 +96,17 @@ public class FlushSnapshotSubprocedure extends Subprocedure { LOG.debug("take snapshot without flush memstore first"); } else { LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); - region.flush(true); + FlushResult res = region.flush(true); + if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { + long duration = region.waitForFlushes(); + if (duration == 0) { + String msg = "Region " + region.toString() + " couldn't flush"; + LOG.debug(msg); + throw new ForeignException(getMemberName(), msg); + } else if (duration > 0) { + LOG.debug("Waited " + duration + " ms for flush to complete"); + } + } } ((HRegion)region).addRegionToSnapshot(snapshot, monitor); if (snapshotSkipFlush) {