diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java index 22df895..ae310e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver.snapshot; +import java.io.IOException; import java.util.List; import java.util.concurrent.Callable; @@ -24,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.procedure.ProcedureMember; @@ -96,11 +98,27 @@ public class FlushSnapshotSubprocedure extends Subprocedure { LOG.debug("take snapshot without flush memstore first"); } else { LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); - FlushResult res = region.flush(true); - if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { - // CANNOT_FLUSH may mean that a flush is already on-going - // we need to wait for that flush to complete - region.waitForFlushes(); + final int MAX_RETRIES = 3; + boolean succeeded = false; + for (int i = 0; i < MAX_RETRIES; i++) { + long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED); + FlushResult res = region.flush(true); + if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { + // CANNOT_FLUSH may mean that a flush is already on-going + // we need to wait for that flush to complete + region.waitForFlushes(); + if (region.getMaxFlushedSeqId() >= readPt) { + // writes at the start of the snapshot have been persisted + succeeded = true; + break; + } + } else { + succeeded = true; + break; + } + } + if (!succeeded) { + throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts"); } } ((HRegion)region).addRegionToSnapshot(snapshot, monitor);