From f60967ce5e506d3f93956b75641b29d8c22fab61 Mon Sep 17 00:00:00 2001 From: anastas Date: Mon, 14 Nov 2016 17:28:00 +0200 Subject: [PATCH] My squashed commits --- .../hbase/regionserver/AbstractMemStore.java | 19 +- .../hbase/regionserver/CompactingMemStore.java | 86 ++++- .../hbase/regionserver/CompactionPipeline.java | 14 + .../regionserver/CompositeImmutableSegment.java | 349 +++++++++++++++++++++ .../hadoop/hbase/regionserver/DefaultMemStore.java | 23 +- .../apache/hadoop/hbase/regionserver/HStore.java | 2 +- .../hbase/regionserver/ImmutableSegment.java | 10 +- .../hbase/regionserver/MemStoreCompactor.java | 2 + .../apache/hadoop/hbase/regionserver/Segment.java | 13 +- .../hadoop/hbase/regionserver/SegmentFactory.java | 3 + .../hbase/regionserver/TestCompactingMemStore.java | 10 +- .../TestCompactingToCellArrayMapMemStore.java | 2 +- .../hbase/regionserver/TestDefaultMemStore.java | 3 +- .../TestWalAndCompactingMemStoreFlush.java | 65 +++- 14 files changed, 555 insertions(+), 46 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index a4ea3ee..1e4553d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; -import java.util.List; -import java.util.NavigableSet; -import java.util.SortedSet; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; @@ -49,7 +47,7 @@ public abstract class AbstractMemStore implements MemStore { // active segment absorbs write operations protected volatile MutableSegment active; // Snapshot of memstore. Made for flusher. - protected volatile ImmutableSegment snapshot; + protected volatile CompositeImmutableSegment snapshot; protected volatile long snapshotId; // Used to track when to flush private volatile long timeOfOldestEdit; @@ -63,7 +61,7 @@ public abstract class AbstractMemStore implements MemStore { this.conf = conf; this.comparator = c; resetActive(); - this.snapshot = SegmentFactory.instance().createImmutableSegment(c); + this.snapshot = initEmptyCompositeSnapshot(); this.snapshotId = NO_SNAPSHOT_ID; } @@ -147,7 +145,7 @@ public abstract class AbstractMemStore implements MemStore { // create a new snapshot and let the old one go. Segment oldSnapshot = this.snapshot; if (!this.snapshot.isEmpty()) { - this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); + this.snapshot = initEmptyCompositeSnapshot(); } this.snapshotId = NO_SNAPSHOT_ID; oldSnapshot.close(); @@ -277,6 +275,15 @@ public abstract class AbstractMemStore implements MemStore { } } + protected CompositeImmutableSegment initEmptyCompositeSnapshot() { + ImmutableSegment emptySegment = + SegmentFactory.instance().createImmutableSegment(getComparator()); + List emptySegments = + new ArrayList(Arrays.asList(emptySegment)); + return // maximal possible read point to create the scanner + new CompositeImmutableSegment(getComparator(), Long.MAX_VALUE, emptySegments); + } + /** * @return The total size of cells in this memstore. We will not consider cells in the snapshot */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index a7eb19e..92ee337 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -66,6 +65,7 @@ public class CompactingMemStore extends AbstractMemStore { private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); @VisibleForTesting private final AtomicBoolean allowCompaction = new AtomicBoolean(true); + private boolean compositeSnapshot = false; public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, @@ -74,8 +74,8 @@ public class CompactingMemStore extends AbstractMemStore { + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD; - public CompactingMemStore(Configuration conf, CellComparator c, - HStore store, RegionServicesForStores regionServices) throws IOException { + public CompactingMemStore(Configuration conf, CellComparator c, HStore store, + RegionServicesForStores regionServices, boolean compositeSnapshot) throws IOException { super(conf, c); this.store = store; this.regionServices = regionServices; @@ -153,7 +153,11 @@ public class CompactingMemStore extends AbstractMemStore { stopCompaction(); pushActiveToPipeline(this.active); snapshotId = EnvironmentEdgeManager.currentTime(); - pushTailToSnapshot(); + if (compositeSnapshot) { + pushPipelineToComposSnapshot(); + } else { + pushTailToSnapshot(); + } } return new MemStoreSnapshot(snapshotId, this.snapshot); } @@ -166,8 +170,13 @@ public class CompactingMemStore extends AbstractMemStore { public MemstoreSize getFlushableSize() { MemstoreSize snapshotSize = getSnapshotSize(); if (snapshotSize.getDataSize() == 0) { - // if snapshot is empty the tail of the pipeline is flushed - snapshotSize = pipeline.getTailSize(); + // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed + if (compositeSnapshot) { + snapshotSize = pipeline.getPipelineSize(); + snapshotSize.incMemstoreSize(this.active.keySize(), this.active.heapOverhead()); + } else { + snapshotSize = pipeline.getTailSize(); + } } return snapshotSize.getDataSize() > 0 ? snapshotSize : new MemstoreSize(this.active.keySize(), this.active.heapOverhead()); @@ -212,10 +221,47 @@ public class CompactingMemStore extends AbstractMemStore { List list = new ArrayList(pipelineList.size() + 2); list.add(this.active); list.addAll(pipelineList); - list.add(this.snapshot); + list.addAll(this.snapshot.getAllSegments()); + return list; } + // the following three methods allow to manipulate the settings of composite snapshot + public void useCompositeSnapshot() { + this.compositeSnapshot = true; + } + + public void useTailSnapshot() { + this.compositeSnapshot = false; + } + + public boolean isCompositeSnapshot() { + return this.compositeSnapshot; + } + + /** + * Override because CompositeImmutableSegment snapshot requires a different treatment + * The passed snapshot was successfully persisted; it can be let go. + * @param id Id of the snapshot to clean out. + * @see MemStore#snapshot() + */ + @Override + public void clearSnapshot(long id) throws UnexpectedStateException { + if (this.snapshotId == -1) return; // already cleared + if (this.snapshotId != id) { + throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " + + id); + } + // OK. Passed in snapshot is same as current snapshot. If not-empty, + // create a new snapshot and let the old one go. + ImmutableSegment oldSnapshot = this.snapshot; + if (!this.snapshot.isEmpty()) { + this.snapshot = initEmptyCompositeSnapshot(); + } + this.snapshotId = -1; + oldSnapshot.close(); + } + public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, boolean merge) { return pipeline.swap(versionedList, result, !merge); @@ -265,8 +311,13 @@ public class CompactingMemStore extends AbstractMemStore { list.add(item.getScanner(readPt, order)); order--; } - list.add(this.snapshot.getScanner(readPt, order)); - return Collections. singletonList(new MemStoreScanner(getComparator(), list)); + + for (Segment item : snapshot.getAllSegments()) { + list.add(item.getScanner(readPt, order)); + order--; + } + + return Collections.singletonList(new MemStoreScanner(getComparator(), list)); } /** @@ -369,7 +420,18 @@ public class CompactingMemStore extends AbstractMemStore { private void pushTailToSnapshot() { ImmutableSegment tail = pipeline.pullTail(); if (!tail.isEmpty()) { - this.snapshot = tail; + List tailSegments = + new ArrayList(Arrays.asList(tail)); + this.snapshot = // maximal possible read point to create the scanner + new CompositeImmutableSegment(getComparator(), Long.MAX_VALUE, tailSegments); + } + } + + private void pushPipelineToComposSnapshot() { + List segments = pipeline.drain(); + if (!segments.isEmpty()) { + this.snapshot = // maximal possible read point to create the scanner + new CompositeImmutableSegment(getComparator(),Long.MAX_VALUE,segments); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 6676170..53b1e2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -77,6 +77,15 @@ public class CompactionPipeline { } } + public List drain() { + synchronized (pipeline){ + version++; + List result = this.pipeline; + this.pipeline = new LinkedList(); + return result; + } + } + public VersionedSegmentsList getVersionedList() { synchronized (pipeline){ LinkedList segmentList = new LinkedList(pipeline); @@ -215,6 +224,11 @@ public class CompactionPipeline { return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); } + public MemstoreSize getPipelineSize() { + if (isEmpty()) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline)); + } + private void swapSuffix(List suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { version++; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java new file mode 100644 index 0000000..9f2b5b2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -0,0 +1,349 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * The CompositeImmutableSegments is created as a collection of ImmutableSegments and supports + * the interface of a single ImmutableSegments. + * The CompositeImmutableSegments is planned to be used only as a snapshot, + * thus only relevant interfaces are supported + */ +@InterfaceAudience.Private +public class CompositeImmutableSegment extends ImmutableSegment { + + private final List segments; + private final CellComparator comparator; + private final TimeRangeTracker timeRangeTracker; + private final KeyValueScanner scaner; + + public CompositeImmutableSegment(CellComparator comparator, long readPt, List segments) { + super(comparator); + this.comparator = comparator; + this.segments = segments; + this.timeRangeTracker = new TimeRangeTracker(); + List list = new ArrayList(segments.size()); + for (ImmutableSegment s : segments) { + this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax()); + this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin()); + list.add(s.getScanner(readPt)); + } + + try { + scaner = new MemStoreScanner(getComparator(), list); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + + + } + + public List getAllSegments() { + List res = new LinkedList(segments); + return res; + } + /** + * Builds a special scanner for the MemStoreSnapshot object that is different than the + * general segment scanner. + * @return a special scanner for the MemStoreSnapshot object + */ + public KeyValueScanner getKeyValueScanner() { + return scaner; + } + + /** + * @return whether the segment has any cells + */ + public boolean isEmpty() { + boolean result = true; + for (ImmutableSegment s : segments) { + result = result & s.isEmpty(); + } + return result; + } + + /** + * @return number of cells in segment + */ + public int getCellsCount() { + int result = 0; + for (ImmutableSegment s : segments) { + result = result + s.getCellsCount(); + } + return result; + } + + /** + * @return the first cell in the segment that has equal or greater key than the given cell + */ + public Cell getFirstAfter(Cell cell) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Closing a segment before it is being discarded + */ + public void close() { + scaner.close(); + for (ImmutableSegment s : segments) { + s.close(); + } + } + + /** + * If the segment has a memory allocator the cell is being cloned to this space, and returned; + * otherwise the given cell is returned + * @return either the given cell or its clone + */ + public Cell maybeCloneWithAllocator(Cell cell) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Get cell length after serialized in {@link KeyValue} + */ + @VisibleForTesting + static int getCellLength(Cell cell) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public boolean shouldSeek(Scan scan, long oldestUnexpiredTS){ + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public long getMinTimestamp(){ + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Creates the scanner for the given read point + * @return a scanner for the given read point + */ + public KeyValueScanner getScanner(long readPoint) { + KeyValueScanner resultScanner; + List list = new ArrayList(segments.size()); + for (ImmutableSegment s : segments) { + list.add(s.getScanner(readPoint)); + } + + try { + resultScanner = new MemStoreScanner(getComparator(), list); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + + return resultScanner; + } + + /** + * Creates the scanner for the given read point, and a specific order in a list + * @return a scanner for the given read point + */ + public KeyValueScanner getScanner(long readPoint, long order) { + KeyValueScanner resultScanner; + List list = new ArrayList(segments.size()); + for (ImmutableSegment s : segments) { + list.add(s.getScanner(readPoint,order)); + } + + try { + resultScanner = new MemStoreScanner(getComparator(), list); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + + return resultScanner; + } + + public boolean isTagsPresent() { + boolean result = false; + for (ImmutableSegment s : segments) { + result = result | s.isTagsPresent(); + } + return result; + } + + public void incScannerCount() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public void decScannerCount() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Setting the CellSet of the segment - used only for flat immutable segment for setting + * immutable CellSet after its creation in immutable segment constructor + * @return this object + */ + + protected CompositeImmutableSegment setCellSet(CellSet cellSetOld, CellSet cellSetNew) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * @return Sum of all cell's size. + */ + public long keySize() { + long result = 0; + for (ImmutableSegment s : segments) { + result = result + s.keySize(); + } + return result; + } + + /** + * @return The heap overhead of this segment. + */ + public long heapOverhead() { + long result = 0; + for (ImmutableSegment s : segments) { + result = result + s.heapOverhead(); + } + return result; + } + + /** + * Updates the heap size counter of the segment by the given delta + */ + protected void incSize(long delta, long heapOverhead) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + protected void incHeapOverheadSize(long delta) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public long getMinSequenceId() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public TimeRangeTracker getTimeRangeTracker() { + return this.timeRangeTracker; + } + + //*** Methods for SegmentsScanner + public Cell last() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public Iterator iterator() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public SortedSet headSet(Cell firstKeyOnRow) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public int compare(Cell left, Cell right) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public int compareRows(Cell left, Cell right) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * @return a set of all cells in the segment + */ + protected CellSet getCellSet() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Returns the Cell comparator used by this segment + * @return the Cell comparator used by this segment + */ + protected CellComparator getComparator() { + return comparator; + } + + protected void internalAdd(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed, + MemstoreSize memstoreSize) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + protected long heapOverheadChange(Cell cell, boolean succ) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Returns a subset of the segment cell set, which starts with the given cell + * @param firstCell a cell in the segment + * @return a subset of the segment cell set, which starts with the given cell + */ + protected SortedSet tailSet(Cell firstCell) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + @VisibleForTesting + MemStoreLAB getMemStoreLAB() { + return segments.get(0).getMemStoreLAB(); + } + + // Debug methods + /** + * Dumps all cells of the segment into the given log + */ + void dump(Log log) { + for (ImmutableSegment s : segments) { + s.dump(log); + } + } + + @Override + public String toString() { + String res = "This is CompositeImmutableSegment and those are its segments:: "; + for (ImmutableSegment s : segments) { + res = res + s.toString(); + } + return res; + } + + /* + * @param a + * @param b + * @return Return lowest of a or b or null if both a and b are null + */ + private Cell getLowest(final Cell a, final Cell b) { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + return comparator.compareRows(a, b) <= 0? a: b; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index d4e6e12..c9f8478 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -91,7 +92,10 @@ public class DefaultMemStore extends AbstractMemStore { if (!this.active.isEmpty()) { ImmutableSegment immutableSegment = SegmentFactory.instance(). createImmutableSegment(this.active); - this.snapshot = immutableSegment; + List emptySegments = + new ArrayList(Arrays.asList(immutableSegment)); + this.snapshot = // maximal possible read point to create the scanner + new CompositeImmutableSegment(getComparator(), Long.MAX_VALUE, emptySegments); resetActive(); } } @@ -136,7 +140,7 @@ public class DefaultMemStore extends AbstractMemStore { protected List getSegments() throws IOException { List list = new ArrayList(2); list.add(this.active); - list.add(this.snapshot); + list.addAll(this.snapshot.getAllSegments()); return list; } @@ -146,9 +150,18 @@ public class DefaultMemStore extends AbstractMemStore { * @return Next row or null if none found. */ Cell getNextRow(final Cell cell) { - return getLowest( - getNextRow(cell, this.active.getCellSet()), - getNextRow(cell, this.snapshot.getCellSet())); + Cell lowest = null; + List segments = new ArrayList(1 + snapshot.getAllSegments().size()); + segments.add(this.active); + segments.addAll(snapshot.getAllSegments()); + for (Segment segment : segments) { + if (lowest == null) { + lowest = getNextRow(cell, segment.getCellSet()); + } else { + lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet())); + } + } + return lowest; } @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 1dcf060..08f8169 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -242,7 +242,7 @@ public class HStore implements Store { if (family.isInMemoryCompaction()) { className = CompactingMemStore.class.getName(); this.memstore = new CompactingMemStore(conf, this.comparator, this, - this.getHRegion().getRegionServicesForStores()); + this.getHRegion().getRegionServicesForStores(), false); } else { this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator }); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 4cdb29d..20148a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -69,6 +69,14 @@ public class ImmutableSegment extends Segment { ///////////////////// CONSTRUCTORS ///////////////////// /**------------------------------------------------------------------------ + * Empty C-tor to be used only for CompositeImmutableSegment + */ + protected ImmutableSegment(CellComparator comparator) { + super(comparator); + this.timeRange = null; + } + + /**------------------------------------------------------------------------ * Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one. * This C-tor should be used when active MutableSegment is pushed into the compaction * pipeline and becomes an ImmutableSegment. @@ -232,7 +240,7 @@ public class ImmutableSegment extends Segment { Cell curCell; int idx = 0; // create this segment scanner with maximal possible read point, to go over all Cells - SegmentScanner segmentScanner = this.getScanner(Long.MAX_VALUE); + KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE); try { while ((curCell = segmentScanner.next()) != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 0df3674..9faa57e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -196,6 +196,8 @@ public class MemStoreCompactor { return; } if (nextStep == Action.FLATTEN) { + // if multiple segments appear in the pipeline flush them to the disk later together + compactingMemStore.useCompositeSnapshot(); // Youngest Segment in the pipeline is with SkipList index, make it flat compactingMemStore.flattenOneSegment(versionedList.getVersion()); return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index afdfe6f..f93aefd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -64,6 +64,15 @@ public abstract class Segment { protected final TimeRangeTracker timeRangeTracker; protected volatile boolean tagsPresent; + // Empty constructor to be used when Segment is used as interface, + // and there is no need in true Segments state + protected Segment(CellComparator comparator) { + this.comparator = comparator; + this.dataSize = new AtomicLong(0); + this.heapOverhead = new AtomicLong(0); + this.timeRangeTracker = new TimeRangeTracker(); + } + // This constructor is used to create empty Segments. protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { this.cellSet.set(cellSet); @@ -91,7 +100,7 @@ public abstract class Segment { * Creates the scanner for the given read point * @return a scanner for the given read point */ - public SegmentScanner getScanner(long readPoint) { + public KeyValueScanner getScanner(long readPoint) { return new SegmentScanner(this, readPoint); } @@ -99,7 +108,7 @@ public abstract class Segment { * Creates the scanner for the given read point, and a specific order in a list * @return a scanner for the given read point */ - public SegmentScanner getScanner(long readPoint, long order) { + public KeyValueScanner getScanner(long readPoint, long order) { return new SegmentScanner(this, readPoint, order); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 4f60976..34ad72a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -117,6 +117,9 @@ public final class SegmentFactory { private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List segments) { List mslabs = new ArrayList(); + if (!conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { + return null; + } for (ImmutableSegment segment : segments) { mslabs.add(segment.getMemStoreLAB()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 7dd9479..fb8e290 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -80,7 +80,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { public void setUp() throws Exception { compactingSetUp(); this.memstore = new CompactingMemStore(HBaseConfiguration.create(), CellComparator.COMPARATOR, - store, regionServicesForStores); + store, regionServicesForStores, true); } protected void compactingSetUp() throws Exception { @@ -130,7 +130,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // use case 3: first in snapshot second in kvset this.memstore = new CompactingMemStore(HBaseConfiguration.create(), - CellComparator.COMPARATOR, store, regionServicesForStores); + CellComparator.COMPARATOR, store, regionServicesForStores, true); this.memstore.add(kv1.clone(), null); // As compaction is starting in the background the repetition // of the k1 might be removed BUT the scanners created earlier @@ -171,12 +171,14 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // Add more versions to make it a little more interesting. Thread.sleep(1); addRows(this.memstore); + ((CompactingMemStore)this.memstore).useCompositeSnapshot(); + + Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY); assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty, new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0); for (int i = 0; i < ROW_COUNT; i++) { - Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i), - System.currentTimeMillis())); + Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i), System.currentTimeMillis())); if (i + 1 == ROW_COUNT) { assertEquals(nr, null); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index c72cae3..aa4f274 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -67,7 +67,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore this.memstore = new CompactingMemStore(conf, CellComparator.COMPARATOR, store, - regionServicesForStores); + regionServicesForStores, true); } ////////////////////////////////////////////////////////////////////////////// diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 433388d..9c9c922 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -227,7 +227,8 @@ public class TestDefaultMemStore { } finally { s.close(); } - assertEquals(rowCount, count); + assertEquals("\n<<< The row count is " + rowCount + " and the iteration count is " + count, + rowCount, count); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 35159b6..74fca75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -119,7 +119,7 @@ public class TestWalAndCompactingMemStoreFlush { assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family).get(qf)); assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), - Arrays.equals(r.getFamilyMap(family).get(qf), val)); + Arrays.equals(r.getFamilyMap(family).get(qf), val)); } @Test(timeout = 180000) @@ -262,7 +262,12 @@ public class TestWalAndCompactingMemStoreFlush { long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII - + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ; + + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" + + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize() + + ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + + ", cf3: " + region.getStore(FAMILY3).getFlushedCellsSize() + + ", cf4: " + region.getStore(FAMILIES[4]).getFlushedCellsSize() + + "; the entire region size is: " + region.getMemstoreSize() + "\n";; // Flush!!!!!!!!!!!!!!!!!!!!!! @@ -284,13 +289,16 @@ public class TestWalAndCompactingMemStoreFlush { s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV - + "\n"; + + "\n" + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize() + + ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + + ", cf3: " + region.getStore(FAMILY3).getFlushedCellsSize() + + ", cf4: " + region.getStore(FAMILIES[4]).getFlushedCellsSize() + "\n"; s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV - + "\n"; + + "\n" + "the entire region size is: " + region.getMemstoreSize() + "\n"; // CF1's pipeline component (inserted before first flush) should be flushed to disk // CF2 should be flushed to disk @@ -322,6 +330,16 @@ public class TestWalAndCompactingMemStoreFlush { assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseV); assertEquals(MemstoreSize.EMPTY_SIZE, cf3MemstoreSizePhaseV); + s = s + "----AFTER THIRD FLUSH, the entire region size is:" + region.getMemstoreSize() + + " (empty memstore size is " + MemstoreSize.EMPTY_SIZE + + "), while the sizes of each memstore are as following \ncf1: " + cf1MemstoreSizePhaseV + + ", cf2: " + cf2MemstoreSizePhaseV + ", cf3: " + cf3MemstoreSizePhaseV + + ", cf4: " + region.getStore(FAMILIES[4]).getSizeOfMemStore() + "\n" + + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize() + + ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + + ", cf3: " + region.getStore(FAMILY3).getFlushedCellsSize() + + ", cf4: " + region.getStore(FAMILIES[4]).getFlushedCellsSize() + "\n"; + // What happens when we hit the memstore limit, but we are not able to find // any Column Family above the threshold? // In that case, we should flush all the CFs. @@ -339,7 +357,7 @@ public class TestWalAndCompactingMemStoreFlush { region.flush(false); - s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: " + s = s + "----AFTER FORTH FLUSH, The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseV + ". After additional inserts and last flush, the entire region size is:" + region .getMemstoreSize() @@ -348,7 +366,7 @@ public class TestWalAndCompactingMemStoreFlush { // Since we won't find any CF above the threshold, and hence no specific // store to flush, we should flush all the memstores // Also compacted memstores are flushed to disk. - assertEquals(0, region.getMemstoreSize()); + assertEquals(s, 0, region.getMemstoreSize()); System.out.println(s); HBaseTestingUtility.closeRegionAndWAL(region); } @@ -612,19 +630,25 @@ public class TestWalAndCompactingMemStoreFlush { @Test(timeout = 180000) public void testSelectiveFlushAndWALinDataCompaction() throws IOException { + + MemstoreSize checkSize = MemstoreSize.EMPTY_SIZE; + // Set up the configuration Configuration conf = HBaseConfiguration.create(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); - conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class - .getName()); - conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * - 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushNonSloppyStoresFirstPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); // set memstore to do data compaction and not to use the speculative scan conf.set("hbase.hregion.compacting.memstore.type", "data-compaction"); // Intialize the HRegion HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); + + MemstoreSize cf2MemstoreSizePhase0 = region.getStore(FAMILY2).getSizeOfMemStore(); + MemstoreSize cf1MemstoreSizePhase0 = region.getStore(FAMILY1).getSizeOfMemStore(); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i)); @@ -644,6 +668,7 @@ public class TestWalAndCompactingMemStoreFlush { // Find the sizes of the memstores of each CF. MemstoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getSizeOfMemStore(); + //boolean oldCF2 = region.getStore(FAMILY2).isSloppyMemstore(); MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore(); @@ -654,16 +679,19 @@ public class TestWalAndCompactingMemStoreFlush { // The total memstore size should be the same as the sum of the sizes of // memstores of CF1, CF2 and CF3. - String msg = "totalMemstoreSize="+totalMemstoreSize + + String msg = "\n<<< totalMemstoreSize="+totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + - " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; + " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI; assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); // Flush! CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; + MemStore cms2 = ((HStore) region.getStore(FAMILY2)).memstore; + MemstoreSize memstrsize2 = cms2.getSnapshotSize(); + MemstoreSize flshsize2 = cms2.getFlushableSize(); CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; cms1.flushInMemory(); cms3.flushInMemory(); @@ -676,9 +704,20 @@ public class TestWalAndCompactingMemStoreFlush { long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); + MemstoreSize newSize = new MemstoreSize(); // CF2 should have been cleared - assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII); + assertEquals(msg + "\n<<< CF2 is compacting " + + ((HStore)region.getStore(FAMILY2)).memstore.isSloppy() + + ", snapshot and flushable size BEFORE flush " + memstrsize2 + "; " + flshsize2 + + ", snapshot and flushable size AFTER flush " + + cms2.getSnapshotSize() + "; " + cms2.getFlushableSize() + "\n<<< cf2 size " + + cms2.size() + "; the checked size " + cf2MemstoreSizePhaseII + + "; memstore empty size " + MemstoreSize.EMPTY_SIZE + "; check size " + checkSize + + "\n<<< first first first CF2 size " + cf2MemstoreSizePhase0 + + "; first first first CF1 size "+ cf1MemstoreSizePhase0 + "; new new new size " + + newSize + "\n", + MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII); String s = "\n\n----------------------------------\n" + "Upon initial insert and flush, LSN of CF1 is:" -- 1.8.5.2 (Apple Git-48)