diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 43bc5d4..38dda11 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; @@ -86,7 +85,6 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy; import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RegionSplitter; @@ -672,6 +670,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { */ public static class WALMapperSearcher extends WALMapper { private SortedSet keysToFind; + private AtomicInteger rows = new AtomicInteger(0); @Override public void setup(Mapper.Context context) @@ -693,8 +692,16 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { boolean b = this.keysToFind.contains(row); if (b) { String keyStr = Bytes.toStringBinary(row); - LOG.info("Found cell=" + cell); - context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); + try { + LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); + } catch (IOException|InterruptedException e) { + LOG.warn(e); + } + if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { + context.getCounter(FOUND_GROUP_KEY, keyStr + "_in_" + + context.getInputSplit().toString()).increment(1); + } + context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); } return b; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java index 8d64f2a..5723919 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.Region.Operation; /** * This flush region implementation uses the distributed procedure framework to flush diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index 7664dee..a78d6ee 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -272,7 +272,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur } /** - * This attempts to cancel out all pending and in progress tasks (interruptions issues) + * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running + * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877). * @throws InterruptedException */ void cancelTasks() throws InterruptedException { @@ -289,13 +290,14 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur } /** - * Abruptly shutdown the thread pool. Call when exiting a region server. + * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be + * interrupted (see HBASE-13877) */ void stop() { if (this.stopped) return; this.stopped = true; - this.executor.shutdownNow(); + this.executor.shutdown(); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0e83dd3..9ded3b5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -478,6 +478,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED. * @return true if the memstores were flushed, else false. */ + @Override public boolean isFlushSucceeded() { return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result .FLUSHED_COMPACTION_NEEDED; @@ -487,6 +488,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED. * @return True if the flush requested a compaction, else false (doesn't even mean it flushed). */ + @Override public boolean isCompactionNeeded() { return result == Result.FLUSHED_COMPACTION_NEEDED; } @@ -1110,7 +1112,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public long getNumMutationsWithoutWAL() { return numMutationsWithoutWAL.get(); } - + @Override public long getDataInMemoryWithoutWAL() { return dataInMemoryWithoutWAL.get(); @@ -2330,6 +2332,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Bytes.toStringBinary(getRegionInfo().getRegionName())); dse.initCause(t); status.abort("Flush failed: " + StringUtils.stringifyException(t)); + + if (rsServices != null) { + // MemstoreFlusher already causes abort, but in case flush is called from another thread + // we should still cause abort otherwise it will be dataloss since memstore snapshots are + // not cleared + rsServices.abort("Replay of WAL required. Forcing server shutdown", dse); + } + throw dse; } @@ -2367,7 +2377,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.info(msg); status.setStatus(msg); - return new FlushResultImpl(compactionRequested ? + return new FlushResultImpl(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED : FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId); @@ -5354,7 +5364,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi moreValues = nextInternal(tmpList, scannerContext); outResults.addAll(tmpList); } - + // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows @@ -6475,6 +6485,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return results; } + @Override public void mutateRow(RowMutations rm) throws IOException { // Don't need nonces here - RowMutations only supports puts and deletes mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); @@ -6501,6 +6512,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * rowsToLock is sorted in order to avoid deadlocks. * @throws IOException */ + @Override public void mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException { MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); @@ -7499,6 +7511,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** @return the coprocessor host */ + @Override public RegionCoprocessorHost getCoprocessorHost() { return coprocessorHost; }