diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 61d532b..279b80e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1586,6 +1586,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + public void waitForFlushes() { + synchronized (writestate) { + if (this.writestate.readOnly) { + // we should not wait for replayed flushed if we are read only (for example in case the + // region is a secondary replica). + return; + } + if (!writestate.flushing) return; + long start = System.currentTimeMillis(); + boolean interrupted = false; + try { + while (writestate.flushing) { + LOG.debug("waiting for cache flush to complete for region " + this); + try { + writestate.wait(); + } catch (InterruptedException iex) { + // essentially ignore and propagate the interrupt back up + LOG.warn("Interrupted while waiting"); + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + long duration = System.currentTimeMillis() - start; + LOG.debug("Waited " + duration + " ms for flush to complete"); + } + } protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( final String threadNamePrefix) { int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java index f083601..5669452 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver.snapshot; +import java.io.IOException; import java.util.List; import java.util.concurrent.Callable; @@ -24,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.procedure.ProcedureMember; @@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; @@ -52,6 +55,9 @@ public class FlushSnapshotSubprocedure extends Subprocedure { private final SnapshotSubprocedurePool taskManager; private boolean snapshotSkipFlush = false; + // the maximum number of attempts we flush + final static int MAX_RETRIES = 3; + public FlushSnapshotSubprocedure(ProcedureMember member, ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, List regions, SnapshotDescription snapshot, @@ -96,7 +102,30 @@ public class FlushSnapshotSubprocedure extends Subprocedure { LOG.debug("take snapshot without flush memstore first"); } else { LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); - region.flush(true); + boolean succeeded = false; + long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED); + for (int i = 0; i < MAX_RETRIES; i++) { + FlushResult res = region.flush(true); + if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { + if (region instanceof HRegion) { + HRegion hreg = (HRegion) region; + // CANNOT_FLUSH may mean that a flush is already on-going + // we need to wait for that flush to complete + hreg.waitForFlushes(); + } + if (region.getMaxFlushedSeqId() >= readPt) { + // writes at the start of the snapshot have been persisted + succeeded = true; + break; + } + } else { + succeeded = true; + break; + } + } + if (!succeeded) { + throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts"); + } } ((HRegion)region).addRegionToSnapshot(snapshot, monitor); if (snapshotSkipFlush) {