diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 1cd30dd..c061b62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -287,7 +287,8 @@ public class CompactingMemStore extends AbstractMemStore { public List getScanners(long readPt) throws IOException { int order = 1; // for active segment - order += pipeline.size(); // for all segments in the pipeline + List pipelineScanners = pipeline.getScanners(readPt, -1); + order += pipelineScanners.size(); // for all segments in the pipeline order += snapshot.getNumOfSegments(); // for all segments in the snapshot // TODO: check alternatives to using this order // The list of elements in pipeline + the active element + the snapshot segments @@ -295,8 +296,12 @@ public class CompactingMemStore extends AbstractMemStore { List list = new ArrayList(order); list.add(this.active.getScanner(readPt, order)); order--; - list.addAll(pipeline.getScanners(readPt,order)); - order -= pipeline.size(); + for (KeyValueScanner s : pipelineScanners) { + SegmentScanner ss = (SegmentScanner) s; + ss.setScannerOrder(order--); + } + LOG.debug("added " + pipelineScanners.size() + " scanners from pipeline"); + list.addAll(pipelineScanners); list.addAll(snapshot.getScanners(readPt,order)); return Collections.singletonList(new MemStoreScanner(getComparator(), list)); } @@ -399,18 +404,26 @@ public class CompactingMemStore extends AbstractMemStore { } private void pushTailToSnapshot() { - ImmutableSegment tail = pipeline.pullTail(); - if (!tail.isEmpty()) { - this.snapshot = tail; - } + VersionedSegmentsList segments = pipeline.getVersionedTail(); + pushToSnapshot(segments.getStoreSegments()); + pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now } private void pushPipelineToSnapshot() { - List segments = pipeline.drain(); - if (!segments.isEmpty()) { - this.snapshot = - SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments); + VersionedSegmentsList segments = pipeline.getVersionedList(); + pushToSnapshot(segments.getStoreSegments()); + pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now + } + + private void pushToSnapshot(List segments) { + if(segments.isEmpty()) return; + if(segments.size() == 1 && !segments.get(0).isEmpty()) { + this.snapshot = segments.get(0); + return; } + // else craete composite snapshot + this.snapshot = + SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments); } private RegionServicesForStores getRegionServices() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 2fd2a14..0d425d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -19,16 +19,17 @@ package org.apache.hadoop.hbase.regionserver; import java.util.ArrayList; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.Pair; /** * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. @@ -49,15 +50,12 @@ public class CompactionPipeline { public final static long ENTRY_OVERHEAD = ClassSize.LINKEDLIST_ENTRY; private final RegionServicesForStores region; - private LinkedList pipeline; + private CopyOnWriteArrayList pipeline; private long version; - private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() - .createImmutableSegment((CellComparator) null); - public CompactionPipeline(RegionServicesForStores region) { this.region = region; - this.pipeline = new LinkedList(); + this.pipeline = new CopyOnWriteArrayList(); this.version = 0; } @@ -69,44 +67,37 @@ public class CompactionPipeline { } } - public ImmutableSegment pullTail() { + public VersionedSegmentsList getVersionedList() { synchronized (pipeline){ - if(pipeline.isEmpty()) { - return EMPTY_MEM_STORE_SEGMENT; - } - return removeLast(); + List segmentList = new ArrayList(pipeline); + return new VersionedSegmentsList(segmentList, version); } } - - public List drain() { - int drainSize = pipeline.size(); - List result = new ArrayList(drainSize); + public VersionedSegmentsList getVersionedTail() { synchronized (pipeline){ - version++; - for(int i=0; i segmentList = new ArrayList<>(); + ListIterator iter = pipeline.listIterator(); + ImmutableSegment last = null; + while (iter.hasNext()) { + last = iter.next(); } - return result; - } - } - - public VersionedSegmentsList getVersionedList() { - synchronized (pipeline){ - LinkedList segmentList = new LinkedList(pipeline); - VersionedSegmentsList res = new VersionedSegmentsList(segmentList, version); - return res; + if (last != null) { + segmentList.add(0, last); + } + return new VersionedSegmentsList(segmentList, version); } } /** - * Swaps the versioned list at the tail of the pipeline with the new compacted segment. - * Swapping only if there were no changes to the suffix of the list while it was compacted. - * @param versionedList tail of the pipeline that was compacted - * @param segment new compacted segment + * Swaps the versioned list at the tail of the pipeline with a new segment. + * Swapping only if there were no changes to the suffix of the list since the version list was + * created. + * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline + * @param segment new segment to replace the suffix. Can be null if the suffix just needs to be + * removed. * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out * During index merge op this will be false and for compaction it will be true. - * @return true iff swapped tail with new compacted segment + * @return true iff swapped tail with new segment */ public boolean swap( VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) { @@ -120,31 +111,49 @@ public class CompactionPipeline { } suffix = versionedList.getStoreSegments(); if (LOG.isDebugEnabled()) { - LOG.debug("Swapping pipeline suffix with compacted item. " + int count = 0; + if(segment != null) { + segment.getCellsCount(); + } + LOG.debug("Swapping pipeline suffix. " + "Just before the swap the number of segments in pipeline is:" + versionedList.getStoreSegments().size() - + ", and the number of cells in new segment is:" + segment.getCellsCount()); + + ", and the number of cells in new segment is:" + count); } - swapSuffix(suffix,segment, closeSuffix); + swapSuffix(suffix, segment, closeSuffix); } - if (region != null) { + if (closeSuffix && region != null) { // update the global memstore size counter long suffixDataSize = getSegmentsKeySize(suffix); - long newDataSize = segment.keySize(); + long newDataSize = 0; + if(segment != null) newDataSize = segment.keySize(); long dataSizeDelta = suffixDataSize - newDataSize; long suffixHeapOverhead = getSegmentsHeapOverhead(suffix); - long newHeapOverhead = segment.heapOverhead(); + long newHeapOverhead = 0; + if(segment != null) newHeapOverhead = segment.heapOverhead(); long heapOverheadDelta = suffixHeapOverhead - newHeapOverhead; region.addMemstoreSize(new MemstoreSize(-dataSizeDelta, -heapOverheadDelta)); if (LOG.isDebugEnabled()) { - LOG.debug("Suffix data size: " + suffixDataSize + " compacted item data size: " + LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: " + newDataSize + ". Suffix heap overhead: " + suffixHeapOverhead - + " compacted item heap overhead: " + newHeapOverhead); + + " new segment heap overhead: " + newHeapOverhead); } } return true; } + private static Pair getSegmentsHeapOverheadAndKeySize( + ListIterator iter) { + long overHead = 0, size = 0; + Segment segment; + while (iter.hasNext()) { + segment = iter.next(); + overHead += segment.heapOverhead(); + size += segment.keySize(); + } + return new Pair<>(overHead, size); + } + private static long getSegmentsHeapOverhead(List list) { long res = 0; for (Segment segment : list) { @@ -183,7 +192,9 @@ public class CompactionPipeline { return false; } - for (ImmutableSegment s : pipeline) { + ListIterator iter = pipeline.listIterator(); + while (iter.hasNext()) { + ImmutableSegment s = (ImmutableSegment) iter.next(); // remember the old size in case this segment is going to be flatten MemstoreSize memstoreSize = new MemstoreSize(); if (s.flatten(memstoreSize)) { @@ -207,7 +218,7 @@ public class CompactionPipeline { public List getSegments() { synchronized (pipeline){ - return new LinkedList(pipeline); + return new LinkedList<>(pipeline); } } @@ -217,11 +228,12 @@ public class CompactionPipeline { public List getScanners(long readPoint, long order) { List scanners = new ArrayList(this.pipeline.size()); - for (Segment segment : this.pipeline) { + ListIterator iter = pipeline.listIterator(); + while (iter.hasNext()) { + Segment segment = iter.next(); scanners.add(segment.getScanner(readPoint, order)); // The order is the Segment ordinal order--; - assert order>=0; // order should never be negative so this is just a sanity check } return scanners; } @@ -229,20 +241,32 @@ public class CompactionPipeline { public long getMinSequenceId() { long minSequenceId = Long.MAX_VALUE; - if (!isEmpty()) { - minSequenceId = pipeline.getLast().getMinSequenceId(); + ListIterator iter = pipeline.listIterator(); + ImmutableSegment last = null; + while (iter.hasNext()) { + last = iter.next(); + } + if (last != null) { + minSequenceId = last.getMinSequenceId(); } return minSequenceId; } public MemstoreSize getTailSize() { - if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + ListIterator iter = pipeline.listIterator(); + ImmutableSegment last = null; + while (iter.hasNext()) { + last = iter.next(); + } + if (last == null) return MemstoreSize.EMPTY_SIZE; + MemstoreSize sz = new MemstoreSize(last.keySize(), last.heapOverhead()); + return sz; } public MemstoreSize getPipelineSize() { if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline)); + Pair pair = getSegmentsHeapOverheadAndKeySize(pipeline.listIterator()); + return new MemstoreSize(pair.getSecond(), pair.getFirst()); } private void swapSuffix(List suffix, ImmutableSegment segment, @@ -260,43 +284,11 @@ public class CompactionPipeline { } } pipeline.removeAll(suffix); - pipeline.addLast(segment); - } - - private ImmutableSegment removeLast() { - version++; - return pipeline.removeLast(); + if (segment != null) pipeline.add(segment); } private boolean addFirst(ImmutableSegment segment) { - pipeline.addFirst(segment); + pipeline.add(0, segment); return true; } - - // debug method - private boolean validateSuffixList(LinkedList suffix) { - if(suffix.isEmpty()) { - // empty suffix is always valid - return true; - } - Iterator pipelineBackwardIterator = pipeline.descendingIterator(); - Iterator suffixBackwardIterator = suffix.descendingIterator(); - ImmutableSegment suffixCurrent; - ImmutableSegment pipelineCurrent; - for( ; suffixBackwardIterator.hasNext(); ) { - if(!pipelineBackwardIterator.hasNext()) { - // a suffix longer than pipeline is invalid - return false; - } - suffixCurrent = suffixBackwardIterator.next(); - pipelineCurrent = pipelineBackwardIterator.next(); - if(suffixCurrent != pipelineCurrent) { - // non-matching suffix - return false; - } - } - // suffix matches pipeline suffix - return true; - } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 7803f7d..bda2100 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -82,6 +82,10 @@ public class SegmentScanner implements KeyValueScanner { } } + void setScannerOrder(long scannerOrder) { + this.scannerOrder = scannerOrder; + } + /** * Look at the next Cell in this scanner, but do not iterate the scanner * @return the currently observed Cell diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java index 01160bf..ab751f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.LinkedList; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -36,10 +35,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public class VersionedSegmentsList { - private final LinkedList storeSegments; + private final List storeSegments; private final long version; - public VersionedSegmentsList(LinkedList storeSegments, long version) { + public VersionedSegmentsList(List storeSegments, long version) { this.storeSegments = storeSegments; this.version = version; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java index da8141b..82fe3cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java @@ -33,17 +33,18 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.regionserver.CompactingMemStore; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -75,11 +76,18 @@ public class TestAsyncTableGetMultiThreaded { @BeforeClass public static void setUp() throws Exception { + setUp(HColumnDescriptor.MemoryCompaction.NONE); + } + + protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) throws Exception { TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L); TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000); TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); + TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(memoryCompaction)); + TEST_UTIL.startMiniCluster(5); SPLIT_KEYS = new byte[8][]; for (int i = 111; i < 999; i += 111) { @@ -103,11 +111,13 @@ public class TestAsyncTableGetMultiThreaded { private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { while (!stop.get()) { - int i = ThreadLocalRandom.current().nextInt(COUNT); - assertEquals(i, - Bytes.toInt( - CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get() - .getValue(FAMILY, QUALIFIER))); + for (int i = 0; i < COUNT; i++) { + assertEquals(i, + Bytes.toInt( + CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))) + .get() + .getValue(FAMILY, QUALIFIER))); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index bd1ec5c..383b816 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -3957,6 +3957,10 @@ public class TestHRegion { while (scanner.next(res)) ; if (!res.isEmpty() || !previousEmpty || i > compactInterval) { + if (expectedCount != res.size()) { + LOG.debug("wrong row count. compactInterval " + compactInterval + " flushInterval " + + flushInterval); + } assertEquals("i=" + i, expectedCount, res.size()); long timestamp = res.get(0).getTimestamp(); assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,