diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 43bc5d4..38dda11 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; @@ -86,7 +85,6 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy; import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RegionSplitter; @@ -672,6 +670,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { */ public static class WALMapperSearcher extends WALMapper { private SortedSet keysToFind; + private AtomicInteger rows = new AtomicInteger(0); @Override public void setup(Mapper.Context context) @@ -693,8 +692,16 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { boolean b = this.keysToFind.contains(row); if (b) { String keyStr = Bytes.toStringBinary(row); - LOG.info("Found cell=" + cell); - context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); + try { + LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); + } catch (IOException|InterruptedException e) { + LOG.warn(e); + } + if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { + context.getCounter(FOUND_GROUP_KEY, keyStr + "_in_" + + context.getInputSplit().toString()).increment(1); + } + context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); } return b; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java index 8d64f2a..5723919 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.Region.Operation; /** * This flush region implementation uses the distributed procedure framework to flush diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index 7664dee..a441a6b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -33,7 +33,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.errorhandling.ForeignException; @@ -157,7 +159,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur FLUSH_REQUEST_WAKE_MILLIS_DEFAULT); FlushTableSubprocedurePool taskManager = - new FlushTableSubprocedurePool(rss.getServerName().toString(), conf); + new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, involvedRegions, table, taskManager); } @@ -195,13 +197,15 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur * failures. */ static class FlushTableSubprocedurePool { + private final Abortable abortable; private final ExecutorCompletionService taskPool; private final ThreadPoolExecutor executor; private volatile boolean stopped; private final List> futures = new ArrayList>(); private final String name; - FlushTableSubprocedurePool(String name, Configuration conf) { + FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) { + this.abortable = abortable; // configure the executor service long keepAlive = conf.getLong( RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY, @@ -259,9 +263,13 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur } // we are stopped so we can just exit. } catch (ExecutionException e) { - if (e.getCause() instanceof ForeignException) { + Throwable cause = e.getCause(); + if (cause instanceof ForeignException) { LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e); throw (ForeignException)e.getCause(); + } else if (cause instanceof DroppedSnapshotException) { + // we have to abort the region server according to contract of flush + abortable.abort("Received DroppedSnapshotException, aborting", cause); } LOG.warn("Got Exception in FlushSubprocedurePool", e); throw new ForeignException(name, e.getCause()); @@ -272,7 +280,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur } /** - * This attempts to cancel out all pending and in progress tasks (interruptions issues) + * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running + * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877). * @throws InterruptedException */ void cancelTasks() throws InterruptedException { @@ -289,13 +298,14 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur } /** - * Abruptly shutdown the thread pool. Call when exiting a region server. + * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be + * interrupted (see HBASE-13877) */ void stop() { if (this.stopped) return; this.stopped = true; - this.executor.shutdownNow(); + this.executor.shutdown(); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0e83dd3..c49e87b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -478,6 +478,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED. * @return true if the memstores were flushed, else false. */ + @Override public boolean isFlushSucceeded() { return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result .FLUSHED_COMPACTION_NEEDED; @@ -487,6 +488,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED. * @return True if the flush requested a compaction, else false (doesn't even mean it flushed). */ + @Override public boolean isCompactionNeeded() { return result == Result.FLUSHED_COMPACTION_NEEDED; } @@ -1110,7 +1112,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public long getNumMutationsWithoutWAL() { return numMutationsWithoutWAL.get(); } - + @Override public long getDataInMemoryWithoutWAL() { return dataInMemoryWithoutWAL.get(); @@ -1277,6 +1279,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * vector if already closed and null if judged that it should not close. * * @throws IOException e + * @throws DroppedSnapshotException Thrown when replay of wal is required + * because a Snapshot was not properly persisted. The region is put in closing mode, and the + * caller MUST abort after this. */ public Map> close() throws IOException { return close(false); @@ -1314,6 +1319,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * we are not to close at this time or we are already closed. * * @throws IOException e + * @throws DroppedSnapshotException Thrown when replay of wal is required + * because a Snapshot was not properly persisted. The region is put in closing mode, and the + * caller MUST abort after this. */ public Map> close(final boolean abort) throws IOException { // Only allow one thread to close at a time. Serialize them so dual @@ -1832,7 +1840,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of wal is required - * because a Snapshot was not properly persisted. + * because a Snapshot was not properly persisted. The region is put in closing mode, and the + * caller MUST abort after this. */ public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker) throws IOException { @@ -2330,6 +2339,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Bytes.toStringBinary(getRegionInfo().getRegionName())); dse.initCause(t); status.abort("Flush failed: " + StringUtils.stringifyException(t)); + + // Callers for flushcache() should catch DroppedSnapshotException and abort the region server. + // However, since we may have the region read lock, we cannot call close(true) here since + // we cannot promote to a write lock. Instead we are setting closing so that all other region + // operations except for close will be rejected. + this.closing.set(true); + + if (rsServices != null) { + // This is a safeguard against the case where the caller fails to explicitly handle aborting + rsServices.abort("Replay of WAL required. Forcing server shutdown", dse); + } + throw dse; } @@ -2367,7 +2388,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.info(msg); status.setStatus(msg); - return new FlushResultImpl(compactionRequested ? + return new FlushResultImpl(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED : FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId); @@ -5354,7 +5375,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi moreValues = nextInternal(tmpList, scannerContext); outResults.addAll(tmpList); } - + // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows @@ -6475,6 +6496,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return results; } + @Override public void mutateRow(RowMutations rm) throws IOException { // Don't need nonces here - RowMutations only supports puts and deletes mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); @@ -6501,6 +6523,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * rowsToLock is sorted in order to avoid deadlocks. * @throws IOException */ + @Override public void mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException { MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); @@ -7499,6 +7522,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** @return the coprocessor host */ + @Override public RegionCoprocessorHost getCoprocessorHost() { return coprocessorHost; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 504ad03..615efb2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -956,7 +956,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Configuration getConfiguration() { return regionServer.getConfiguration(); } - + private RegionServerQuotaManager getQuotaManager() { return regionServer.getRegionServerQuotaManager(); } @@ -1330,6 +1330,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible); return MergeRegionsResponse.newBuilder().build(); + } catch (DroppedSnapshotException ex) { + regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); + throw new ServiceException(ex); } catch (IOException ie) { throw new ServiceException(ie); } @@ -1765,6 +1768,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ((HRegion)region).forceSplit(splitPoint); regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit()); return SplitRegionResponse.newBuilder().build(); + } catch (DroppedSnapshotException ex) { + regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); + throw new ServiceException(ex); } catch (IOException ie) { throw new ServiceException(ie); } @@ -2438,7 +2444,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); } } - + quota.addScanResult(results); // If the scanner's filter - if any - is done with the scan diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 8910042..e61a186 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -136,7 +136,7 @@ public interface Region extends ConfigurationObserver { */ long getOldestHfileTs(boolean majorCompactioOnly) throws IOException; - /** + /** * @return map of column family names to max sequence id that was read from storage when this * region was opened */ @@ -157,7 +157,7 @@ public interface Region extends ConfigurationObserver { /////////////////////////////////////////////////////////////////////////// // Metrics - + /** @return read requests count for this region */ long getReadRequestsCount(); @@ -181,7 +181,7 @@ public interface Region extends ConfigurationObserver { /** @return the number of mutations processed bypassing the WAL */ long getNumMutationsWithoutWAL(); - + /** @return the size of data processed bypassing the WAL, in bytes */ long getDataInMemoryWithoutWAL(); @@ -216,7 +216,7 @@ public interface Region extends ConfigurationObserver { /** * This method needs to be called before any public call that reads or - * modifies data. + * modifies data. * Acquires a read lock and checks if the region is closing or closed. *

{@link #closeRegionOperation} MUST then always be called after * the operation has completed, whether it succeeded or failed. @@ -226,7 +226,7 @@ public interface Region extends ConfigurationObserver { /** * This method needs to be called before any public call that reads or - * modifies data. + * modifies data. * Acquires a read lock and checks if the region is closing or closed. *

{@link #closeRegionOperation} MUST then always be called after * the operation has completed, whether it succeeded or failed. @@ -413,7 +413,7 @@ public interface Region extends ConfigurationObserver { /** * Perform atomic mutations within the region. - * + * * @param mutations The list of mutations to perform. * mutations can contain operations for multiple rows. * Caller has to ensure that all rows are contained in this region. @@ -609,13 +609,13 @@ public interface Region extends ConfigurationObserver { CANNOT_FLUSH_MEMSTORE_EMPTY, CANNOT_FLUSH } - + /** @return the detailed result code */ Result getResult(); /** @return true if the memstores were flushed, else false */ boolean isFlushSucceeded(); - + /** @return True if the flush requested a compaction, else false */ boolean isCompactionNeeded(); } @@ -638,7 +638,8 @@ public interface Region extends ConfigurationObserver { * the region needs compacting * * @throws IOException general io exceptions - * @throws DroppedSnapshotException Thrown when abort is required + * @throws DroppedSnapshotException Thrown when abort is required. The caller MUST catch this + * exception and MUST abort. Any further operation to the region may cause data loss. * because a snapshot was not properly persisted. */ FlushResult flush(boolean force) throws IOException; @@ -647,7 +648,7 @@ public interface Region extends ConfigurationObserver { * Synchronously compact all stores in the region. *

This operation could block for a long time, so don't call it from a * time-sensitive thread. - *

Note that no locks are taken to prevent possible conflicts between + *

Note that no locks are taken to prevent possible conflicts between * compaction and splitting activities. The regionserver does not normally compact * and split in parallel. However by calling this method you may introduce * unexpected and unhandled concurrency. Don't do this unless you know what diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java index a9d5863..62990b0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -93,6 +94,10 @@ class RegionMergeRequest implements Runnable { + (this.server.isStopping() ? " stopping" : " stopped"), e); return; } + if (e instanceof DroppedSnapshotException) { + server.abort("Replay of WAL required. Forcing server shutdown", e); + return; + } try { LOG.warn("Running rollback/cleanup of failed merge of " + region_a +" and "+ region_b + "; " + e.getMessage(), e); @@ -132,7 +137,7 @@ class RegionMergeRequest implements Runnable { try { this.tableLock.release(); } catch (IOException ex) { - LOG.error("Could not release the table lock (something is really wrong). " + LOG.error("Could not release the table lock (something is really wrong). " + "Aborting this server to avoid holding the lock forever."); this.server.abort("Abort; we got an error when releasing the table lock " + "on " + region_a.getRegionInfo().getRegionNameAsString()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java index 8c73a1b..4003f1b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.Bytes; @@ -91,6 +92,10 @@ class SplitRequest implements Runnable { + (this.server.isStopping() ? " stopping" : " stopped"), e); return; } + if (e instanceof DroppedSnapshotException) { + server.abort("Replay of WAL required. Forcing server shutdown", e); + return; + } try { LOG.info("Running rollback/cleanup of failed split of " + parent.getRegionInfo().getRegionNameAsString() + "; " + e.getMessage(), e); @@ -147,7 +152,7 @@ class SplitRequest implements Runnable { try { this.tableLock.release(); } catch (IOException ex) { - LOG.error("Could not release the table lock (something is really wrong). " + LOG.error("Could not release the table lock (something is really wrong). " + "Aborting this server to avoid holding the lock forever."); this.server.abort("Abort; we got an error when releasing the table lock " + "on " + parent.getRegionInfo().getRegionNameAsString()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java index 021c16f..f04feb1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java @@ -35,7 +35,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionReplicaUtil; @@ -184,7 +186,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { switch (snapshot.getType()) { case FLUSH: SnapshotSubprocedurePool taskManager = - new SnapshotSubprocedurePool(rss.getServerName().toString(), conf); + new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss); return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, involvedRegions, snapshot, taskManager); case SKIPFLUSH: @@ -196,7 +198,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { * To minimized the code change, class name is not changed. */ SnapshotSubprocedurePool taskManager2 = - new SnapshotSubprocedurePool(rss.getServerName().toString(), conf); + new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss); return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, involvedRegions, snapshot, taskManager2); @@ -265,13 +267,15 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { * operations such as compactions and replication sinks. */ static class SnapshotSubprocedurePool { + private final Abortable abortable; private final ExecutorCompletionService taskPool; private final ThreadPoolExecutor executor; private volatile boolean stopped; private final List> futures = new ArrayList>(); private final String name; - SnapshotSubprocedurePool(String name, Configuration conf) { + SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) { + this.abortable = abortable; // configure the executor service long keepAlive = conf.getLong( RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY, @@ -331,9 +335,13 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { } // we are stopped so we can just exit. } catch (ExecutionException e) { - if (e.getCause() instanceof ForeignException) { + Throwable cause = e.getCause(); + if (cause instanceof ForeignException) { LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e); throw (ForeignException)e.getCause(); + } else if (cause instanceof DroppedSnapshotException) { + // we have to abort the region server according to contract of flush + abortable.abort("Received DroppedSnapshotException, aborting", cause); } LOG.warn("Got Exception in SnapshotSubprocedurePool", e); throw new ForeignException(name, e.getCause()); @@ -371,7 +379,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { if (this.stopped) return; this.stopped = true; - this.executor.shutdownNow(); + this.executor.shutdown(); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 680e57f..3b023f0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -363,6 +363,7 @@ public class TestHRegion { Assert.fail("Didn't bubble up IOE!"); } catch (DroppedSnapshotException dse) { // What we are expecting + region.closing.set(false); // this is needed for the rest of the test to work } // Make it so all writes succeed from here on out ffs.fault.set(false);