diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 2fd2a14..c8d1c9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,6 +55,7 @@ public class CompactionPipeline { private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() .createImmutableSegment((CellComparator) null); + final private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public CompactionPipeline(RegionServicesForStores region) { this.region = region; @@ -64,38 +66,51 @@ public class CompactionPipeline { public boolean pushHead(MutableSegment segment) { ImmutableSegment immutableSegment = SegmentFactory.instance(). createImmutableSegment(segment); - synchronized (pipeline){ - return addFirst(immutableSegment); + lock.writeLock().lock(); + try { + pipeline.addFirst(immutableSegment); + return true; + } finally { + lock.writeLock().unlock(); } } public ImmutableSegment pullTail() { - synchronized (pipeline){ - if(pipeline.isEmpty()) { + lock.readLock().lock(); + try { + if (pipeline.isEmpty()) { return EMPTY_MEM_STORE_SEGMENT; } return removeLast(); + } finally { + lock.readLock().unlock(); } } public List drain() { - int drainSize = pipeline.size(); - List result = new ArrayList(drainSize); - synchronized (pipeline){ + lock.writeLock().lock(); + try { + int drainSize = pipeline.size(); + List result = new ArrayList(drainSize); version++; for(int i=0; i segmentList = new LinkedList(pipeline); VersionedSegmentsList res = new VersionedSegmentsList(segmentList, version); return res; + } finally { + lock.readLock().unlock(); } } @@ -114,7 +129,8 @@ public class CompactionPipeline { return false; } List suffix; - synchronized (pipeline){ + lock.writeLock().lock(); + try { if(versionedList.getVersion() != version) { return false; } @@ -126,6 +142,8 @@ public class CompactionPipeline { + ", and the number of cells in new segment is:" + segment.getCellsCount()); } swapSuffix(suffix,segment, closeSuffix); + } finally { + lock.writeLock().unlock(); } if (region != null) { // update the global memstore size counter @@ -177,7 +195,8 @@ public class CompactionPipeline { return false; } - synchronized (pipeline){ + lock.readLock().lock(); + try { if(requesterVersion != version) { LOG.warn("Segment flattening failed, because versions do not match"); return false; @@ -195,6 +214,8 @@ public class CompactionPipeline { } } + } finally { + lock.readLock().unlock(); } // do not update the global memstore size counter and do not increase the version, // because all the cells remain in place @@ -202,47 +223,81 @@ public class CompactionPipeline { } public boolean isEmpty() { - return pipeline.isEmpty(); + lock.readLock().lock(); + try { + return pipeline.isEmpty(); + } finally { + lock.readLock().unlock(); + } } public List getSegments() { - synchronized (pipeline){ + lock.readLock().lock(); + try { return new LinkedList(pipeline); + } finally { + lock.readLock().unlock(); } } public long size() { - return pipeline.size(); + lock.readLock().lock(); + try { + return pipeline.size(); + } finally { + lock.readLock().unlock(); + } } public List getScanners(long readPoint, long order) { - List scanners = new ArrayList(this.pipeline.size()); - for (Segment segment : this.pipeline) { - scanners.add(segment.getScanner(readPoint, order)); - // The order is the Segment ordinal - order--; - assert order>=0; // order should never be negative so this is just a sanity check + lock.readLock().lock(); + try { + List scanners = new ArrayList(this.pipeline.size()); + // order should never be negative so this is just a sanity check + assert order >= this.pipeline.size(); + for (Segment segment : this.pipeline) { + scanners.add(segment.getScanner(readPoint, order)); + // The order is the Segment ordinal + order--; + } + return scanners; + } finally { + lock.readLock().unlock(); } - return scanners; } public long getMinSequenceId() { long minSequenceId = Long.MAX_VALUE; - if (!isEmpty()) { - minSequenceId = pipeline.getLast().getMinSequenceId(); + lock.readLock().lock(); + try { + if (!isEmpty()) { + minSequenceId = pipeline.getLast().getMinSequenceId(); + } + } finally { + lock.readLock().unlock(); } return minSequenceId; } public MemstoreSize getTailSize() { - if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + lock.readLock().lock(); + try { + if (isEmpty()) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + } finally { + lock.readLock().unlock(); + } } public MemstoreSize getPipelineSize() { - if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline)); + lock.readLock().lock(); + try { + if (isEmpty()) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline)); + } finally { + lock.readLock().unlock(); + } } private void swapSuffix(List suffix, ImmutableSegment segment, @@ -268,35 +323,4 @@ public class CompactionPipeline { return pipeline.removeLast(); } - private boolean addFirst(ImmutableSegment segment) { - pipeline.addFirst(segment); - return true; - } - - // debug method - private boolean validateSuffixList(LinkedList suffix) { - if(suffix.isEmpty()) { - // empty suffix is always valid - return true; - } - Iterator pipelineBackwardIterator = pipeline.descendingIterator(); - Iterator suffixBackwardIterator = suffix.descendingIterator(); - ImmutableSegment suffixCurrent; - ImmutableSegment pipelineCurrent; - for( ; suffixBackwardIterator.hasNext(); ) { - if(!pipelineBackwardIterator.hasNext()) { - // a suffix longer than pipeline is invalid - return false; - } - suffixCurrent = suffixBackwardIterator.next(); - pipelineCurrent = pipelineBackwardIterator.next(); - if(suffixCurrent != pipelineCurrent) { - // non-matching suffix - return false; - } - } - // suffix matches pipeline suffix - return true; - } - }