From 15b571360c22b50f6a0e068302bd4e2da78ac890 Mon Sep 17 00:00:00 2001 From: anastas Date: Thu, 11 Aug 2016 11:45:01 +0300 Subject: [PATCH] My squashed commits --- .../org/apache/hadoop/hbase/util/ClassSize.java | 26 +- .../hbase/regionserver/AbstractMemStore.java | 10 +- .../hadoop/hbase/regionserver/CellArrayMap.java | 54 +++ .../hadoop/hbase/regionserver/CellFlatMap.java | 494 +++++++++++++++++++++ .../apache/hadoop/hbase/regionserver/CellSet.java | 6 +- .../hbase/regionserver/CompactingMemStore.java | 73 ++- .../hbase/regionserver/CompactionPipeline.java | 47 +- .../hadoop/hbase/regionserver/DefaultMemStore.java | 10 +- .../hbase/regionserver/ImmutableSegment.java | 181 +++++++- .../hbase/regionserver/MemStoreCompactor.java | 246 ++++++---- .../regionserver/MemStoreCompactorIterator.java | 160 +++++++ .../hadoop/hbase/regionserver/MemStoreScanner.java | 68 ++- .../hadoop/hbase/regionserver/MutableSegment.java | 30 +- .../apache/hadoop/hbase/regionserver/Segment.java | 85 +++- .../hadoop/hbase/regionserver/SegmentFactory.java | 28 +- .../hadoop/hbase/regionserver/SegmentScanner.java | 8 +- .../hbase/regionserver/VersionedSegmentsList.java | 15 +- .../org/apache/hadoop/hbase/io/TestHeapSize.java | 17 +- .../hadoop/hbase/regionserver/TestCellFlatSet.java | 143 ++++++ .../hbase/regionserver/TestCompactingMemStore.java | 43 +- .../TestCompactingToCellArrayMapMemStore.java | 363 +++++++++++++++ .../hadoop/hbase/regionserver/TestHRegion.java | 4 +- .../regionserver/TestHRegionWithInMemoryFlush.java | 94 ++++ .../TestWalAndCompactingMemStoreFlush.java | 276 +++++++++++- hbase-shell/src/main/ruby/hbase/admin.rb | 2 +- 25 files changed, 2210 insertions(+), 273 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayMap.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellFlatMap.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java index 41c93ea..ff9dbcb 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; + /** * Class for determining the "size" of a class, an attempt to calculate the * actual bytes that an object of this class will occupy in memory @@ -81,6 +82,12 @@ public class ClassSize { /** Overhead for ConcurrentSkipListMap Entry */ public static final int CONCURRENT_SKIPLISTMAP_ENTRY; + /** Overhead for CellArrayMap */ + public static final int CELL_ARRAY_MAP; + + /** Overhead for Cell Array Entry */ + public static final int CELL_ARRAY_MAP_ENTRY; + /** Overhead for ReentrantReadWriteLock */ public static final int REENTRANT_LOCK; @@ -106,7 +113,7 @@ public class ClassSize { public static final int TIMERANGE_TRACKER; /** Overhead for CellSkipListSet */ - public static final int CELL_SKIPLIST_SET; + public static final int CELL_SET; public static final int STORE_SERVICES; @@ -262,9 +269,20 @@ public class ClassSize { // The size changes from jdk7 to jdk8, estimate the size rather than use a conditional CONCURRENT_SKIPLISTMAP = (int) estimateBase(ConcurrentSkipListMap.class, false); - CONCURRENT_SKIPLISTMAP_ENTRY = + // CELL_ARRAY_MAP is the size of an instance of CellArrayMap class, which extends + // CellFlatMap class. CellArrayMap object containing a ref to an Array, so + // OBJECT + REFERENCE + ARRAY + // CellFlatMap object contains two integers, one boolean and one reference to object, so + // 2*INT + BOOLEAN + REFERENCE + CELL_ARRAY_MAP = align(OBJECT + 2*Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN + + ARRAY + 2*REFERENCE); + + CONCURRENT_SKIPLISTMAP_ENTRY = align( align(OBJECT + (3 * REFERENCE)) + /* one node per entry */ - align((OBJECT + (3 * REFERENCE))/2); /* one index per two entries */ + align((OBJECT + (3 * REFERENCE))/2)); /* one index per two entries */ + + // REFERENCE in the CellArrayMap all the rest is counted in KeyValue.heapSize() + CELL_ARRAY_MAP_ENTRY = align(REFERENCE); REENTRANT_LOCK = align(OBJECT + (3 * REFERENCE)); @@ -282,7 +300,7 @@ public class ClassSize { TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2); - CELL_SKIPLIST_SET = align(OBJECT + REFERENCE); + CELL_SET = align(OBJECT + REFERENCE); STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 4716eee..df73a1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -65,18 +65,18 @@ public abstract class AbstractMemStore implements MemStore { public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER + - ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP)); + ClassSize.CELL_SET + ClassSize.CONCURRENT_SKIPLISTMAP)); protected AbstractMemStore(final Configuration conf, final CellComparator c) { this.conf = conf; this.comparator = c; - resetCellSet(); - this.snapshot = SegmentFactory.instance().createImmutableSegment(conf, c, 0); + resetActive(); + this.snapshot = SegmentFactory.instance().createImmutableSegment(c, 0); this.snapshotId = NO_SNAPSHOT_ID; } - protected void resetCellSet() { + protected void resetActive() { // Reset heap to not include any keys this.active = SegmentFactory.instance().createMutableSegment(conf, comparator, DEEP_OVERHEAD); this.timeOfOldestEdit = Long.MAX_VALUE; @@ -301,7 +301,7 @@ public abstract class AbstractMemStore implements MemStore { // false means there was a change, so give us the size. long delta = heapSizeChange(cur, true); addedSize -= delta; - active.incSize(-delta); + active.updateSize(-delta); it.remove(); setOldestEditTimeToNow(); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayMap.java new file mode 100644 index 0000000..605fea2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayMap.java @@ -0,0 +1,54 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Cellersion 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY CellIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.util.Comparator; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * CellArrayMap is a simple array of Cells and cannot be allocated off-heap. + * As all java arrays CellArrayMap's array of references pointing to Cell objects. + */ +@InterfaceAudience.Private +public class CellArrayMap extends CellFlatMap { + + private final Cell[] block; + + /* The Cells Array is created only when CellArrayMap is created, all sub-CellBlocks use + * boundary indexes. The given Cell array must be ordered. */ + public CellArrayMap( + Comparator comparator, Cell[] b, int min, int max, boolean descending) { + super(comparator,min,max,descending); + this.block = b; + } + + /* To be used by base class only to create a sub-CellFlatMap */ + @Override + protected CellFlatMap createSubCellFlatMap(int min, int max, boolean descending) { + return new CellArrayMap(comparator(), this.block, min, max, descending); + } + + @Override + protected Cell getCell(int i) { + if( (i < minCellIdx) && (i >= maxCellIdx) ) return null; + return block[i]; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellFlatMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellFlatMap.java new file mode 100644 index 0000000..dd4515c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellFlatMap.java @@ -0,0 +1,494 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Cellersion 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY CellIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableSet; +import java.util.NavigableMap; +import java.util.Set; + + +/** + * CellFlatMap stores a constant number of elements and is immutable after creation stage. + * Being immutable, the CellFlatMap can be implemented as array. + * The actual array can be on- or off-heap and is implemented in concrete class derived from CellFlatMap. + * The CellFlatMap uses no synchronization primitives, it is assumed to be created by a + * single thread and then it can be read-only by multiple threads. + * + * The "flat" in the name, means that the memory layout of the Map is sequential array and thus + * requires less memory than ConcurrentSkipListMap. + */ +@InterfaceAudience.Private +public abstract class CellFlatMap implements NavigableMap { + + private final Comparator comparator; + protected int minCellIdx = 0; // the index of the minimal cell (for sub-sets) + protected int maxCellIdx = 0; // the index of the cell after the maximal cell (for sub-sets) + private boolean descending = false; + + /* C-tor */ + public CellFlatMap(Comparator comparator, int min, int max, boolean d){ + this.comparator = comparator; + this.minCellIdx = min; + this.maxCellIdx = max; + this.descending = d; + } + + /* Used for abstract CellFlatMap creation, implemented by derived class */ + protected abstract CellFlatMap createSubCellFlatMap(int min, int max, boolean descending); + + /* Returns the i-th cell in the cell block */ + protected abstract Cell getCell(int i); + + /** + * Binary search for a given key in between given boundaries of the array. + * Positive returned numbers mean the index. + * Negative returned numbers means the key not found. + * + * The absolute value of the output is the + * possible insert index for the searched key + * + * In twos-complement, (-1 * insertion point)-1 is the bitwise not of the insert point. + * + * + * @param needle The key to look for in all of the entries + * @return Same return value as Arrays.binarySearch. + */ + private int find(Cell needle) { + int begin = minCellIdx; + int end = maxCellIdx - 1; + + while (begin <= end) { + int mid = (begin + end) >>> 1; + Cell midCell = getCell(mid); + int compareRes = comparator.compare(midCell, needle); + + if (compareRes == 0) { + return mid; // 0 means equals. We found the key + } + + if (compareRes < 0) { + // midCell is less than needle so we need to look at farther up + begin = mid + 1; + } else { + // midCell is greater than needle so we need to look down + end = mid - 1; + } + } + + return (-1 * begin)-1; + } + + /* Get the index of the given anchor key for creating subsequent set. + ** It doesn't matter whether the given key exists in the set or not. + ** + ** taking into consideration whether + ** the key should be inclusive or exclusive */ + private int getValidIndex(Cell key, boolean inclusive, boolean tail) { + int index = find(key); + int result = -1; + + // if the key is found and to be included, for all possibilities, the answer is the found index + if (index >= 0 && inclusive) result = index; + + // The compliment Operator (~) converts the returned insertion point to the real one + if (index<0) result = ~index; + + if (tail && result==-1) { + if (index >= 0 && !inclusive) + result = (descending) ? index - 1 : index + 1; + } else if (result==-1) { + if (index >= 0 && !inclusive) + result = (descending) ? index + 1 : index - 1; + } + + if (result < minCellIdx || result > maxCellIdx) { + throw new IllegalArgumentException("Index " + result + " (initial index " + index + ") " + + " out of boundary, when looking for key " + key + ". The minCellIdx is " + minCellIdx + + " and the maxCellIdx is " + maxCellIdx + ". Finally, descending? " + descending + + " and was the key requested inclusively? " + inclusive); + } + return result; + } + + @Override + public Comparator comparator() { + return comparator; + } + + @Override + public int size() { + return maxCellIdx-minCellIdx; + } + + @Override + public boolean isEmpty() { + return ( size() == 0 ); + } + + + // ---------------- Sub-Maps ---------------- + @Override + public NavigableMap subMap( Cell fromKey, + boolean fromInclusive, + Cell toKey, + boolean toInclusive) { + int toIndex = getValidIndex(toKey, toInclusive, false); + int fromIndex = (getValidIndex(fromKey, fromInclusive, true)); + + if (fromIndex > toIndex) { + throw new IllegalArgumentException("Inconsistent range, when looking from " + + fromKey + " to " + toKey); + } + return createSubCellFlatMap(fromIndex, toIndex+1, descending); + } + + @Override + public NavigableMap headMap(Cell toKey, boolean inclusive) { + int index = getValidIndex(toKey, inclusive, false); + // "+1" because the max index is one after the true index + return createSubCellFlatMap(minCellIdx, index+1, descending); + } + + @Override + public NavigableMap tailMap(Cell fromKey, boolean inclusive) { + int index = (getValidIndex(fromKey, inclusive, true)); + return createSubCellFlatMap(index, maxCellIdx, descending); + } + + @Override + public NavigableMap descendingMap() { + return createSubCellFlatMap(minCellIdx, maxCellIdx, true); + } + + @Override + public NavigableMap subMap(Cell k1, Cell k2) { + return this.subMap(k1, true, k2, true); + } + + @Override + public NavigableMap headMap(Cell k) { + return this.headMap(k, true); + } + + @Override + public NavigableMap tailMap(Cell k) { + return this.tailMap(k, true); + } + + + // -------------------------------- Key's getters -------------------------------- + @Override + public Cell firstKey() { + if (isEmpty()) { + return null; + } + return descending ? getCell(maxCellIdx - 1) : getCell(minCellIdx); + } + + @Override + public Cell lastKey() { + if (isEmpty()) { + return null; + } + return descending ? getCell(minCellIdx) : getCell(maxCellIdx - 1); + } + + @Override + public Cell lowerKey(Cell k) { + if (isEmpty()) { + return null; + } + int index = find(k); + // If index>=0 there's a key exactly equal + index = (index>=0) ? index-1 : -(index); + return (index < minCellIdx || index >= maxCellIdx) ? null : getCell(index); + } + + @Override + public Cell floorKey(Cell k) { + if (isEmpty()) { + return null; + } + int index = find(k); + index = (index>=0) ? index : -(index); + return (index < minCellIdx || index >= maxCellIdx) ? null : getCell(index); + } + + @Override + public Cell ceilingKey(Cell k) { + if (isEmpty()) { + return null; + } + int index = find(k); + index = (index>=0) ? index : -(index)+1; + return (index < minCellIdx || index >= maxCellIdx) ? null : getCell(index); + } + + @Override + public Cell higherKey(Cell k) { + if (isEmpty()) { + return null; + } + int index = find(k); + index = (index>=0) ? index+1 : -(index)+1; + return (index < minCellIdx || index >= maxCellIdx) ? null : getCell(index); + } + + @Override + public boolean containsKey(Object o) { + int index = find((Cell) o); + return (index >= 0); + } + + @Override + public boolean containsValue(Object o) { // use containsKey(Object o) instead + throw new UnsupportedOperationException("Use containsKey(Object o) instead"); + } + + @Override + public Cell get(Object o) { + int index = find((Cell) o); + return (index >= 0) ? getCell(index) : null; + } + + // -------------------------------- Entry's getters -------------------------------- + // all interfaces returning Entries are unsupported because we are dealing only with the keys + @Override + public Entry lowerEntry(Cell k) { + throw new UnsupportedOperationException(); + } + + @Override + public Entry higherEntry(Cell k) { + throw new UnsupportedOperationException(); + } + + @Override + public Entry ceilingEntry(Cell k) { + throw new UnsupportedOperationException(); + } + + @Override + public Entry floorEntry(Cell k) { + throw new UnsupportedOperationException(); + } + + @Override + public Entry firstEntry() { + throw new UnsupportedOperationException(); + } + + @Override + public Entry lastEntry() { + throw new UnsupportedOperationException(); + } + + @Override + public Entry pollFirstEntry() { + throw new UnsupportedOperationException(); + } + + @Override + public Entry pollLastEntry() { + throw new UnsupportedOperationException(); + } + + + // -------------------------------- Updates -------------------------------- + // All updating methods below are unsupported. + // Assuming an array of Cells will be allocated externally, + // fill up with Cells and provided in construction time. + // Later the structure is immutable. + @Override + public Cell put(Cell k, Cell v) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public Cell remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean replace(Cell k, Cell v, Cell v1) { + throw new UnsupportedOperationException(); + } + + @Override + public void putAll(Map map) { + throw new UnsupportedOperationException(); + } + + @Override + public Cell putIfAbsent(Cell k, Cell v) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o, Object o1) { + throw new UnsupportedOperationException(); + } + + @Override + public Cell replace(Cell k, Cell v) { + throw new UnsupportedOperationException(); + } + + + // -------------------------------- Sub-Sets -------------------------------- + @Override + public NavigableSet navigableKeySet() { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableSet descendingKeySet() { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableSet keySet() { + throw new UnsupportedOperationException(); + } + + @Override + public Collection values() { + return new CellFlatMapCollection(); + } + + @Override + public Set> entrySet() { + throw new UnsupportedOperationException(); + } + + + // -------------------------------- Iterator K -------------------------------- + private final class CellFlatMapIterator implements Iterator { + int index; + + private CellFlatMapIterator() { + index = descending ? maxCellIdx-1 : minCellIdx; + } + + @Override + public boolean hasNext() { + return descending ? (index >= minCellIdx) : (index < maxCellIdx); + } + + @Override + public Cell next() { + Cell result = getCell(index); + if (descending) { + index--; + } else { + index++; + } + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + // -------------------------------- Collection -------------------------------- + private final class CellFlatMapCollection implements Collection { + + @Override + public int size() { + return CellFlatMap.this.size(); + } + + @Override + public boolean isEmpty() { + return CellFlatMap.this.isEmpty(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean contains(Object o) { + return containsKey(o); + } + + @Override + public Iterator iterator() { + return new CellFlatMapIterator(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public T[] toArray(T[] ts) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(Cell k) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection collection) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection collection) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection collection) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection collection) { + throw new UnsupportedOperationException(); + } + + + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java index 4433302..6f70bac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java @@ -22,8 +22,8 @@ import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.NavigableSet; +import java.util.NavigableMap; import java.util.SortedSet; -import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.hbase.Cell; @@ -44,13 +44,13 @@ public class CellSet implements NavigableSet { // is not already present.", this implementation "Adds the specified element to this set EVEN // if it is already present overwriting what was there previous". // Otherwise, has same attributes as ConcurrentSkipListSet - private final ConcurrentNavigableMap delegatee; + private final NavigableMap delegatee; /// CellSet(final CellComparator c) { this.delegatee = new ConcurrentSkipListMap(c); } - CellSet(final ConcurrentNavigableMap m) { + CellSet(final NavigableMap m) { this.delegatee = m; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e27acce..4e1f997 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -51,9 +51,14 @@ import org.apache.hadoop.hbase.wal.WAL; */ @InterfaceAudience.Private public class CompactingMemStore extends AbstractMemStore { - public final static long DEEP_OVERHEAD_PER_PIPELINE_ITEM = ClassSize.align( + + public final static long DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM = ClassSize.align( + ClassSize.TIMERANGE_TRACKER + ClassSize.CELL_SET + ClassSize.CONCURRENT_SKIPLISTMAP); + + public final static long DEEP_OVERHEAD_PER_PIPELINE_CELL_ARRAY_ITEM = ClassSize.align( ClassSize.TIMERANGE_TRACKER + ClassSize.TIMERANGE + - ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP); + ClassSize.CELL_SET + ClassSize.CELL_ARRAY_MAP); + // Default fraction of in-memory-flush size w.r.t. flush-to-disk size public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = "hbase.memstore.inmemoryflush.threshold.factor"; @@ -64,9 +69,10 @@ public class CompactingMemStore extends AbstractMemStore { private RegionServicesForStores regionServices; private CompactionPipeline pipeline; private MemStoreCompactor compactor; - // the threshold on active size for in-memory flush - private long inmemoryFlushSize; + + private long inmemoryFlushSize; // the threshold on active size for in-memory flush private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); + @VisibleForTesting private final AtomicBoolean allowCompaction = new AtomicBoolean(true); public CompactingMemStore(Configuration conf, CellComparator c, @@ -95,7 +101,7 @@ public class CompactingMemStore extends AbstractMemStore { } public static long getSegmentSize(Segment segment) { - return segment.getSize() - DEEP_OVERHEAD_PER_PIPELINE_ITEM; + return segment.keySize(); } public static long getSegmentsSize(List list) { @@ -203,11 +209,20 @@ public class CompactingMemStore extends AbstractMemStore { return pipeline.swap(versionedList, result); } - public boolean hasCompactibleSegments() { + /** + * @param requesterVersion The caller must hold the VersionedList of the pipeline + * with version taken earlier. This version must be passed as a parameter here. + * The flattening happens only if versions match. + */ + public void flattenOneSegment(long requesterVersion) { + pipeline.flattenYoungestSegment(requesterVersion); + } + + public boolean hasImmutableSegments() { return !pipeline.isEmpty(); } - public VersionedSegmentsList getCompactibleSegments() { + public VersionedSegmentsList getImmutableSegments() { return pipeline.getVersionedList(); } @@ -233,14 +248,13 @@ public class CompactingMemStore extends AbstractMemStore { // The list of elements in pipeline + the active element + the snapshot segment // TODO : This will change when the snapshot is made of more than one element List list = new ArrayList(pipelineList.size() + 2); - list.add(getActive().getSegmentScanner(readPt, order + 1)); + list.add(getActive().getScanner(readPt, order + 1)); for (Segment item : pipelineList) { - list.add(item.getSegmentScanner(readPt, order)); + list.add(item.getScanner(readPt, order)); order--; } - list.add(getSnapshot().getSegmentScanner(readPt, order)); - return Collections. singletonList( - new MemStoreScanner((AbstractMemStore) this, list, readPt)); + list.add(getSnapshot().getScanner(readPt, order)); + return Collections. singletonList(new MemStoreScanner(getComparator(), list)); } /** @@ -272,8 +286,6 @@ public class CompactingMemStore extends AbstractMemStore { void flushInMemory() throws IOException { // setting the inMemoryFlushInProgress flag again for the case this method is invoked // directly (only in tests) in the common path setting from true to true is idempotent - // Speculative compaction execution, may be interrupted if flush is forced while - // compaction is in progress inMemoryFlushInProgress.set(true); try { // Phase I: Update the pipeline @@ -281,20 +293,22 @@ public class CompactingMemStore extends AbstractMemStore { try { MutableSegment active = getActive(); if (LOG.isDebugEnabled()) { - LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, " - + "and initiating compaction."); + LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline"); } pushActiveToPipeline(active); } finally { getRegionServices().unblockUpdates(); } + // Used by tests if (!allowCompaction.get()) { return; } // Phase II: Compact the pipeline try { - compactor.startCompaction(); + // Speculative compaction execution, may be interrupted if flush is forced while + // compaction is in progress + compactor.start(); } catch (IOException e) { LOG.warn("Unable to run memstore compaction. region " + getRegionServices().getRegionInfo().getRegionNameAsString() + "store: " @@ -314,9 +328,10 @@ public class CompactingMemStore extends AbstractMemStore { } private boolean shouldFlushInMemory() { - if (getActive().getSize() > inmemoryFlushSize) { - // size above flush threshold - return inMemoryFlushInProgress.compareAndSet(false, true); + if(getActive().getSize() > inmemoryFlushSize) { // size above flush threshold + // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude + // the insert of the active into the compaction pipeline + return (inMemoryFlushInProgress.compareAndSet(false,true)); } return false; } @@ -328,17 +343,17 @@ public class CompactingMemStore extends AbstractMemStore { */ private void stopCompaction() { if (inMemoryFlushInProgress.get()) { - compactor.stopCompact(); + compactor.stop(); inMemoryFlushInProgress.set(false); } } private void pushActiveToPipeline(MutableSegment active) { if (!active.isEmpty()) { - long delta = DEEP_OVERHEAD_PER_PIPELINE_ITEM - DEEP_OVERHEAD; - active.setSize(active.getSize() + delta); + long delta = DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM - DEEP_OVERHEAD; + active.updateSize(delta); pipeline.pushHead(active); - resetCellSet(); + resetActive(); } } @@ -410,4 +425,14 @@ public class CompactingMemStore extends AbstractMemStore { } return lowest; } + + // debug method + public void debug() { + String msg = "active size="+getActive().getSize(); + msg += " threshold="+IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT* inmemoryFlushSize; + msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false"); + msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false"); + LOG.debug(msg); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 3f3bf8d..e0ba8c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -45,7 +45,7 @@ public class CompactionPipeline { private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() .createImmutableSegment(null, - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM); public CompactionPipeline(RegionServicesForStores region) { this.region = region; @@ -117,6 +117,50 @@ public class CompactionPipeline { return true; } + /** + * If the caller holds the current version, go over the the pipeline and try to flatten each + * segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based. + * Flattening of the segment that initially is not based on ConcurrentSkipListMap has no effect. + * Return after one segment is successfully flatten. + * + * @return true iff a segment was successfully flattened + */ + public boolean flattenYoungestSegment(long requesterVersion) { + + if(requesterVersion != version) { + LOG.warn("Segment flattening failed, because versions do not match. Requester version: " + + requesterVersion + ", actual version: " + version); + return false; + } + + synchronized (pipeline){ + if(requesterVersion != version) { + LOG.warn("Segment flattening failed, because versions do not match"); + return false; + } + + for (ImmutableSegment s : pipeline) { + // remember the old size in case this segment is going to be flatten + long sizeBeforeFlat = s.keySize(); + long globalMemstoreSize = 0; + if (s.flatten()) { + if(region != null) { + long sizeAfterFlat = s.keySize(); + long delta = sizeBeforeFlat - sizeAfterFlat; + globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta); + } + LOG.debug("Compaction pipeline segment " + s + " was flattened; globalMemstoreSize: " + + globalMemstoreSize); + return true; + } + } + + } + // do not update the global memstore size counter and do not increase the version, + // because all the cells remain in place + return false; + } + public boolean isEmpty() { return pipeline.isEmpty(); } @@ -170,7 +214,6 @@ public class CompactionPipeline { // empty suffix is always valid return true; } - Iterator pipelineBackwardIterator = pipeline.descendingIterator(); Iterator suffixBackwardIterator = suffix.descendingIterator(); ImmutableSegment suffixCurrent; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index c21dbb5..50ab06b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -93,7 +93,7 @@ public class DefaultMemStore extends AbstractMemStore { createImmutableSegment(getActive()); setSnapshot(immutableSegment); setSnapshotSize(keySize()); - resetCellSet(); + resetActive(); } } return new MemStoreSnapshot(this.snapshotId, getSnapshot()); @@ -116,10 +116,10 @@ public class DefaultMemStore extends AbstractMemStore { */ public List getScanners(long readPt) throws IOException { List list = new ArrayList(2); - list.add(getActive().getSegmentScanner(readPt, 1)); - list.add(getSnapshot().getSegmentScanner(readPt, 0)); + list.add(getActive().getScanner(readPt, 1)); + list.add(getSnapshot().getScanner(readPt, 0)); return Collections. singletonList( - new MemStoreScanner((AbstractMemStore) this, list, readPt)); + new MemStoreScanner(getComparator(), list)); } @Override @@ -207,4 +207,4 @@ public class DefaultMemStore extends AbstractMemStore { LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); LOG.info("Exiting."); } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 13d9fbf..4682bca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -18,12 +18,18 @@ */ package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.CollectionBackedScanner; +import java.io.IOException; + /** * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, * and is not needed for a {@link MutableSegment}. Specifically, the method @@ -38,12 +44,87 @@ public class ImmutableSegment extends Segment { */ private final TimeRange timeRange; + /** + * Types of ImmutableSegment + */ + public enum Type { + SKIPLIST_MAP_BASED, + ARRAY_MAP_BASED, + } + + private Type type = Type.SKIPLIST_MAP_BASED; + + // whether it is based on CellFlatMap or ConcurrentSkipListMap + private boolean isFlat(){ + return (type != Type.SKIPLIST_MAP_BASED); + } + + ///////////////////// CONSTRUCTORS ///////////////////// + /**------------------------------------------------------------------------ + * Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one. + * This C-tor should be used when active MutableSegment is pushed into the compaction + * pipeline and becomes an ImmutableSegment. + */ protected ImmutableSegment(Segment segment) { super(segment); + type = Type.SKIPLIST_MAP_BASED; + TimeRangeTracker trt = getTimeRangeTracker(); + this.timeRange = trt == null? null: trt.toTimeRange(); + } + + /**------------------------------------------------------------------------ + * C-tor to be used when new CELL_ARRAY BASED ImmutableSegment is a result of compaction of a + * list of older ImmutableSegments. + * The given iterator returns the Cells that "survived" the compaction. + * The input parameter "type" exists for future use when more types of flat ImmutableSegments + * are going to be introduced. + */ + protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator, + MemStoreLAB memStoreLAB, int numOfCells, Type type) { + + super(null, // initiailize the CellSet with NULL + comparator, memStoreLAB, + // initial size of segment metadata (the data per cell is added in createCellArrayMapSet) + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_CELL_ARRAY_ITEM, + ClassSize.CELL_ARRAY_MAP_ENTRY); + + // build the true CellSet based on CellArrayMap + CellSet cs = createCellArrayMapSet(numOfCells, iterator); + + this.setCellSet(null, cs); // update the CellSet of the new Segment + this.type = type; TimeRangeTracker trt = getTimeRangeTracker(); this.timeRange = trt == null? null: trt.toTimeRange(); } + /**------------------------------------------------------------------------ + * C-tor to be used when new SKIP-LIST BASED ImmutableSegment is a result of compaction of a + * list of older ImmutableSegments. + * The given iterator returns the Cells that "survived" the compaction. + */ + protected ImmutableSegment( + CellComparator comparator, MemStoreCompactorIterator iterator, MemStoreLAB memStoreLAB) { + + super(new CellSet(comparator), // initiailize the CellSet with empty CellSet + comparator, memStoreLAB, + // initial size of segment metadata (the data per cell is added in internalAdd) + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM, + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); + + while (iterator.hasNext()) { + Cell c = iterator.next(); + // The scanner is doing all the elimination logic + // now we just copy it to the new segment + Cell newKV = maybeCloneWithAllocator(c); + boolean usedMSLAB = (newKV != c); + internalAdd(newKV, usedMSLAB); // + } + type = Type.SKIPLIST_MAP_BASED; + TimeRangeTracker trt = getTimeRangeTracker(); + this.timeRange = trt == null? null: trt.toTimeRange(); + } + + ///////////////////// PUBLIC METHODS ///////////////////// /** * Builds a special scanner for the MemStoreSnapshot object that is different than the * general segment scanner. @@ -64,4 +145,102 @@ public class ImmutableSegment extends Segment { return this.timeRange.getMin(); } -} \ No newline at end of file + @Override + public long keySize() { + switch (type){ + case SKIPLIST_MAP_BASED: + return size.get() - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM; + case ARRAY_MAP_BASED: + return size.get() - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_CELL_ARRAY_ITEM; + default: throw new IllegalStateException(); + } + } + + /**------------------------------------------------------------------------ + * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one + * based on CellArrayMap. + * If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOP + * For now the change from ConcurrentSkipListMap to CellChunkMap is not supported, because + * this requires the Cell to know on which Chunk it is placed. + * + * Synchronization of the CellSet replacement: + * The reference to the CellSet is AtomicReference and is updated only when ImmutableSegment + * is constructed (single thread) or flattened. The flattening happens as part of a single + * thread of compaction, but to be on the safe side the initial CellSet is locally saved + * before the flattening and then replaced using CAS instruction. + */ + public boolean flatten() { + if (isFlat()) return false; + CellSet oldCellSet = getCellSet(); + int numOfCells = getCellsCount(); + + // each Cell is now represented in CellArrayMap + constantCellMetaDataSize = ClassSize.CELL_ARRAY_MAP_ENTRY; + + // build the new (CellSet CellArrayMap based) + CellSet newCellSet = recreateCellArrayMapSet(numOfCells); + type = Type.ARRAY_MAP_BASED; + setCellSet(oldCellSet,newCellSet); + + // arrange the meta-data size, decrease all meta-data sizes related to SkipList + // (recreateCellArrayMapSet doesn't take the care for the sizes) + long newSegmentSizeDelta = -(ClassSize.CONCURRENT_SKIPLISTMAP + + numOfCells * ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); + // add size of CellArrayMap and meta-data overhead per Cell + newSegmentSizeDelta = newSegmentSizeDelta + ClassSize.CELL_ARRAY_MAP + + numOfCells * ClassSize.CELL_ARRAY_MAP_ENTRY; + updateSize(newSegmentSizeDelta); + + return true; + } + + ///////////////////// PRIVATE METHODS ///////////////////// + /*------------------------------------------------------------------------*/ + // Create CellSet based on CellArrayMap from compacting iterator + private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator) { + + Cell[] cells = new Cell[numOfCells]; // build the Cell Array + int i = 0; + while (iterator.hasNext()) { + Cell c = iterator.next(); + // The scanner behind the iterator is doing all the elimination logic + // now we just copy it to the new segment (also MSLAB copy) + cells[i] = maybeCloneWithAllocator(c); + boolean usedMSLAB = (cells[i] != c); + // second parameter true, because in compaction addition of the cell to new segment + // is always successful + updateMetaInfo(c, true, usedMSLAB); // updates the size per cell + i++; + } + // build the immutable CellSet + CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false); + return new CellSet(cam); + } + + /*------------------------------------------------------------------------*/ + // Create CellSet based on CellArrayMap from current ConcurrentSkipListMap based CellSet + // (without compacting iterator) + private CellSet recreateCellArrayMapSet(int numOfCells) { + + Cell[] cells = new Cell[numOfCells]; // build the Cell Array + Cell curCell; + int idx = 0; + // create this segment scanner with maximal possible read point, to go over all Cells + SegmentScanner segmentScanner = this.getScanner(Long.MAX_VALUE); + + try { + while ((curCell = segmentScanner.next()) != null) { + cells[idx++] = curCell; + } + } catch (IOException ie) { + throw new IllegalStateException(ie); + } finally { + segmentScanner.close(); + } + + // build the immutable CellSet + CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false); + return new CellSet(cam); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 042de0a..57febac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -32,8 +32,11 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; /** - * The ongoing MemStore Compaction manager, dispatches a solo running compaction - * and interrupts the compaction if requested. + * The ongoing MemStore Compaction manager, dispatches a solo running compaction and interrupts + * the compaction if requested. The compaction is interrupted and stopped by CompactingMemStore, + * for example when another compaction needs to be started. + * Prior to compaction the MemStoreCompactor evaluates + * the compacting ratio and aborts the compaction if it is not worthy. * The MemStoreScanner is used to traverse the compaction pipeline. The MemStoreScanner * is included in internal store scanner, where all compaction logic is implemented. * Threads safety: It is assumed that the compaction pipeline is immutable, @@ -42,50 +45,75 @@ import java.util.concurrent.atomic.AtomicBoolean; @InterfaceAudience.Private class MemStoreCompactor { + // Possibility for external guidance whether to flatten the segments without compaction + static final String MEMSTORE_COMPACTOR_FLATTENING = "hbase.hregion.compacting.memstore.flatten"; + static final boolean MEMSTORE_COMPACTOR_FLATTENING_DEFAULT = true; + + // Possibility for external setting of the compacted structure (SkipList, CellArray, etc.) + static final String COMPACTING_MEMSTORE_TYPE_KEY = "hbase.hregion.compacting.memstore.type"; + static final int COMPACTING_MEMSTORE_TYPE_DEFAULT = 2; // COMPACT_TO_ARRAY_MAP as default + + static final String COMPACTION_THRESHOLD_REMAIN_FRACTION + = "hbase.hregion.compacting.memstore.comactPercent"; + static final double COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT = 0.2; + private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private CompactingMemStore compactingMemStore; - private MemStoreScanner scanner; // scanner for pipeline only - // scanner on top of MemStoreScanner that uses ScanQueryMatcher - private StoreScanner compactingScanner; - - // smallest read point for any ongoing MemStore scan - private long smallestReadPoint; // a static version of the segment list from the pipeline private VersionedSegmentsList versionedList; + + // a flag raised when compaction is requested to stop private final AtomicBoolean isInterrupted = new AtomicBoolean(false); + // the limit to the size of the groups to be later provided to MemStoreCompactorIterator + private final int compactionKVMax; + + double fraction = 0.8; + /** + * Types of Compaction + */ + private enum Type { + COMPACT_TO_SKIPLIST_MAP, + COMPACT_TO_ARRAY_MAP + } + + private Type type = Type.COMPACT_TO_ARRAY_MAP; + public MemStoreCompactor(CompactingMemStore compactingMemStore) { this.compactingMemStore = compactingMemStore; + this.compactionKVMax = compactingMemStore.getConfiguration().getInt( + HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); + this.fraction = 1 - compactingMemStore.getConfiguration().getDouble( + COMPACTION_THRESHOLD_REMAIN_FRACTION, + COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT); } - /** + /**---------------------------------------------------------------------- * The request to dispatch the compaction asynchronous task. * The method returns true if compaction was successfully dispatched, or false if there - * is already an ongoing compaction or nothing to compact. + * is already an ongoing compaction or no segments to compact. */ - public boolean startCompaction() throws IOException { - if (!compactingMemStore.hasCompactibleSegments()) return false; // no compaction on empty - - List scanners = new ArrayList(); - // get the list of segments from the pipeline - versionedList = compactingMemStore.getCompactibleSegments(); - // the list is marked with specific version - - // create the list of scanners with maximally possible read point, meaning that - // all KVs are going to be returned by the pipeline traversing - for (Segment segment : versionedList.getStoreSegments()) { - scanners.add(segment.getSegmentScanner(Long.MAX_VALUE)); + public boolean start() throws IOException { + if (!compactingMemStore.hasImmutableSegments()) return false; // no compaction on empty + + int t = compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_TYPE_KEY, + COMPACTING_MEMSTORE_TYPE_DEFAULT); + + switch (t) { + case 1: type = Type.COMPACT_TO_SKIPLIST_MAP; + break; + case 2: type = Type.COMPACT_TO_ARRAY_MAP; + break; + default: throw new RuntimeException("Unknown type " + type); // sanity check } - scanner = - new MemStoreScanner(compactingMemStore, scanners, Long.MAX_VALUE, - MemStoreScanner.Type.COMPACT_FORWARD); - smallestReadPoint = compactingMemStore.getSmallestReadPoint(); - compactingScanner = createScanner(compactingMemStore.getStore()); + // get a snapshot of the list of the segments from the pipeline, + // this local copy of the list is marked with specific version + versionedList = compactingMemStore.getImmutableSegments(); if (LOG.isDebugEnabled()) { - LOG.debug("Starting the MemStore in-memory compaction for store " + LOG.debug("Starting the MemStore In-Memory Shrink of type " + type + " for store " + compactingMemStore.getStore().getColumnFamilyName()); } @@ -93,116 +121,144 @@ class MemStoreCompactor { return true; } - /** + /**---------------------------------------------------------------------- * The request to cancel the compaction asynchronous task * The compaction may still happen if the request was sent too late * Non-blocking request */ - public void stopCompact() { - isInterrupted.compareAndSet(false, true); + public void stop() { + isInterrupted.compareAndSet(false, true); } - - /** + /**---------------------------------------------------------------------- * Close the scanners and clear the pointers in order to allow good * garbage collection */ private void releaseResources() { isInterrupted.set(false); - scanner.close(); - scanner = null; - compactingScanner.close(); - compactingScanner = null; versionedList = null; } - /** + /**---------------------------------------------------------------------- + * Check whether there are some signs to definitely not to flatten, + * returns false if we must compact. If this method returns true we + * still need to evaluate the compaction. + */ + private boolean toFlatten() { + boolean userToFlatten = // the user configurable option to flatten or not to flatten + compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_FLATTENING, + MEMSTORE_COMPACTOR_FLATTENING_DEFAULT); + int numOfSegments = versionedList.getNumOfSegments(); + if (numOfSegments > 3) // hard-coded for now as it is going to move to policy + return false; + else return userToFlatten; + } + + /**---------------------------------------------------------------------- * The worker thread performs the compaction asynchronously. * The solo (per compactor) thread only reads the compaction pipeline. * There is at most one thread per memstore instance. */ private void doCompaction() { + ImmutableSegment result = null; + boolean resultSwapped = false; + int immutCellsNum = versionedList.getNumOfCells(); // number of immutable cells - ImmutableSegment result = SegmentFactory.instance() // create the scanner - .createImmutableSegment( - compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); - - // the compaction processing try { - // Phase I: create the compacted MutableCellSetSegment - compactSegments(result); + // PHASE I: estimate the compaction expedience - EVALUATE COMPACTION + if (toFlatten()) { + immutCellsNum = countCellsForCompaction(); + + if ( !isInterrupted.get() && + (immutCellsNum + > fraction * versionedList.getNumOfCells())) { + // too much cells "survive" the possible compaction, we do not want to compact! + LOG.debug("In-Memory compaction does not pay off - storing the flattened segment" + + " for store: " + compactingMemStore.getFamilyName()); + // Looking for Segment in the pipeline with SkipList index, to make it flat + compactingMemStore.flattenOneSegment(versionedList.getVersion()); + return; + } + } - // Phase II: swap the old compaction pipeline + // PHASE II: create the new compacted ImmutableSegment - START COPY-COMPACTION if (!isInterrupted.get()) { - if (compactingMemStore.swapCompactedSegments(versionedList, result)) { + result = compact(immutCellsNum); + } + + // Phase III: swap the old compaction pipeline - END COPY-COMPACTION + if (!isInterrupted.get()) { + if (resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result)) { // update the wal so it can be truncated and not get too long compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater - } else { - // We just ignored the Segment 'result' and swap did not happen. - result.close(); } - } else { - // We just ignore the Segment 'result'. - result.close(); } } catch (Exception e) { - LOG.debug("Interrupting the MemStore in-memory compaction for store " + compactingMemStore - .getFamilyName()); + LOG.debug("Interrupting the MemStore in-memory compaction for store " + + compactingMemStore.getFamilyName()); Thread.currentThread().interrupt(); - return; } finally { + if ((result != null) && (!resultSwapped)) result.close(); releaseResources(); } } - /** - * Creates the scanner for compacting the pipeline. - * - * @return the scanner + /**---------------------------------------------------------------------- + * The copy-compaction is the creation of the ImmutableSegment (from the relevant type) + * based on the Compactor Iterator. The new ImmutableSegment is returned. */ - private StoreScanner createScanner(Store store) throws IOException { + private ImmutableSegment compact(int numOfCells) throws IOException { - Scan scan = new Scan(); - scan.setMaxVersions(); //Get all available versions + LOG.debug("In-Memory compaction does pay off - The estimated number of cells " + + "after compaction is " + numOfCells + + ", while number of cells before is " + versionedList.getNumOfCells() + + ". The fraction of remaining cells should be: " + fraction); - StoreScanner internalScanner = - new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner), - ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); + ImmutableSegment result = null; + MemStoreCompactorIterator iterator = + new MemStoreCompactorIterator(versionedList.getStoreSegments(), + compactingMemStore.getComparator(), + compactionKVMax, compactingMemStore.getStore()); + try { + switch (type) { + case COMPACT_TO_SKIPLIST_MAP: + result = SegmentFactory.instance().createImmutableSegment( + compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator); + break; + case COMPACT_TO_ARRAY_MAP: + result = SegmentFactory.instance().createImmutableSegment( + compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, + numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED); + break; + default: throw new RuntimeException("Unknown type " + type); // sanity check + } + } finally { + iterator.close(); + } - return internalScanner; + return result; } - /** - * Updates the given single Segment using the internal store scanner, - * who in turn uses ScanQueryMatcher + /**---------------------------------------------------------------------- + * Count cells to estimate the efficiency of the future compaction */ - private void compactSegments(Segment result) throws IOException { - - List kvs = new ArrayList(); - // get the limit to the size of the groups to be returned by compactingScanner - int compactionKVMax = compactingMemStore.getConfiguration().getInt( - HConstants.COMPACTION_KV_MAX, - HConstants.COMPACTION_KV_MAX_DEFAULT); - - ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); - - boolean hasMore; - do { - hasMore = compactingScanner.next(kvs, scannerContext); - if (!kvs.isEmpty()) { - for (Cell c : kvs) { - // The scanner is doing all the elimination logic - // now we just copy it to the new segment - Cell newKV = result.maybeCloneWithAllocator(c); - boolean mslabUsed = (newKV != c); - result.internalAdd(newKV, mslabUsed); + private int countCellsForCompaction() throws IOException { - } - kvs.clear(); + int cnt = 0; + MemStoreCompactorIterator iterator = + new MemStoreCompactorIterator( + versionedList.getStoreSegments(), compactingMemStore.getComparator(), + compactionKVMax, compactingMemStore.getStore()); + + try { + while (iterator.next() != null) { + cnt++; } - } while (hasMore && (!isInterrupted.get())); + } finally { + iterator.close(); + } + + return cnt; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java new file mode 100644 index 0000000..2eafb42 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java @@ -0,0 +1,160 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; + +import java.io.IOException; +import java.util.*; + +/** + * The MemStoreCompactorIterator is designed to perform one iteration over given list of segments + * For another iteration new instance of MemStoreCompactorIterator needs to be created + * The iterator is not thread-safe and must have only one instance in each period of time + */ +@InterfaceAudience.Private +public class MemStoreCompactorIterator implements Iterator { + + private List kvs = new ArrayList(); + + // scanner for full or partial pipeline (heap of segment scanners) + // we need to keep those scanners in order to close them at the end + private KeyValueScanner scanner; + + // scanner on top of pipeline scanner that uses ScanQueryMatcher + private StoreScanner compactingScanner; + + private final ScannerContext scannerContext; + + private boolean hasMore; + private Iterator kvsIterator; + + // C-tor + public MemStoreCompactorIterator(List segments, + CellComparator comparator, int compactionKVMax, Store store) throws IOException { + + this.scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + + // list of Scanners of segments in the pipeline, when compaction starts + List scanners = new ArrayList(); + + // create the list of scanners with maximally possible read point, meaning that + // all KVs are going to be returned by the pipeline traversing + for (Segment segment : segments) { + scanners.add(segment.getScanner(store.getSmallestReadPoint())); + } + + scanner = new MemStoreScanner(comparator, scanners, MemStoreScanner.Type.COMPACT_FORWARD); + + // reinitialize the compacting scanner for each instance of iterator + compactingScanner = createScanner(store, scanner); + + hasMore = compactingScanner.next(kvs, scannerContext); + + if (!kvs.isEmpty()) { + kvsIterator = kvs.iterator(); + } + + } + + @Override + public boolean hasNext() { + if (!kvsIterator.hasNext()) { + // refillKVS() method should be invoked only if !kvsIterator.hasNext() + if (!refillKVS()) { + return false; + } + } + return (kvsIterator.hasNext() || hasMore); + } + + @Override + public Cell next() { + if (!kvsIterator.hasNext()) { + // refillKVS() method should be invoked only if !kvsIterator.hasNext() + if (!refillKVS()) return null; + } + return (!hasMore) ? null : kvsIterator.next(); + } + + public void close() { + compactingScanner.close(); + compactingScanner = null; + scanner.close(); + scanner = null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * Creates the scanner for compacting the pipeline. + * + * @return the scanner + */ + private StoreScanner createScanner(Store store, KeyValueScanner scanner) + throws IOException { + + Scan scan = new Scan(); + scan.setMaxVersions(); //Get all available versions + StoreScanner internalScanner = + new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner), + ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP); + + return internalScanner; + } + + + + private boolean refillKVS() { + kvs.clear(); // clear previous KVS, first initiated in the constructor + if (!hasMore) { // if there is nothing expected next in compactingScanner + return false; + } + + try { // try to get next KVS + hasMore = compactingScanner.next(kvs, scannerContext); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + + if (!kvs.isEmpty() ) {// is the new KVS empty ? + kvsIterator = kvs.iterator(); + return true; + } else { + // KVS is empty, but hasMore still true? + if (hasMore) { // try to move to next row + return refillKVS(); + } + + } + return hasMore; + } + + +} + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index 3d31d2a..74d061c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; @@ -30,7 +31,7 @@ import org.apache.htrace.Trace; /** * This is the scanner for any MemStore implementation, derived from MemStore. - * The MemStoreScanner combines SegmentScanner from different Segments and + * The MemStoreScanner combines KeyValueScanner from different Segments and * uses the key-value heap and the reversed key-value heap for the aggregated key-values set. * It is assumed that only traversing forward or backward is used (without zigzagging in between) */ @@ -55,61 +56,50 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { // or according to the first usage private Type type = Type.UNDEFINED; - private long readPoint; // remember the initial version of the scanners list List scanners; - // pointer back to the relevant MemStore - // is needed for shouldSeek() method - private AbstractMemStore backwardReferenceToMemStore; - /** - * If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default! - * After constructor only one heap is going to be initialized for entire lifespan - * of the MemStoreScanner. A specific scanner can only be one directional! - * - * @param ms Pointer back to the MemStore - * @param scanners List of scanners over the segments - * @param readPt Read point below which we can safely remove duplicate KVs - */ - public MemStoreScanner(AbstractMemStore ms, List scanners, long readPt) - throws IOException { - this(ms, scanners, readPt, Type.UNDEFINED); - } + private final CellComparator comparator; /** * If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default! * After constructor only one heap is going to be initialized for entire lifespan * of the MemStoreScanner. A specific scanner can only be one directional! * - * @param ms Pointer back to the MemStore - * @param scanners List of scanners over the segments - * @param readPt Read point below which we can safely remove duplicate KVs - * @param type The scan type COMPACT_FORWARD should be used for compaction + * @param comparator Cell Comparator + * @param scanners List of scanners, from which the heap will be built + * @param type The scan type COMPACT_FORWARD should be used for compaction */ - public MemStoreScanner(AbstractMemStore ms, List scanners, long readPt, - Type type) throws IOException { + public MemStoreScanner(CellComparator comparator, List scanners, Type type) + throws IOException { super(); - this.readPoint = readPt; this.type = type; switch (type) { - case UNDEFINED: - case USER_SCAN_FORWARD: - case COMPACT_FORWARD: - this.forwardHeap = new KeyValueHeap(scanners, ms.getComparator()); - break; - case USER_SCAN_BACKWARD: - this.backwardHeap = new ReversedKeyValueHeap(scanners, ms.getComparator()); - break; - default: - throw new IllegalArgumentException("Unknown scanner type in MemStoreScanner"); + case UNDEFINED: + case USER_SCAN_FORWARD: + case COMPACT_FORWARD: + this.forwardHeap = new KeyValueHeap(scanners, comparator); + break; + case USER_SCAN_BACKWARD: + this.backwardHeap = new ReversedKeyValueHeap(scanners, comparator); + break; + default: + throw new IllegalArgumentException("Unknown scanner type in MemStoreScanner"); } - this.backwardReferenceToMemStore = ms; + this.comparator = comparator; this.scanners = scanners; if (Trace.isTracing() && Trace.currentSpan() != null) { Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); } } + /* Constructor used only when the scan usage is unknown + and need to be defined according to the first move */ + public MemStoreScanner(CellComparator comparator, List scanners) + throws IOException { + this(comparator, scanners, Type.UNDEFINED); + } + /** * Returns the cell from the top-most scanner without advancing the iterator. * The backward traversal is assumed, only if specified explicitly @@ -135,7 +125,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { currentCell != null; currentCell = heap.next()) { - // all the logic of presenting cells is inside the internal SegmentScanners + // all the logic of presenting cells is inside the internal KeyValueScanners // located inside the heap return currentCell; @@ -297,7 +287,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { res |= scan.seekToPreviousRow(cell); } this.backwardHeap = - new ReversedKeyValueHeap(scanners, backwardReferenceToMemStore.getComparator()); + new ReversedKeyValueHeap(scanners, comparator); return res; } @@ -327,7 +317,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { } } this.backwardHeap = - new ReversedKeyValueHeap(scanners, backwardReferenceToMemStore.getComparator()); + new ReversedKeyValueHeap(scanners, comparator); type = Type.USER_SCAN_BACKWARD; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java index e62249a..1cac7fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.client.Scan; /** @@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.client.Scan; public class MutableSegment extends Segment { protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, long size) { - super(cellSet, comparator, memStoreLAB, size); + super(cellSet, comparator, memStoreLAB, size, ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); } /** @@ -43,21 +44,6 @@ public class MutableSegment extends Segment { return internalAdd(cell, mslabUsed); } - /** - * Removes the given cell from the segment - * @return the change in the heap size - */ - public long rollback(Cell cell) { - Cell found = getCellSet().get(cell); - if (found != null && found.getSequenceId() == cell.getSequenceId()) { - long sz = AbstractMemStore.heapSizeChange(cell, true); - getCellSet().remove(cell); - incSize(-sz); - return sz; - } - return 0; - } - //methods for test /** @@ -80,15 +66,7 @@ public class MutableSegment extends Segment { } @Override - protected void updateMetaInfo(Cell toAdd, long s) { - getTimeRangeTracker().includeTimestamp(toAdd); - size.addAndGet(s); - // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. - // When we use ACL CP or Visibility CP which deals with Tags during - // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not - // parse the byte[] to identify the tags length. - if(toAdd.getTagsLength() > 0) { - tagsPresent = true; - } + public long keySize() { + return size.get() - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 33c3bfb..5187a5b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -21,15 +21,19 @@ package org.apache.hadoop.hbase.regionserver; import java.util.Iterator; import java.util.SortedSet; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.ByteRange; +import org.apache.hadoop.hbase.util.ClassSize; import com.google.common.annotations.VisibleForTesting; @@ -43,32 +47,39 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private public abstract class Segment { - private volatile CellSet cellSet; + private static final Log LOG = LogFactory.getLog(Segment.class); + private AtomicReference cellSet= new AtomicReference(); private final CellComparator comparator; private long minSequenceId; private volatile MemStoreLAB memStoreLAB; + /* The size includes everything allocated for this segment, + * use keySize() to get only size of the cells */ protected final AtomicLong size; protected volatile boolean tagsPresent; private final TimeRangeTracker timeRangeTracker; + protected long constantCellMetaDataSize; - protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, - long size) { - this.cellSet = cellSet; + protected Segment( + CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, long size, + long constantCellSize) { + this.cellSet.set(cellSet); this.comparator = comparator; this.minSequenceId = Long.MAX_VALUE; this.memStoreLAB = memStoreLAB; this.size = new AtomicLong(size); this.tagsPresent = false; + this.constantCellMetaDataSize = constantCellSize; this.timeRangeTracker = new TimeRangeTracker(); } protected Segment(Segment segment) { - this.cellSet = segment.getCellSet(); + this.cellSet.set(segment.getCellSet()); this.comparator = segment.getComparator(); this.minSequenceId = segment.getMinSequenceId(); this.memStoreLAB = segment.getMemStoreLAB(); this.size = new AtomicLong(segment.getSize()); this.tagsPresent = segment.isTagsPresent(); + this.constantCellMetaDataSize = segment.getConstantCellMetaDataSize(); this.timeRangeTracker = segment.getTimeRangeTracker(); } @@ -76,7 +87,7 @@ public abstract class Segment { * Creates the scanner for the given read point * @return a scanner for the given read point */ - public SegmentScanner getSegmentScanner(long readPoint) { + public SegmentScanner getScanner(long readPoint) { return new SegmentScanner(this, readPoint); } @@ -84,7 +95,7 @@ public abstract class Segment { * Creates the scanner for the given read point, and a specific order in a list * @return a scanner for the given read point */ - public SegmentScanner getSegmentScanner(long readPoint, long order) { + public SegmentScanner getScanner(long readPoint, long order) { return new SegmentScanner(this, readPoint, order); } @@ -191,6 +202,20 @@ public abstract class Segment { } /** + * Setting the CellSet of the segment - used only for flat immutable segment for setting + * immutable CellSet after its creation in immutable segment constructor + * @return this object + */ + + protected Segment setCellSet(CellSet cellSetOld, CellSet cellSetNew) { + this.cellSet.compareAndSet(cellSetOld, cellSetNew); + return this; + } + + /* return only cell's heap size */ + public abstract long keySize(); + + /** * Returns the heap size of the segment * @return the heap size of the segment */ @@ -199,9 +224,9 @@ public abstract class Segment { } /** - * Increases the heap size counter of the segment by the given delta + * Updates the heap size counter of the segment by the given delta */ - public void incSize(long delta) { + public void updateSize(long delta) { size.addAndGet(delta); } @@ -239,7 +264,7 @@ public abstract class Segment { * @return a set of all cells in the segment */ protected CellSet getCellSet() { - return cellSet; + return cellSet.get(); } /** @@ -252,28 +277,29 @@ public abstract class Segment { protected long internalAdd(Cell cell, boolean mslabUsed) { boolean succ = getCellSet().add(cell); - long s = AbstractMemStore.heapSizeChange(cell, succ); + long s = updateMetaInfo(cell, succ, mslabUsed); + return s; + } + + protected long updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed) { + long s = heapSizeChange(cellToAdd, succ); // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger // than the counted number) if (!succ && mslabUsed) { - s += getCellLength(cell); + s += getCellLength(cellToAdd); } - updateMetaInfo(cell, s); - return s; - } - - protected void updateMetaInfo(Cell toAdd, long s) { - getTimeRangeTracker().includeTimestamp(toAdd); - size.addAndGet(s); - minSequenceId = Math.min(minSequenceId, toAdd.getSequenceId()); + getTimeRangeTracker().includeTimestamp(cellToAdd); + updateSize(s); + minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId()); // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. // When we use ACL CP or Visibility CP which deals with Tags during // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not // parse the byte[] to identify the tags length. - if(toAdd.getTagsLength() > 0) { + if( cellToAdd.getTagsLength() > 0) { tagsPresent = true; } + return s; } /** @@ -300,6 +326,23 @@ public abstract class Segment { } } + /* + * Calculate how the MemStore size has changed. Includes overhead of the + * backing Map. + * @param cell + * @param notPresent True if the cell was NOT present in the set. + * @return change in size + */ + protected long heapSizeChange(final Cell cell, final boolean notPresent){ + return + notPresent ? + ClassSize.align(constantCellMetaDataSize + CellUtil.estimatedHeapSizeOf(cell)) : 0; + } + + public long getConstantCellMetaDataSize() { + return this.constantCellMetaDataSize; + } + @Override public String toString() { String res = "Store segment of type "+this.getClass().getName()+"; "; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 7ac80ae..6351f13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -18,11 +18,14 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; +import java.io.IOException; + /** * A singleton store segment factory. * Generate concrete store segments. @@ -40,28 +43,43 @@ public final class SegmentFactory { return instance; } + // create skip-list-based (non-flat) immutable segment from compacting old immutable segments public ImmutableSegment createImmutableSegment(final Configuration conf, - final CellComparator comparator, long size) { + final CellComparator comparator, MemStoreCompactorIterator iterator) { MemStoreLAB memStoreLAB = getMemStoreLAB(conf); - MutableSegment segment = generateMutableSegment(conf, comparator, memStoreLAB, size); - return createImmutableSegment(segment); + return + new ImmutableSegment(comparator, iterator, memStoreLAB); } - public ImmutableSegment createImmutableSegment(CellComparator comparator, - long size) { + // create empty immutable segment + public ImmutableSegment createImmutableSegment(CellComparator comparator, long size) { MutableSegment segment = generateMutableSegment(null, comparator, null, size); return createImmutableSegment(segment); } + // create immutable segment from mutable public ImmutableSegment createImmutableSegment(MutableSegment segment) { return new ImmutableSegment(segment); } + + // create mutable segment public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator, long size) { MemStoreLAB memStoreLAB = getMemStoreLAB(conf); return generateMutableSegment(conf, comparator, memStoreLAB, size); } + // create new flat immutable segment from compacting old immutable segment + public ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator, + MemStoreCompactorIterator iterator, int numOfCells, ImmutableSegment.Type segmentType) + throws IOException { + Preconditions.checkArgument( + segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, "wrong immutable segment type"); + MemStoreLAB memStoreLAB = getMemStoreLAB(conf); + return + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType); + } + //****** private methods to instantiate concrete store segments **********// private MutableSegment generateMutableSegment( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 1191f30..8cf0a7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -305,10 +305,6 @@ public class SegmentScanner implements KeyValueScanner { // do nothing } - protected Segment getSegment(){ - return segment; - } - //debug method @Override public String toString() { @@ -320,6 +316,10 @@ public class SegmentScanner implements KeyValueScanner { /********************* Private Methods **********************/ + private Segment getSegment(){ + return segment; + } + /** * Private internal method for iterating over the segment, * skipping the cells with irrelevant MVCC diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java index 9d7a723..2e8bead 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java @@ -38,8 +38,7 @@ public class VersionedSegmentsList { private final LinkedList storeSegments; private final long version; - public VersionedSegmentsList( - LinkedList storeSegments, long version) { + public VersionedSegmentsList(LinkedList storeSegments, long version) { this.storeSegments = storeSegments; this.version = version; } @@ -51,4 +50,16 @@ public class VersionedSegmentsList { public long getVersion() { return version; } + + public int getNumOfCells() { + int totalCells = 0; + for (ImmutableSegment s : storeSegments) { + totalCells += s.getCellsCount(); + } + return totalCells; + } + + public int getNumOfSegments() { + return storeSegments.size(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 09e2271..e7d6661 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -28,11 +28,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.io.hfile.LruCachedBlock; -import org.apache.hadoop.hbase.regionserver.CellSet; -import org.apache.hadoop.hbase.regionserver.DefaultMemStore; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.regionserver.*; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ClassSize; @@ -174,6 +170,15 @@ public class TestHeapSize { assertEquals(expected, actual); } + // CellArrayMap + cl = CellArrayMap.class; + expected = ClassSize.estimateBase(cl, false); + actual = ClassSize.CELL_ARRAY_MAP; + if(expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + // ReentrantReadWriteLock cl = ReentrantReadWriteLock.class; expected = ClassSize.estimateBase(cl, false); @@ -240,7 +245,7 @@ public class TestHeapSize { // CellSet cl = CellSet.class; expected = ClassSize.estimateBase(cl, false); - actual = ClassSize.CELL_SKIPLIST_SET; + actual = ClassSize.CELL_SET; if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java new file mode 100644 index 0000000..cd5788e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java @@ -0,0 +1,143 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.experimental.categories.Category; + +import java.util.Iterator; +import java.util.NavigableMap; +import java.util.SortedSet; +import static org.junit.Assert.assertTrue; + +@Category({RegionServerTests.class, SmallTests.class}) +public class TestCellFlatSet extends TestCase { + + private static final int NUM_OF_CELLS = 4; + + private Cell cells[]; + private CellArrayMap cbOnHeap; + + private final static Configuration conf = new Configuration(); + private HeapMemStoreLAB mslab; + + + protected void setUp() throws Exception { + super.setUp(); + + // create array of Cells to bass to the CellFlatMap under CellSet + final byte[] one = Bytes.toBytes(15); + final byte[] two = Bytes.toBytes(25); + final byte[] three = Bytes.toBytes(35); + final byte[] four = Bytes.toBytes(45); + + final byte[] f = Bytes.toBytes("f"); + final byte[] q = Bytes.toBytes("q"); + final byte[] v = Bytes.toBytes(4); + + final KeyValue kv1 = new KeyValue(one, f, q, 10, v); + final KeyValue kv2 = new KeyValue(two, f, q, 20, v); + final KeyValue kv3 = new KeyValue(three, f, q, 30, v); + final KeyValue kv4 = new KeyValue(four, f, q, 40, v); + + cells = new Cell[] {kv1,kv2,kv3,kv4}; + cbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,cells,0,NUM_OF_CELLS,false); + + conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true); + conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); + MemStoreChunkPool.chunkPoolDisabled = false; + mslab = new HeapMemStoreLAB(conf); + } + + /* Create and test CellSet based on CellArrayMap */ + public void testCellBlocksOnHeap() throws Exception { + CellSet cs = new CellSet(cbOnHeap); + testCellBlocks(cs); + testIterators(cs); + } + + /* Generic basic test for immutable CellSet */ + private void testCellBlocks(CellSet cs) throws Exception { + final byte[] oneAndHalf = Bytes.toBytes(20); + final byte[] f = Bytes.toBytes("f"); + final byte[] q = Bytes.toBytes("q"); + final byte[] v = Bytes.toBytes(4); + final KeyValue outerCell = new KeyValue(oneAndHalf, f, q, 10, v); + + assertEquals(NUM_OF_CELLS, cs.size()); // check size + assertFalse(cs.contains(outerCell)); // check outer cell + + assertTrue(cs.contains(cells[0])); // check existence of the first + Cell first = cs.first(); + assertTrue(cells[0].equals(first)); + + assertTrue(cs.contains(cells[NUM_OF_CELLS - 1])); // check last + Cell last = cs.last(); + assertTrue(cells[NUM_OF_CELLS - 1].equals(last)); + + SortedSet tail = cs.tailSet(cells[1]); // check tail abd head sizes + assertEquals(NUM_OF_CELLS - 1, tail.size()); + SortedSet head = cs.headSet(cells[1]); + assertEquals(1, head.size()); + + SortedSet tailOuter = cs.tailSet(outerCell); // check tail starting from outer cell + assertEquals(NUM_OF_CELLS - 1, tailOuter.size()); + + Cell tailFirst = tail.first(); + assertTrue(cells[1].equals(tailFirst)); + Cell tailLast = tail.last(); + assertTrue(cells[NUM_OF_CELLS - 1].equals(tailLast)); + + Cell headFirst = head.first(); + assertTrue(cells[0].equals(headFirst)); + Cell headLast = head.last(); + assertTrue(cells[0].equals(headLast)); + } + + /* Generic iterators test for immutable CellSet */ + private void testIterators(CellSet cs) throws Exception { + + // Assert that we have NUM_OF_CELLS values and that they are in order + int count = 0; + for (Cell kv: cs) { + assertEquals("\n\n-------------------------------------------------------------------\n" + + "Comparing iteration number " + (count + 1) + " the returned cell: " + kv + + ", the first Cell in the CellBlocksMap: " + cells[count] + + ", and the same transformed to String: " + cells[count].toString() + + "\n-------------------------------------------------------------------\n", + cells[count], kv); + count++; + } + assertEquals(NUM_OF_CELLS, count); + + // Test descending iterator + count = 0; + for (Iterator i = cs.descendingIterator(); i.hasNext();) { + Cell kv = i.next(); + assertEquals(cells[NUM_OF_CELLS - (count + 1)], kv); + count++; + } + assertEquals(NUM_OF_CELLS, count); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index c5aae00..db0205e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -58,15 +58,15 @@ import static org.junit.Assert.assertTrue; public class TestCompactingMemStore extends TestDefaultMemStore { private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class); - private static MemStoreChunkPool chunkPool; - private HRegion region; - private RegionServicesForStores regionServicesForStores; - private HStore store; + protected static MemStoreChunkPool chunkPool; + protected HRegion region; + protected RegionServicesForStores regionServicesForStores; + protected HStore store; ////////////////////////////////////////////////////////////////////////////// // Helpers ////////////////////////////////////////////////////////////////////////////// - private static byte[] makeQualifier(final int i1, final int i2) { + protected static byte[] makeQualifier(final int i1, final int i2) { return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2)); } @@ -79,6 +79,12 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Override @Before public void setUp() throws Exception { + compactingSetUp(); + this.memstore = new CompactingMemStore(HBaseConfiguration.create(), CellComparator.COMPARATOR, + store, regionServicesForStores); + } + + protected void compactingSetUp() throws Exception { super.internalSetUp(); Configuration conf = new Configuration(); conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true); @@ -89,13 +95,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore { this.region = hbaseUtility.createTestRegion("foobar", hcd); this.regionServicesForStores = region.getRegionServicesForStores(); this.store = new HStore(region, hcd, conf); - this.memstore = new CompactingMemStore(HBaseConfiguration.create(), CellComparator.COMPARATOR, - store, regionServicesForStores); + chunkPool = MemStoreChunkPool.getPool(conf); assertTrue(chunkPool != null); } - /** * A simple test which verifies the 3 possible states when scanning across snapshot. * @@ -597,7 +601,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Threads.sleep(10); } assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot @@ -624,11 +628,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { Threads.sleep(1000); } + int counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(3, counter); assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); addRowsByKeys(memstore, keys2); - assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact @@ -636,7 +645,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Threads.sleep(10); } assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot @@ -667,7 +676,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Threads.sleep(10); } assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); addRowsByKeys(memstore, keys2); @@ -675,16 +684,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { region.getMemstoreSize() + ", Memstore Total Size: " + regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n"; - assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); ((CompactingMemStore)memstore).disableCompaction(); size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); addRowsByKeys(memstore, keys3); - assertEquals(1128, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(1016, regionServicesForStores.getGlobalMemstoreTotalSize()); ((CompactingMemStore)memstore).enableCompaction(); size = memstore.getFlushableSize(); @@ -693,7 +702,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Threads.sleep(10); } assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); + assertEquals(384, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java new file mode 100644 index 0000000..1fe4b87 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -0,0 +1,363 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.util.ArrayList; +import java.util.List; + + + +/** + * compacted memstore test case + */ +@Category({RegionServerTests.class, MediumTests.class}) +public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore { + + private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class); + //private static MemStoreChunkPool chunkPool; + //private HRegion region; + //private RegionServicesForStores regionServicesForStores; + //private HStore store; + + ////////////////////////////////////////////////////////////////////////////// + // Helpers + ////////////////////////////////////////////////////////////////////////////// + + @Override public void tearDown() throws Exception { + chunkPool.clearChunks(); + } + + @Override public void setUp() throws Exception { + compactingSetUp(); + Configuration conf = HBaseConfiguration.create(); + + conf.setLong("hbase.hregion.compacting.memstore.type", 2); // compact to CellArrayMap + + this.memstore = + new CompactingMemStore(conf, CellComparator.COMPARATOR, store, + regionServicesForStores); + } + + ////////////////////////////////////////////////////////////////////////////// + // Compaction tests + ////////////////////////////////////////////////////////////////////////////// + public void testCompaction1Bucket() throws IOException { + int counter = 0; + String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 + + // test 1 bucket + addRowsByKeys(memstore, keys1); + assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); + + assertEquals(4, memstore.getActive().getCellsCount()); + long size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(3, counter); + size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.addAndGetGlobalMemstoreSize(-size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(3, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize()); + + memstore.clearSnapshot(snapshot.getId()); + } + + public void testCompaction2Buckets() throws IOException { + + String[] keys1 = { "A", "A", "B", "C" }; + String[] keys2 = { "A", "B", "D" }; + + addRowsByKeys(memstore, keys1); + assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); + long size = memstore.getFlushableSize(); + +// assertTrue( +// "\n\n<<< This is the active size with 4 keys - " + memstore.getActive().getSize() +// + ". This is the memstore flushable size - " + size + "\n",false); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(1000); + } + int counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(3,counter); + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); + + addRowsByKeys(memstore, keys2); + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); + + size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + int i = 0; + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + if (i > 10000000) { + ((CompactingMemStore) memstore).debug(); + assertTrue("\n\n<<< Infinite loop! :( \n", false); + } + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(4,counter); + assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize()); + + size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.addAndGetGlobalMemstoreSize(-size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(4, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize()); + + memstore.clearSnapshot(snapshot.getId()); + } + + public void testCompaction3Buckets() throws IOException { + + String[] keys1 = { "A", "A", "B", "C" }; + String[] keys2 = { "A", "B", "D" }; + String[] keys3 = { "D", "B", "B" }; + + addRowsByKeys(memstore, keys1); + assertEquals(496, region.getMemstoreSize()); + + long size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + + String tstStr = "\n\nFlushable size after first flush in memory:" + size + ". Is MemmStore in compaction?:" + + ((CompactingMemStore) memstore).isMemStoreFlushingInMemory(); + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); + + addRowsByKeys(memstore, keys2); + + tstStr += " After adding second part of the keys. Memstore size: " + + region.getMemstoreSize() + ", Memstore Total Size: " + + regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n"; + + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); + + ((CompactingMemStore) memstore).disableCompaction(); + size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize()); + + addRowsByKeys(memstore, keys3); + assertEquals(1016, regionServicesForStores.getGlobalMemstoreTotalSize()); + + ((CompactingMemStore) memstore).enableCompaction(); + size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(384, regionServicesForStores.getGlobalMemstoreTotalSize()); + + size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.addAndGetGlobalMemstoreSize(-size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(4, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize()); + + memstore.clearSnapshot(snapshot.getId()); + + } + + ////////////////////////////////////////////////////////////////////////////// + // Flattening tests + ////////////////////////////////////////////////////////////////////////////// + @Test + public void testFlattening() throws IOException { + + String[] keys1 = { "A", "A", "B", "C", "F", "H"}; + String[] keys2 = { "A", "B", "D", "G", "I", "J"}; + String[] keys3 = { "D", "B", "B", "E" }; + + // set flattening to true + memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten", true); + + addRowsByKeys(memstore, keys1); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact + + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + addRowsByKeys(memstore, keys2); // also should only flatten + + ((CompactingMemStore) memstore).disableCompaction(); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + addRowsByKeys(memstore, keys3); + + ((CompactingMemStore) memstore).enableCompaction(); + + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + int counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(10,counter); + + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + ImmutableSegment s = memstore.getSnapshot(); + memstore.clearSnapshot(snapshot.getId()); + } + + @Test + public void testCountOfCellsAfterFlatteningByScan() throws IOException { + String[] keys1 = { "A", "B", "C" }; // A, B, C + addRowsByKeysWith50Cols(memstore, keys1); + // this should only flatten as there are no duplicates + ((CompactingMemStore) memstore).flushInMemory(); + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + List scanners = memstore.getScanners(Long.MAX_VALUE); + MemStoreScanner scanner = new MemStoreScanner(CellComparator.COMPARATOR, scanners); + int count = 0; + while (scanner.next() != null) { + count++; + } + assertEquals("the count should be ", count, 150); + scanner.close(); + } + + @Test + public void testCountOfCellsAfterFlatteningByIterator() throws IOException { + String[] keys1 = { "A", "B", "C" }; // A, B, C + addRowsByKeysWith50Cols(memstore, keys1); + // this should only flatten as there are no duplicates + ((CompactingMemStore) memstore).flushInMemory(); + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + // Just doing the cnt operation here + MemStoreCompactorIterator itr = new MemStoreCompactorIterator( + ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), + CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore()); + int cnt = 0; + try { + while (itr.next() != null) { + cnt++; + } + } finally { + itr.close(); + } + assertEquals("the count should be ", cnt, 150); + } + + + private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) { + byte[] fam = Bytes.toBytes("testfamily"); + for (int i = 0; i < keys.length; i++) { + long timestamp = System.currentTimeMillis(); + Threads.sleep(1); // to make sure each kv gets a different ts + byte[] row = Bytes.toBytes(keys[i]); + for(int j =0 ;j < 50; j++) { + byte[] qf = Bytes.toBytes("testqualifier"+j); + byte[] val = Bytes.toBytes(keys[i] + j); + KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); + hmc.add(kv); + } + } + } + + private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) { + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf = Bytes.toBytes("testqualifier"); + long size = hmc.getActive().getSize();// + for (int i = 0; i < keys.length; i++) { + long timestamp = System.currentTimeMillis(); + Threads.sleep(1); // to make sure each kv gets a different ts + byte[] row = Bytes.toBytes(keys[i]); + byte[] val = Bytes.toBytes(keys[i] + i); + KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); + hmc.add(kv); + LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); + //long size = AbstractMemStore.heapSizeChange(kv, true); + //regionServicesForStores.addAndGetGlobalMemstoreSize(size); + } + regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().getSize() - size);// + } + + private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { + long t = 1234; + + @Override public long currentTime() { + return t; + } + + public void setCurrentTimeMillis(long t) { + this.t = t; + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 9aa3a9b..c44f022 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -5109,7 +5109,7 @@ public class TestHRegion { * * @throws IOException */ - private void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue) + protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue) throws IOException { byte[][] families = { fs }; Scan scan = new Scan(); @@ -5172,7 +5172,7 @@ public class TestHRegion { } } - private Configuration initSplit() { + protected Configuration initSplit() { // Always compact if there is more than one store file. CONF.setInt("hbase.hstore.compactionThreshold", 2); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java index be604af..d66899b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java @@ -19,10 +19,13 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.TreeMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; @@ -30,11 +33,19 @@ import org.apache.hadoop.hbase.client.TestMobSnapshotCloneIndependence; import org.apache.hadoop.hbase.master.procedure.TestMasterFailoverWithProcedures; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.junit.ClassRule; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; +import static org.junit.Assert.assertNotNull; + /** * A test similar to TestHRegion, but with in-memory flush families. * Also checks wal truncation after in-memory compaction. @@ -65,5 +76,88 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion{ isReadOnly, durability, wal, inMemory, families); } + /** + * Splits twice and verifies getting from each of the split regions. + * + * @throws Exception + */ + @Override + public void testBasicSplit() throws Exception { + byte[][] families = { fam1, fam2, fam3 }; + + Configuration hc = initSplit(); + // Setting up region + String method = this.getName(); + this.region = initHRegion(tableName, method, hc, families); + + try { + LOG.info("" + HBaseTestCase.addContent(region, fam3)); + region.flush(true); + region.compactStores(); + byte[] splitRow = region.checkSplit(); + assertNotNull(splitRow); + LOG.info("SplitRow: " + Bytes.toString(splitRow)); + HRegion[] regions = splitRegion(region, splitRow); + try { + // Need to open the regions. + // TODO: Add an 'open' to HRegion... don't do open by constructing + // instance. + for (int i = 0; i < regions.length; i++) { + regions[i] = HRegion.openHRegion(regions[i], null); + } + // Assert can get rows out of new regions. Should be able to get first + // row from first region and the midkey from second region. + assertGet(regions[0], fam3, Bytes.toBytes(START_KEY)); + assertGet(regions[1], fam3, splitRow); + // Test I can get scanner and that it starts at right place. + assertScan(regions[0], fam3, Bytes.toBytes(START_KEY)); + assertScan(regions[1], fam3, splitRow); + // Now prove can't split regions that have references. + for (int i = 0; i < regions.length; i++) { + // Add so much data to this region, we create a store file that is > + // than one of our unsplitable references. it will. + for (int j = 0; j < 2; j++) { + HBaseTestCase.addContent(regions[i], fam3); + } + HBaseTestCase.addContent(regions[i], fam2); + HBaseTestCase.addContent(regions[i], fam1); + regions[i].flush(true); + } + + byte[][] midkeys = new byte[regions.length][]; + // To make regions splitable force compaction. + for (int i = 0; i < regions.length; i++) { + regions[i].compactStores(); + midkeys[i] = regions[i].checkSplit(); + } + + TreeMap sortedMap = new TreeMap(); + // Split these two daughter regions so then I'll have 4 regions. Will + // split because added data above. + for (int i = 0; i < regions.length; i++) { + HRegion[] rs = null; + if (midkeys[i] != null) { + rs = splitRegion(regions[i], midkeys[i]); + for (int j = 0; j < rs.length; j++) { + sortedMap.put(Bytes.toString(rs[j].getRegionInfo().getRegionName()), + HRegion.openHRegion(rs[j], null)); + } + } + } + LOG.info("Made 4 regions"); + } finally { + for (int i = 0; i < regions.length; i++) { + try { + regions[i].close(); + } catch (IOException e) { + // Ignore. + } + } + } + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 2acfd12..a6c7912 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -135,12 +135,264 @@ public class TestWalAndCompactingMemStoreFlush { // Set up the configuration Configuration conf = HBaseConfiguration.create(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); - conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class - .getName()); - conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * - 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushNonSloppyStoresFirstPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); + conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); + + // Intialize the region + Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); + + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); // compacted memstore, all the keys are unique + + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + // compacted memstore, subject for compaction due to duplications + region.put(createDoublePut(3, i)); + } + } + } + + // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk + for (int i = 100; i < 2000; i++) { + region.put(createPut(2, i)); + } + + long totalMemstoreSize = region.getMemstoreSize(); + + // Find the smallest LSNs for edits wrt to each CF. + long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); + + // Get the overall smallest LSN in the region's memstores. + long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region) + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + String s = "\n\n----------------------------------\n" + + "Upon initial insert and before any flush, size of CF1 is:" + + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:" + + region.getStore(FAMILY1).getMemStore().isSloppy() + ". Size of CF2 is:" + + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" + + region.getStore(FAMILY2).getMemStore().isSloppy() + ". Size of CF3 is:" + + cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:" + + region.getStore(FAMILY3).getMemStore().isSloppy() + "\n"; + + // The overall smallest LSN in the region's memstores should be the same as + // the LSN of the smallest edit in CF1 + assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); + + // Some other sanity checks. + assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); + assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); + assertTrue(cf1MemstoreSizePhaseI > 0); + assertTrue(cf2MemstoreSizePhaseI > 0); + assertTrue(cf3MemstoreSizePhaseI > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + String msg = "totalMemstoreSize="+totalMemstoreSize + + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + + " DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM + + + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + + " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; + assertEquals(msg,totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, + cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); + + // Flush!!!!!!!!!!!!!!!!!!!!!! + // We have big compacting memstore CF1 and two small memstores: + // CF2 (not compacted) and CF3 (compacting) + // All together they are above the flush size lower bound. + // Since CF1 and CF3 should be flushed to memory (not to disk), + // CF2 is going to be flushed to disk. + // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted + ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); + ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); + region.flush(false); + + // CF3 should be compacted so wait here to be sure the compaction is done + while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) + .isMemStoreFlushingInMemory()) + Threads.sleep(10); + + // Recalculate everything + long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); + + long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region) + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + // Find the smallest LSNs for edits wrt to each CF. + long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); + + s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD + + ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD + + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM is:" + CompactingMemStore + .DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM + + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" + + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII + + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; + + // CF1 was flushed to memory, but there is nothing to compact, and CF! was flattened + assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI); + + // CF2 should become empty + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); + + // verify that CF3 was flushed to memory and was compacted (this is approximation check) + assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD + + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM > + cf3MemstoreSizePhaseII); + + // CF3 was compacted and flattened! + assertTrue("\n<<< Size of CF3 in phase I - " + cf3MemstoreSizePhaseI + + ", size of CF3 in phase II - " + cf3MemstoreSizePhaseII + "\n", + cf3MemstoreSizePhaseI / 2 > cf3MemstoreSizePhaseII); + + + // Now the smallest LSN in the region should be the same as the smallest + // LSN in the memstore of CF1. + assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); + + // Now add more puts for CF1, so that we also flush CF1 to disk instead of + // memory in next flush + for (int i = 1200; i < 3000; i++) { + region.put(createPut(1, i)); + } + + s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII + + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " + + "the smallest sequence in CF2:" + + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n"; + + // How much does the CF1 memstore occupy? Will be used later. + long cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); + long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); + + s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII + + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ; + + + // Flush!!!!!!!!!!!!!!!!!!!!!! + // Flush again, CF1 is flushed to disk + // CF2 is flushed to disk, because it is not in-memory compacted memstore + // CF3 is flushed empty to memory (actually nothing happens to CF3) + region.flush(false); + + // Recalculate everything + long cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); + + long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region) + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); + + s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" + + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + + "\n"; + + s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV + + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " + + "the smallest sequence in CF2:" + + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV + + "\n"; + + // CF1's pipeline component (inserted before first flush) should be flushed to disk + // CF2 should be flushed to disk + assertTrue(cf1MemstoreSizePhaseIII > cf1MemstoreSizePhaseIV); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV); + + // CF3 shouldn't have been touched. + assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); + + // the smallest LSN of CF3 shouldn't change + assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); + + // CF3 should be bottleneck for WAL + assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); + + // Flush!!!!!!!!!!!!!!!!!!!!!! + // Trying to clean the existing memstores, CF2 all flushed to disk. The single + // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk. + // Note that active set of CF3 is empty + // But active set of CF1 is not yet empty + region.flush(true); + + // Recalculate everything + long cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); + long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + assertTrue(DefaultMemStore.DEEP_OVERHEAD < cf1MemstoreSizePhaseV); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseV); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSizePhaseV); + + region.flush(true); // flush once again in order to be sure that everything is empty + assertEquals(DefaultMemStore.DEEP_OVERHEAD, region.getStore(FAMILY1).getMemStoreSize()); + + // What happens when we hit the memstore limit, but we are not able to find + // any Column Family above the threshold? + // In that case, we should flush all the CFs. + + // The memstore limit is 200*1024 and the column family flush threshold is + // around 50*1024. We try to just hit the memstore limit with each CF's + // memstore being below the CF flush threshold. + for (int i = 1; i <= 300; i++) { + region.put(createPut(1, i)); + region.put(createPut(2, i)); + region.put(createPut(3, i)); + region.put(createPut(4, i)); + region.put(createPut(5, i)); + } + + region.flush(false); + + s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: " + + smallestSeqInRegionCurrentMemstorePhaseV + + ". After additional inserts and last flush, the entire region size is:" + region + .getMemstoreSize() + + "\n----------------------------------\n"; + + // Since we won't find any CF above the threshold, and hence no specific + // store to flush, we should flush all the memstores + // Also compacted memstores are flushed to disk. + assertEquals(0, region.getMemstoreSize()); + System.out.println(s); + HBaseTestingUtility.closeRegionAndWAL(region); + } + + @Test(timeout = 180000) + public void testSelectiveFlushWhenEnabledNoFlattening() throws IOException { + + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushNonSloppyStoresFirstPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); + // set memstore segment flattening to false and compact to skip-list + conf.setBoolean("hbase.hregion.compacting.memstore.flatten", false); + conf.setInt("hbase.hregion.compacting.memstore.type",1); + // Intialize the region Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); @@ -201,7 +453,8 @@ public class TestWalAndCompactingMemStoreFlush { // memstores of CF1, CF2 and CF3. String msg = "totalMemstoreSize="+totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + - " DEEP_OVERHEAD_PER_PIPELINE_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM + + " DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM + + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; @@ -238,8 +491,8 @@ public class TestWalAndCompactingMemStoreFlush { s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD + ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD - + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_ITEM is:" + CompactingMemStore - .DEEP_OVERHEAD_PER_PIPELINE_ITEM + + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM is:" + CompactingMemStore + .DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; @@ -247,14 +500,14 @@ public class TestWalAndCompactingMemStoreFlush { // CF1 was flushed to memory, but there is nothing to compact, should // remain the same size plus renewed empty skip-list assertEquals(s, cf1MemstoreSizePhaseII, - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); + cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM); // CF2 should become empty assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); // verify that CF3 was flushed to memory and was compacted (this is approximation check) assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD + - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM > + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM > cf3MemstoreSizePhaseII); assertTrue(cf3MemstoreSizePhaseI/2 < cf3MemstoreSizePhaseII); @@ -322,7 +575,7 @@ public class TestWalAndCompactingMemStoreFlush { assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); // CF3 should be bottleneck for WAL - assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); + assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); // Flush!!!!!!!!!!!!!!!!!!!!!! // Clearing the existing memstores, CF2 all flushed to disk. The single @@ -419,7 +672,8 @@ public class TestWalAndCompactingMemStoreFlush { // memstores of CF1, CF2 and CF3. String msg = "totalMemstoreSize="+totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + - " DEEP_OVERHEAD_PER_PIPELINE_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM + + " DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM + + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index e4f52e9..d737c0e 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -815,7 +815,7 @@ module Hbase family.setScope(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE) family.setCacheDataOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE) family.setInMemory(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY) - family.setCompacted(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION) + family.setInMemoryCompaction(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION) family.setTimeToLive(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::TTL)) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL) family.setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING) family.setBlocksize(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE) -- 1.8.5.2 (Apple Git-48)