diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 2fd2a14..fb112c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -65,7 +65,8 @@ public class CompactionPipeline { ImmutableSegment immutableSegment = SegmentFactory.instance(). createImmutableSegment(segment); synchronized (pipeline){ - return addFirst(immutableSegment); + pipeline.addFirst(immutableSegment); + return true; } } @@ -79,9 +80,9 @@ public class CompactionPipeline { } public List drain() { - int drainSize = pipeline.size(); - List result = new ArrayList(drainSize); synchronized (pipeline){ + int drainSize = pipeline.size(); + List result = new ArrayList(drainSize); version++; for(int i=0; i getSegments() { @@ -212,37 +215,48 @@ public class CompactionPipeline { } public long size() { - return pipeline.size(); + synchronized (pipeline){ + return pipeline.size(); + } } public List getScanners(long readPoint, long order) { - List scanners = new ArrayList(this.pipeline.size()); - for (Segment segment : this.pipeline) { - scanners.add(segment.getScanner(readPoint, order)); - // The order is the Segment ordinal - order--; - assert order>=0; // order should never be negative so this is just a sanity check + synchronized (this.pipeline) { + List scanners = new ArrayList(this.pipeline.size()); + // order should never be negative so this is just a sanity check + assert order >= this.pipeline.size(); + for (Segment segment : this.pipeline) { + scanners.add(segment.getScanner(readPoint, order)); + // The order is the Segment ordinal + order--; + } + return scanners; } - return scanners; } public long getMinSequenceId() { long minSequenceId = Long.MAX_VALUE; - if (!isEmpty()) { - minSequenceId = pipeline.getLast().getMinSequenceId(); + synchronized (pipeline){ + if (!isEmpty()) { + minSequenceId = pipeline.getLast().getMinSequenceId(); + } } return minSequenceId; } public MemstoreSize getTailSize() { - if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + synchronized (pipeline){ + if (isEmpty()) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + } } public MemstoreSize getPipelineSize() { - if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline)); + synchronized (pipeline){ + if (isEmpty()) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline)); + } } private void swapSuffix(List suffix, ImmutableSegment segment, @@ -268,35 +282,4 @@ public class CompactionPipeline { return pipeline.removeLast(); } - private boolean addFirst(ImmutableSegment segment) { - pipeline.addFirst(segment); - return true; - } - - // debug method - private boolean validateSuffixList(LinkedList suffix) { - if(suffix.isEmpty()) { - // empty suffix is always valid - return true; - } - Iterator pipelineBackwardIterator = pipeline.descendingIterator(); - Iterator suffixBackwardIterator = suffix.descendingIterator(); - ImmutableSegment suffixCurrent; - ImmutableSegment pipelineCurrent; - for( ; suffixBackwardIterator.hasNext(); ) { - if(!pipelineBackwardIterator.hasNext()) { - // a suffix longer than pipeline is invalid - return false; - } - suffixCurrent = suffixBackwardIterator.next(); - pipelineCurrent = pipelineBackwardIterator.next(); - if(suffixCurrent != pipelineCurrent) { - // non-matching suffix - return false; - } - } - // suffix matches pipeline suffix - return true; - } - }