diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 38dda11..41a38ef 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -220,7 +220,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster - private static final int MISSING_ROWS_TO_LOG = 2; // YARN complains when too many counters + private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters private static final int WIDTH_DEFAULT = 1000000; private static final int WRAP_DEFAULT = 25; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index a78d6ee..a441a6b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -33,7 +33,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.errorhandling.ForeignException; @@ -157,7 +159,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur FLUSH_REQUEST_WAKE_MILLIS_DEFAULT); FlushTableSubprocedurePool taskManager = - new FlushTableSubprocedurePool(rss.getServerName().toString(), conf); + new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, involvedRegions, table, taskManager); } @@ -195,13 +197,15 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur * failures. */ static class FlushTableSubprocedurePool { + private final Abortable abortable; private final ExecutorCompletionService taskPool; private final ThreadPoolExecutor executor; private volatile boolean stopped; private final List> futures = new ArrayList>(); private final String name; - FlushTableSubprocedurePool(String name, Configuration conf) { + FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) { + this.abortable = abortable; // configure the executor service long keepAlive = conf.getLong( RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY, @@ -259,9 +263,13 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur } // we are stopped so we can just exit. } catch (ExecutionException e) { - if (e.getCause() instanceof ForeignException) { + Throwable cause = e.getCause(); + if (cause instanceof ForeignException) { LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e); throw (ForeignException)e.getCause(); + } else if (cause instanceof DroppedSnapshotException) { + // we have to abort the region server according to contract of flush + abortable.abort("Received DroppedSnapshotException, aborting", cause); } LOG.warn("Got Exception in FlushSubprocedurePool", e); throw new ForeignException(name, e.getCause()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9ded3b5..4fe9741 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1279,6 +1279,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * vector if already closed and null if judged that it should not close. * * @throws IOException e + * @throws DroppedSnapshotException Thrown when replay of wal is required + * because a Snapshot was not properly persisted. The region is put in closing mode, and the + * caller MUST abort after this. */ public Map> close() throws IOException { return close(false); @@ -1316,6 +1319,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * we are not to close at this time or we are already closed. * * @throws IOException e + * @throws DroppedSnapshotException Thrown when replay of wal is required + * because a Snapshot was not properly persisted. The region is put in closing mode, and the + * caller MUST abort after this. */ public Map> close(final boolean abort) throws IOException { // Only allow one thread to close at a time. Serialize them so dual @@ -1334,6 +1340,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + /** + * Exposed for some very specific unit tests. + */ + @VisibleForTesting + public void setClosing(boolean closing) { + this.closing.set(closing); + } + private Map> doClose(final boolean abort, MonitoredTask status) throws IOException { if (isClosed()) { @@ -1834,7 +1848,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of wal is required - * because a Snapshot was not properly persisted. + * because a Snapshot was not properly persisted. The region is put in closing mode, and the + * caller MUST abort after this. */ public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker) throws IOException { @@ -2333,10 +2348,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi dse.initCause(t); status.abort("Flush failed: " + StringUtils.stringifyException(t)); + // Callers for flushcache() should catch DroppedSnapshotException and abort the region server. + // However, since we may have the region read lock, we cannot call close(true) here since + // we cannot promote to a write lock. Instead we are setting closing so that all other region + // operations except for close will be rejected. + this.closing.set(true); + if (rsServices != null) { - // MemstoreFlusher already causes abort, but in case flush is called from another thread - // we should still cause abort otherwise it will be dataloss since memstore snapshots are - // not cleared + // This is a safeguard against the case where the caller fails to explicitly handle aborting rsServices.abort("Replay of WAL required. Forcing server shutdown", dse); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 504ad03..615efb2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -956,7 +956,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Configuration getConfiguration() { return regionServer.getConfiguration(); } - + private RegionServerQuotaManager getQuotaManager() { return regionServer.getRegionServerQuotaManager(); } @@ -1330,6 +1330,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible); return MergeRegionsResponse.newBuilder().build(); + } catch (DroppedSnapshotException ex) { + regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); + throw new ServiceException(ex); } catch (IOException ie) { throw new ServiceException(ie); } @@ -1765,6 +1768,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ((HRegion)region).forceSplit(splitPoint); regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit()); return SplitRegionResponse.newBuilder().build(); + } catch (DroppedSnapshotException ex) { + regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); + throw new ServiceException(ex); } catch (IOException ie) { throw new ServiceException(ie); } @@ -2438,7 +2444,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); } } - + quota.addScanResult(results); // If the scanner's filter - if any - is done with the scan diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 8910042..e61a186 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -136,7 +136,7 @@ public interface Region extends ConfigurationObserver { */ long getOldestHfileTs(boolean majorCompactioOnly) throws IOException; - /** + /** * @return map of column family names to max sequence id that was read from storage when this * region was opened */ @@ -157,7 +157,7 @@ public interface Region extends ConfigurationObserver { /////////////////////////////////////////////////////////////////////////// // Metrics - + /** @return read requests count for this region */ long getReadRequestsCount(); @@ -181,7 +181,7 @@ public interface Region extends ConfigurationObserver { /** @return the number of mutations processed bypassing the WAL */ long getNumMutationsWithoutWAL(); - + /** @return the size of data processed bypassing the WAL, in bytes */ long getDataInMemoryWithoutWAL(); @@ -216,7 +216,7 @@ public interface Region extends ConfigurationObserver { /** * This method needs to be called before any public call that reads or - * modifies data. + * modifies data. * Acquires a read lock and checks if the region is closing or closed. *

{@link #closeRegionOperation} MUST then always be called after * the operation has completed, whether it succeeded or failed. @@ -226,7 +226,7 @@ public interface Region extends ConfigurationObserver { /** * This method needs to be called before any public call that reads or - * modifies data. + * modifies data. * Acquires a read lock and checks if the region is closing or closed. *

{@link #closeRegionOperation} MUST then always be called after * the operation has completed, whether it succeeded or failed. @@ -413,7 +413,7 @@ public interface Region extends ConfigurationObserver { /** * Perform atomic mutations within the region. - * + * * @param mutations The list of mutations to perform. * mutations can contain operations for multiple rows. * Caller has to ensure that all rows are contained in this region. @@ -609,13 +609,13 @@ public interface Region extends ConfigurationObserver { CANNOT_FLUSH_MEMSTORE_EMPTY, CANNOT_FLUSH } - + /** @return the detailed result code */ Result getResult(); /** @return true if the memstores were flushed, else false */ boolean isFlushSucceeded(); - + /** @return True if the flush requested a compaction, else false */ boolean isCompactionNeeded(); } @@ -638,7 +638,8 @@ public interface Region extends ConfigurationObserver { * the region needs compacting * * @throws IOException general io exceptions - * @throws DroppedSnapshotException Thrown when abort is required + * @throws DroppedSnapshotException Thrown when abort is required. The caller MUST catch this + * exception and MUST abort. Any further operation to the region may cause data loss. * because a snapshot was not properly persisted. */ FlushResult flush(boolean force) throws IOException; @@ -647,7 +648,7 @@ public interface Region extends ConfigurationObserver { * Synchronously compact all stores in the region. *

This operation could block for a long time, so don't call it from a * time-sensitive thread. - *

Note that no locks are taken to prevent possible conflicts between + *

Note that no locks are taken to prevent possible conflicts between * compaction and splitting activities. The regionserver does not normally compact * and split in parallel. However by calling this method you may introduce * unexpected and unhandled concurrency. Don't do this unless you know what diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java index a9d5863..62990b0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -93,6 +94,10 @@ class RegionMergeRequest implements Runnable { + (this.server.isStopping() ? " stopping" : " stopped"), e); return; } + if (e instanceof DroppedSnapshotException) { + server.abort("Replay of WAL required. Forcing server shutdown", e); + return; + } try { LOG.warn("Running rollback/cleanup of failed merge of " + region_a +" and "+ region_b + "; " + e.getMessage(), e); @@ -132,7 +137,7 @@ class RegionMergeRequest implements Runnable { try { this.tableLock.release(); } catch (IOException ex) { - LOG.error("Could not release the table lock (something is really wrong). " + LOG.error("Could not release the table lock (something is really wrong). " + "Aborting this server to avoid holding the lock forever."); this.server.abort("Abort; we got an error when releasing the table lock " + "on " + region_a.getRegionInfo().getRegionNameAsString()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java index 8c73a1b..4003f1b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.Bytes; @@ -91,6 +92,10 @@ class SplitRequest implements Runnable { + (this.server.isStopping() ? " stopping" : " stopped"), e); return; } + if (e instanceof DroppedSnapshotException) { + server.abort("Replay of WAL required. Forcing server shutdown", e); + return; + } try { LOG.info("Running rollback/cleanup of failed split of " + parent.getRegionInfo().getRegionNameAsString() + "; " + e.getMessage(), e); @@ -147,7 +152,7 @@ class SplitRequest implements Runnable { try { this.tableLock.release(); } catch (IOException ex) { - LOG.error("Could not release the table lock (something is really wrong). " + LOG.error("Could not release the table lock (something is really wrong). " + "Aborting this server to avoid holding the lock forever."); this.server.abort("Abort; we got an error when releasing the table lock " + "on " + parent.getRegionInfo().getRegionNameAsString()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java index 021c16f..f04feb1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java @@ -35,7 +35,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionReplicaUtil; @@ -184,7 +186,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { switch (snapshot.getType()) { case FLUSH: SnapshotSubprocedurePool taskManager = - new SnapshotSubprocedurePool(rss.getServerName().toString(), conf); + new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss); return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, involvedRegions, snapshot, taskManager); case SKIPFLUSH: @@ -196,7 +198,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { * To minimized the code change, class name is not changed. */ SnapshotSubprocedurePool taskManager2 = - new SnapshotSubprocedurePool(rss.getServerName().toString(), conf); + new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss); return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, involvedRegions, snapshot, taskManager2); @@ -265,13 +267,15 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { * operations such as compactions and replication sinks. */ static class SnapshotSubprocedurePool { + private final Abortable abortable; private final ExecutorCompletionService taskPool; private final ThreadPoolExecutor executor; private volatile boolean stopped; private final List> futures = new ArrayList>(); private final String name; - SnapshotSubprocedurePool(String name, Configuration conf) { + SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) { + this.abortable = abortable; // configure the executor service long keepAlive = conf.getLong( RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY, @@ -331,9 +335,13 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { } // we are stopped so we can just exit. } catch (ExecutionException e) { - if (e.getCause() instanceof ForeignException) { + Throwable cause = e.getCause(); + if (cause instanceof ForeignException) { LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e); throw (ForeignException)e.getCause(); + } else if (cause instanceof DroppedSnapshotException) { + // we have to abort the region server according to contract of flush + abortable.abort("Received DroppedSnapshotException, aborting", cause); } LOG.warn("Got Exception in SnapshotSubprocedurePool", e); throw new ForeignException(name, e.getCause()); @@ -371,7 +379,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { if (this.stopped) return; this.stopped = true; - this.executor.shutdownNow(); + this.executor.shutdown(); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 9d529a7..5717163 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -367,6 +367,7 @@ public class TestHRegion { Assert.fail("Didn't bubble up IOE!"); } catch (DroppedSnapshotException dse) { // What we are expecting + region.closing.set(false); // this is needed for the rest of the test to work } // Make it so all writes succeed from here on out ffs.fault.set(false); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 3535ba2..a8aa4d3 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -718,6 +718,8 @@ public class TestWALReplay { + t.getMessage()); // simulated to abort server Mockito.doReturn(true).when(rsServices).isAborted(); + region.setClosing(false); // region normally does not accept writes after + // DroppedSnapshotException. We mock around it for this test. } // writing more data int moreRow = 10;