From 30779985e06bd618b12d8affb0766f1393665a41 Mon Sep 17 00:00:00 2001 From: anastas Date: Thu, 22 Jun 2017 11:25:53 +0300 Subject: [PATCH] HBASE-18010: Integration of CellChunkMap index into CompactingMemStore, including tests --- .../org/apache/hadoop/hbase/util/ClassSize.java | 29 +- .../hbase/regionserver/CSLMImmutableSegment.java | 53 ++ .../regionserver/CellArrayImmutableSegment.java | 139 +++++ .../regionserver/CellChunkImmutableSegment.java | 220 ++++++++ .../hadoop/hbase/regionserver/CellChunkMap.java | 20 +- .../hadoop/hbase/regionserver/ChunkCreator.java | 79 ++- .../hbase/regionserver/CompactingMemStore.java | 42 +- .../hbase/regionserver/CompactionPipeline.java | 28 +- .../regionserver/CompositeImmutableSegment.java | 10 + .../hbase/regionserver/ImmutableMemStoreLAB.java | 9 + .../hbase/regionserver/ImmutableSegment.java | 187 +------ .../hbase/regionserver/MemStoreCompactor.java | 8 +- .../hadoop/hbase/regionserver/MemStoreLAB.java | 5 + .../hadoop/hbase/regionserver/MemStoreLABImpl.java | 9 + .../hadoop/hbase/regionserver/MutableSegment.java | 1 + .../apache/hadoop/hbase/regionserver/Segment.java | 6 +- .../hadoop/hbase/regionserver/SegmentFactory.java | 76 ++- .../org/apache/hadoop/hbase/io/TestHeapSize.java | 29 +- .../hadoop/hbase/regionserver/TestCellFlatSet.java | 15 +- .../hbase/regionserver/TestCompactingMemStore.java | 134 +++-- .../TestCompactingToCellArrayMapMemStore.java | 492 ------------------ .../TestCompactingToCellFlatMapMemStore.java | 562 +++++++++++++++++++++ .../hbase/regionserver/TestDefaultMemStore.java | 7 +- 23 files changed, 1379 insertions(+), 781 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java index e064cc0..4990ac0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java @@ -89,6 +89,15 @@ public class ClassSize { /** Overhead for ConcurrentSkipListMap Entry */ public static final int CONCURRENT_SKIPLISTMAP_ENTRY; + /** Overhead for CellFlatMap */ + public static final int CELL_FLAT_MAP; + + /** Overhead for CellChunkMap */ + public static final int CELL_CHUNK_MAP; + + /** Overhead for Cell Chunk Map Entry */ + public static final int CELL_CHUNK_MAP_ENTRY; + /** Overhead for CellArrayMap */ public static final int CELL_ARRAY_MAP; @@ -275,13 +284,17 @@ public class ClassSize { // The size changes from jdk7 to jdk8, estimate the size rather than use a conditional CONCURRENT_SKIPLISTMAP = (int) estimateBase(ConcurrentSkipListMap.class, false); - // CELL_ARRAY_MAP is the size of an instance of CellArrayMap class, which extends - // CellFlatMap class. CellArrayMap object containing a ref to an Array, so - // OBJECT + REFERENCE + ARRAY // CellFlatMap object contains two integers, one boolean and one reference to object, so // 2*INT + BOOLEAN + REFERENCE - CELL_ARRAY_MAP = align(OBJECT + 2*Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN - + ARRAY + 2*REFERENCE); + CELL_FLAT_MAP = OBJECT + 2*Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN + REFERENCE; + + // CELL_ARRAY_MAP is the size of an instance of CellArrayMap class, which extends + // CellFlatMap class. CellArrayMap object containing a ref to an Array of Cells + CELL_ARRAY_MAP = align(CELL_FLAT_MAP + REFERENCE + ARRAY); + + // CELL_CHUNK_MAP is the size of an instance of CellChunkMap class, which extends + // CellFlatMap class. CellChunkMap object containing a ref to an Array of Chunks + CELL_CHUNK_MAP = align(CELL_FLAT_MAP + REFERENCE + ARRAY); CONCURRENT_SKIPLISTMAP_ENTRY = align( align(OBJECT + (3 * REFERENCE)) + /* one node per entry */ @@ -290,6 +303,12 @@ public class ClassSize { // REFERENCE in the CellArrayMap all the rest is counted in KeyValue.heapSize() CELL_ARRAY_MAP_ENTRY = align(REFERENCE); + // The Cell Representation in the CellChunkMap, the Cell object size shouldn't be counted + // in KeyValue.heapSize() + // each cell-representation requires three integers for chunkID (reference to the ByteBuffer), + // offset and length, and one long for seqID + CELL_CHUNK_MAP_ENTRY = 3*Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG; + REENTRANT_LOCK = align(OBJECT + (3 * REFERENCE)); ATOMIC_LONG = align(OBJECT + Bytes.SIZEOF_LONG); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java new file mode 100644 index 0000000..861b040 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java @@ -0,0 +1,53 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ClassSize; + +/** + * CSLMImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, + * and {@link ImmutableSegment}. This immutable segment is working with CellSet with + * ConcurrentSkipListMap (CSLM) delegatee. + */ +@InterfaceAudience.Private +public class CSLMImmutableSegment extends ImmutableSegment { + public static final long DEEP_OVERHEAD_CSLM = + ImmutableSegment.DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP; + + /**------------------------------------------------------------------------ + * Copy C-tor to be used when new CSLMImmutableSegment is being built from a Mutable one. + * This C-tor should be used when active MutableSegment is pushed into the compaction + * pipeline and becomes an ImmutableSegment. + */ + protected CSLMImmutableSegment(Segment segment) { + super(segment); + // update the segment metadata heap size + incSize(0, -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM); + } + + @Override + protected long indexEntrySize() { + return ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY; + } + + @Override protected boolean isFlat() { + return false; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java new file mode 100644 index 0000000..aa97c85 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java @@ -0,0 +1,139 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ClassSize; + +import java.io.IOException; + +/** + * CSLMImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, + * and {@link ImmutableSegment}. This immutable segment is working with CellSet with + * CellArrayMap delegatee. + */ +@InterfaceAudience.Private +public class CellArrayImmutableSegment extends ImmutableSegment { + + public static final long DEEP_OVERHEAD_CAM = DEEP_OVERHEAD + ClassSize.CELL_ARRAY_MAP; + + ///////////////////// CONSTRUCTORS ///////////////////// + /**------------------------------------------------------------------------ + * C-tor to be used when new CellArrayImmutableSegment is a result of compaction of a + * list of older ImmutableSegments. + * The given iterator returns the Cells that "survived" the compaction. + */ + protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, + MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) { + super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL + incSize(0,DEEP_OVERHEAD_CAM); + // build the new CellSet based on CellArrayMap + CellSet cs = createCellFlatMapSet(numOfCells, iterator, action); + this.setCellSet(null, cs); // update the CellSet of the new Segment + } + + /**------------------------------------------------------------------------ + * C-tor to be used when new CellChunkImmutableSegment is built as a result of flattening + * of CSLMImmutableSegment + * The given iterator returns the Cells that "survived" the compaction. + */ + protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemstoreSize memstoreSize) { + super(segment); // initiailize the upper class + int numOfCells = segment.getCellsCount(); + // build the new CellSet based on CellChunkMap + CellSet cs = recreateCellFlatMapSet(numOfCells, segment.getScanner(Long.MAX_VALUE)); + this.setCellSet(segment.getCellSet(), cs); // update the CellSet of the new Segment + + // arrange the meta-data size, decrease all meta-data sizes related to SkipList; + // if flattening is to CellChunkMap decrease also Cell object sizes + // (recreateCellArrayMapSet doesn't take the care for the sizes) + long newSegmentSizeDelta = -(numOfCells * ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); + // add size of CellArrayMap entry or CellChunkMap entry + newSegmentSizeDelta = newSegmentSizeDelta + (numOfCells * ClassSize.CELL_ARRAY_MAP_ENTRY); + incSize(0, newSegmentSizeDelta); + memstoreSize.incMemstoreSize(0, newSegmentSizeDelta); + } + + @Override + protected long indexEntrySize() { + return ClassSize.CELL_ARRAY_MAP_ENTRY; + } + + @Override protected boolean isFlat() { + return true; + } + + ///////////////////// PRIVATE METHODS ///////////////////// + /*------------------------------------------------------------------------*/ + // Create CellSet based on CellArrayMap from compacting iterator + protected CellSet createCellFlatMapSet(int numOfCells, MemStoreSegmentsIterator iterator, + MemStoreCompactor.Action action) { + + Cell[] cells = new Cell[numOfCells]; // build the Cell Array + int i = 0; + while (iterator.hasNext()) { + Cell c = iterator.next(); + // The scanner behind the iterator is doing all the elimination logic + if (action == MemStoreCompactor.Action.MERGE) { + // if this is merge we just move the Cell object without copying MSLAB + // the sizes still need to be updated in the new segment + cells[i] = c; + } else { + // now we just copy it to the new segment (also MSLAB copy) + cells[i] = maybeCloneWithAllocator(c); + } + boolean useMSLAB = (getMemStoreLAB()!=null); + // second parameter true, because in compaction/merge the addition of the cell to new segment + // is always successful + updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell + i++; + } + // build the immutable CellSet + CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false); + return new CellSet(cam); + } + + /*------------------------------------------------------------------------*/ + // Create CellSet based on CellChunkMap from current ConcurrentSkipListMap based CellSet + // (without compacting iterator) + // We do not consider cells bigger than chunks! + static protected CellSet recreateCellFlatMapSet(int numOfCells, KeyValueScanner segmentScanner) { + Cell[] cells = new Cell[numOfCells]; // build the Cell Array + Cell curCell; + int idx = 0; + + try { + while ((curCell = segmentScanner.next()) != null) { + cells[idx++] = curCell; + } + } catch (IOException ie) { + throw new IllegalStateException(ie); + } finally { + segmentScanner.close(); + } + + // build the immutable CellSet + CellArrayMap cam = new CellArrayMap(CellComparator.COMPARATOR, cells, 0, idx, false); + return new CellSet(cam); + } + + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java new file mode 100644 index 0000000..1638598 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java @@ -0,0 +1,220 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.ByteBufferKeyValue; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.ClassSize; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * CSLMImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, + * and {@link ImmutableSegment}. This immutable segment is working with CellSet with + * CellChunkMap delegatee. + */ +@InterfaceAudience.Private +public class CellChunkImmutableSegment extends ImmutableSegment { + + public static final long DEEP_OVERHEAD_CCM = + ImmutableSegment.DEEP_OVERHEAD + ClassSize.CELL_CHUNK_MAP; + + ///////////////////// CONSTRUCTORS ///////////////////// + /**------------------------------------------------------------------------ + * C-tor to be used when new CellChunkImmutableSegment is built as a result of compaction/merge + * of a list of older ImmutableSegments. + * The given iterator returns the Cells that "survived" the compaction. + */ + protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, + MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) { + super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL + incSize(0,DEEP_OVERHEAD_CCM); // initiate the heapSize with the size of the segment metadata + // build the new CellSet based on CellArrayMap + CellSet cs = createCellChunkMapSet(numOfCells, iterator, action); + this.setCellSet(null, cs); // update the CellSet of the new Segment + } + + /**------------------------------------------------------------------------ + * C-tor to be used when new CellChunkImmutableSegment is built as a result of flattening + * of CSLMImmutableSegment + * The given iterator returns the Cells that "survived" the compaction. + */ + protected CellChunkImmutableSegment(CSLMImmutableSegment segment, MemstoreSize memstoreSize) { + super(segment); // initiailize the upper class + incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM); + int numOfCells = segment.getCellsCount(); + // build the new CellSet based on CellChunkMap + CellSet cs = recreateCellFlatMapSet(numOfCells, segment.getScanner(Long.MAX_VALUE)); + this.setCellSet(segment.getCellSet(), cs); // update the CellSet of the new Segment + + // arrange the meta-data size, decrease all meta-data sizes related to SkipList; + // if flattening is to CellChunkMap decrease also Cell object sizes + // (recreateCellArrayMapSet doesn't take the care for the sizes) + long newSegmentSizeDelta = + -(numOfCells * (ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD)); + // add size of CellArrayMap entry or CellChunkMap entry + newSegmentSizeDelta = + newSegmentSizeDelta + (numOfCells * ClassSize.CELL_CHUNK_MAP_ENTRY); + incSize(0, newSegmentSizeDelta); + memstoreSize.incMemstoreSize(0, newSegmentSizeDelta); + } + + @Override + protected long heapSizeChange(Cell cell, boolean succ) { + if (succ) { + return ClassSize.align( // no Cell object is created so subtracting KeyValue.FIXED_OVERHEAD + ClassSize.CELL_CHUNK_MAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell) + - KeyValue.FIXED_OVERHEAD); + } + return 0; + } + + @Override + protected long indexEntrySize() { + return (ClassSize.CELL_CHUNK_MAP_ENTRY - KeyValue.FIXED_OVERHEAD); + } + + @Override protected boolean isFlat() { + return true; + } + + ///////////////////// PRIVATE METHODS ///////////////////// + /*------------------------------------------------------------------------*/ + // Create CellSet based on CellChunkMap from compacting iterator + private CellSet createCellChunkMapSet(int numOfCells, MemStoreSegmentsIterator iterator, + MemStoreCompactor.Action action) { + + // calculate how many chunks we will need for index + int chunkSize = ChunkCreator.getInstance().getChunkSize(); + int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK; + int numberOfChunks = numOfCells/numOfCellsInChunk + 1; + int numOfCellsAfterCompaction = 0; + int currentChunkIdx = 0; + int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; + + // all index Chunks are allocated from ChunkCreator + Chunk[] chunks = new Chunk[numberOfChunks]; + for (int i=0; i chunkSize) { + currentChunkIdx++; // continue to the next index chunk + offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; + } + + if (action == MemStoreCompactor.Action.COMPACT) { + c = maybeCloneWithAllocator(c); // for compaction copy cell to the new segment (MSLAB copy) + } + + offsetInCurentChunk = // add the Cell reference to the index chunk + createCellReference((ByteBufferKeyValue)c, chunks[currentChunkIdx].getData(), + offsetInCurentChunk); + + boolean useMSLAB = (getMemStoreLAB()!=null); + + // the sizes still need to be updated in the new segment + // second parameter true, because in compaction/merge the addition of the cell to new segment + // is always successful + updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell + } + // build the immutable CellSet + CellChunkMap ccm = + new CellChunkMap(CellComparator.COMPARATOR,chunks,0,numOfCellsAfterCompaction,false); + return new CellSet(ccm); + } + + /*------------------------------------------------------------------------*/ + // Create CellSet based on CellChunkMap from current ConcurrentSkipListMap based CellSet + // (without compacting iterator) + // This is a service for not-flat immutable segments + // Assumption: cells do not exceed chunk size! + private CellSet recreateCellFlatMapSet(int numOfCells, KeyValueScanner segmentScanner) { + Cell curCell; + // calculate how many chunks we will need for metadata + int chunkSize = ChunkCreator.getInstance().getChunkSize(); + int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK; + int numberOfChunks = numOfCells/numOfCellsInChunk + 1; + if(numOfCells%numOfCellsInChunk==0) { // if cells can be divided evenly between chunks + numberOfChunks--; // addition of the one in the calculation above is + } // not needed + + // all index Chunks are allocated from ChunkCreator + Chunk[] chunks = new Chunk[numberOfChunks]; + for (int i=0; i chunkSize) { + // continue to the next metadata chunk + currentChunkIdx++; + offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; + } + offsetInCurentChunk = + createCellReference((ByteBufferKeyValue) curCell, chunks[currentChunkIdx].getData(), + offsetInCurentChunk); + } + } catch (IOException ie) { + throw new IllegalStateException(ie); + } finally { + segmentScanner.close(); + } + + CellChunkMap ccm = new CellChunkMap(CellComparator.COMPARATOR,chunks,0,numOfCells,false); + return new CellSet(ccm); + } + + /*------------------------------------------------------------------------*/ + // for a given cell, write the cell representation on the index chunk + private int createCellReference(ByteBufferKeyValue cell, ByteBuffer idxBuffer, int idxOffset) { + int offset = idxOffset; + int dataChunkID = cell.getChunkId(); + // ensure strong pointer to data chunk, as index is no longer directly points to it + Chunk c = ChunkCreator.getInstance().saveChunkFromGC(dataChunkID); + // if c is null, it means that this cell chunks was already released shouldn't happen + assert (c!=null); + offset = ByteBufferUtils.putInt(idxBuffer, offset, dataChunkID); // write data chunk id + offset = ByteBufferUtils.putInt(idxBuffer, offset, cell.getOffset()); // offset + offset = ByteBufferUtils.putInt(idxBuffer, offset, KeyValueUtil.length(cell)); // length + offset = ByteBufferUtils.putLong(idxBuffer, offset, cell.getSequenceId()); // seqId + + return offset; + } + + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java index a965ade..a5c73fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.ClassSize; import java.util.Comparator; @@ -56,11 +57,12 @@ import java.util.Comparator; public class CellChunkMap extends CellFlatMap { private final Chunk[] chunks; // the array of chunks, on which the index is based - private final int numOfCellsInsideChunk; // constant number of cell-representations in a chunk - // each cell-representation requires three integers for chunkID (reference to the ByteBuffer), - // offset and length, and one long for seqID - public static final int SIZEOF_CELL_REP = 3*Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG ; + // constant number of cell-representations in a chunk + // each chunk starts with its own ID following the cells data + public static final int NUM_OF_CELL_REPS_IN_CHUNK = + (ChunkCreator.getInstance().getChunkSize() - ChunkCreator.SIZEOF_CHUNK_HEADER) / + ClassSize.CELL_CHUNK_MAP_ENTRY; /** * C-tor for creating CellChunkMap from existing Chunk array, which must be ordered @@ -75,9 +77,6 @@ public class CellChunkMap extends CellFlatMap { Chunk[] chunks, int min, int max, boolean descending) { super(comparator, min, max, descending); this.chunks = chunks; - this.numOfCellsInsideChunk = // each chunk starts with its own ID following the cells data - (ChunkCreator.getInstance().getChunkSize() - Bytes.SIZEOF_INT) / SIZEOF_CELL_REP; - } /* To be used by base (CellFlatMap) class only to create a sub-CellFlatMap @@ -91,13 +90,12 @@ public class CellChunkMap extends CellFlatMap { @Override protected Cell getCell(int i) { // get the index of the relevant chunk inside chunk array - int chunkIndex = (i / numOfCellsInsideChunk); + int chunkIndex = (i / NUM_OF_CELL_REPS_IN_CHUNK); ByteBuffer block = chunks[chunkIndex].getData();// get the ByteBuffer of the relevant chunk - int j = i - chunkIndex * numOfCellsInsideChunk; // get the index of the cell-representation + int j = i - chunkIndex * NUM_OF_CELL_REPS_IN_CHUNK; // get the index of the cell-representation // find inside the offset inside the chunk holding the index, skip bytes for chunk id - int offsetInBytes = Bytes.SIZEOF_INT + j* SIZEOF_CELL_REP; - + int offsetInBytes = ChunkCreator.SIZEOF_CHUNK_HEADER + j* ClassSize.CELL_CHUNK_MAP_ENTRY; // find the chunk holding the data of the cell, the chunkID is stored first int chunkId = ByteBufferUtils.toInt(block, offsetInBytes); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java index d550148..baf9a7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -18,7 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.lang.ref.SoftReference; +import java.lang.ref.WeakReference; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -35,6 +35,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -51,9 +52,28 @@ public class ChunkCreator { private AtomicInteger chunkID = new AtomicInteger(1); // maps the chunk against the monotonically increasing chunk id. We need to preserve the // natural ordering of the key - // CellChunkMap creation should convert the soft ref to hard reference - private Map> chunkIdMap = - new ConcurrentHashMap>(); + // CellChunkMap creation should convert the weak ref to hard reference + + // chunk id of each chunk is the first integer written on each chunk, + // the header size need to be changed in case chunk id size is changed + public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT; + + // An object pointed by a weak reference can be garbage collected, in opposite to an object + // referenced by a strong (regular) reference. Every chunk created via ChunkCreator is referenced + // from either weakChunkIdMap or strongChunkIdMap. + // Upon chunk C creation, C's ID is mapped into weak reference to C, in order not to disturb C's + // GC in case all other reference to C are going to be removed. + // When chunk C is referenced from CellChunkMap (via C's ID) it is possible to GC the chunk C. + // To avoid that upon inserting C into CellChunkMap, C's ID is mapped into strong (regular) + // reference to C. + + // map that doesn't influence GC + private Map> weakChunkIdMap = + new ConcurrentHashMap>(); + + // map that keeps chunks from garbage collection + private Map strongChunkIdMap = new ConcurrentHashMap(); + private final int chunkSize; private final boolean offheap; @VisibleForTesting @@ -119,8 +139,8 @@ public class ChunkCreator { if (chunk == null) { chunk = createChunk(); } - // put this chunk into the chunkIdMap - this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk)); + // put this chunk initially into the weakChunkIdMap + this.weakChunkIdMap.put(chunk.getId(), new WeakReference<>(chunk)); // now we need to actually do the expensive memory allocation step in case of a new chunk, // else only the offset is set to the beginning of the chunk to accept allocations chunk.init(); @@ -148,12 +168,36 @@ public class ChunkCreator { } @VisibleForTesting - // TODO : To be used by CellChunkMap + // Used to translate the ChunkID into a chunk ref Chunk getChunk(int id) { - SoftReference ref = chunkIdMap.get(id); + WeakReference ref = weakChunkIdMap.get(id); if (ref != null) { return ref.get(); } + // check also the strong mapping + return strongChunkIdMap.get(id); + } + + // transfer the weak pointer to be a strong chunk pointer + Chunk saveChunkFromGC(int chunkID) { + Chunk c = strongChunkIdMap.get(chunkID); // check whether the chunk is already protected + if (c != null) // with strong pointer + return c; + WeakReference ref = weakChunkIdMap.get(chunkID); + if (ref != null) { + c = ref.get(); + } + if (c != null) { + // put this strong reference to chunk into the strongChunkIdMap + // the read of the weakMap is always happening before the read of the strongMap + // so no synchronization issues here + this.strongChunkIdMap.put(chunkID, c); + this.weakChunkIdMap.remove(chunkID); + return c; + } + // we should actually never return null as someone should not ask to save from GC a chunk, + // which is already released. However, we are not asserting it here and we let the caller + // to deal with the return value an assert if needed return null; } @@ -166,25 +210,30 @@ public class ChunkCreator { } private void removeChunks(Set chunkIDs) { - this.chunkIdMap.keySet().removeAll(chunkIDs); + this.weakChunkIdMap.keySet().removeAll(chunkIDs); + this.strongChunkIdMap.keySet().removeAll(chunkIDs); } Chunk removeChunk(int chunkId) { - SoftReference ref = this.chunkIdMap.remove(chunkId); - if (ref != null) { - return ref.get(); + WeakReference weak = this.weakChunkIdMap.remove(chunkId); + Chunk strong = this.strongChunkIdMap.remove(chunkId); + if (weak != null) { + return weak.get(); } - return null; + return strong; } @VisibleForTesting + // the chunks in the weakChunkIdMap may already be released so we shouldn't relay + // on this counting for strong correctness. This method is used only in testing. int size() { - return this.chunkIdMap.size(); + return this.weakChunkIdMap.size()+this.strongChunkIdMap.size(); } @VisibleForTesting void clearChunkIds() { - this.chunkIdMap.clear(); + this.strongChunkIdMap.clear(); + this.weakChunkIdMap.clear(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 5b9372a..88659e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -57,6 +57,11 @@ public class CompactingMemStore extends AbstractMemStore { "hbase.hregion.compacting.memstore.type"; public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = String.valueOf(MemoryCompactionPolicy.BASIC); + // The external setting of the compacting MemStore behaviour + public static final String COMPACTING_MEMSTORE_CHUNK_MAP_KEY = + "hbase.hregion.compacting.memstore.ccm"; + // usage of CellChunkMap is set to false by default, later it will be decided how to use it + public static final boolean COMPACTING_MEMSTORE_CHUNK_MAP_DEFAULT = false; // Default fraction of in-memory-flush size w.r.t. flush-to-disk size public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = "hbase.memstore.inmemoryflush.threshold.factor"; @@ -78,10 +83,22 @@ public class CompactingMemStore extends AbstractMemStore { private final AtomicBoolean allowCompaction = new AtomicBoolean(true); private boolean compositeSnapshot = true; + /** + * Types of indexes (part of immutable segments) to be used after flattening, + * compaction, or merge are applied. + */ + public enum IndexType { + CSLM_MAP, // ConcurrentSkipLisMap + ARRAY_MAP, // CellArrayMap + CHUNK_MAP // CellChunkMap + } + + private IndexType indexType = IndexType.ARRAY_MAP; // default implementation public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD - + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, - // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction + + 7 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, + // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction, + // indexType + Bytes.SIZEOF_LONG // inmemoryFlushSize + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction @@ -96,6 +113,10 @@ public class CompactingMemStore extends AbstractMemStore { this.pipeline = new CompactionPipeline(getRegionServices()); this.compactor = createMemStoreCompactor(compactionPolicy); initInmemoryFlushSize(conf); + if(getConfiguration().getBoolean(COMPACTING_MEMSTORE_CHUNK_MAP_KEY, + COMPACTING_MEMSTORE_CHUNK_MAP_DEFAULT)) { + indexType = IndexType.CHUNK_MAP; + } } @VisibleForTesting @@ -294,7 +315,22 @@ public class CompactingMemStore extends AbstractMemStore { * The flattening happens only if versions match. */ public void flattenOneSegment(long requesterVersion) { - pipeline.flattenYoungestSegment(requesterVersion); + pipeline.flattenOneSegment(requesterVersion, indexType); + } + + // setter is used only for testability + @VisibleForTesting + public void setIndexType() { + if(getConfiguration().getBoolean(COMPACTING_MEMSTORE_CHUNK_MAP_KEY, + COMPACTING_MEMSTORE_CHUNK_MAP_DEFAULT)) { + indexType = IndexType.CHUNK_MAP; + } else { + indexType = IndexType.ARRAY_MAP; + } + } + + public IndexType getIndexType() { + return indexType; } public boolean hasImmutableSegments() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 06e83a3..83beaaa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.ClassSize; * method accesses the read-only copy more than once it makes a local copy of it * to ensure it accesses the same copy. * - * The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also + * The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also * protected by a lock since they need to have a consistent (atomic) view of the pipeline list * and version number. */ @@ -183,7 +183,7 @@ public class CompactionPipeline { * * @return true iff a segment was successfully flattened */ - public boolean flattenYoungestSegment(long requesterVersion) { + public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType) { if(requesterVersion != version) { LOG.warn("Segment flattening failed, because versions do not match. Requester version: " @@ -196,17 +196,22 @@ public class CompactionPipeline { LOG.warn("Segment flattening failed, because versions do not match"); return false; } - + int i = 0; for (ImmutableSegment s : pipeline) { - // remember the old size in case this segment is going to be flatten - MemstoreSize memstoreSize = new MemstoreSize(); - if (s.flatten(memstoreSize)) { + if ( !s.isFlat() ) { + MemstoreSize newMemstoreSize = new MemstoreSize(); // the size to be updated + ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( + (CSLMImmutableSegment)s,idxType,newMemstoreSize); + replaceAtIndex(i,newS); if(region != null) { - region.addMemstoreSize(memstoreSize); + // update the global memstore size counter + // upon flattening there is no change in the data size + region.addMemstoreSize(new MemstoreSize(0, newMemstoreSize.getHeapSize())); } LOG.debug("Compaction pipeline segment " + s + " was flattened"); return true; } + i++; } } @@ -271,12 +276,19 @@ public class CompactionPipeline { if(segment != null) pipeline.addLast(segment); } + // replacing one segment in the pipeline with a new one exactly at the same index + // need to be called only within synchronized block + private void replaceAtIndex(int idx, ImmutableSegment newSegment) { + ImmutableSegment oldSegment = pipeline.set(idx, newSegment); + readOnlyCopy = new LinkedList<>(pipeline); + } + public Segment getTail() { List localCopy = getSegments(); if(localCopy.isEmpty()) { return null; } - return localCopy.get(localCopy.size()-1); + return localCopy.get(localCopy.size() - 1); } private boolean addFirst(ImmutableSegment segment) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java index 2f89ec7..c856ecd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -184,6 +184,16 @@ public class CompositeImmutableSegment extends ImmutableSegment { throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } + + @Override + protected long indexEntrySize() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + @Override protected boolean isFlat() { + return false; + } + /** * @return Sum of all cell sizes. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java index 430b642..98abbcd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java @@ -46,6 +46,15 @@ public class ImmutableMemStoreLAB implements MemStoreLAB { } @Override + // returning a new chunk, without replacing current chunk, + // the space on this chunk will be allocated externally + // use the first MemStoreLABImpl in the list + public Chunk getNewChunk() { + MemStoreLAB mslab = this.mslabs.get(0); + return mslab.getNewChunk(); + } + + @Override public void close() { // 'openScannerCount' here tracks the scanners opened on segments which directly refer to this // MSLAB. The individual MSLABs this refers also having its own 'openScannerCount'. The usage of diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 19b66b4..d674fad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -19,15 +19,12 @@ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -37,21 +34,11 @@ import java.util.List; * and is not needed for a {@link MutableSegment}. */ @InterfaceAudience.Private -public class ImmutableSegment extends Segment { +public abstract class ImmutableSegment extends Segment { - private static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD - + (2 * ClassSize.REFERENCE) // Refs to timeRange and type - + ClassSize.TIMERANGE; - public static final long DEEP_OVERHEAD_CSLM = DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP; - public static final long DEEP_OVERHEAD_CAM = DEEP_OVERHEAD + ClassSize.CELL_ARRAY_MAP; - - /** - * Types of ImmutableSegment - */ - public enum Type { - SKIPLIST_MAP_BASED, - ARRAY_MAP_BASED, - } + public static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD + + ClassSize.align(ClassSize.REFERENCE // Referent to timeRange + + ClassSize.TIMERANGE); /** * This is an immutable segment so use the read-only TimeRange rather than the heavy-weight @@ -59,12 +46,13 @@ public class ImmutableSegment extends Segment { */ private final TimeRange timeRange; - private Type type = Type.SKIPLIST_MAP_BASED; - // whether it is based on CellFlatMap or ConcurrentSkipListMap - private boolean isFlat(){ - return (type != Type.SKIPLIST_MAP_BASED); - } + // the way how heap size is updated depends on specific segment + @Override + protected abstract long indexEntrySize(); + + // each sub-type of immutable segment knows whether it is flat or not + protected abstract boolean isFlat(); ///////////////////// CONSTRUCTORS ///////////////////// /**------------------------------------------------------------------------ @@ -76,59 +64,25 @@ public class ImmutableSegment extends Segment { } /**------------------------------------------------------------------------ - * Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one. - * This C-tor should be used when active MutableSegment is pushed into the compaction - * pipeline and becomes an ImmutableSegment. + * C-tor to be used to build the derived classes */ - protected ImmutableSegment(Segment segment) { - super(segment); - this.type = Type.SKIPLIST_MAP_BASED; + protected ImmutableSegment(CellSet cs, CellComparator comparator, MemStoreLAB memStoreLAB) { + super(cs, comparator, memStoreLAB); this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange(); } /**------------------------------------------------------------------------ - * C-tor to be used when new CELL_ARRAY BASED ImmutableSegment is a result of compaction of a - * list of older ImmutableSegments. - * The given iterator returns the Cells that "survived" the compaction. - * The input parameter "type" exists for future use when more types of flat ImmutableSegments - * are going to be introduced. + * Copy C-tor to be used when new CSLMImmutableSegment (derived) is being built from a Mutable one. + * This C-tor should be used when active MutableSegment is pushed into the compaction + * pipeline and becomes an ImmutableSegment. */ - protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, - MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean merge) { - - super(null, // initiailize the CellSet with NULL - comparator, memStoreLAB); - this.type = type; - // build the new CellSet based on CellArrayMap - CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge); - - this.setCellSet(null, cs); // update the CellSet of the new Segment + protected ImmutableSegment(Segment segment) { + super(segment); this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange(); } - /**------------------------------------------------------------------------ - * C-tor to be used when new SKIP-LIST BASED ImmutableSegment is a result of compaction of a - * list of older ImmutableSegments. - * The given iterator returns the Cells that "survived" the compaction. - */ - protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, - MemStoreLAB memStoreLAB) { - super(new CellSet(comparator), // initiailize the CellSet with empty CellSet - comparator, memStoreLAB); - type = Type.SKIPLIST_MAP_BASED; - while (iterator.hasNext()) { - Cell c = iterator.next(); - // The scanner is doing all the elimination logic - // now we just copy it to the new segment - Cell newKV = maybeCloneWithAllocator(c); - boolean usedMSLAB = (newKV != c); - internalAdd(newKV, usedMSLAB, null); - } - this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange(); - } ///////////////////// PUBLIC METHODS ///////////////////// - @Override public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { return this.timeRange.includesTimeRange(scan.getTimeRange()) && @@ -148,107 +102,4 @@ public class ImmutableSegment extends Segment { List res = new ArrayList<>(Arrays.asList(this)); return res; } - - /**------------------------------------------------------------------------ - * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one - * based on CellArrayMap. - * If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOOP - * - * Synchronization of the CellSet replacement: - * The reference to the CellSet is AtomicReference and is updated only when ImmutableSegment - * is constructed (single thread) or flattened. The flattening happens as part of a single - * thread of compaction, but to be on the safe side the initial CellSet is locally saved - * before the flattening and then replaced using CAS instruction. - */ - public boolean flatten(MemstoreSize memstoreSize) { - if (isFlat()) return false; - CellSet oldCellSet = getCellSet(); - int numOfCells = getCellsCount(); - - // build the new (CellSet CellArrayMap based) - CellSet newCellSet = recreateCellArrayMapSet(numOfCells); - type = Type.ARRAY_MAP_BASED; - setCellSet(oldCellSet,newCellSet); - - // arrange the meta-data size, decrease all meta-data sizes related to SkipList - // (recreateCellArrayMapSet doesn't take the care for the sizes) - long newSegmentSizeDelta = -(numOfCells * ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); - // add size of CellArrayMap and meta-data overhead per Cell - newSegmentSizeDelta = newSegmentSizeDelta + numOfCells * ClassSize.CELL_ARRAY_MAP_ENTRY; - incSize(0, newSegmentSizeDelta); - if (memstoreSize != null) { - memstoreSize.incMemstoreSize(0, newSegmentSizeDelta); - } - - return true; - } - - ///////////////////// PRIVATE METHODS ///////////////////// - /*------------------------------------------------------------------------*/ - // Create CellSet based on CellArrayMap from compacting iterator - private CellSet createCellArrayMapSet(int numOfCells, MemStoreSegmentsIterator iterator, - boolean merge) { - - Cell[] cells = new Cell[numOfCells]; // build the Cell Array - int i = 0; - while (iterator.hasNext()) { - Cell c = iterator.next(); - // The scanner behind the iterator is doing all the elimination logic - if (merge) { - // if this is merge we just move the Cell object without copying MSLAB - // the sizes still need to be updated in the new segment - cells[i] = c; - } else { - // now we just copy it to the new segment (also MSLAB copy) - cells[i] = maybeCloneWithAllocator(c); - } - boolean useMSLAB = (getMemStoreLAB()!=null); - // second parameter true, because in compaction/merge the addition of the cell to new segment - // is always successful - updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell - i++; - } - // build the immutable CellSet - CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false); - return new CellSet(cam); - } - - @Override - protected long heapSizeChange(Cell cell, boolean succ) { - if (succ) { - switch (this.type) { - case SKIPLIST_MAP_BASED: - return super.heapSizeChange(cell, succ); - case ARRAY_MAP_BASED: - return ClassSize.align(ClassSize.CELL_ARRAY_MAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell)); - } - } - return 0; - } - - /*------------------------------------------------------------------------*/ - // Create CellSet based on CellArrayMap from current ConcurrentSkipListMap based CellSet - // (without compacting iterator) - private CellSet recreateCellArrayMapSet(int numOfCells) { - - Cell[] cells = new Cell[numOfCells]; // build the Cell Array - Cell curCell; - int idx = 0; - // create this segment scanner with maximal possible read point, to go over all Cells - KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE); - - try { - while ((curCell = segmentScanner.next()) != null) { - cells[idx++] = curCell; - } - } catch (IOException ie) { - throw new IllegalStateException(ie); - } finally { - segmentScanner.close(); - } - - // build the immutable CellSet - CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false); - return new CellSet(cam); - } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 08af7fe..315b33c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -80,7 +80,7 @@ public class MemStoreCompactor { * Note that every value covers the previous ones, i.e. if MERGE is the action it implies * that the youngest segment is going to be flatten anyway. */ - private enum Action { + public enum Action { NOOP, FLATTEN, // flatten the youngest segment in the pipeline MERGE, // merge all the segments in the pipeline into one @@ -252,7 +252,7 @@ public class MemStoreCompactor { result = SegmentFactory.instance().createImmutableSegmentByCompaction( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, - versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED); + versionedList.getNumOfCells(), compactingMemStore.getIndexType()); iterator.close(); break; case MERGE: @@ -263,8 +263,8 @@ public class MemStoreCompactor { result = SegmentFactory.instance().createImmutableSegmentByMerge( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, - versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED, - versionedList.getStoreSegments()); + versionedList.getNumOfCells(), versionedList.getStoreSegments(), + compactingMemStore.getIndexType()); iterator.close(); break; default: throw new RuntimeException("Unknown action " + action); // sanity check diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index 72e937c..0c164c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -83,6 +83,11 @@ public interface MemStoreLAB { */ void decScannerCount(); + /** + * Return a new empty chunk without considering this chunk as current + */ + Chunk getNewChunk(); + public static MemStoreLAB newInstance(Configuration conf) { MemStoreLAB memStoreLAB = null; if (isEnabled(conf)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 4fba82d..40794e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -253,6 +253,15 @@ public class MemStoreLABImpl implements MemStoreLAB { return null; } + // returning a new chunk, without replacing current chunk, + // the space on this chunk will be allocated externally + @Override + public Chunk getNewChunk() { + Chunk c = this.chunkCreator.getChunk(); + chunks.add(c.getId()); + return c; + } + @VisibleForTesting Chunk getCurrentChunk() { return this.curChunk.get(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java index 7361750..a0616e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -42,6 +42,7 @@ public class MutableSegment extends Segment { protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { super(cellSet, comparator, memStoreLAB); + incSize(0,DEEP_OVERHEAD); // update the mutable segment metadata } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 8f43fa8..3868931 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -312,11 +312,15 @@ public abstract class Segment { protected long heapSizeChange(Cell cell, boolean succ) { if (succ) { return ClassSize - .align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell)); + .align(indexEntrySize() + CellUtil.estimatedHeapSizeOf(cell)); } return 0; } + protected long indexEntrySize() { + return ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY; + } + /** * Returns a subset of the segment cell set, which starts with the given cell * @param firstCell a cell in the segment diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 1a8b89d..d719a55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -41,42 +40,36 @@ public final class SegmentFactory { return instance; } - // create skip-list-based (non-flat) immutable segment from compacting old immutable segments - public ImmutableSegment createImmutableSegment(final Configuration conf, - final CellComparator comparator, MemStoreSegmentsIterator iterator) { - return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf)); - } - // create composite immutable segment from a list of segments + // for snapshot consisting of multiple segments public CompositeImmutableSegment createCompositeImmutableSegment( final CellComparator comparator, List segments) { return new CompositeImmutableSegment(comparator, segments); - } // create new flat immutable segment from compacting old immutable segments + // for compaction public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, - ImmutableSegment.Type segmentType) + CompactingMemStore.IndexType idxType) throws IOException { - Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED, - "wrong immutable segment type"); + MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf); return - // the last parameter "false" means not to merge, but to compact the pipeline - // in order to create the new segment - new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false); + createImmutableSegment( + conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.COMPACT,idxType); } // create empty immutable segment + // for initializations public ImmutableSegment createImmutableSegment(CellComparator comparator) { MutableSegment segment = generateMutableSegment(null, comparator, null); return createImmutableSegment(segment); } - // create immutable segment from mutable segment + // create not-flat immutable segment from mutable segment public ImmutableSegment createImmutableSegment(MutableSegment segment) { - return new ImmutableSegment(segment); + return new CSLMImmutableSegment(segment); } // create mutable segment @@ -86,19 +79,58 @@ public final class SegmentFactory { } // create new flat immutable segment from merging old immutable segments + // for merge public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, - ImmutableSegment.Type segmentType, List segments) + List segments, CompactingMemStore.IndexType idxType) throws IOException { - Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED, - "wrong immutable segment type"); + MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments); return - // the last parameter "true" means to merge the compaction pipeline - // in order to create the new segment - new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true); + createImmutableSegment( + conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.MERGE,idxType); + + } + + // create flat immutable segment from non-flat immutable segment + // for flattening + public ImmutableSegment createImmutableSegmentByFlattening( + CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType, MemstoreSize memstoreSize) { + ImmutableSegment res = null; + switch (idxType) { + case CHUNK_MAP: + res = new CellChunkImmutableSegment(segment, memstoreSize); + break; + case CSLM_MAP: + assert false; // non-flat segment can not be the result of flattening + break; + case ARRAY_MAP: + res = new CellArrayImmutableSegment(segment, memstoreSize); + break; + } + return res; } + + //****** private methods to instantiate concrete store segments **********// + private ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator, + MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells, + MemStoreCompactor.Action action, CompactingMemStore.IndexType idxType) { + + ImmutableSegment res = null; + switch (idxType) { + case CHUNK_MAP: + res = new CellChunkImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action); + break; + case CSLM_MAP: + assert false; // non-flat segment can not be created here + break; + case ARRAY_MAP: + res = new CellArrayImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action); + break; + } + return res; + } private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator, MemStoreLAB memStoreLAB) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index bf74a9e..1af0d88 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -369,6 +369,7 @@ public class TestHeapSize { if (expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(AtomicLong.class, true); + ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicReference.class, true); ClassSize.estimateBase(CellSet.class, true); ClassSize.estimateBase(TimeRangeTracker.class, true); @@ -376,9 +377,28 @@ public class TestHeapSize { assertEquals(expected, actual); } - // ImmutableSegment Deep overhead + // ImmutableSegments Deep overhead cl = ImmutableSegment.class; - actual = ImmutableSegment.DEEP_OVERHEAD_CSLM; + actual = ImmutableSegment.DEEP_OVERHEAD; + expected = ClassSize.estimateBase(cl, false); + expected += 2 * ClassSize.estimateBase(AtomicLong.class, false); + expected += ClassSize.estimateBase(AtomicReference.class, false); + expected += ClassSize.estimateBase(CellSet.class, false); + expected += ClassSize.estimateBase(TimeRangeTracker.class, false); + expected += ClassSize.estimateBase(TimeRange.class, false); + if (expected != actual) { + ClassSize.estimateBase(cl, true); + ClassSize.estimateBase(AtomicLong.class, true); + ClassSize.estimateBase(AtomicLong.class, true); + ClassSize.estimateBase(AtomicReference.class, true); + ClassSize.estimateBase(CellSet.class, true); + ClassSize.estimateBase(TimeRangeTracker.class, true); + ClassSize.estimateBase(TimeRange.class, true); + assertEquals(expected, actual); + } + + cl = CSLMImmutableSegment.class; + actual = CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; expected = ClassSize.estimateBase(cl, false); expected += 2 * ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(AtomicReference.class, false); @@ -389,6 +409,7 @@ public class TestHeapSize { if (expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(AtomicLong.class, true); + ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicReference.class, true); ClassSize.estimateBase(CellSet.class, true); ClassSize.estimateBase(TimeRangeTracker.class, true); @@ -396,7 +417,8 @@ public class TestHeapSize { ClassSize.estimateBase(ConcurrentSkipListMap.class, true); assertEquals(expected, actual); } - actual = ImmutableSegment.DEEP_OVERHEAD_CAM; + cl = CellArrayImmutableSegment.class; + actual = CellArrayImmutableSegment.DEEP_OVERHEAD_CAM; expected = ClassSize.estimateBase(cl, false); expected += 2 * ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(AtomicReference.class, false); @@ -407,6 +429,7 @@ public class TestHeapSize { if (expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(AtomicLong.class, true); + ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicReference.class, true); ClassSize.estimateBase(CellSet.class, true); ClassSize.estimateBase(TimeRangeTracker.class, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java index 5872d69..6307d32 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -286,8 +287,8 @@ public class TestCellFlatSet extends TestCase { ByteBuffer idxBuffer = idxChunk.getData(); // the buffers of the chunks ByteBuffer dataBuffer = dataChunk.getData(); - int dataOffset = Bytes.SIZEOF_INT; // offset inside data buffer - int idxOffset = Bytes.SIZEOF_INT; // skip the space for chunk ID + int dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; // offset inside data buffer + int idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; // skip the space for chunk ID Cell[] cellArray = asc ? ascCells : descCells; @@ -296,16 +297,16 @@ public class TestCellFlatSet extends TestCase { if (dataOffset + KeyValueUtil.length(kv) > chunkCreator.getChunkSize()) { dataChunk = chunkCreator.getChunk(); // allocate more data chunks if needed dataBuffer = dataChunk.getData(); - dataOffset = Bytes.SIZEOF_INT; + dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; } int dataStartOfset = dataOffset; dataOffset = KeyValueUtil.appendTo(kv, dataBuffer, dataOffset, false); // write deep cell data // do we have enough space to write the cell-representation on the index chunk? - if (idxOffset + CellChunkMap.SIZEOF_CELL_REP > chunkCreator.getChunkSize()) { + if (idxOffset + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkCreator.getChunkSize()) { idxChunk = chunkCreator.getChunk(); // allocate more index chunks if needed idxBuffer = idxChunk.getData(); - idxOffset = Bytes.SIZEOF_INT; + idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; chunkArray[chunkArrayIdx++] = idxChunk; } idxOffset = ByteBufferUtils.putInt(idxBuffer, idxOffset, dataChunk.getId()); // write data chunk id @@ -314,8 +315,6 @@ public class TestCellFlatSet extends TestCase { idxOffset = ByteBufferUtils.putLong(idxBuffer, idxOffset, kv.getSequenceId()); // seqId } - return asc ? - new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,false) : - new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,true); + return new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,!asc); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 04435db..cccfd73 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; @@ -98,7 +99,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, - globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); + globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); assertTrue(chunkCreator != null); } @@ -563,16 +564,71 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertTrue(chunkCreator.getPoolSize() > 0); } + @Test + public void testFlatteningToCellChunkMap() throws IOException { + + // set memstore to flat into CellChunkMap + MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; + memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(compactionType)); + ((CompactingMemStore)memstore).initiateType(compactionType); + memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_CHUNK_MAP_KEY, + String.valueOf(true)); + ((CompactingMemStore)memstore).setIndexType(); + int numOfCells = 8; + String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8 + + // make one cell + byte[] row = Bytes.toBytes(keys1[0]); + byte[] val = Bytes.toBytes(keys1[0] + 0); + KeyValue kv = + new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), + System.currentTimeMillis(), val); + + // test 1 bucket + int totalCellsLen = addRowsByKeys(memstore, keys1); + long oneCellOnCSLMHeapSize = + ClassSize.align( + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil + .length(kv)); + + long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD; + assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); + + ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten + assertEquals(0, memstore.getSnapshot().getCellsCount()); + // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode. + // totalCellsLen should remain the same + long oneCellOnCCMHeapSize = + ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv)); + totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + + numOfCells * oneCellOnCCMHeapSize; + + assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); + + MemstoreSize size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.decrMemstoreSize(size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(numOfCells, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getMemstoreSize()); + + memstore.clearSnapshot(snapshot.getId()); + } + ////////////////////////////////////////////////////////////////////////////// // Compaction tests ////////////////////////////////////////////////////////////////////////////// @Test public void testCompaction1Bucket() throws IOException { - // set memstore to do data compaction and not to use the speculative scan - MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; - memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, - String.valueOf(compactionType)); + // set memstore to do basic structure flattening, the "eager" option is tested in + // TestCompactingToCellFlatMapMemStore + MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; + memstore.getConfiguration() + .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); ((CompactingMemStore)memstore).initiateType(compactionType); String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 @@ -581,25 +637,24 @@ public class TestCompactingMemStore extends TestDefaultMemStore { int totalCellsLen = addRowsByKeys(memstore, keys1); int oneCellOnCSLMHeapSize = 120; int oneCellOnCAHeapSize = 88; - long totalHeapSize = 4 * oneCellOnCSLMHeapSize; + long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); - MemstoreSize size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact assertEquals(0, memstore.getSnapshot().getCellsCount()); - // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting - // totalCellsLen - totalCellsLen = (totalCellsLen * 3) / 4; - totalHeapSize = 3 * oneCellOnCAHeapSize; + // There is no compaction, as the compacting memstore type is basic. + // totalCellsLen remains the same + totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + + 4 * oneCellOnCAHeapSize; assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); - size = memstore.getFlushableSize(); + MemstoreSize size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot region.decrMemstoreSize(size); // simulate flusher ImmutableSegment s = memstore.getSnapshot(); - assertEquals(3, s.getCellsCount()); + assertEquals(4, s.getCellsCount()); assertEquals(0, regionServicesForStores.getMemstoreSize()); memstore.clearSnapshot(snapshot.getId()); @@ -608,8 +663,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testCompaction2Buckets() throws IOException { - // set memstore to do data compaction and not to use the speculative scan - MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; + // set memstore to do basic structure flattening, the "eager" option is tested in + // TestCompactingToCellFlatMapMemStore + MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); ((CompactingMemStore)memstore).initiateType(compactionType); @@ -619,44 +675,43 @@ public class TestCompactingMemStore extends TestDefaultMemStore { int totalCellsLen1 = addRowsByKeys(memstore, keys1); int oneCellOnCSLMHeapSize = 120; int oneCellOnCAHeapSize = 88; - long totalHeapSize = 4 * oneCellOnCSLMHeapSize; + long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); - MemstoreSize size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact int counter = 0; for ( Segment s : memstore.getSegments()) { counter += s.getCellsCount(); } - assertEquals(3, counter); + assertEquals(4, counter); assertEquals(0, memstore.getSnapshot().getCellsCount()); - // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting - // totalCellsLen - totalCellsLen1 = (totalCellsLen1 * 3) / 4; + // There is no compaction, as the compacting memstore type is basic. + // totalCellsLen remains the same assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); - totalHeapSize = 3 * oneCellOnCAHeapSize; + totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + + 4 * oneCellOnCAHeapSize; assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); int totalCellsLen2 = addRowsByKeys(memstore, keys2); totalHeapSize += 3 * oneCellOnCSLMHeapSize; assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); - assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); + assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); - size = memstore.getFlushableSize(); + MemstoreSize size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact assertEquals(0, memstore.getSnapshot().getCellsCount()); - totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); - totalHeapSize = 4 * oneCellOnCAHeapSize; + totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + + 7 * oneCellOnCAHeapSize; assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot region.decrMemstoreSize(size); // simulate flusher ImmutableSegment s = memstore.getSnapshot(); - assertEquals(4, s.getCellsCount()); + assertEquals(7, s.getCellsCount()); assertEquals(0, regionServicesForStores.getMemstoreSize()); memstore.clearSnapshot(snapshot.getId()); @@ -678,10 +733,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { int oneCellOnCSLMHeapSize = 120; int oneCellOnCAHeapSize = 88; assertEquals(totalCellsLen1, region.getMemstoreSize()); - long totalHeapSize = 4 * oneCellOnCSLMHeapSize; + long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); - - MemstoreSize size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact assertEquals(0, memstore.getSnapshot().getCellsCount()); @@ -690,29 +743,31 @@ public class TestCompactingMemStore extends TestDefaultMemStore { totalCellsLen1 = (totalCellsLen1 * 3) / 4; assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff. - totalHeapSize = 3 * oneCellOnCAHeapSize; + totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + + 3 * oneCellOnCAHeapSize; assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells. - long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize; + long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize; assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); - assertEquals(totalHeapSize + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); + assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); ((CompactingMemStore) memstore).disableCompaction(); - size = memstore.getFlushableSize(); + MemstoreSize size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction assertEquals(0, memstore.getSnapshot().getCellsCount()); // No change in the cells data size. ie. memstore size. as there is no compaction. assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); - assertEquals(totalHeapSize + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); + assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, + ((CompactingMemStore) memstore).heapSize()); int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, regionServicesForStores.getMemstoreSize()); - long totalHeapSize3 = 3 * oneCellOnCSLMHeapSize; - assertEquals(totalHeapSize + totalHeapSize2 + totalHeapSize3, - ((CompactingMemStore) memstore).heapSize()); + long totalHeapSize3 = totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + + 3 * oneCellOnCSLMHeapSize; + assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); ((CompactingMemStore)memstore).enableCompaction(); size = memstore.getFlushableSize(); @@ -725,7 +780,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, regionServicesForStores.getMemstoreSize()); // Only 4 unique cells left - assertEquals(4 * oneCellOnCAHeapSize, ((CompactingMemStore) memstore).heapSize()); + assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD + + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize()); size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java deleted file mode 100644 index 66e107a..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ /dev/null @@ -1,492 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.util.List; - - - -/** - * compacted memstore test case - */ -@Category({RegionServerTests.class, MediumTests.class}) -public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore { - - private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class); - - ////////////////////////////////////////////////////////////////////////////// - // Helpers - ////////////////////////////////////////////////////////////////////////////// - - @Override public void tearDown() throws Exception { - chunkCreator.clearChunksInPool(); - } - - @Override public void setUp() throws Exception { - compactingSetUp(); - Configuration conf = HBaseConfiguration.create(); - - // set memstore to do data compaction - conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, - String.valueOf(MemoryCompactionPolicy.EAGER)); - - this.memstore = - new CompactingMemStore(conf, CellComparator.COMPARATOR, store, - regionServicesForStores, MemoryCompactionPolicy.EAGER); - } - - ////////////////////////////////////////////////////////////////////////////// - // Compaction tests - ////////////////////////////////////////////////////////////////////////////// - public void testCompaction1Bucket() throws IOException { - int counter = 0; - String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 - - // test 1 bucket - long totalCellsLen = addRowsByKeys(memstore, keys1); - int oneCellOnCSLMHeapSize = 120; - int oneCellOnCAHeapSize = 88; - long totalHeapSize = 4 * oneCellOnCSLMHeapSize; - assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize()); - assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); - - assertEquals(4, memstore.getActive().getCellsCount()); - MemstoreSize size = memstore.getFlushableSize(); - ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact - assertEquals(0, memstore.getSnapshot().getCellsCount()); - // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting - // totalCellsLen - totalCellsLen = (totalCellsLen * 3) / 4; - assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize()); - totalHeapSize = 3 * oneCellOnCAHeapSize; - assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); - for ( Segment s : memstore.getSegments()) { - counter += s.getCellsCount(); - } - assertEquals(3, counter); - size = memstore.getFlushableSize(); - MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemstoreSize(size); // simulate flusher - ImmutableSegment s = memstore.getSnapshot(); - assertEquals(3, s.getCellsCount()); - assertEquals(0, regionServicesForStores.getMemstoreSize()); - - memstore.clearSnapshot(snapshot.getId()); - } - - public void testCompaction2Buckets() throws IOException { - - String[] keys1 = { "A", "A", "B", "C" }; - String[] keys2 = { "A", "B", "D" }; - - long totalCellsLen1 = addRowsByKeys(memstore, keys1); - int oneCellOnCSLMHeapSize = 120; - int oneCellOnCAHeapSize = 88; - long totalHeapSize1 = 4 * oneCellOnCSLMHeapSize; - assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); - assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); - MemstoreSize size = memstore.getFlushableSize(); - - ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact - int counter = 0; - for ( Segment s : memstore.getSegments()) { - counter += s.getCellsCount(); - } - assertEquals(3,counter); - assertEquals(0, memstore.getSnapshot().getCellsCount()); - // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting - // totalCellsLen - totalCellsLen1 = (totalCellsLen1 * 3) / 4; - totalHeapSize1 = 3 * oneCellOnCAHeapSize; - assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); - assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); - - long totalCellsLen2 = addRowsByKeys(memstore, keys2); - long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize; - assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); - assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); - - size = memstore.getFlushableSize(); - ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact - assertEquals(0, memstore.getSnapshot().getCellsCount()); - counter = 0; - for ( Segment s : memstore.getSegments()) { - counter += s.getCellsCount(); - } - assertEquals(4,counter); - totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2 - assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); - totalHeapSize2 = 1 * oneCellOnCAHeapSize; - assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); - - size = memstore.getFlushableSize(); - MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemstoreSize(size); // simulate flusher - ImmutableSegment s = memstore.getSnapshot(); - assertEquals(4, s.getCellsCount()); - assertEquals(0, regionServicesForStores.getMemstoreSize()); - - memstore.clearSnapshot(snapshot.getId()); - } - - public void testCompaction3Buckets() throws IOException { - - String[] keys1 = { "A", "A", "B", "C" }; - String[] keys2 = { "A", "B", "D" }; - String[] keys3 = { "D", "B", "B" }; - - long totalCellsLen1 = addRowsByKeys(memstore, keys1); - int oneCellOnCSLMHeapSize = 120; - int oneCellOnCAHeapSize = 88; - long totalHeapSize1 = 4 * oneCellOnCSLMHeapSize; - assertEquals(totalCellsLen1, region.getMemstoreSize()); - assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); - - MemstoreSize size = memstore.getFlushableSize(); - ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact - - assertEquals(0, memstore.getSnapshot().getCellsCount()); - // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting - // totalCellsLen - totalCellsLen1 = (totalCellsLen1 * 3) / 4; - totalHeapSize1 = 3 * oneCellOnCAHeapSize; - assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); - assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); - - long totalCellsLen2 = addRowsByKeys(memstore, keys2); - long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize; - - assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); - assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); - - ((CompactingMemStore) memstore).disableCompaction(); - size = memstore.getFlushableSize(); - ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction - assertEquals(0, memstore.getSnapshot().getCellsCount()); - assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); - assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); - - long totalCellsLen3 = addRowsByKeys(memstore, keys3); - long totalHeapSize3 = 3 * oneCellOnCSLMHeapSize; - assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, - regionServicesForStores.getMemstoreSize()); - assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3, - ((CompactingMemStore) memstore).heapSize()); - - ((CompactingMemStore) memstore).enableCompaction(); - size = memstore.getFlushableSize(); - ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact - while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); - } - assertEquals(0, memstore.getSnapshot().getCellsCount()); - // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. - // Out of total 10, only 4 cells are unique - totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated - totalCellsLen3 = 0;// All duplicated cells. - assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, - regionServicesForStores.getMemstoreSize()); - // Only 4 unique cells left - assertEquals(4 * oneCellOnCAHeapSize, ((CompactingMemStore) memstore).heapSize()); - - size = memstore.getFlushableSize(); - MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemstoreSize(size); // simulate flusher - ImmutableSegment s = memstore.getSnapshot(); - assertEquals(4, s.getCellsCount()); - assertEquals(0, regionServicesForStores.getMemstoreSize()); - - memstore.clearSnapshot(snapshot.getId()); - - } - - ////////////////////////////////////////////////////////////////////////////// - // Merging tests - ////////////////////////////////////////////////////////////////////////////// - @Test - public void testMerging() throws IOException { - - String[] keys1 = { "A", "A", "B", "C", "F", "H"}; - String[] keys2 = { "A", "B", "D", "G", "I", "J"}; - String[] keys3 = { "D", "B", "B", "E" }; - - MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; - memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, - String.valueOf(compactionType)); - ((CompactingMemStore)memstore).initiateType(compactionType); - addRowsByKeys(memstore, keys1); - - ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact - - while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); - } - assertEquals(0, memstore.getSnapshot().getCellsCount()); - - addRowsByKeys(memstore, keys2); // also should only flatten - - int counter2 = 0; - for ( Segment s : memstore.getSegments()) { - counter2 += s.getCellsCount(); - } - assertEquals(12, counter2); - - ((CompactingMemStore) memstore).disableCompaction(); - - ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening - assertEquals(0, memstore.getSnapshot().getCellsCount()); - - int counter3 = 0; - for ( Segment s : memstore.getSegments()) { - counter3 += s.getCellsCount(); - } - assertEquals(12, counter3); - - addRowsByKeys(memstore, keys3); - - int counter4 = 0; - for ( Segment s : memstore.getSegments()) { - counter4 += s.getCellsCount(); - } - assertEquals(16, counter4); - - ((CompactingMemStore) memstore).enableCompaction(); - - - ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact - while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); - } - assertEquals(0, memstore.getSnapshot().getCellsCount()); - - int counter = 0; - for ( Segment s : memstore.getSegments()) { - counter += s.getCellsCount(); - } - assertEquals(16,counter); - - MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - ImmutableSegment s = memstore.getSnapshot(); - memstore.clearSnapshot(snapshot.getId()); - } - - @Test - public void testCountOfCellsAfterFlatteningByScan() throws IOException { - String[] keys1 = { "A", "B", "C" }; // A, B, C - addRowsByKeysWith50Cols(memstore, keys1); - // this should only flatten as there are no duplicates - ((CompactingMemStore) memstore).flushInMemory(); - while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); - } - List scanners = memstore.getScanners(Long.MAX_VALUE); - // seek - int count = 0; - for(int i = 0; i < scanners.size(); i++) { - scanners.get(i).seek(KeyValue.LOWESTKEY); - while (scanners.get(i).next() != null) { - count++; - } - } - assertEquals("the count should be ", count, 150); - for(int i = 0; i < scanners.size(); i++) { - scanners.get(i).close(); - } - } - - @Test - public void testCountOfCellsAfterFlatteningByIterator() throws IOException { - String[] keys1 = { "A", "B", "C" }; // A, B, C - addRowsByKeysWith50Cols(memstore, keys1); - // this should only flatten as there are no duplicates - ((CompactingMemStore) memstore).flushInMemory(); - while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); - } - // Just doing the cnt operation here - MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator( - ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), - CellComparator.COMPARATOR, 10); - int cnt = 0; - try { - while (itr.next() != null) { - cnt++; - } - } finally { - itr.close(); - } - assertEquals("the count should be ", cnt, 150); - } - - - private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) { - byte[] fam = Bytes.toBytes("testfamily"); - for (int i = 0; i < keys.length; i++) { - long timestamp = System.currentTimeMillis(); - Threads.sleep(1); // to make sure each kv gets a different ts - byte[] row = Bytes.toBytes(keys[i]); - for(int j =0 ;j < 50; j++) { - byte[] qf = Bytes.toBytes("testqualifier"+j); - byte[] val = Bytes.toBytes(keys[i] + j); - KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); - hmc.add(kv, null); - } - } - } - - @Override - @Test - public void testPuttingBackChunksWithOpeningScanner() throws IOException { - byte[] row = Bytes.toBytes("testrow"); - byte[] fam = Bytes.toBytes("testfamily"); - byte[] qf1 = Bytes.toBytes("testqualifier1"); - byte[] qf2 = Bytes.toBytes("testqualifier2"); - byte[] qf3 = Bytes.toBytes("testqualifier3"); - byte[] qf4 = Bytes.toBytes("testqualifier4"); - byte[] qf5 = Bytes.toBytes("testqualifier5"); - byte[] qf6 = Bytes.toBytes("testqualifier6"); - byte[] qf7 = Bytes.toBytes("testqualifier7"); - byte[] val = Bytes.toBytes("testval"); - - // Setting up memstore - memstore.add(new KeyValue(row, fam, qf1, val), null); - memstore.add(new KeyValue(row, fam, qf2, val), null); - memstore.add(new KeyValue(row, fam, qf3, val), null); - - // Creating a snapshot - MemStoreSnapshot snapshot = memstore.snapshot(); - assertEquals(3, memstore.getSnapshot().getCellsCount()); - - // Adding value to "new" memstore - assertEquals(0, memstore.getActive().getCellsCount()); - memstore.add(new KeyValue(row, fam, qf4, val), null); - memstore.add(new KeyValue(row, fam, qf5, val), null); - assertEquals(2, memstore.getActive().getCellsCount()); - - // opening scanner before clear the snapshot - List scanners = memstore.getScanners(0); - // Shouldn't putting back the chunks to pool,since some scanners are opening - // based on their data - // close the scanners - for(KeyValueScanner scanner : snapshot.getScanners()) { - scanner.close(); - } - memstore.clearSnapshot(snapshot.getId()); - - assertTrue(chunkCreator.getPoolSize() == 0); - - // Chunks will be put back to pool after close scanners; - for (KeyValueScanner scanner : scanners) { - scanner.close(); - } - assertTrue(chunkCreator.getPoolSize() > 0); - - // clear chunks - chunkCreator.clearChunksInPool(); - - // Creating another snapshot - - snapshot = memstore.snapshot(); - // Adding more value - memstore.add(new KeyValue(row, fam, qf6, val), null); - memstore.add(new KeyValue(row, fam, qf7, val), null); - // opening scanners - scanners = memstore.getScanners(0); - // close scanners before clear the snapshot - for (KeyValueScanner scanner : scanners) { - scanner.close(); - } - // Since no opening scanner, the chunks of snapshot should be put back to - // pool - // close the scanners - for(KeyValueScanner scanner : snapshot.getScanners()) { - scanner.close(); - } - memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkCreator.getPoolSize() > 0); - } - - @Test - public void testPuttingBackChunksAfterFlushing() throws IOException { - byte[] row = Bytes.toBytes("testrow"); - byte[] fam = Bytes.toBytes("testfamily"); - byte[] qf1 = Bytes.toBytes("testqualifier1"); - byte[] qf2 = Bytes.toBytes("testqualifier2"); - byte[] qf3 = Bytes.toBytes("testqualifier3"); - byte[] qf4 = Bytes.toBytes("testqualifier4"); - byte[] qf5 = Bytes.toBytes("testqualifier5"); - byte[] val = Bytes.toBytes("testval"); - - // Setting up memstore - memstore.add(new KeyValue(row, fam, qf1, val), null); - memstore.add(new KeyValue(row, fam, qf2, val), null); - memstore.add(new KeyValue(row, fam, qf3, val), null); - - // Creating a snapshot - MemStoreSnapshot snapshot = memstore.snapshot(); - assertEquals(3, memstore.getSnapshot().getCellsCount()); - - // Adding value to "new" memstore - assertEquals(0, memstore.getActive().getCellsCount()); - memstore.add(new KeyValue(row, fam, qf4, val), null); - memstore.add(new KeyValue(row, fam, qf5, val), null); - assertEquals(2, memstore.getActive().getCellsCount()); - // close the scanners - for(KeyValueScanner scanner : snapshot.getScanners()) { - scanner.close(); - } - memstore.clearSnapshot(snapshot.getId()); - - int chunkCount = chunkCreator.getPoolSize(); - assertTrue(chunkCount > 0); - } - - - private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) { - byte[] fam = Bytes.toBytes("testfamily"); - byte[] qf = Bytes.toBytes("testqualifier"); - MemstoreSize memstoreSize = new MemstoreSize(); - for (int i = 0; i < keys.length; i++) { - long timestamp = System.currentTimeMillis(); - Threads.sleep(1); // to make sure each kv gets a different ts - byte[] row = Bytes.toBytes(keys[i]); - byte[] val = Bytes.toBytes(keys[i] + i); - KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); - hmc.add(kv, memstoreSize); - LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); - } - regionServicesForStores.addMemstoreSize(memstoreSize); - return memstoreSize.getDataSize(); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java new file mode 100644 index 0000000..b6a3e51 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java @@ -0,0 +1,562 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.Threads; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.List; + +/** + * compacted memstore test case + */ +@Category({RegionServerTests.class, MediumTests.class}) +@RunWith(Parameterized.class) +public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore { + @Parameterized.Parameters + public static Object[] data() { + return new Object[] { "CHUNK_MAP", "ARRAY_MAP" }; // test different immutable indexes + } + private static final Log LOG = LogFactory.getLog(TestCompactingToCellFlatMapMemStore.class); + public final boolean toCellChunkMap; + Configuration conf; + ////////////////////////////////////////////////////////////////////////////// + // Helpers + ////////////////////////////////////////////////////////////////////////////// + public TestCompactingToCellFlatMapMemStore(String type){ + if (type == "CHUNK_MAP") { + toCellChunkMap = true; + } else { + toCellChunkMap = false; + } + } + + @Override public void tearDown() throws Exception { + chunkCreator.clearChunksInPool(); + } + + @Override public void setUp() throws Exception { + + compactingSetUp(); + this.conf = HBaseConfiguration.create(); + + // set memstore to do data compaction + conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(MemoryCompactionPolicy.EAGER)); + + this.memstore = + new CompactingMemStore(conf, CellComparator.COMPARATOR, store, + regionServicesForStores, MemoryCompactionPolicy.EAGER); + } + + ////////////////////////////////////////////////////////////////////////////// + // Compaction tests + ////////////////////////////////////////////////////////////////////////////// + public void testCompaction1Bucket() throws IOException { + int counter = 0; + String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 + if (toCellChunkMap) { + // set memstore to flat into CellChunkMap + conf.set(CompactingMemStore.COMPACTING_MEMSTORE_CHUNK_MAP_KEY, String.valueOf(true)); + ((CompactingMemStore)memstore).setIndexType(); + } + + // test 1 bucket + long totalCellsLen = addRowsByKeys(memstore, keys1); + long cellBeforeFlushSize = cellBeforeFlushSize(); + long cellAfterFlushSize = cellAfterFlushSize(); + long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; + + assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); + + assertEquals(4, memstore.getActive().getCellsCount()); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + assertEquals(0, memstore.getSnapshot().getCellsCount()); + // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting + // totalCellsLen + totalCellsLen = (totalCellsLen * 3) / 4; + assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize()); + + totalHeapSize = + 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD + + (toCellChunkMap ? + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); + assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(3, counter); + MemstoreSize size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.decrMemstoreSize(size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(3, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getMemstoreSize()); + + memstore.clearSnapshot(snapshot.getId()); + } + + public void testCompaction2Buckets() throws IOException { + if (toCellChunkMap) { + // set memstore to flat into CellChunkMap + conf.set(CompactingMemStore.COMPACTING_MEMSTORE_CHUNK_MAP_KEY, String.valueOf(true)); + ((CompactingMemStore)memstore).setIndexType(); + } + String[] keys1 = { "A", "A", "B", "C" }; + String[] keys2 = { "A", "B", "D" }; + + long totalCellsLen1 = addRowsByKeys(memstore, keys1); // INSERT 4 + long cellBeforeFlushSize = cellBeforeFlushSize(); + long cellAfterFlushSize = cellAfterFlushSize(); + long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; + assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + int counter = 0; // COMPACT 4->3 + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(3,counter); + assertEquals(0, memstore.getSnapshot().getCellsCount()); + // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting + // totalCellsLen + totalCellsLen1 = (totalCellsLen1 * 3) / 4; + totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD + + (toCellChunkMap ? + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); + assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); + + long totalCellsLen2 = addRowsByKeys(memstore, keys2); // INSERT 3 (3+3=6) + long totalHeapSize2 = 3 * cellBeforeFlushSize; + assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4 + counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(4,counter); + totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2 + assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); + totalHeapSize2 = 1 * cellAfterFlushSize; + assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); + + MemstoreSize size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.decrMemstoreSize(size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(4, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getMemstoreSize()); + + memstore.clearSnapshot(snapshot.getId()); + } + + public void testCompaction3Buckets() throws IOException { + if (toCellChunkMap) { + // set memstore to flat into CellChunkMap + conf.set(CompactingMemStore.COMPACTING_MEMSTORE_CHUNK_MAP_KEY, String.valueOf(true)); + ((CompactingMemStore)memstore).setIndexType(); + } + String[] keys1 = { "A", "A", "B", "C" }; + String[] keys2 = { "A", "B", "D" }; + String[] keys3 = { "D", "B", "B" }; + + long totalCellsLen1 = addRowsByKeys(memstore, keys1); + long cellBeforeFlushSize = cellBeforeFlushSize(); + long cellAfterFlushSize = cellAfterFlushSize(); + long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; + assertEquals(totalCellsLen1, region.getMemstoreSize()); + assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); + + MemstoreSize size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + + assertEquals(0, memstore.getSnapshot().getCellsCount()); + // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting + // totalCellsLen + totalCellsLen1 = (totalCellsLen1 * 3) / 4; + totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD + + (toCellChunkMap ? + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); + assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); + + long totalCellsLen2 = addRowsByKeys(memstore, keys2); + long totalHeapSize2 = 3 * cellBeforeFlushSize; + + assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); + + ((CompactingMemStore) memstore).disableCompaction(); + size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction + totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); + + long totalCellsLen3 = addRowsByKeys(memstore, keys3); + long totalHeapSize3 = 3 * cellBeforeFlushSize; + assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, + regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3, + ((CompactingMemStore) memstore).heapSize()); + + ((CompactingMemStore) memstore).enableCompaction(); + size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. + // Out of total 10, only 4 cells are unique + totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated + totalCellsLen3 = 0;// All duplicated cells. + assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, + regionServicesForStores.getMemstoreSize()); + // Only 4 unique cells left + long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD + + (toCellChunkMap ? + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); + assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize()); + + size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.decrMemstoreSize(size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(4, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getMemstoreSize()); + + memstore.clearSnapshot(snapshot.getId()); + + } + + ////////////////////////////////////////////////////////////////////////////// + // Merging tests + ////////////////////////////////////////////////////////////////////////////// + @Test + public void testMerging() throws IOException { + if (toCellChunkMap) { + // set memstore to flat into CellChunkMap + conf.set(CompactingMemStore.COMPACTING_MEMSTORE_CHUNK_MAP_KEY, String.valueOf(true)); + ((CompactingMemStore)memstore).setIndexType(); + } + String[] keys1 = { "A", "A", "B", "C", "F", "H"}; + String[] keys2 = { "A", "B", "D", "G", "I", "J"}; + String[] keys3 = { "D", "B", "B", "E" }; + + MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; + memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(compactionType)); + ((CompactingMemStore)memstore).initiateType(compactionType); + addRowsByKeys(memstore, keys1); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact + + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + addRowsByKeys(memstore, keys2); // also should only flatten + + int counter2 = 0; + for ( Segment s : memstore.getSegments()) { + counter2 += s.getCellsCount(); + } + assertEquals(12, counter2); + + ((CompactingMemStore) memstore).disableCompaction(); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + int counter3 = 0; + for ( Segment s : memstore.getSegments()) { + counter3 += s.getCellsCount(); + } + assertEquals(12, counter3); + + addRowsByKeys(memstore, keys3); + + int counter4 = 0; + for ( Segment s : memstore.getSegments()) { + counter4 += s.getCellsCount(); + } + assertEquals(16, counter4); + + ((CompactingMemStore) memstore).enableCompaction(); + + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + int counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(16,counter); + + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + ImmutableSegment s = memstore.getSnapshot(); + memstore.clearSnapshot(snapshot.getId()); + } + + @Test + public void testCountOfCellsAfterFlatteningByScan() throws IOException { + String[] keys1 = { "A", "B", "C" }; // A, B, C + addRowsByKeysWith50Cols(memstore, keys1); + // this should only flatten as there are no duplicates + ((CompactingMemStore) memstore).flushInMemory(); + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + List scanners = memstore.getScanners(Long.MAX_VALUE); + // seek + int count = 0; + for(int i = 0; i < scanners.size(); i++) { + scanners.get(i).seek(KeyValue.LOWESTKEY); + while (scanners.get(i).next() != null) { + count++; + } + } + assertEquals("the count should be ", count, 150); + for(int i = 0; i < scanners.size(); i++) { + scanners.get(i).close(); + } + } + + @Test + public void testCountOfCellsAfterFlatteningByIterator() throws IOException { + String[] keys1 = { "A", "B", "C" }; // A, B, C + addRowsByKeysWith50Cols(memstore, keys1); + // this should only flatten as there are no duplicates + ((CompactingMemStore) memstore).flushInMemory(); + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + // Just doing the cnt operation here + MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator( + ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), + CellComparator.COMPARATOR, 10); + int cnt = 0; + try { + while (itr.next() != null) { + cnt++; + } + } finally { + itr.close(); + } + assertEquals("the count should be ", cnt, 150); + } + + private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) { + byte[] fam = Bytes.toBytes("testfamily"); + for (int i = 0; i < keys.length; i++) { + long timestamp = System.currentTimeMillis(); + Threads.sleep(1); // to make sure each kv gets a different ts + byte[] row = Bytes.toBytes(keys[i]); + for(int j =0 ;j < 50; j++) { + byte[] qf = Bytes.toBytes("testqualifier"+j); + byte[] val = Bytes.toBytes(keys[i] + j); + KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); + hmc.add(kv, null); + } + } + } + + @Override + @Test + public void testPuttingBackChunksWithOpeningScanner() throws IOException { + byte[] row = Bytes.toBytes("testrow"); + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf1 = Bytes.toBytes("testqualifier1"); + byte[] qf2 = Bytes.toBytes("testqualifier2"); + byte[] qf3 = Bytes.toBytes("testqualifier3"); + byte[] qf4 = Bytes.toBytes("testqualifier4"); + byte[] qf5 = Bytes.toBytes("testqualifier5"); + byte[] qf6 = Bytes.toBytes("testqualifier6"); + byte[] qf7 = Bytes.toBytes("testqualifier7"); + byte[] val = Bytes.toBytes("testval"); + + // Setting up memstore + memstore.add(new KeyValue(row, fam, qf1, val), null); + memstore.add(new KeyValue(row, fam, qf2, val), null); + memstore.add(new KeyValue(row, fam, qf3, val), null); + + // Creating a snapshot + MemStoreSnapshot snapshot = memstore.snapshot(); + assertEquals(3, memstore.getSnapshot().getCellsCount()); + + // Adding value to "new" memstore + assertEquals(0, memstore.getActive().getCellsCount()); + memstore.add(new KeyValue(row, fam, qf4, val), null); + memstore.add(new KeyValue(row, fam, qf5, val), null); + assertEquals(2, memstore.getActive().getCellsCount()); + + // opening scanner before clear the snapshot + List scanners = memstore.getScanners(0); + // Shouldn't putting back the chunks to pool,since some scanners are opening + // based on their data + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } + memstore.clearSnapshot(snapshot.getId()); + + assertTrue(chunkCreator.getPoolSize() == 0); + + // Chunks will be put back to pool after close scanners; + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + assertTrue(chunkCreator.getPoolSize() > 0); + + // clear chunks + chunkCreator.clearChunksInPool(); + + // Creating another snapshot + + snapshot = memstore.snapshot(); + // Adding more value + memstore.add(new KeyValue(row, fam, qf6, val), null); + memstore.add(new KeyValue(row, fam, qf7, val), null); + // opening scanners + scanners = memstore.getScanners(0); + // close scanners before clear the snapshot + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + // Since no opening scanner, the chunks of snapshot should be put back to + // pool + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } + memstore.clearSnapshot(snapshot.getId()); + assertTrue(chunkCreator.getPoolSize() > 0); + } + + @Test + public void testPuttingBackChunksAfterFlushing() throws IOException { + byte[] row = Bytes.toBytes("testrow"); + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf1 = Bytes.toBytes("testqualifier1"); + byte[] qf2 = Bytes.toBytes("testqualifier2"); + byte[] qf3 = Bytes.toBytes("testqualifier3"); + byte[] qf4 = Bytes.toBytes("testqualifier4"); + byte[] qf5 = Bytes.toBytes("testqualifier5"); + byte[] val = Bytes.toBytes("testval"); + + // Setting up memstore + memstore.add(new KeyValue(row, fam, qf1, val), null); + memstore.add(new KeyValue(row, fam, qf2, val), null); + memstore.add(new KeyValue(row, fam, qf3, val), null); + + // Creating a snapshot + MemStoreSnapshot snapshot = memstore.snapshot(); + assertEquals(3, memstore.getSnapshot().getCellsCount()); + + // Adding value to "new" memstore + assertEquals(0, memstore.getActive().getCellsCount()); + memstore.add(new KeyValue(row, fam, qf4, val), null); + memstore.add(new KeyValue(row, fam, qf5, val), null); + assertEquals(2, memstore.getActive().getCellsCount()); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } + memstore.clearSnapshot(snapshot.getId()); + + int chunkCount = chunkCreator.getPoolSize(); + assertTrue(chunkCount > 0); + } + + + private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) { + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf = Bytes.toBytes("testqualifier"); + MemstoreSize memstoreSize = new MemstoreSize(); + for (int i = 0; i < keys.length; i++) { + long timestamp = System.currentTimeMillis(); + Threads.sleep(1); // to make sure each kv gets a different ts + byte[] row = Bytes.toBytes(keys[i]); + byte[] val = Bytes.toBytes(keys[i] + i); + KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); + hmc.add(kv, memstoreSize); + LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); + } + regionServicesForStores.addMemstoreSize(memstoreSize); + return memstoreSize.getDataSize(); + } + + private long cellBeforeFlushSize() { + // make one cell + byte[] row = Bytes.toBytes("A"); + byte[] val = Bytes.toBytes("A" + 0); + KeyValue kv = + new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), + System.currentTimeMillis(), val); + return ClassSize.align( + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv)); + } + + private long cellAfterFlushSize() { + // make one cell + byte[] row = Bytes.toBytes("A"); + byte[] val = Bytes.toBytes("A" + 0); + KeyValue kv = + new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), + System.currentTimeMillis(), val); + + return toCellChunkMap ? + ClassSize.align( + ClassSize.CELL_CHUNK_MAP_ENTRY + KeyValueUtil.length(kv)) : + ClassSize.align( + ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 3b15ff3..439f3d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -965,9 +965,12 @@ public class TestDefaultMemStore { conf, FSTableDescriptors.createMetaTableDescriptor(conf), wFactory.getMetaWAL(HRegionInfo.FIRST_META_REGIONINFO. getEncodedNameAsBytes())); - HRegionInfo hri = new HRegionInfo(TableName.valueOf(name.getMethodName()), + // parameterized tests add [#] suffix get rid of [ and ]. + HRegionInfo hri = + new HRegionInfo(TableName.valueOf(name.getMethodName().replaceAll("[\\[\\]]", "_")), Bytes.toBytes("row_0200"), Bytes.toBytes("row_0300")); - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName())); + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf( + name.getMethodName().replaceAll("[\\[\\]]", "_"))); desc.addFamily(new HColumnDescriptor("foo".getBytes())); HRegion r = HRegion.createHRegion(hri, testDir, conf, desc, -- 1.8.5.2 (Apple Git-48)