From 7a509565d9b6e1b90e2585c8648aed6ce376ce20 Mon Sep 17 00:00:00 2001 From: eshcar Date: Wed, 4 Jan 2017 16:02:27 +0200 Subject: [PATCH] New synchronization scheme for compaction pipeline --- .../hbase/regionserver/CompactingMemStore.java | 6 ++--- .../hbase/regionserver/CompactionPipeline.java | 27 ++++++++++++---------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e1289f8..99c1685 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -217,8 +217,8 @@ public class CompactingMemStore extends AbstractMemStore { @VisibleForTesting @Override protected List getSegments() { - List pipelineList = pipeline.getSegments(); - List list = new ArrayList(pipelineList.size() + 2); + List pipelineList = pipeline.getSegments(); + List list = new ArrayList<>(pipelineList.size() + 2); list.add(this.active); list.addAll(pipelineList); list.add(this.snapshot); @@ -264,7 +264,7 @@ public class CompactingMemStore extends AbstractMemStore { * Scanners are ordered from 0 (oldest) to newest in increasing order. */ public List getScanners(long readPt) throws IOException { - List pipelineList = pipeline.getSegments(); + List pipelineList = pipeline.getSegments(); long order = pipelineList.size(); // The list of elements in pipeline + the active element + the snapshot segment // TODO : This will change when the snapshot is made of more than one element diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 9d5df77..2040859 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -49,11 +49,13 @@ public class CompactionPipeline { private final RegionServicesForStores region; private LinkedList pipeline; + private volatile LinkedList readOnlyCopy; private long version; public CompactionPipeline(RegionServicesForStores region) { this.region = region; this.pipeline = new LinkedList<>(); + this.readOnlyCopy = new LinkedList<>(); this.version = 0; } @@ -61,14 +63,15 @@ public class CompactionPipeline { ImmutableSegment immutableSegment = SegmentFactory.instance(). createImmutableSegment(segment); synchronized (pipeline){ - return addFirst(immutableSegment); + boolean res = addFirst(immutableSegment); + readOnlyCopy = new LinkedList<>(pipeline); + return res; } } public VersionedSegmentsList getVersionedList() { synchronized (pipeline){ - List segmentList = new ArrayList<>(pipeline); - return new VersionedSegmentsList(segmentList, version); + return new VersionedSegmentsList(readOnlyCopy, version); } } @@ -115,6 +118,7 @@ public class CompactionPipeline { + ", and the number of cells in new segment is:" + count); } swapSuffix(suffix, segment, closeSuffix); + readOnlyCopy = new LinkedList<>(pipeline); } if (closeSuffix && region != null) { // update the global memstore size counter @@ -193,33 +197,32 @@ public class CompactionPipeline { } public boolean isEmpty() { - return pipeline.isEmpty(); + return readOnlyCopy.isEmpty(); } - public List getSegments() { - synchronized (pipeline){ - return new LinkedList<>(pipeline); - } + public List getSegments() { + return readOnlyCopy; } public long size() { - return pipeline.size(); + return readOnlyCopy.size(); } public long getMinSequenceId() { long minSequenceId = Long.MAX_VALUE; if (!isEmpty()) { - minSequenceId = pipeline.getLast().getMinSequenceId(); + minSequenceId = readOnlyCopy.getLast().getMinSequenceId(); } return minSequenceId; } public MemstoreSize getTailSize() { + LinkedList localCopy = readOnlyCopy; if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead()); } - private void swapSuffix(List suffix, ImmutableSegment segment, + private void swapSuffix(List suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { version++; // During index merge we won't be closing the segments undergoing the merge. Segment#close() -- 2.10.1 (Apple Git-78)