From 5dcc49ca354dcb5b06bb8ddc7e82ce5cf258f52a Mon Sep 17 00:00:00 2001 From: eshcar Date: Tue, 27 Dec 2016 22:17:40 +0200 Subject: [PATCH] HBASE-17373: Fixing bug in moving segments from compaction pipeline to snapshot --- .../hbase/regionserver/CompactingMemStore.java | 24 ++++--- .../hbase/regionserver/CompactionPipeline.java | 75 +++++++++------------- .../hbase/regionserver/VersionedSegmentsList.java | 5 +- .../client/TestAsyncTableGetMultiThreaded.java | 22 +++++-- ...ncTableGetMultiThreadedWithBasicCompaction.java | 18 ++++++ ...ncTableGetMultiThreadedWithEagerCompaction.java | 18 ++++++ 6 files changed, 101 insertions(+), 61 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 1cd30dd..5c31122 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -399,18 +399,26 @@ public class CompactingMemStore extends AbstractMemStore { } private void pushTailToSnapshot() { - ImmutableSegment tail = pipeline.pullTail(); - if (!tail.isEmpty()) { - this.snapshot = tail; - } + VersionedSegmentsList segments = pipeline.getVersionedTail(); + pushToSnapshot(segments.getStoreSegments()); + pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now } private void pushPipelineToSnapshot() { - List segments = pipeline.drain(); - if (!segments.isEmpty()) { - this.snapshot = - SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments); + VersionedSegmentsList segments = pipeline.getVersionedList(); + pushToSnapshot(segments.getStoreSegments()); + pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now + } + + private void pushToSnapshot(List segments) { + if(segments.isEmpty()) return; + if(segments.size() == 1 && !segments.get(0).isEmpty()) { + this.snapshot = segments.get(0); + return; } + // else craete composite snapshot + this.snapshot = + SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments); } private RegionServicesForStores getRegionServices() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 2fd2a14..a8afef8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -52,9 +52,6 @@ public class CompactionPipeline { private LinkedList pipeline; private long version; - private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() - .createImmutableSegment((CellComparator) null); - public CompactionPipeline(RegionServicesForStores region) { this.region = region; this.pipeline = new LinkedList(); @@ -69,44 +66,33 @@ public class CompactionPipeline { } } - public ImmutableSegment pullTail() { + public VersionedSegmentsList getVersionedList() { synchronized (pipeline){ - if(pipeline.isEmpty()) { - return EMPTY_MEM_STORE_SEGMENT; - } - return removeLast(); + List segmentList = new ArrayList<>(pipeline); + return new VersionedSegmentsList(segmentList, version); } } - public List drain() { - int drainSize = pipeline.size(); - List result = new ArrayList(drainSize); + public VersionedSegmentsList getVersionedTail() { synchronized (pipeline){ - version++; - for(int i=0; i segmentList = new ArrayList<>(); + if(!pipeline.isEmpty()) { + segmentList.add(0, pipeline.getLast()); } - return result; - } - } - - public VersionedSegmentsList getVersionedList() { - synchronized (pipeline){ - LinkedList segmentList = new LinkedList(pipeline); - VersionedSegmentsList res = new VersionedSegmentsList(segmentList, version); - return res; + return new VersionedSegmentsList(segmentList, version); } } /** - * Swaps the versioned list at the tail of the pipeline with the new compacted segment. - * Swapping only if there were no changes to the suffix of the list while it was compacted. - * @param versionedList tail of the pipeline that was compacted - * @param segment new compacted segment + * Swaps the versioned list at the tail of the pipeline with a new segment. + * Swapping only if there were no changes to the suffix of the list since the version list was + * created. + * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline + * @param segment new segment to replace the suffix. Can be null if the suffix just needs to be + * removed. * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out * During index merge op this will be false and for compaction it will be true. - * @return true iff swapped tail with new compacted segment + * @return true iff swapped tail with new segment */ public boolean swap( VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) { @@ -120,26 +106,32 @@ public class CompactionPipeline { } suffix = versionedList.getStoreSegments(); if (LOG.isDebugEnabled()) { - LOG.debug("Swapping pipeline suffix with compacted item. " + int count = 0; + if(segment != null) { + segment.getCellsCount(); + } + LOG.debug("Swapping pipeline suffix. " + "Just before the swap the number of segments in pipeline is:" + versionedList.getStoreSegments().size() - + ", and the number of cells in new segment is:" + segment.getCellsCount()); + + ", and the number of cells in new segment is:" + count); } - swapSuffix(suffix,segment, closeSuffix); + swapSuffix(suffix, segment, closeSuffix); } - if (region != null) { + if (closeSuffix && region != null) { // update the global memstore size counter long suffixDataSize = getSegmentsKeySize(suffix); - long newDataSize = segment.keySize(); + long newDataSize = 0; + if(segment != null) newDataSize = segment.keySize(); long dataSizeDelta = suffixDataSize - newDataSize; long suffixHeapOverhead = getSegmentsHeapOverhead(suffix); - long newHeapOverhead = segment.heapOverhead(); + long newHeapOverhead = 0; + if(segment != null) newHeapOverhead = segment.heapOverhead(); long heapOverheadDelta = suffixHeapOverhead - newHeapOverhead; region.addMemstoreSize(new MemstoreSize(-dataSizeDelta, -heapOverheadDelta)); if (LOG.isDebugEnabled()) { - LOG.debug("Suffix data size: " + suffixDataSize + " compacted item data size: " + LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: " + newDataSize + ". Suffix heap overhead: " + suffixHeapOverhead - + " compacted item heap overhead: " + newHeapOverhead); + + " new segment heap overhead: " + newHeapOverhead); } } return true; @@ -207,7 +199,7 @@ public class CompactionPipeline { public List getSegments() { synchronized (pipeline){ - return new LinkedList(pipeline); + return new LinkedList<>(pipeline); } } @@ -260,12 +252,7 @@ public class CompactionPipeline { } } pipeline.removeAll(suffix); - pipeline.addLast(segment); - } - - private ImmutableSegment removeLast() { - version++; - return pipeline.removeLast(); + if(segment != null) pipeline.addLast(segment); } private boolean addFirst(ImmutableSegment segment) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java index 01160bf..ab751f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.LinkedList; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -36,10 +35,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public class VersionedSegmentsList { - private final LinkedList storeSegments; + private final List storeSegments; private final long version; - public VersionedSegmentsList(LinkedList storeSegments, long version) { + public VersionedSegmentsList(List storeSegments, long version) { this.storeSegments = storeSegments; this.version = version; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java index da8141b..82fe3cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java @@ -33,17 +33,18 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.regionserver.CompactingMemStore; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -75,11 +76,18 @@ public class TestAsyncTableGetMultiThreaded { @BeforeClass public static void setUp() throws Exception { + setUp(HColumnDescriptor.MemoryCompaction.NONE); + } + + protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) throws Exception { TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L); TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000); TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); + TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(memoryCompaction)); + TEST_UTIL.startMiniCluster(5); SPLIT_KEYS = new byte[8][]; for (int i = 111; i < 999; i += 111) { @@ -103,11 +111,13 @@ public class TestAsyncTableGetMultiThreaded { private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { while (!stop.get()) { - int i = ThreadLocalRandom.current().nextInt(COUNT); - assertEquals(i, - Bytes.toInt( - CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get() - .getValue(FAMILY, QUALIFIER))); + for (int i = 0; i < COUNT; i++) { + assertEquals(i, + Bytes.toInt( + CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))) + .get() + .getValue(FAMILY, QUALIFIER))); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java new file mode 100644 index 0000000..882e04f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncTableGetMultiThreadedWithBasicCompaction extends + TestAsyncTableGetMultiThreaded { + + @BeforeClass + public static void setUp() throws Exception { + setUp(HColumnDescriptor.MemoryCompaction.BASIC); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java new file mode 100644 index 0000000..3e91652 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncTableGetMultiThreadedWithEagerCompaction extends + TestAsyncTableGetMultiThreaded { + + @BeforeClass + public static void setUp() throws Exception { + setUp(HColumnDescriptor.MemoryCompaction.EAGER); + } + +} -- 2.10.1 (Apple Git-78)