From a52b9d1a9e8a8bb046948d66ded6b48c0775fa02 Mon Sep 17 00:00:00 2001 From: eshcar Date: Tue, 3 Jan 2017 15:28:10 +0200 Subject: [PATCH] HBASE-17373: Fixing bug in moving segments from compaction pipeline to snapshot --- .../hbase/regionserver/CompactingMemStore.java | 19 ++++-- .../hbase/regionserver/CompactionPipeline.java | 69 +++++++++++----------- .../hbase/regionserver/VersionedSegmentsList.java | 5 +- .../client/TestAsyncTableGetMultiThreaded.java | 22 +++++-- ...ncTableGetMultiThreadedWithBasicCompaction.java | 35 +++++++++++ ...ncTableGetMultiThreadedWithEagerCompaction.java | 35 +++++++++++ 6 files changed, 137 insertions(+), 48 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index f8192a2..e1289f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -213,8 +213,10 @@ public class CompactingMemStore extends AbstractMemStore { } } + // the getSegments() method is used for tests only + @VisibleForTesting @Override - public List getSegments() { + protected List getSegments() { List pipelineList = pipeline.getSegments(); List list = new ArrayList(pipelineList.size() + 2); list.add(this.active); @@ -266,6 +268,7 @@ public class CompactingMemStore extends AbstractMemStore { long order = pipelineList.size(); // The list of elements in pipeline + the active element + the snapshot segment // TODO : This will change when the snapshot is made of more than one element + // The order is the Segment ordinal List list = new ArrayList(pipelineList.size() + 2); list.add(this.active.getScanner(readPt, order + 1)); for (Segment item : pipelineList) { @@ -374,10 +377,18 @@ public class CompactingMemStore extends AbstractMemStore { } private void pushTailToSnapshot() { - ImmutableSegment tail = pipeline.pullTail(); - if (!tail.isEmpty()) { - this.snapshot = tail; + VersionedSegmentsList segments = pipeline.getVersionedTail(); + pushToSnapshot(segments.getStoreSegments()); + pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now + } + + private void pushToSnapshot(List segments) { + if(segments.isEmpty()) return; + if(segments.size() == 1 && !segments.get(0).isEmpty()) { + this.snapshot = segments.get(0); + return; } + // TODO else craete composite snapshot } private RegionServicesForStores getRegionServices() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 6676170..9d5df77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -45,18 +46,14 @@ public class CompactionPipeline { public final static long FIXED_OVERHEAD = ClassSize .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST; - public final static long ENTRY_OVERHEAD = ClassSize.LINKEDLIST_ENTRY; private final RegionServicesForStores region; private LinkedList pipeline; private long version; - private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() - .createImmutableSegment((CellComparator) null); - public CompactionPipeline(RegionServicesForStores region) { this.region = region; - this.pipeline = new LinkedList(); + this.pipeline = new LinkedList<>(); this.version = 0; } @@ -68,31 +65,33 @@ public class CompactionPipeline { } } - public ImmutableSegment pullTail() { + public VersionedSegmentsList getVersionedList() { synchronized (pipeline){ - if(pipeline.isEmpty()) { - return EMPTY_MEM_STORE_SEGMENT; - } - return removeLast(); + List segmentList = new ArrayList<>(pipeline); + return new VersionedSegmentsList(segmentList, version); } } - public VersionedSegmentsList getVersionedList() { + public VersionedSegmentsList getVersionedTail() { synchronized (pipeline){ - LinkedList segmentList = new LinkedList(pipeline); - VersionedSegmentsList res = new VersionedSegmentsList(segmentList, version); - return res; + List segmentList = new ArrayList<>(); + if(!pipeline.isEmpty()) { + segmentList.add(0, pipeline.getLast()); + } + return new VersionedSegmentsList(segmentList, version); } } /** - * Swaps the versioned list at the tail of the pipeline with the new compacted segment. - * Swapping only if there were no changes to the suffix of the list while it was compacted. - * @param versionedList tail of the pipeline that was compacted - * @param segment new compacted segment + * Swaps the versioned list at the tail of the pipeline with a new segment. + * Swapping only if there were no changes to the suffix of the list since the version list was + * created. + * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline + * @param segment new segment to replace the suffix. Can be null if the suffix just needs to be + * removed. * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out * During index merge op this will be false and for compaction it will be true. - * @return true iff swapped tail with new compacted segment + * @return true iff swapped tail with new segment */ public boolean swap( VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) { @@ -106,26 +105,32 @@ public class CompactionPipeline { } suffix = versionedList.getStoreSegments(); if (LOG.isDebugEnabled()) { - LOG.debug("Swapping pipeline suffix with compacted item. " + int count = 0; + if(segment != null) { + count = segment.getCellsCount(); + } + LOG.debug("Swapping pipeline suffix. " + "Just before the swap the number of segments in pipeline is:" + versionedList.getStoreSegments().size() - + ", and the number of cells in new segment is:" + segment.getCellsCount()); + + ", and the number of cells in new segment is:" + count); } - swapSuffix(suffix,segment, closeSuffix); + swapSuffix(suffix, segment, closeSuffix); } - if (region != null) { + if (closeSuffix && region != null) { // update the global memstore size counter long suffixDataSize = getSegmentsKeySize(suffix); - long newDataSize = segment.keySize(); + long newDataSize = 0; + if(segment != null) newDataSize = segment.keySize(); long dataSizeDelta = suffixDataSize - newDataSize; long suffixHeapOverhead = getSegmentsHeapOverhead(suffix); - long newHeapOverhead = segment.heapOverhead(); + long newHeapOverhead = 0; + if(segment != null) newHeapOverhead = segment.heapOverhead(); long heapOverheadDelta = suffixHeapOverhead - newHeapOverhead; region.addMemstoreSize(new MemstoreSize(-dataSizeDelta, -heapOverheadDelta)); if (LOG.isDebugEnabled()) { - LOG.debug("Suffix data size: " + suffixDataSize + " compacted item data size: " + LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: " + newDataSize + ". Suffix heap overhead: " + suffixHeapOverhead - + " compacted item heap overhead: " + newHeapOverhead); + + " new segment heap overhead: " + newHeapOverhead); } } return true; @@ -193,8 +198,7 @@ public class CompactionPipeline { public List getSegments() { synchronized (pipeline){ - List res = new LinkedList(pipeline); - return res; + return new LinkedList<>(pipeline); } } @@ -230,12 +234,7 @@ public class CompactionPipeline { } } pipeline.removeAll(suffix); - pipeline.addLast(segment); - } - - private ImmutableSegment removeLast() { - version++; - return pipeline.removeLast(); + if(segment != null) pipeline.addLast(segment); } private boolean addFirst(ImmutableSegment segment) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java index 01160bf..ab751f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.LinkedList; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -36,10 +35,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public class VersionedSegmentsList { - private final LinkedList storeSegments; + private final List storeSegments; private final long version; - public VersionedSegmentsList(LinkedList storeSegments, long version) { + public VersionedSegmentsList(List storeSegments, long version) { this.storeSegments = storeSegments; this.version = version; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java index da8141b..82fe3cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java @@ -33,17 +33,18 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.regionserver.CompactingMemStore; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -75,11 +76,18 @@ public class TestAsyncTableGetMultiThreaded { @BeforeClass public static void setUp() throws Exception { + setUp(HColumnDescriptor.MemoryCompaction.NONE); + } + + protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) throws Exception { TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L); TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000); TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); + TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(memoryCompaction)); + TEST_UTIL.startMiniCluster(5); SPLIT_KEYS = new byte[8][]; for (int i = 111; i < 999; i += 111) { @@ -103,11 +111,13 @@ public class TestAsyncTableGetMultiThreaded { private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { while (!stop.get()) { - int i = ThreadLocalRandom.current().nextInt(COUNT); - assertEquals(i, - Bytes.toInt( - CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get() - .getValue(FAMILY, QUALIFIER))); + for (int i = 0; i < COUNT; i++) { + assertEquals(i, + Bytes.toInt( + CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))) + .get() + .getValue(FAMILY, QUALIFIER))); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java new file mode 100644 index 0000000..eb07875 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncTableGetMultiThreadedWithBasicCompaction extends + TestAsyncTableGetMultiThreaded { + + @BeforeClass + public static void setUp() throws Exception { + setUp(HColumnDescriptor.MemoryCompaction.BASIC); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java new file mode 100644 index 0000000..6fe8045 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncTableGetMultiThreadedWithEagerCompaction extends + TestAsyncTableGetMultiThreaded { + + @BeforeClass + public static void setUp() throws Exception { + setUp(HColumnDescriptor.MemoryCompaction.EAGER); + } + +} -- 2.10.1 (Apple Git-78)