From cf7a1f8fadc11b65aaab47dfa148e20ad6bbcde7 Mon Sep 17 00:00:00 2001 From: anastas Date: Sun, 15 Jan 2017 11:58:06 +0200 Subject: [PATCH] My squashed commits --- .../hbase/regionserver/CompactingMemStore.java | 53 +++- .../hbase/regionserver/CompactionPipeline.java | 12 + .../regionserver/CompositeImmutableSegment.java | 353 +++++++++++++++++++++ .../hbase/regionserver/ImmutableSegment.java | 22 +- .../hbase/regionserver/MemStoreCompactor.java | 2 +- .../hadoop/hbase/regionserver/MemstoreSize.java | 25 +- .../apache/hadoop/hbase/regionserver/Segment.java | 21 +- .../hadoop/hbase/regionserver/SegmentFactory.java | 10 + 8 files changed, 481 insertions(+), 17 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 99c1685..3ddab4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -72,6 +72,7 @@ public class CompactingMemStore extends AbstractMemStore { private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); @VisibleForTesting private final AtomicBoolean allowCompaction = new AtomicBoolean(true); + private boolean compositeSnapshot = true; public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, @@ -160,7 +161,12 @@ public class CompactingMemStore extends AbstractMemStore { stopCompaction(); pushActiveToPipeline(this.active); snapshotId = EnvironmentEdgeManager.currentTime(); - pushTailToSnapshot(); + // in both cases whatever is pushed to snapshot is cleared from the pipeline + if (compositeSnapshot) { + pushPipelineToSnapshot(); + } else { + pushTailToSnapshot(); + } } return new MemStoreSnapshot(snapshotId, this.snapshot); } @@ -173,8 +179,13 @@ public class CompactingMemStore extends AbstractMemStore { public MemstoreSize getFlushableSize() { MemstoreSize snapshotSize = getSnapshotSize(); if (snapshotSize.getDataSize() == 0) { - // if snapshot is empty the tail of the pipeline is flushed - snapshotSize = pipeline.getTailSize(); + // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed + if (compositeSnapshot) { + snapshotSize = pipeline.getPipelineSize(); + snapshotSize.incMemstoreSize(this.active.keySize(), this.active.heapOverhead()); + } else { + snapshotSize = pipeline.getTailSize(); + } } return snapshotSize.getDataSize() > 0 ? snapshotSize : new MemstoreSize(this.active.keySize(), this.active.heapOverhead()); @@ -221,10 +232,20 @@ public class CompactingMemStore extends AbstractMemStore { List list = new ArrayList<>(pipelineList.size() + 2); list.add(this.active); list.addAll(pipelineList); - list.add(this.snapshot); + list.addAll(this.snapshot.getAllSegments()); + return list; } + // the following three methods allow to manipulate the settings of composite snapshot + public void setCompositeSnapshot(boolean useCompositeSnapshot) { + this.compositeSnapshot = useCompositeSnapshot; + } + + public boolean isCompositeSnapshot() { + return this.compositeSnapshot; + } + public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, boolean merge) { return pipeline.swap(versionedList, result, !merge); @@ -265,17 +286,20 @@ public class CompactingMemStore extends AbstractMemStore { */ public List getScanners(long readPt) throws IOException { List pipelineList = pipeline.getSegments(); - long order = pipelineList.size(); + int order = pipelineList.size() + snapshot.getNumOfSegments(); // The list of elements in pipeline + the active element + the snapshot segment // TODO : This will change when the snapshot is made of more than one element // The order is the Segment ordinal - List list = new ArrayList(pipelineList.size() + 2); + List list = new ArrayList(order+1); list.add(this.active.getScanner(readPt, order + 1)); for (Segment item : pipelineList) { list.add(item.getScanner(readPt, order)); order--; } - list.add(this.snapshot.getScanner(readPt, order)); + for (Segment item : snapshot.getAllSegments()) { + list.add(item.getScanner(readPt, order)); + order--; + } return Collections. singletonList(new MemStoreScanner(getComparator(), list)); } @@ -382,13 +406,26 @@ public class CompactingMemStore extends AbstractMemStore { pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now } + private void pushPipelineToSnapshot() { + boolean done = false; + while (!done) { + VersionedSegmentsList segments = pipeline.getVersionedList(); + pushToSnapshot(segments.getStoreSegments()); + // swap can return false in case the pipeline was updated by ongoing compaction + // and the version increase, the chance of it happenning is very low + done = pipeline.swap(segments, null, false); // don't close segments; they are in snapshot now + } + } + private void pushToSnapshot(List segments) { if(segments.isEmpty()) return; if(segments.size() == 1 && !segments.get(0).isEmpty()) { this.snapshot = segments.get(0); return; + } else { // create composite snapshot + this.snapshot = + SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments); } - // TODO else craete composite snapshot } private RegionServicesForStores getRegionServices() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index fafdbee..e533bd0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -238,6 +238,18 @@ public class CompactionPipeline { return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead()); } + public MemstoreSize getPipelineSize() { + long keySize = 0; + long heapOverhead = 0; + LinkedList localCopy = readOnlyCopy; + if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE; + for (Segment segment : localCopy) { + keySize += segment.keySize(); + heapOverhead += segment.heapOverhead(); + } + return new MemstoreSize(keySize, heapOverhead); + } + private void swapSuffix(List suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { // During index merge we won't be closing the segments undergoing the merge. Segment#close() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java new file mode 100644 index 0000000..cf6255a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -0,0 +1,353 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.SortedSet; + +/** + * The CompositeImmutableSegments is created as a collection of ImmutableSegments and supports + * the interface of a single ImmutableSegments. + * The CompositeImmutableSegments is planned to be used only as a snapshot, + * thus only relevant interfaces are supported + */ +@InterfaceAudience.Private +public class CompositeImmutableSegment extends ImmutableSegment { + + private final List segments; + private final CellComparator comparator; + // CompositeImmutableSegment is used for snapshots and snapshot should + // support getTimeRangeTracker() interface. + // Thus we hold a constant TRT build in the construction time from TRT of the given segments. + private final TimeRangeTracker timeRangeTracker; + + private long keySize = 0; + + // This scanner need to be remembered in order to close it when the snapshot is cleared. + // Initially CollectionBackedScanner didn't raise the scanner counters thus there was no + // need to close it. Now when MemStoreScanner is used instead we need to decrease the + // scanner counters. + // private KeyValueScanner flushingScanner = null; + + public CompositeImmutableSegment(CellComparator comparator, List segments) { + super(comparator); + this.comparator = comparator; + this.segments = segments; + this.timeRangeTracker = new TimeRangeTracker(); + for (ImmutableSegment s : segments) { + this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax()); + this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin()); + this.keySize += s.keySize(); + } + } + + @VisibleForTesting + public List getAllSegments() { + return new LinkedList(segments); + } + + public int getNumOfSegments() { + return segments.size(); + } + + /** + * Builds a special scanner for the MemStoreSnapshot object that is different than the + * general segment scanner. + * @return a special scanner for the MemStoreSnapshot object + */ + public KeyValueScanner getSnapshotScanner() { + KeyValueScanner scanner; + List list = new ArrayList(segments.size()); + for (ImmutableSegment s : segments) { + list.add(s.getScanner(Long.MAX_VALUE)); + } + + try { + scanner = new MemStoreScanner(getComparator(), list); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + + // flushingScanner = scanner; + return scanner; + } + + @Override + public List getScanners(long readPoint, long order) { + List scanners = new ArrayList(this.segments.size()); + for (Segment segment : this.segments) { + scanners.add(segment.getScanner(readPoint, order)); + // The order is the Segment ordinal + order--; + // order should never be negative so this is just a sanity check + order = (order<0) ? 0 : order; + } + return scanners; + } + + /** + * @return whether the segment has any cells + */ + public boolean isEmpty() { + for (ImmutableSegment s : segments) { + if (!s.isEmpty()) return false; + } + return true; + } + + /** + * @return number of cells in segment + */ + public int getCellsCount() { + int result = 0; + for (ImmutableSegment s : segments) { + result += s.getCellsCount(); + } + return result; + } + + /** + * @return the first cell in the segment that has equal or greater key than the given cell + */ + public Cell getFirstAfter(Cell cell) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Closing a segment before it is being discarded + */ + public void close() { +// if (flushingScanner != null) { +// flushingScanner.close(); +// flushingScanner = null; + // } + for (ImmutableSegment s : segments) { + s.close(); + } + } + + /** + * If the segment has a memory allocator the cell is being cloned to this space, and returned; + * otherwise the given cell is returned + * @return either the given cell or its clone + */ + public Cell maybeCloneWithAllocator(Cell cell) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public boolean shouldSeek(Scan scan, long oldestUnexpiredTS){ + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public long getMinTimestamp(){ + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Creates the scanner for the given read point + * @return a scanner for the given read point + */ + public KeyValueScanner getScanner(long readPoint) { + KeyValueScanner resultScanner; + List list = new ArrayList(segments.size()); + for (ImmutableSegment s : segments) { + list.add(s.getScanner(readPoint)); + } + + try { + resultScanner = new MemStoreScanner(getComparator(), list); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + + return resultScanner; + } + + /** + * Creates the scanner for the given read point, and a specific order in a list + * @return a scanner for the given read point + */ + public KeyValueScanner getScanner(long readPoint, long order) { + KeyValueScanner resultScanner; + List list = new ArrayList(segments.size()); + for (ImmutableSegment s : segments) { + list.add(s.getScanner(readPoint,order)); + } + + try { + resultScanner = new MemStoreScanner(getComparator(), list); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + + return resultScanner; + } + + public boolean isTagsPresent() { + for (ImmutableSegment s : segments) { + if (s.isTagsPresent()) return true; + } + return false; + } + + public void incScannerCount() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public void decScannerCount() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Setting the CellSet of the segment - used only for flat immutable segment for setting + * immutable CellSet after its creation in immutable segment constructor + * @return this object + */ + + protected CompositeImmutableSegment setCellSet(CellSet cellSetOld, CellSet cellSetNew) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * @return Sum of all cell's size. + */ + public long keySize() { + return this.keySize; + } + + /** + * @return The heap overhead of this segment. + */ + public long heapOverhead() { + long result = 0; + for (ImmutableSegment s : segments) { + result += s.heapOverhead(); + } + return result; + } + + /** + * Updates the heap size counter of the segment by the given delta + */ + protected void incSize(long delta, long heapOverhead) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + protected void incHeapOverheadSize(long delta) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public long getMinSequenceId() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public TimeRangeTracker getTimeRangeTracker() { + return this.timeRangeTracker; + } + + //*** Methods for SegmentsScanner + public Cell last() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public Iterator iterator() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public SortedSet headSet(Cell firstKeyOnRow) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public int compare(Cell left, Cell right) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public int compareRows(Cell left, Cell right) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * @return a set of all cells in the segment + */ + protected CellSet getCellSet() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Returns the Cell comparator used by this segment + * @return the Cell comparator used by this segment + */ + protected CellComparator getComparator() { + return comparator; + } + + protected void internalAdd(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed, + MemstoreSize memstoreSize) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + protected long heapOverheadChange(Cell cell, boolean succ) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Returns a subset of the segment cell set, which starts with the given cell + * @param firstCell a cell in the segment + * @return a subset of the segment cell set, which starts with the given cell + */ + protected SortedSet tailSet(Cell firstCell) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + // Debug methods + /** + * Dumps all cells of the segment into the given log + */ + void dump(Log log) { + for (ImmutableSegment s : segments) { + s.dump(log); + } + } + + @Override + public String toString() { + StringBuilder sb = + new StringBuilder("This is CompositeImmutableSegment and those are its segments:: "); + for (ImmutableSegment s : segments) { + sb.append(s.toString()); + } + return sb.toString(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 0fae6c3..faa9b67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -29,6 +29,9 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; /** * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, @@ -68,6 +71,14 @@ public class ImmutableSegment extends Segment { ///////////////////// CONSTRUCTORS ///////////////////// /**------------------------------------------------------------------------ + * Empty C-tor to be used only for CompositeImmutableSegment + */ + protected ImmutableSegment(CellComparator comparator) { + super(comparator); + this.timeRange = null; + } + + /**------------------------------------------------------------------------ * Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one. * This C-tor should be used when active MutableSegment is pushed into the compaction * pipeline and becomes an ImmutableSegment. @@ -141,6 +152,15 @@ public class ImmutableSegment extends Segment { return this.timeRange.getMin(); } + public int getNumOfSegments() { + return 1; + } + + public List getAllSegments() { + List res = new ArrayList(Arrays.asList(this)); + return res; + } + /**------------------------------------------------------------------------ * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one * based on CellArrayMap. @@ -231,7 +251,7 @@ public class ImmutableSegment extends Segment { Cell curCell; int idx = 0; // create this segment scanner with maximal possible read point, to go over all Cells - SegmentScanner segmentScanner = this.getScanner(Long.MAX_VALUE); + KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE); try { while ((curCell = segmentScanner.next()) != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 84f88f0..2174d89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -56,7 +56,7 @@ public class MemStoreCompactor { // The upper bound for the number of segments we store in the pipeline prior to merging. // This constant is subject to further experimentation. - private static final int THRESHOLD_PIPELINE_SEGMENTS = 1; + private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java index 77cea51..fa7c342 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java @@ -25,19 +25,32 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public class MemstoreSize { - static final MemstoreSize EMPTY_SIZE = new MemstoreSize(); - private long dataSize; private long heapOverhead; + final private boolean isEmpty; + + static final MemstoreSize EMPTY_SIZE = new MemstoreSize(true); public MemstoreSize() { dataSize = 0; heapOverhead = 0; + isEmpty = false; + } + + public MemstoreSize(boolean isEmpty) { + dataSize = 0; + heapOverhead = 0; + this.isEmpty = isEmpty; + } + + public boolean isEmpty() { + return isEmpty; } public MemstoreSize(long dataSize, long heapOverhead) { this.dataSize = dataSize; this.heapOverhead = heapOverhead; + this.isEmpty = false; } public void incMemstoreSize(long dataSize, long heapOverhead) { @@ -61,11 +74,13 @@ public class MemstoreSize { } public long getDataSize() { - return dataSize; + + return isEmpty ? 0 : dataSize; } public long getHeapOverhead() { - return heapOverhead; + + return isEmpty ? 0 : heapOverhead; } @Override @@ -74,7 +89,7 @@ public class MemstoreSize { return false; } MemstoreSize other = (MemstoreSize) obj; - return this.dataSize == other.dataSize && this.heapOverhead == other.heapOverhead; + return getDataSize() == other.dataSize && getHeapOverhead() == other.heapOverhead; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index afdfe6f..8581517 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -18,7 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.SortedSet; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -64,6 +66,15 @@ public abstract class Segment { protected final TimeRangeTracker timeRangeTracker; protected volatile boolean tagsPresent; + // Empty constructor to be used when Segment is used as interface, + // and there is no need in true Segments state + protected Segment(CellComparator comparator) { + this.comparator = comparator; + this.dataSize = new AtomicLong(0); + this.heapOverhead = new AtomicLong(0); + this.timeRangeTracker = new TimeRangeTracker(); + } + // This constructor is used to create empty Segments. protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { this.cellSet.set(cellSet); @@ -91,7 +102,7 @@ public abstract class Segment { * Creates the scanner for the given read point * @return a scanner for the given read point */ - public SegmentScanner getScanner(long readPoint) { + public KeyValueScanner getScanner(long readPoint) { return new SegmentScanner(this, readPoint); } @@ -99,10 +110,16 @@ public abstract class Segment { * Creates the scanner for the given read point, and a specific order in a list * @return a scanner for the given read point */ - public SegmentScanner getScanner(long readPoint, long order) { + public KeyValueScanner getScanner(long readPoint, long order) { return new SegmentScanner(this, readPoint, order); } + public List getScanners(long readPoint, long order) { + List scanners = new ArrayList(1); + scanners.add(getScanner(readPoint, order)); + return scanners; + } + /** * @return whether the segment has any cells */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 01e07ef..7e53026 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -47,6 +47,13 @@ public final class SegmentFactory { return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf)); } + // create composite immutable segment from a list of segments + public CompositeImmutableSegment createCompositeImmutableSegment( + final CellComparator comparator, List segments) { + return new CompositeImmutableSegment(comparator, segments); + + } + // create new flat immutable segment from compacting old immutable segments public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, @@ -102,6 +109,9 @@ public final class SegmentFactory { private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List segments) { List mslabs = new ArrayList(); + if (!conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) { + return null; + } for (ImmutableSegment segment : segments) { mslabs.add(segment.getMemStoreLAB()); } -- 1.8.5.2 (Apple Git-48)