.../java/org/apache/hadoop/hbase/io/TimeRange.java | 13 ++ .../hbase/regionserver/AbstractMemStore.java | 7 +- .../CollectionBackedImmutableSegment.java | 158 +++++++++++++++++++++ .../hbase/regionserver/CompactingMemStore.java | 51 +++++-- .../hbase/regionserver/CompactionPipeline.java | 13 ++ .../hadoop/hbase/regionserver/DefaultMemStore.java | 4 +- .../hbase/regionserver/ImmutableSegment.java | 12 +- .../hbase/regionserver/MemStoreCompactor.java | 14 +- .../apache/hadoop/hbase/regionserver/Segment.java | 4 +- .../hadoop/hbase/regionserver/SegmentFactory.java | 19 +++ .../hbase/regionserver/SnapshotSegments.java | 52 +++++++ 11 files changed, 319 insertions(+), 28 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java index 2efcde1..a518d9e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; +import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -216,6 +217,18 @@ public class TimeRange { return timestamp >= maxStamp? 1: 0; } + public static TimeRange union(List ranges) { + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + for (TimeRange range : ranges) { + if (range != null) { + if (range.minStamp < min) min = range.minStamp; + if (range.maxStamp > max) max = range.maxStamp; + } + } + return new TimeRange(min, max); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index a4ea3ee..c26ae77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -49,7 +49,7 @@ public abstract class AbstractMemStore implements MemStore { // active segment absorbs write operations protected volatile MutableSegment active; // Snapshot of memstore. Made for flusher. - protected volatile ImmutableSegment snapshot; + protected volatile SnapshotSegments snapshot; protected volatile long snapshotId; // Used to track when to flush private volatile long timeOfOldestEdit; @@ -63,7 +63,7 @@ public abstract class AbstractMemStore implements MemStore { this.conf = conf; this.comparator = c; resetActive(); - this.snapshot = SegmentFactory.instance().createImmutableSegment(c); + this.snapshot = SegmentFactory.instance().createSnapshotSegments(c); this.snapshotId = NO_SNAPSHOT_ID; } @@ -147,7 +147,7 @@ public abstract class AbstractMemStore implements MemStore { // create a new snapshot and let the old one go. Segment oldSnapshot = this.snapshot; if (!this.snapshot.isEmpty()) { - this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); + this.snapshot = SegmentFactory.instance().createSnapshotSegments(this.comparator); } this.snapshotId = NO_SNAPSHOT_ID; oldSnapshot.close(); @@ -310,6 +310,7 @@ public abstract class AbstractMemStore implements MemStore { /** * @return an ordered list of segments from most recent to oldest in memstore */ + @VisibleForTesting protected abstract List getSegments() throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CollectionBackedImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CollectionBackedImmutableSegment.java new file mode 100644 index 0000000..7043907 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CollectionBackedImmutableSegment.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.SortedSet; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.TimeRange; + +/** + * A type of segment that works on a list of segments. Works with snapshots that needs to be taken + * over the pipeline that consists of more than one segment + */ +@InterfaceAudience.Private +public class CollectionBackedImmutableSegment extends SnapshotSegments { + + // list of segments that form the snapshot + private List segments; + + CollectionBackedImmutableSegment(CellComparator comp, List snapshot, TimeRange timeRange) { + super(comp, null, null, timeRange); + this.segments = snapshot; + for (ImmutableSegment segment : snapshot) { + incSize(segment.dataSize.get(), segment.heapOverhead.get()); + } + } + + @Override + void dump(Log log) { + for (ImmutableSegment segment : this.segments) { + segment.dump(log); + } + } + + @Override + public Cell getFirstAfter(Cell firstCell) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public boolean isEmpty() { + for (ImmutableSegment segment : this.segments) { + if (!segment.isEmpty()) { + return false; + } + } + return true; + } + + @Override + public void close() { + for (ImmutableSegment segment : this.segments) { + segment.close(); + } + } + + @Override + public int getCellsCount() { + int cellCount = 0; + for (ImmutableSegment segment : this.segments) { + cellCount += segment.getCellsCount(); + } + return cellCount; + } + + @Override + public boolean isTagsPresent() { + for (ImmutableSegment segment : this.segments) { + if (segment.isTagsPresent()) { + return true; + } + } + return false; + } + + @Override + public TimeRangeTracker getTimeRangeTracker() { + // returning the first time range tracker here. + for (ImmutableSegment segment : segments) { + return segment.getTimeRangeTracker(); + } + return null; + } + + @Override + public KeyValueScanner getKeyValueScanner() { + List scanners = new ArrayList(this.segments.size()); + for (ImmutableSegment segment : segments) { + scanners.add(segment.getKeyValueScanner()); + } + try { + return new MemStoreScanner(getComparator(), scanners); + } catch (Exception e) { + // No exception possible here + throw new IllegalArgumentException("Unknow exception thrown", e); + } + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public SegmentScanner getScanner(long readPoint) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public SegmentScanner getScanner(long readPoint, long order) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public Cell last() { + throw new UnsupportedOperationException("Operation is not supported"); + } + + @Override + public SortedSet headSet(Cell firstKeyOnRow) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + protected SortedSet tailSet(Cell firstCell) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public List getScanners(long readPoint, long order) { + List scanners = new ArrayList(this.segments.size()); + for (Segment segment : this.segments) { + scanners.add(segment.getScanner(readPoint, order)); + } + return scanners; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index a7eb19e..8351839 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -67,10 +67,14 @@ public class CompactingMemStore extends AbstractMemStore { @VisibleForTesting private final AtomicBoolean allowCompaction = new AtomicBoolean(true); + // TODO : remove this - this is for ease of testing + private boolean pipelineMemstore = false; // if the entire pipeline has to be flushed + public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction + Bytes.SIZEOF_LONG // inmemoryFlushSize + + Bytes.SIZEOF_BOOLEAN // pipelinememstore (this is junk)?? + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD; @@ -92,6 +96,7 @@ public class CompactingMemStore extends AbstractMemStore { numStores = 1; } inmemoryFlushSize = memstoreFlushSize / numStores; + this.pipelineMemstore = conf.getBoolean("hbase.memstore.inmemoryflush.pipeline", false); // multiply by a factor double factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT); @@ -99,6 +104,9 @@ public class CompactingMemStore extends AbstractMemStore { LOG.info("Setting in-memory flush size threshold to " + inmemoryFlushSize); } + boolean isPipelineMemstore() { + return pipelineMemstore; + } /** * @return Total memory occupied by this MemStore. This won't include any size occupied by the * snapshot. We assume the snapshot will get cleared soon. This is not thread safe and @@ -150,10 +158,19 @@ public class CompactingMemStore extends AbstractMemStore { + getRegionServices().getRegionInfo().getRegionNameAsString() + "store: " + getFamilyName()); } - stopCompaction(); - pushActiveToPipeline(this.active); - snapshotId = EnvironmentEdgeManager.currentTime(); - pushTailToSnapshot(); + if (this.pipelineMemstore) { + stopCompaction(); + pushActiveToPipeline(active); + // get the existing segments in the pipeline + List segmentsInPipeLine = pipeline.removeCurrentPipeline(); + snapshotId = EnvironmentEdgeManager.currentTime(); + this.snapshot = SegmentFactory.instance().createSnapshotSegments(getComparator(), segmentsInPipeLine); + } else { + stopCompaction(); + pushActiveToPipeline(this.active); + snapshotId = EnvironmentEdgeManager.currentTime(); + pushTailToSnapshot(); + } } return new MemStoreSnapshot(snapshotId, this.snapshot); } @@ -164,13 +181,22 @@ public class CompactingMemStore extends AbstractMemStore { */ @Override public MemstoreSize getFlushableSize() { - MemstoreSize snapshotSize = getSnapshotSize(); - if (snapshotSize.getDataSize() == 0) { - // if snapshot is empty the tail of the pipeline is flushed - snapshotSize = pipeline.getTailSize(); + MemstoreSize snapshotSize; + if (this.pipelineMemstore) { + snapshotSize = getSnapshotSize(); + if (snapshotSize.getDataSize() == 0) { + snapshotSize = new MemstoreSize(keySize(), heapOverhead()); + } + return snapshotSize; + } else { + snapshotSize = getSnapshotSize(); + if (snapshotSize.getDataSize() == 0) { + // if snapshot is empty the tail of the pipeline is flushed + snapshotSize = pipeline.getTailSize(); + } + return snapshotSize.getDataSize() > 0 ? snapshotSize + : new MemstoreSize(this.active.keySize(), this.active.heapOverhead()); } - return snapshotSize.getDataSize() > 0 ? snapshotSize - : new MemstoreSize(this.active.keySize(), this.active.heapOverhead()); } @Override @@ -258,14 +284,13 @@ public class CompactingMemStore extends AbstractMemStore { List pipelineList = pipeline.getSegments(); long order = pipelineList.size(); // The list of elements in pipeline + the active element + the snapshot segment - // TODO : This will change when the snapshot is made of more than one element List list = new ArrayList(pipelineList.size() + 2); list.add(this.active.getScanner(readPt, order + 1)); for (Segment item : pipelineList) { list.add(item.getScanner(readPt, order)); order--; } - list.add(this.snapshot.getScanner(readPt, order)); + list.addAll(snapshot.getScanners(readPt, order)); return Collections. singletonList(new MemStoreScanner(getComparator(), list)); } @@ -369,7 +394,7 @@ public class CompactingMemStore extends AbstractMemStore { private void pushTailToSnapshot() { ImmutableSegment tail = pipeline.pullTail(); if (!tail.isEmpty()) { - this.snapshot = tail; + this.snapshot = SegmentFactory.instance().createSnapshotSegments(tail); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 6676170..1e408ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -50,6 +50,7 @@ public class CompactionPipeline { private final RegionServicesForStores region; private LinkedList pipeline; private long version; + private List EMPTY_LIST = new LinkedList(); private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() .createImmutableSegment((CellComparator) null); @@ -85,6 +86,18 @@ public class CompactionPipeline { } } + public List removeCurrentPipeline() { + synchronized (pipeline) { + if (pipeline.isEmpty()) { + return EMPTY_LIST; + } + List res = new LinkedList(pipeline); + pipeline.clear(); + version++; + return res; + } + } + /** * Swaps the versioned list at the tail of the pipeline with the new compacted segment. * Swapping only if there were no changes to the suffix of the list while it was compacted. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index d4e6e12..eb78b64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -89,9 +89,7 @@ public class DefaultMemStore extends AbstractMemStore { } else { this.snapshotId = EnvironmentEdgeManager.currentTime(); if (!this.active.isEmpty()) { - ImmutableSegment immutableSegment = SegmentFactory.instance(). - createImmutableSegment(this.active); - this.snapshot = immutableSegment; + this.snapshot = SegmentFactory.instance().createSnapshotSegments(this.active); resetActive(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 4cdb29d..6ed8e52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -58,7 +57,7 @@ public class ImmutableSegment extends Segment { * This is an immutable segment so use the read-only TimeRange rather than the heavy-weight * TimeRangeTracker with all its synchronization when doing time range stuff. */ - private final TimeRange timeRange; + protected final TimeRange timeRange; private Type type = Type.SKIPLIST_MAP_BASED; @@ -99,6 +98,15 @@ public class ImmutableSegment extends Segment { this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange(); } + protected ImmutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, + TimeRange timeRange) { + super(cellSet, comparator, null); + if (timeRange == null) { + this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange(); + } else { + this.timeRange = timeRange; + } + } /**------------------------------------------------------------------------ * C-tor to be used when new SKIP-LIST BASED ImmutableSegment is a result of compaction of a * list of older ImmutableSegments. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 0df3674..abc19b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -167,11 +167,15 @@ public class MemStoreCompactor { // compaction shouldn't happen or doesn't worth it // limit the number of the segments in the pipeline - int numOfSegments = versionedList.getNumOfSegments(); - if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) { - LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() - + " is going to be merged, as there are " + numOfSegments + " segments"); - return Action.MERGE; // to avoid too many segments, merge now + // TODO : this is likely to change. If by default we allow more than 1 segment in the pipeline + // then we flatten all the semgnets and once the threshold reaches we do the merge + if (!compactingMemStore.isPipelineMemstore()) { + int numOfSegments = versionedList.getNumOfSegments(); + if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) { + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be merged, as there are " + numOfSegments + " segments"); + return Action.MERGE; // to avoid too many segments, merge now + } } // if nothing of the above, then just flatten the newly joined segment diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index afdfe6f..13ef69e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -54,7 +54,7 @@ public abstract class Segment { + ClassSize.CELL_SET + ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER; private AtomicReference cellSet= new AtomicReference(); - private final CellComparator comparator; + protected final CellComparator comparator; protected long minSequenceId; private MemStoreLAB memStoreLAB; // Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not @@ -106,7 +106,7 @@ public abstract class Segment { /** * @return whether the segment has any cells */ - public boolean isEmpty() { + boolean isEmpty() { return getCellSet().isEmpty(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index fa8860a..c9150ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.ReflectionUtils; import java.io.IOException; @@ -75,6 +76,24 @@ public final class SegmentFactory { return new ImmutableSegment(segment); } + public SnapshotSegments createSnapshotSegments(CellComparator comparator) { + MutableSegment segment = generateMutableSegment(null, comparator, null); + return createSnapshotSegments(segment); + } + + public SnapshotSegments createSnapshotSegments(Segment segment) { + return new SnapshotSegments(segment); + } + + public SnapshotSegments createSnapshotSegments(CellComparator comp, + List segments) { + List timeRanges = new ArrayList(); + for (ImmutableSegment segment : segments) { + timeRanges.add(segment.timeRange); + } + return new CollectionBackedImmutableSegment(comp, segments, TimeRange.union(timeRanges)); + } + // create mutable segment public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator) { MemStoreLAB memStoreLAB = getMemStoreLAB(conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotSegments.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotSegments.java new file mode 100644 index 0000000..2306175 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotSegments.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.TimeRange; + +/** + * Represents a snapshot segment + */ +@InterfaceAudience.Private +public class SnapshotSegments extends ImmutableSegment { + + // add a method in factory to create this instance. It just marks the segment as snapshot segment + protected SnapshotSegments(CellComparator comparator, MemStoreSegmentsIterator iterator, + MemStoreLAB memStoreLAB, TimeRange timeRange) { + super(null, comparator, null, timeRange); + } + + protected SnapshotSegments(Segment segment) { + super(segment); + } + + /** + * Creates a list of scanners on the snaspshot segment + * @param readPoint + * @param order + * @return + */ + public List getScanners(long readPoint, long order) { + return Collections.emptyList(); + } +}