diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 104c5da..e685ce9 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; @@ -86,7 +85,6 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy; import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RegionSplitter; @@ -184,7 +182,29 @@ import com.google.common.collect.Sets; * * Delete - A standalone program that deletes a single node * - * This class can be run as a unit test, as an integration test, or from the command line + * Search - A standalone program that searches the missing keys as reported from Verify in WALs + * Configure the following to retain all WALs, HFiles and recovered.edits files: + *
+ * 
+      hbase.master.hfilecleaner.ttl
+      604800000
+      
+    
+
+    
+      hbase.master.logcleaner.ttl
+      604800000
+      
+    
+
+    
+      hbase.region.archive.recovered.edits
+      true
+  
+ * 
+ * + * This class can be run as a unit test, as an integration test, or from the command line. + * */ @Category(IntegrationTests.class) public class IntegrationTestBigLinkedList extends IntegrationTestBase { @@ -222,7 +242,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster - private static final int MISSING_ROWS_TO_LOG = 50; + private static final int MISSING_ROWS_TO_LOG = 2; // YARN complains when too many counters private static final int WIDTH_DEFAULT = 1000000; private static final int WRAP_DEFAULT = 25; @@ -672,6 +692,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { */ public static class WALMapperSearcher extends WALMapper { private SortedSet keysToFind; + private AtomicInteger rows = new AtomicInteger(0); @Override public void setup(Mapper.Context context) @@ -693,8 +714,16 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { boolean b = this.keysToFind.contains(row); if (b) { String keyStr = Bytes.toStringBinary(row); - LOG.info("Found cell=" + cell); - context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); + try { + LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); + } catch (IOException|InterruptedException e) { + e.printStackTrace(); + } + if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { + context.getCounter(FOUND_GROUP_KEY, keyStr + "_in_" + + context.getInputSplit().toString()).increment(1); + } + } return b; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java index 7b92df9..f083f8d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java @@ -167,7 +167,7 @@ public class HFilePrettyPrinter extends Configured implements Tool { if (cmd.hasOption("w")) { String key = cmd.getOptionValue("w"); if (key != null && key.length() != 0) { - row = key.getBytes(); + row = Bytes.toBytesBinary(key); isSeekToRow = true; } else { System.err.println("Invalid row is specified."); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0d5306e..e063e4b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -476,6 +476,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED. * @return true if the memstores were flushed, else false. */ + @Override public boolean isFlushSucceeded() { return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result .FLUSHED_COMPACTION_NEEDED; @@ -485,6 +486,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED. * @return True if the flush requested a compaction, else false (doesn't even mean it flushed). */ + @Override public boolean isCompactionNeeded() { return result == Result.FLUSHED_COMPACTION_NEEDED; } @@ -1108,7 +1110,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public long getNumMutationsWithoutWAL() { return numMutationsWithoutWAL.get(); } - + @Override public long getDataInMemoryWithoutWAL() { return dataInMemoryWithoutWAL.get(); @@ -1610,7 +1612,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes(); regionLoadBldr.clearStoreCompleteSequenceId(); for (byte[] familyName : this.stores.keySet()) { - long oldestUnflushedSeqId = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName); + long oldestUnflushedSeqId + = this.wal.getEarliestNotFlushedSeqNum(encodedRegionName, familyName); // no oldestUnflushedSeqId means no data has written to the store after last flush, so we use // lastFlushOpSeqId as complete sequence id for the store. regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId @@ -1913,8 +1916,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ boolean shouldFlushStore(Store store) { long maxFlushedSeqId = - this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store - .getFamily().getName()) - 1; + this.wal.getEarliestNotFlushedOrFlushingSeqNum(getRegionInfo().getEncodedNameAsBytes(), + store.getFamily().getName()) - 1; if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) { if (LOG.isDebugEnabled()) { LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this @@ -2142,7 +2145,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi myseqid); } flushOpSeqId = getNextSequenceId(wal); - long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName); + long oldestUnflushedSeqId = wal.getEarliestNotFlushedOrFlushingSeqNum(encodedRegionName); // no oldestUnflushedSeqId means we flushed all stores. // or the unflushed stores are all empty. flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId @@ -2365,7 +2368,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.info(msg); status.setStatus(msg); - return new FlushResultImpl(compactionRequested ? + return new FlushResultImpl(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED : FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId); @@ -5352,7 +5355,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi moreValues = nextInternal(tmpList, scannerContext); outResults.addAll(tmpList); } - + // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows @@ -6473,6 +6476,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return results; } + @Override public void mutateRow(RowMutations rm) throws IOException { // Don't need nonces here - RowMutations only supports puts and deletes mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); @@ -6499,6 +6503,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * rowsToLock is sorted in order to avoid deadlocks. * @throws IOException */ + @Override public void mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException { MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); @@ -7497,6 +7502,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** @return the coprocessor host */ + @Override public RegionCoprocessorHost getCoprocessorHost() { return coprocessorHost; } @@ -7780,7 +7786,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public long getOldestSeqIdOfStore(byte[] familyName) { - return wal.getEarliestMemstoreSeqNum(getRegionInfo() + return wal.getEarliestNotFlushedOrFlushingSeqNum(getRegionInfo() .getEncodedNameAsBytes(), familyName); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8f4059e..79bb4db 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2837,7 +2837,7 @@ public class HRegionServer extends HasThread implements if (destination != null) { try { WAL wal = getWAL(r.getRegionInfo()); - long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes()); + long closeSeqNum = wal.getEarliestNotFlushedOrFlushingSeqNum(r.getRegionInfo().getEncodedNameAsBytes()); if (closeSeqNum == HConstants.NO_SEQNUM) { // No edits in WAL for this region; get the sequence number when the region was opened. closeSeqNum = r.getOpenSeqNum(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 7c170b0..ba3fe88 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; @@ -136,7 +135,7 @@ public class FSHLog implements WAL { // Calls to append now also wait until the append has been done on the consumer side of the // disruptor. We used to not wait but it makes the implemenation easier to grok if we have // the region edit/sequence id after the append returns. - // + // // TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend // once only? Probably hard given syncs take way longer than an append. // @@ -156,7 +155,7 @@ public class FSHLog implements WAL { static final Log LOG = LogFactory.getLog(FSHLog.class); private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms - + /** * The nexus at which all incoming handlers meet. Does appends and sync with an ordering. * Appends and syncs are each put on the ring which means handlers need to @@ -226,7 +225,7 @@ public class FSHLog implements WAL { private final String logFilePrefix; /** - * Suffix included on generated wal file names + * Suffix included on generated wal file names */ private final String logFileSuffix; @@ -248,7 +247,7 @@ public class FSHLog implements WAL { public void registerWALActionsListener(final WALActionsListener listener) { this.listeners.add(listener); } - + @Override public boolean unregisterWALActionsListener(final WALActionsListener listener) { return this.listeners.remove(listener); @@ -647,7 +646,7 @@ public class FSHLog implements WAL { /** * Tell listeners about pre log roll. - * @throws IOException + * @throws IOException */ private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) throws IOException { @@ -660,7 +659,7 @@ public class FSHLog implements WAL { /** * Tell listeners about post log roll. - * @throws IOException + * @throws IOException */ private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) throws IOException { @@ -850,7 +849,7 @@ public class FSHLog implements WAL { // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. synchronized (regionSequenceIdLock) { for (Map.Entry e: regionsSequenceNums.entrySet()) { - long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey()); + long unFlushedVal = getEarliestNotFlushedOrFlushingSeqNum(e.getKey()); if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) { if (regionsToFlush == null) regionsToFlush = new ArrayList(); @@ -1169,12 +1168,12 @@ public class FSHLog implements WAL { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce); } - + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", justification="Will never be null") @Override public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, - final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, + final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, final List memstoreCells) throws IOException { if (this.closed) throw new IOException("Cannot append; log is closed"); // Make a trace scope for the append. It is closed on other side of the ring buffer by the @@ -1217,9 +1216,9 @@ public class FSHLog implements WAL { private class SyncRunner extends HasThread { private volatile long sequence; private final BlockingQueue syncFutures; - + /** - * UPDATE! + * UPDATE! * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, * we will put the result of the actual hdfs sync call as the result. * @param sequence The sequence number on the ring buffer when this thread was set running. @@ -1267,7 +1266,7 @@ public class FSHLog implements WAL { // This function releases one sync future only. return 1; } - + /** * Release all SyncFutures whose sequence is <= currentSequence. * @param currentSequence @@ -1306,6 +1305,7 @@ public class FSHLog implements WAL { return sequence; } + @Override public void run() { long currentSequence; while (!isInterrupted()) { @@ -1760,18 +1760,25 @@ public class FSHLog implements WAL { } + /** + * Returns the earliest memstore seqNum of the edits of the given region that are currently + * not flushed or flushing. NOTE that edits currently flushing are excluded. + */ @Override - public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { + public long getEarliestNotFlushedOrFlushingSeqNum(byte[] encodedRegionName) { ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); return oldestUnflushedStoreSequenceIdsOfRegion != null ? getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM; } + /** + * Returns the earliest memstore seqNum of the edits of the given region that are currently + * not flushed or flushing. NOTE that edits currently flushing are excluded. + */ @Override - public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, - byte[] familyName) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + public long getEarliestNotFlushedOrFlushingSeqNum(byte[] encodedRegionName, byte[] familyName) { + Map oldestUnflushedStoreSequenceIdsOfRegion = this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); if (oldestUnflushedStoreSequenceIdsOfRegion != null) { Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName); @@ -1782,11 +1789,42 @@ public class FSHLog implements WAL { } /** + * Returns the earliest memstore seqNum of the edits of the given region that are currently + * not flushed. NOTE that edits currently flushing are included. + */ + @Override + public long getEarliestNotFlushedSeqNum(byte[] encodedRegionName, byte[] familyName) { + synchronized (regionSequenceIdLock) { + // in startCacheFlush, we move the earliest seqIds between the + // oldestUnflushedStoreSequenceIdsOfRegion and lowestFlushingStoreSequenceIds maps. + Map oldestUnflushedStoreSequenceIdsOfRegion + = lowestFlushingStoreSequenceIds.get(encodedRegionName); + + // check the flushing map first. It is guaranteed to have the lower seqId + if (oldestUnflushedStoreSequenceIdsOfRegion != null) { + Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName); + if (result != null) { + return result.longValue(); + } + } + + oldestUnflushedStoreSequenceIdsOfRegion = + this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); + if (oldestUnflushedStoreSequenceIdsOfRegion != null) { + Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName); + return result != null ? result.longValue() : HConstants.NO_SEQNUM; + } else { + return HConstants.NO_SEQNUM; + } + } + } + + /** * This class is used coordinating two threads holding one thread at a * 'safe point' while the orchestrating thread does some work that requires the first thread * paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another * thread. - * + * *

Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A. * Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused, @@ -1794,7 +1832,7 @@ public class FSHLog implements WAL { * it flags B and then Thread A and Thread B continue along on their merry way. Pause and * signalling 'zigzags' between the two participating threads. We use two latches -- one the * inverse of the other -- pausing and signaling when states are achieved. - * + * *

To start up the drama, Thread A creates an instance of this class each time it would do * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it @@ -1816,7 +1854,7 @@ public class FSHLog implements WAL { * Latch to wait on. Will be released when we can proceed. */ private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); - + /** * For Thread A to call when it is ready to wait on the 'safe point' to be attained. * Thread A will be held in here until Thread B calls {@link #safePointAttained()} @@ -1825,7 +1863,7 @@ public class FSHLog implements WAL { * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with * an exception, then something is up w/ our syncing. * @return The passed syncFuture - * @throws FailedSyncBeforeLogCloseException + * @throws FailedSyncBeforeLogCloseException */ SyncFuture waitSafePoint(final SyncFuture syncFuture) throws InterruptedException, FailedSyncBeforeLogCloseException { @@ -1837,7 +1875,7 @@ public class FSHLog implements WAL { } return syncFuture; } - + /** * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} @@ -2040,14 +2078,14 @@ public class FSHLog implements WAL { // here inside this single appending/writing thread. Events are ordered on the ringbuffer // so region sequenceids will also be in order. regionSequenceId = entry.stampRegionSequenceId(); - - // Edits are empty, there is nothing to append. Maybe empty when we are looking for a - // region sequence id only, a region edit/sequence id that is not associated with an actual + + // Edits are empty, there is nothing to append. Maybe empty when we are looking for a + // region sequence id only, a region edit/sequence id that is not associated with an actual // edit. It has to go through all the rigmarole to be sure we have the right ordering. if (entry.getEdit().isEmpty()) { return; } - + // Coprocessor hook. if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit())) { @@ -2156,7 +2194,7 @@ public class FSHLog implements WAL { System.exit(-1); } } - + /** * Find the 'getPipeline' on the passed os stream. * @return Method or null. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 7254ad1..b7d3e55 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -104,7 +103,7 @@ class DisabledWALProvider implements WALProvider { public void registerWALActionsListener(final WALActionsListener listener) { listeners.add(listener); } - + @Override public boolean unregisterWALActionsListener(final WALActionsListener listener) { return listeners.remove(listener); @@ -205,12 +204,17 @@ class DisabledWALProvider implements WALProvider { } @Override - public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { + public long getEarliestNotFlushedOrFlushingSeqNum(byte[] encodedRegionName) { + return HConstants.NO_SEQNUM; + } + + @Override + public long getEarliestNotFlushedSeqNum(byte[] encodedRegionName, byte[] familyName) { return HConstants.NO_SEQNUM; } @Override - public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { + public long getEarliestNotFlushedOrFlushingSeqNum(byte[] encodedRegionName, byte[] familyName) { return HConstants.NO_SEQNUM; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 5a2b08d..f470caa 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -175,20 +175,34 @@ public interface WAL { WALCoprocessorHost getCoprocessorHost(); - /** Gets the earliest sequence number in the memstore for this particular region. - * This can serve as best-effort "recent" WAL number for this region. + /** + * Returns the earliest memstore seqNum of the edits of the given region that are currently + * not flushed or flushing. NOTE that edits currently flushing are excluded. * @param encodedRegionName The region to get the number for. * @return The number if present, HConstants.NO_SEQNUM if absent. + * @see #getEarliestNotFlushedSeqNum(byte[], byte[]) + */ + long getEarliestNotFlushedOrFlushingSeqNum(byte[] encodedRegionName); + + /** + * Returns the earliest memstore seqNum of the edits of the given region that are currently + * not flushed or flushing. NOTE that edits currently flushing are excluded. + * @param encodedRegionName The region to get the number for. + * @param familyName The family to get the number for. + * @return The number if present, HConstants.NO_SEQNUM if absent. + * @see #getEarliestNotFlushedSeqNum(byte[], byte[]) */ - long getEarliestMemstoreSeqNum(byte[] encodedRegionName); + long getEarliestNotFlushedOrFlushingSeqNum(byte[] encodedRegionName, byte[] familyName); /** - * Gets the earliest sequence number in the memstore for this particular region and store. + * Returns the earliest memstore seqNum of the edits of the given region and store that are + * currently not flushed. NOTE that edits currently flushing are included. * @param encodedRegionName The region to get the number for. * @param familyName The family to get the number for. * @return The number if present, HConstants.NO_SEQNUM if absent. + * @see #getEarliestNotFlushedOrFlushingSeqNum(byte[], byte[]) */ - long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName); + long getEarliestNotFlushedSeqNum(byte[] encodedRegionName, byte[] familyName); /** * Human readable identifying information about the state of this WAL. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 75e1dea..0e48e66 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -1322,7 +1322,9 @@ public class WALSplitter { @Override public Void call() throws Exception { WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); - LOG.debug("Closing " + wap.p); + if (LOG.isTraceEnabled()) { + LOG.trace("Closing " + wap.p); + } try { wap.w.close(); } catch (IOException ioe) { @@ -1330,8 +1332,11 @@ public class WALSplitter { thrown.add(ioe); return null; } - LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in " + if (LOG.isDebugEnabled()) { + LOG.debug("Closed wap " + wap.p + " (wrote " + (wap.editsWritten-wap.editsSkipped) + + " edits, skipped " + wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)"); + } if (wap.editsWritten == 0) { // just remove the empty recovered.edits file @@ -1544,6 +1549,8 @@ public class WALSplitter { filterCellByStore(logEntry); if (!logEntry.getEdit().isEmpty()) { wap.w.append(logEntry); + } else { + wap.incrementSkippedEdits(1); } this.updateRegionMaximumEditLogSeqNum(logEntry); editsCount++; @@ -1584,6 +1591,8 @@ public class WALSplitter { public abstract static class SinkWriter { /* Count of edits written to this path */ long editsWritten = 0; + /* Count of edits skipped to this path */ + long editsSkipped = 0; /* Number of nanos spent writing to this log */ long nanosSpent = 0; @@ -1591,6 +1600,10 @@ public class WALSplitter { editsWritten += edits; } + void incrementSkippedEdits(int skipped) { + editsSkipped += skipped; + } + void incrementNanoTime(long nanos) { nanosSpent += nanos; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 1ccb392..c3fb23f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -158,7 +158,7 @@ public class TestPerColumnFamilyFlush { // Get the overall smallest LSN in the region's memstores. long smallestSeqInRegionCurrentMemstore = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + .getEarliestNotFlushedOrFlushingSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); // The overall smallest LSN in the region's memstores should be the same as // the LSN of the smallest edit in CF1 @@ -189,7 +189,7 @@ public class TestPerColumnFamilyFlush { cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); totalMemstoreSize = region.getMemstoreSize(); smallestSeqInRegionCurrentMemstore = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + .getEarliestNotFlushedOrFlushingSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); // We should have cleared out only CF1, since we chose the flush thresholds // and number of puts accordingly. @@ -227,7 +227,7 @@ public class TestPerColumnFamilyFlush { cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); totalMemstoreSize = region.getMemstoreSize(); smallestSeqInRegionCurrentMemstore = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + .getEarliestNotFlushedOrFlushingSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); // CF1 and CF2, both should be absent. assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); @@ -307,7 +307,7 @@ public class TestPerColumnFamilyFlush { cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); totalMemstoreSize = region.getMemstoreSize(); long smallestSeqInRegionCurrentMemstore = ((HRegion)region).getWAL() - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + .getEarliestNotFlushedOrFlushingSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); // Everything should have been cleared assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);