diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 5c31122..40f0c6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -287,7 +287,8 @@ public class CompactingMemStore extends AbstractMemStore { public List getScanners(long readPt) throws IOException { int order = 1; // for active segment - order += pipeline.size(); // for all segments in the pipeline + List pipelineScanners = pipeline.getScanners(readPt); + order += pipelineScanners.size(); // for all segments in the pipeline order += snapshot.getNumOfSegments(); // for all segments in the snapshot // TODO: check alternatives to using this order // The list of elements in pipeline + the active element + the snapshot segments @@ -295,8 +296,11 @@ public class CompactingMemStore extends AbstractMemStore { List list = new ArrayList(order); list.add(this.active.getScanner(readPt, order)); order--; - list.addAll(pipeline.getScanners(readPt,order)); - order -= pipeline.size(); + for (KeyValueScanner s : pipelineScanners) { + SegmentScanner ss = (SegmentScanner) s; + ss.setScannerOrder(order--); + } + list.addAll(pipelineScanners); list.addAll(snapshot.getScanners(readPt,order)); return Collections.singletonList(new MemStoreScanner(getComparator(), list)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index a8afef8..f0b45f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.regionserver; import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -45,41 +45,57 @@ public class CompactionPipeline { public final static long FIXED_OVERHEAD = ClassSize .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); - public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST; - public final static long ENTRY_OVERHEAD = ClassSize.LINKEDLIST_ENTRY; + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ARRAY; private final RegionServicesForStores region; - private LinkedList pipeline; + private volatile ImmutableSegment[] pipeline; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private long version; public CompactionPipeline(RegionServicesForStores region) { this.region = region; - this.pipeline = new LinkedList(); + this.pipeline = new ImmutableSegment[0]; this.version = 0; } public boolean pushHead(MutableSegment segment) { ImmutableSegment immutableSegment = SegmentFactory.instance(). createImmutableSegment(segment); - synchronized (pipeline){ + lock.writeLock().lock(); + try { return addFirst(immutableSegment); + } finally { + lock.writeLock().unlock(); } } public VersionedSegmentsList getVersionedList() { - synchronized (pipeline){ - List segmentList = new ArrayList<>(pipeline); + lock.readLock().lock(); + try { + List segmentList = Arrays.asList(pipeline); return new VersionedSegmentsList(segmentList, version); + } finally { + lock.readLock().unlock(); } } + ImmutableSegment getPipelineTail() { + ImmutableSegment[] a = pipeline; + if (a.length == 0) return null; + return a[a.length-1]; + } + public VersionedSegmentsList getVersionedTail() { - synchronized (pipeline){ + lock.readLock().lock(); + try { List segmentList = new ArrayList<>(); - if(!pipeline.isEmpty()) { - segmentList.add(0, pipeline.getLast()); + ImmutableSegment last = getPipelineTail(); + if (last != null) { + segmentList.add(0, last); } return new VersionedSegmentsList(segmentList, version); + } finally { + lock.readLock().unlock(); } } @@ -100,7 +116,8 @@ public class CompactionPipeline { return false; } List suffix; - synchronized (pipeline){ + lock.writeLock().lock(); + try { if(versionedList.getVersion() != version) { return false; } @@ -116,6 +133,8 @@ public class CompactionPipeline { + ", and the number of cells in new segment is:" + count); } swapSuffix(suffix, segment, closeSuffix); + } finally { + lock.writeLock().unlock(); } if (closeSuffix && region != null) { // update the global memstore size counter @@ -169,7 +188,8 @@ public class CompactionPipeline { return false; } - synchronized (pipeline){ + lock.readLock().lock(); + try { if(requesterVersion != version) { LOG.warn("Segment flattening failed, because versions do not match"); return false; @@ -186,7 +206,8 @@ public class CompactionPipeline { return true; } } - + } finally { + lock.readLock().unlock(); } // do not update the global memstore size counter and do not increase the version, // because all the cells remain in place @@ -194,26 +215,23 @@ public class CompactionPipeline { } public boolean isEmpty() { - return pipeline.isEmpty(); + return pipeline.length == 0; } public List getSegments() { - synchronized (pipeline){ - return new LinkedList<>(pipeline); - } + return Arrays.asList(pipeline); } public long size() { - return pipeline.size(); + return pipeline.length; } - public List getScanners(long readPoint, long order) { - List scanners = new ArrayList(this.pipeline.size()); - for (Segment segment : this.pipeline) { - scanners.add(segment.getScanner(readPoint, order)); - // The order is the Segment ordinal - order--; - assert order>=0; // order should never be negative so this is just a sanity check + public List getScanners(long readPoint) { + ImmutableSegment[] snapshot = pipeline; + List scanners = new ArrayList(snapshot.length); + for (Segment segment : snapshot) { + // The order is the Segment ordinal which would be set later + scanners.add(segment.getScanner(readPoint, -1)); } return scanners; } @@ -221,20 +239,91 @@ public class CompactionPipeline { public long getMinSequenceId() { long minSequenceId = Long.MAX_VALUE; - if (!isEmpty()) { - minSequenceId = pipeline.getLast().getMinSequenceId(); + ImmutableSegment last = getPipelineTail(); + if (last != null) { + minSequenceId = last.getMinSequenceId(); } return minSequenceId; } public MemstoreSize getTailSize() { - if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + ImmutableSegment last = getPipelineTail(); + if (last == null) return MemstoreSize.EMPTY_SIZE; + MemstoreSize sz = new MemstoreSize(last.keySize(), last.heapOverhead()); + return sz; } public MemstoreSize getPipelineSize() { if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline)); + ImmutableSegment[] snapshot = pipeline; + long overHead = 0, size = 0; + for (Segment segment : snapshot) { + overHead += segment.heapOverhead(); + size += segment.keySize(); + } + return new MemstoreSize(size, overHead); + } + + /* + * Not thread-safe + * @param c collection of ImmutableSegment + */ + boolean removeAll(Collection c) { + ImmutableSegment[] elements = pipeline; + int len = elements.length; + if (len != 0) { + // temp array holds those elements to be kept + int len2 = 0; + ImmutableSegment[] temp = new ImmutableSegment[len]; + for (int i = 0; i < len; ++i) { + ImmutableSegment element = elements[i]; + if (!c.contains(element)) { + temp[len2++] = element; + } + } + if (len2 != len) { + pipeline = Arrays.copyOf(temp, len2); + return true; + } + } + return false; + } + + /* + * Not thread-safe + * Adds ImmutableSegment to the tail + */ + void add(ImmutableSegment segment) { + ImmutableSegment[] elements = pipeline; + int len = elements.length; + ImmutableSegment[] newElements = Arrays.copyOf(elements, len + 1); + newElements[len] = segment; + pipeline = newElements; + } + + /* + * Not thread-safe + * @param index index at which segment is to be added + * @param segment + */ + void add(int index, ImmutableSegment segment) { + ImmutableSegment[] elements = pipeline; + int len = elements.length; + if (index > len || index < 0) { + throw new IndexOutOfBoundsException("Index: " + index + ", Size: "+len); + } + ImmutableSegment[] newElements; + int numToMove = len - index; + if (numToMove == 0) { + newElements = Arrays.copyOf(elements, len + 1); + } + else { + newElements = new ImmutableSegment[len + 1]; + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index, newElements, index + 1, numToMove); + } + newElements[index] = segment; + pipeline = newElements; } private void swapSuffix(List suffix, ImmutableSegment segment, @@ -251,39 +340,12 @@ public class CompactionPipeline { itemInSuffix.close(); } } - pipeline.removeAll(suffix); - if(segment != null) pipeline.addLast(segment); + removeAll(suffix); + if(segment != null) add(segment); } private boolean addFirst(ImmutableSegment segment) { - pipeline.addFirst(segment); - return true; - } - - // debug method - private boolean validateSuffixList(LinkedList suffix) { - if(suffix.isEmpty()) { - // empty suffix is always valid - return true; - } - Iterator pipelineBackwardIterator = pipeline.descendingIterator(); - Iterator suffixBackwardIterator = suffix.descendingIterator(); - ImmutableSegment suffixCurrent; - ImmutableSegment pipelineCurrent; - for( ; suffixBackwardIterator.hasNext(); ) { - if(!pipelineBackwardIterator.hasNext()) { - // a suffix longer than pipeline is invalid - return false; - } - suffixCurrent = suffixBackwardIterator.next(); - pipelineCurrent = pipelineBackwardIterator.next(); - if(suffixCurrent != pipelineCurrent) { - // non-matching suffix - return false; - } - } - // suffix matches pipeline suffix + add(0, segment); return true; } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 7803f7d..bda2100 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -82,6 +82,10 @@ public class SegmentScanner implements KeyValueScanner { } } + void setScannerOrder(long scannerOrder) { + this.scannerOrder = scannerOrder; + } + /** * Look at the next Cell in this scanner, but do not iterate the scanner * @return the currently observed Cell diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 6e8f831..762ca44 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -324,7 +324,6 @@ public class TestHeapSize { expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(CompactionPipeline.class, false); - expected += ClassSize.estimateBase(LinkedList.class, false); expected += ClassSize.estimateBase(MemStoreCompactor.class, false); expected += ClassSize.estimateBase(AtomicBoolean.class, false); if (expected != actual) { @@ -332,7 +331,6 @@ public class TestHeapSize { ClassSize.estimateBase(AtomicBoolean.class, true); ClassSize.estimateBase(AtomicBoolean.class, true); ClassSize.estimateBase(CompactionPipeline.class, true); - ClassSize.estimateBase(LinkedList.class, true); ClassSize.estimateBase(MemStoreCompactor.class, true); ClassSize.estimateBase(AtomicBoolean.class, true); assertEquals(expected, actual);