diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 2fd2a14..07429a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -79,9 +79,9 @@ public class CompactionPipeline { } public List drain() { - int drainSize = pipeline.size(); - List result = new ArrayList(drainSize); synchronized (pipeline){ + int drainSize = pipeline.size(); + List result = new ArrayList(drainSize); version++; for(int i=0; i getSegments() { @@ -212,37 +214,48 @@ public class CompactionPipeline { } public long size() { - return pipeline.size(); + synchronized (pipeline){ + return pipeline.size(); + } } public List getScanners(long readPoint, long order) { - List scanners = new ArrayList(this.pipeline.size()); - for (Segment segment : this.pipeline) { - scanners.add(segment.getScanner(readPoint, order)); - // The order is the Segment ordinal - order--; - assert order>=0; // order should never be negative so this is just a sanity check + synchronized (this.pipeline) { + List scanners = new ArrayList(this.pipeline.size()); + // order should never be negative so this is just a sanity check + assert order >= this.pipeline.size(); + for (Segment segment : this.pipeline) { + scanners.add(segment.getScanner(readPoint, order)); + // The order is the Segment ordinal + order--; + } + return scanners; } - return scanners; } public long getMinSequenceId() { long minSequenceId = Long.MAX_VALUE; - if (!isEmpty()) { - minSequenceId = pipeline.getLast().getMinSequenceId(); + synchronized (pipeline){ + if (!isEmpty()) { + minSequenceId = pipeline.getLast().getMinSequenceId(); + } } return minSequenceId; } public MemstoreSize getTailSize() { - if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + synchronized (pipeline){ + if (isEmpty()) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + } } public MemstoreSize getPipelineSize() { - if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline)); + synchronized (pipeline){ + if (isEmpty()) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline)); + } } private void swapSuffix(List suffix, ImmutableSegment segment, @@ -259,17 +272,23 @@ public class CompactionPipeline { itemInSuffix.close(); } } - pipeline.removeAll(suffix); - pipeline.addLast(segment); + synchronized (pipeline){ + pipeline.removeAll(suffix); + pipeline.addLast(segment); + } } private ImmutableSegment removeLast() { version++; - return pipeline.removeLast(); + synchronized (pipeline){ + return pipeline.removeLast(); + } } private boolean addFirst(ImmutableSegment segment) { - pipeline.addFirst(segment); + synchronized (pipeline){ + pipeline.addFirst(segment); + } return true; } @@ -279,20 +298,22 @@ public class CompactionPipeline { // empty suffix is always valid return true; } - Iterator pipelineBackwardIterator = pipeline.descendingIterator(); - Iterator suffixBackwardIterator = suffix.descendingIterator(); - ImmutableSegment suffixCurrent; - ImmutableSegment pipelineCurrent; - for( ; suffixBackwardIterator.hasNext(); ) { - if(!pipelineBackwardIterator.hasNext()) { - // a suffix longer than pipeline is invalid - return false; - } - suffixCurrent = suffixBackwardIterator.next(); - pipelineCurrent = pipelineBackwardIterator.next(); - if(suffixCurrent != pipelineCurrent) { - // non-matching suffix - return false; + synchronized (pipeline){ + Iterator pipelineBackwardIterator = pipeline.descendingIterator(); + Iterator suffixBackwardIterator = suffix.descendingIterator(); + ImmutableSegment suffixCurrent; + ImmutableSegment pipelineCurrent; + for( ; suffixBackwardIterator.hasNext(); ) { + if(!pipelineBackwardIterator.hasNext()) { + // a suffix longer than pipeline is invalid + return false; + } + suffixCurrent = suffixBackwardIterator.next(); + pipelineCurrent = pipelineBackwardIterator.next(); + if(suffixCurrent != pipelineCurrent) { + // non-matching suffix + return false; + } } } // suffix matches pipeline suffix