commit 8ce02c10b605b4bff6bb507642c021ace8a09b9c Author: John Leach Date: Wed Aug 24 15:10:48 2016 -0500 HBASE-16496 Fixing Hotspot on SequenceIDAccounting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index e33ae33..3b85716 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -56,10 +56,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.DrainBarrier; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.*; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; @@ -226,8 +223,8 @@ public abstract class AbstractFSWAL implements WAL { * Map of WAL log file to the latest sequence ids of all regions it has entries of. The map is * sorted by the log file creation timestamp (contained in the log file name). */ - protected ConcurrentNavigableMap> byWalRegionSequenceIds = - new ConcurrentSkipListMap>(LOG_NAME_COMPARATOR); + protected ConcurrentNavigableMap> byWalRegionSequenceIds = + new ConcurrentSkipListMap>(LOG_NAME_COMPARATOR); /** * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. @@ -523,7 +520,7 @@ public abstract class AbstractFSWAL implements WAL { byte[][] regions = null; int logCount = getNumRolledLogFiles(); if (logCount > this.maxLogs && logCount > 0) { - Map.Entry> firstWALEntry = this.byWalRegionSequenceIds.firstEntry(); + Map.Entry> firstWALEntry = this.byWalRegionSequenceIds.firstEntry(); regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue()); } if (regions != null) { @@ -547,9 +544,9 @@ public abstract class AbstractFSWAL implements WAL { List logsToArchive = null; // For each log file, look at its Map of regions to highest sequence id; if all sequence ids // are older than what is currently in memory, the WAL can be GC'd. - for (Map.Entry> e : this.byWalRegionSequenceIds.entrySet()) { + for (Map.Entry> e : this.byWalRegionSequenceIds.entrySet()) { Path log = e.getKey(); - Map sequenceNums = e.getValue(); + Map sequenceNums = e.getValue(); if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { if (logsToArchive == null) { logsToArchive = new ArrayList(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index 62dea53..d6f4417 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.ImmutableByteArray; /** @@ -99,7 +100,7 @@ class SequenceIdAccounting { * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns * the same array. */ - private Map highestSequenceIds = new HashMap<>(); + private Map highestSequenceIds = new HashMap<>(); /** * Returns the lowest unflushed sequence id for the region. @@ -150,9 +151,9 @@ class SequenceIdAccounting { * @return Return the previous accounting Map of regions to the last sequence id written into * each. */ - Map resetHighest() { - Map old = this.highestSequenceIds; - this.highestSequenceIds = new HashMap(); + Map resetHighest() { + Map old = this.highestSequenceIds; + this.highestSequenceIds = new HashMap(); return old; } @@ -168,7 +169,7 @@ class SequenceIdAccounting { void update(byte[] encodedRegionName, Set families, long sequenceid, final boolean lowest) { Long l = Long.valueOf(sequenceid); - this.highestSequenceIds.put(encodedRegionName, l); + this.highestSequenceIds.put(new HashedBytes(encodedRegionName), l); if (lowest) { ConcurrentMap m = getOrCreateLowestSequenceIds(encodedRegionName); for (byte[] familyName : families) { @@ -187,7 +188,7 @@ class SequenceIdAccounting { } Long highest = this.highestSequenceIds.get(encodedRegionName); if (highest == null || sequenceId > highest) { - this.highestSequenceIds.put(encodedRegionName, sequenceId); + this.highestSequenceIds.put(new HashedBytes(encodedRegionName), sequenceId); } ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName); synchronized (this.tieLock) { @@ -368,7 +369,7 @@ class SequenceIdAccounting { * be null). * @return true if all sequenceids are lower, older than, the old sequenceids in this instance. */ - boolean areAllLower(Map sequenceids) { + boolean areAllLower(Map sequenceids) { Map flushing = null; Map unflushed = null; synchronized (this.tieLock) { @@ -377,7 +378,7 @@ class SequenceIdAccounting { flushing = flattenToLowestSequenceId(this.flushingSequenceIds); unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds); } - for (Map.Entry e : sequenceids.entrySet()) { + for (Map.Entry e : sequenceids.entrySet()) { long oldestFlushing = Long.MAX_VALUE; long oldestUnflushed = Long.MAX_VALUE; if (flushing != null && flushing.containsKey(e.getKey())) { @@ -402,11 +403,11 @@ class SequenceIdAccounting { * @param sequenceids Sequenceids keyed by encoded region name. * @return regions found in this instance with sequence ids less than those passed in. */ - byte[][] findLower(Map sequenceids) { + byte[][] findLower(Map sequenceids) { List toFlush = null; // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. synchronized (tieLock) { - for (Map.Entry e : sequenceids.entrySet()) { + for (Map.Entry e : sequenceids.entrySet()) { Map m = this.lowestUnflushedSequenceIds.get(e.getKey()); if (m == null) { continue; @@ -417,7 +418,7 @@ class SequenceIdAccounting { if (toFlush == null) { toFlush = new ArrayList(); } - toFlush.add(e.getKey()); + toFlush.add(e.getKey().getBytes()); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java index 9fd0cb1..8a5f88e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java @@ -29,6 +29,7 @@ import java.util.Set; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HashedBytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -69,35 +70,35 @@ public class TestSequenceIdAccounting { public void testAreAllLower() { SequenceIdAccounting sida = new SequenceIdAccounting(); sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); - Map m = new HashMap(); - m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + Map m = new HashMap(); + m.put(new HashedBytes(ENCODED_REGION_NAME), HConstants.NO_SEQNUM); assertTrue(sida.areAllLower(m)); long sequenceid = 1; sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); assertTrue(sida.areAllLower(m)); - m.put(ENCODED_REGION_NAME, sequenceid); + m.put(new HashedBytes(ENCODED_REGION_NAME), sequenceid); assertFalse(sida.areAllLower(m)); long lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME); assertEquals("Lowest should be first sequence id inserted", 1, lowest); - m.put(ENCODED_REGION_NAME, lowest); + m.put(new HashedBytes(ENCODED_REGION_NAME), lowest); assertFalse(sida.areAllLower(m)); // Now make sure above works when flushing. sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES); assertFalse(sida.areAllLower(m)); - m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + m.put(new HashedBytes(ENCODED_REGION_NAME), HConstants.NO_SEQNUM); assertTrue(sida.areAllLower(m)); // Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits sida.completeCacheFlush(ENCODED_REGION_NAME); - m.put(ENCODED_REGION_NAME, sequenceid); + m.put(new HashedBytes(ENCODED_REGION_NAME), sequenceid); assertTrue(sida.areAllLower(m)); // Flush again but add sequenceids while we are flushing. sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME); - m.put(ENCODED_REGION_NAME, lowest); + m.put(new HashedBytes(ENCODED_REGION_NAME), lowest); assertFalse(sida.areAllLower(m)); sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES); // The cache flush will clear out all sequenceid accounting by region. @@ -106,7 +107,7 @@ public class TestSequenceIdAccounting { // No new edits have gone in so no sequenceid to work with. assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME)); // Make an edit behind all we'll put now into sida. - m.put(ENCODED_REGION_NAME, sequenceid); + m.put(new HashedBytes(ENCODED_REGION_NAME), sequenceid); sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); @@ -117,16 +118,16 @@ public class TestSequenceIdAccounting { public void testFindLower() { SequenceIdAccounting sida = new SequenceIdAccounting(); sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); - Map m = new HashMap(); - m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + Map m = new HashMap(); + m.put(new HashedBytes(ENCODED_REGION_NAME), HConstants.NO_SEQNUM); long sequenceid = 1; sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); assertTrue(sida.findLower(m) == null); - m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME)); + m.put(new HashedBytes(ENCODED_REGION_NAME), sida.getLowestSequenceId(ENCODED_REGION_NAME)); assertTrue(sida.findLower(m).length == 1); - m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1); + m.put(new HashedBytes(ENCODED_REGION_NAME), sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1); assertTrue(sida.findLower(m) == null); } } \ No newline at end of file