diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 104c5da..e685ce9 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; @@ -86,7 +85,6 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy; import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RegionSplitter; @@ -184,7 +182,29 @@ import com.google.common.collect.Sets; * * Delete - A standalone program that deletes a single node * - * This class can be run as a unit test, as an integration test, or from the command line + * Search - A standalone program that searches the missing keys as reported from Verify in WALs + * Configure the following to retain all WALs, HFiles and recovered.edits files: + *
+ *+ * + * This class can be run as a unit test, as an integration test, or from the command line. + * */ @Category(IntegrationTests.class) public class IntegrationTestBigLinkedList extends IntegrationTestBase { @@ -222,7 +242,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster - private static final int MISSING_ROWS_TO_LOG = 50; + private static final int MISSING_ROWS_TO_LOG = 2; // YARN complains when too many counters private static final int WIDTH_DEFAULT = 1000000; private static final int WRAP_DEFAULT = 25; @@ -672,6 +692,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { */ public static class WALMapperSearcher extends WALMapper { private SortedSet+ + +hbase.master.hfilecleaner.ttl +604800000 + ++ + +hbase.master.logcleaner.ttl +604800000 + ++ + *hbase.region.archive.recovered.edits +true +
rowsToLock is sorted in order to avoid deadlocks.
* @throws IOException
*/
+ @Override
public void mutateRowsWithLocks(CollectioncurrentSequence.
* @param currentSequence
@@ -1306,6 +1305,7 @@ public class FSHLog implements WAL {
return sequence;
}
+ @Override
public void run() {
long currentSequence;
while (!isInterrupted()) {
@@ -1760,18 +1760,25 @@ public class FSHLog implements WAL {
}
+ /**
+ * Returns the earliest memstore seqNum of the edits of the given region that are currently
+ * not flushed or flushing. NOTE that edits currently flushing are excluded.
+ */
@Override
- public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
+ public long getEarliestNotFlushedOrFlushingSeqNum(byte[] encodedRegionName) {
ConcurrentMapThread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A. * Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused, @@ -1794,7 +1832,7 @@ public class FSHLog implements WAL { * it flags B and then Thread A and Thread B continue along on their merry way. Pause and * signalling 'zigzags' between the two participating threads. We use two latches -- one the * inverse of the other -- pausing and signaling when states are achieved. - * + * *
To start up the drama, Thread A creates an instance of this class each time it would do
* this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot
* only). Thread B notices the new instance (via reading a volatile reference or how ever) and it
@@ -1816,7 +1854,7 @@ public class FSHLog implements WAL {
* Latch to wait on. Will be released when we can proceed.
*/
private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
-
+
/**
* For Thread A to call when it is ready to wait on the 'safe point' to be attained.
* Thread A will be held in here until Thread B calls {@link #safePointAttained()}
@@ -1825,7 +1863,7 @@ public class FSHLog implements WAL {
* @param syncFuture We need this as barometer on outstanding syncs. If it comes home with
* an exception, then something is up w/ our syncing.
* @return The passed syncFuture
- * @throws FailedSyncBeforeLogCloseException
+ * @throws FailedSyncBeforeLogCloseException
*/
SyncFuture waitSafePoint(final SyncFuture syncFuture)
throws InterruptedException, FailedSyncBeforeLogCloseException {
@@ -1837,7 +1875,7 @@ public class FSHLog implements WAL {
}
return syncFuture;
}
-
+
/**
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals
* Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
@@ -2040,14 +2078,14 @@ public class FSHLog implements WAL {
// here inside this single appending/writing thread. Events are ordered on the ringbuffer
// so region sequenceids will also be in order.
regionSequenceId = entry.stampRegionSequenceId();
-
- // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
- // region sequence id only, a region edit/sequence id that is not associated with an actual
+
+ // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
+ // region sequence id only, a region edit/sequence id that is not associated with an actual
// edit. It has to go through all the rigmarole to be sure we have the right ordering.
if (entry.getEdit().isEmpty()) {
return;
}
-
+
// Coprocessor hook.
if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
entry.getEdit())) {
@@ -2156,7 +2194,7 @@ public class FSHLog implements WAL {
System.exit(-1);
}
}
-
+
/**
* Find the 'getPipeline' on the passed os stream.
* @return Method or null.
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 7254ad1..b7d3e55 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -104,7 +103,7 @@ class DisabledWALProvider implements WALProvider {
public void registerWALActionsListener(final WALActionsListener listener) {
listeners.add(listener);
}
-
+
@Override
public boolean unregisterWALActionsListener(final WALActionsListener listener) {
return listeners.remove(listener);
@@ -205,12 +204,17 @@ class DisabledWALProvider implements WALProvider {
}
@Override
- public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
+ public long getEarliestNotFlushedOrFlushingSeqNum(byte[] encodedRegionName) {
+ return HConstants.NO_SEQNUM;
+ }
+
+ @Override
+ public long getEarliestNotFlushedSeqNum(byte[] encodedRegionName, byte[] familyName) {
return HConstants.NO_SEQNUM;
}
@Override
- public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
+ public long getEarliestNotFlushedOrFlushingSeqNum(byte[] encodedRegionName, byte[] familyName) {
return HConstants.NO_SEQNUM;
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 5a2b08d..f470caa 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -175,20 +175,34 @@ public interface WAL {
WALCoprocessorHost getCoprocessorHost();
- /** Gets the earliest sequence number in the memstore for this particular region.
- * This can serve as best-effort "recent" WAL number for this region.
+ /**
+ * Returns the earliest memstore seqNum of the edits of the given region that are currently
+ * not flushed or flushing. NOTE that edits currently flushing are excluded.
* @param encodedRegionName The region to get the number for.
* @return The number if present, HConstants.NO_SEQNUM if absent.
+ * @see #getEarliestNotFlushedSeqNum(byte[], byte[])
+ */
+ long getEarliestNotFlushedOrFlushingSeqNum(byte[] encodedRegionName);
+
+ /**
+ * Returns the earliest memstore seqNum of the edits of the given region that are currently
+ * not flushed or flushing. NOTE that edits currently flushing are excluded.
+ * @param encodedRegionName The region to get the number for.
+ * @param familyName The family to get the number for.
+ * @return The number if present, HConstants.NO_SEQNUM if absent.
+ * @see #getEarliestNotFlushedSeqNum(byte[], byte[])
*/
- long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
+ long getEarliestNotFlushedOrFlushingSeqNum(byte[] encodedRegionName, byte[] familyName);
/**
- * Gets the earliest sequence number in the memstore for this particular region and store.
+ * Returns the earliest memstore seqNum of the edits of the given region and store that are
+ * currently not flushed. NOTE that edits currently flushing are included.
* @param encodedRegionName The region to get the number for.
* @param familyName The family to get the number for.
* @return The number if present, HConstants.NO_SEQNUM if absent.
+ * @see #getEarliestNotFlushedOrFlushingSeqNum(byte[], byte[])
*/
- long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName);
+ long getEarliestNotFlushedSeqNum(byte[] encodedRegionName, byte[] familyName);
/**
* Human readable identifying information about the state of this WAL.
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 75e1dea..0e48e66 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -1322,7 +1322,9 @@ public class WALSplitter {
@Override
public Void call() throws Exception {
WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
- LOG.debug("Closing " + wap.p);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closing " + wap.p);
+ }
try {
wap.w.close();
} catch (IOException ioe) {
@@ -1330,8 +1332,11 @@ public class WALSplitter {
thrown.add(ioe);
return null;
}
- LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in "
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closed wap " + wap.p + " (wrote " + (wap.editsWritten-wap.editsSkipped)
+ + " edits, skipped " + wap.editsSkipped + " edits in "
+ (wap.nanosSpent / 1000 / 1000) + "ms)");
+ }
if (wap.editsWritten == 0) {
// just remove the empty recovered.edits file
@@ -1544,6 +1549,8 @@ public class WALSplitter {
filterCellByStore(logEntry);
if (!logEntry.getEdit().isEmpty()) {
wap.w.append(logEntry);
+ } else {
+ wap.incrementSkippedEdits(1);
}
this.updateRegionMaximumEditLogSeqNum(logEntry);
editsCount++;
@@ -1584,6 +1591,8 @@ public class WALSplitter {
public abstract static class SinkWriter {
/* Count of edits written to this path */
long editsWritten = 0;
+ /* Count of edits skipped to this path */
+ long editsSkipped = 0;
/* Number of nanos spent writing to this log */
long nanosSpent = 0;
@@ -1591,6 +1600,10 @@ public class WALSplitter {
editsWritten += edits;
}
+ void incrementSkippedEdits(int skipped) {
+ editsSkipped += skipped;
+ }
+
void incrementNanoTime(long nanos) {
nanosSpent += nanos;
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index 1ccb392..c3fb23f 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -158,7 +158,7 @@ public class TestPerColumnFamilyFlush {
// Get the overall smallest LSN in the region's memstores.
long smallestSeqInRegionCurrentMemstore = getWAL(region)
- .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+ .getEarliestNotFlushedOrFlushingSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// The overall smallest LSN in the region's memstores should be the same as
// the LSN of the smallest edit in CF1
@@ -189,7 +189,7 @@ public class TestPerColumnFamilyFlush {
cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
totalMemstoreSize = region.getMemstoreSize();
smallestSeqInRegionCurrentMemstore = getWAL(region)
- .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+ .getEarliestNotFlushedOrFlushingSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// We should have cleared out only CF1, since we chose the flush thresholds
// and number of puts accordingly.
@@ -227,7 +227,7 @@ public class TestPerColumnFamilyFlush {
cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
totalMemstoreSize = region.getMemstoreSize();
smallestSeqInRegionCurrentMemstore = getWAL(region)
- .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+ .getEarliestNotFlushedOrFlushingSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// CF1 and CF2, both should be absent.
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
@@ -307,7 +307,7 @@ public class TestPerColumnFamilyFlush {
cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
totalMemstoreSize = region.getMemstoreSize();
long smallestSeqInRegionCurrentMemstore = ((HRegion)region).getWAL()
- .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+ .getEarliestNotFlushedOrFlushingSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// Everything should have been cleared
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);