diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellBlock.java new file mode 100644 index 0000000..69ddfc9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellBlock.java @@ -0,0 +1,510 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.SortedSet; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.ByteRange; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; + +@InterfaceAudience.Private +public class CellBlock { + private static final Log LOG = LogFactory.getLog(CellBlock.class); + // TODO we can use HFile block size for this value also (?) + public static final String CELL_BLOCK_FRAGMENT_SIZE_KEY = + "hbase.regionserver.cellblock.fragment.size"; + private static final int DEFAULT_CELL_BLOCK_FRAGMENT_SIZE = 64 * 1024;// 64 KB + + private Configuration conf; + private final int fragmentSize; + private PositionedByteRange buffers[]; + private final TimeRangeTracker timeRangeTracker; + private final KVComparator c; + private final MemStoreLAB allocator; + private final NavigableMap> indexMeta; + private int cellsCount; + private byte[] lastRK = null; + + CellBlock(TimeRangeTracker timeRangeTracker, KVComparator c, MemStoreLAB allocator, + Configuration conf) { + this.timeRangeTracker = timeRangeTracker; + this.c = c; + this.allocator = allocator; + this.conf = conf; + this.fragmentSize = conf.getInt(CELL_BLOCK_FRAGMENT_SIZE_KEY, + DEFAULT_CELL_BLOCK_FRAGMENT_SIZE); + this.indexMeta = new TreeMap>(c); + } + + public TimeRangeTracker getTimeRangeTracker() { + return this.timeRangeTracker; + } + + public KeyValueScanner getScanner(long readPoint) { + return new CellBlockScanner(readPoint, this); + } + + public int getCellsCount() { + return this.cellsCount; + } + + public boolean flush(CellSkipListSet src, int flushSize) { + assert flushSize > 0; + int chunkSize = conf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY, HeapMemStoreLAB.CHUNK_SIZE_DEFAULT); + List bufferLst = allocateBuffer(flushSize, chunkSize); + if (bufferLst.isEmpty()) return false; + int offset = 0; + int lastFragmentOffset = 0; + int curKvFlushSize = -1; + Cell prevCell = null; + Cell curCell = null; + Iterator itr = src.iterator(); + int bufIndex = 0; + PositionedByteRange curBuffer = bufferLst.get(bufIndex); + int remainingFlushSize = flushSize; + while (itr.hasNext()) { + curCell = itr.next(); + curKvFlushSize = KeyValueUtil.keyLength(curCell) + curCell.getValueLength() + + curCell.getTagsLength() + KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + + Bytes.SIZEOF_LONG; + if (curKvFlushSize > chunkSize) { + // TODO need to release all the allocated chunks here. + // We can not store any when kv its flushSize > chunkSize. We need to fail the flush then. + LOG.debug(curCell + " size is bigger than chunkSize : " + chunkSize + + ". It can never get flushed to any chunks. Failing the entire flush"); + return false; + } + // Bytes.SIZEOF_LONG is for storing mvcc# + // Checking whether this kv can go into the curBuffer. If no enough space in that, then we + // will have to use the next buffer in the bufferLst + if (curBuffer.getLength() - curBuffer.getPosition() < curKvFlushSize) { + // Re adjust the curBuffer length + curBuffer.setLength(curBuffer.getPosition()); + bufIndex++; + if (bufIndex < bufferLst.size()) { + curBuffer = bufferLst.get(bufIndex); + } else { + LOG.info("The preallocated buffers are over. Going for a new allocation"); + ByteRange allocation = this.allocator.allocateBytes(Math.min(remainingFlushSize, + chunkSize)); + if (allocation == null) return false;// TODO need to release all the allocated chunks here + PositionedByteRange buffer = new SimplePositionedByteRange(allocation.getBytes(), + allocation.getOffset(), flushSize); + bufferLst.add(buffer); + curBuffer = buffer; + } + } + if (prevCell == null + || ((offset - lastFragmentOffset) >= this.fragmentSize && (!(dupKey(prevCell, curCell))))) { + // The size of kvs added so far reached the fragment size.. Create a new fragment and add + // the remaining kvs (Including this one) there. We store the fragments startkey vs offset + // in a Map. During seeks this table can be used to skip lower fragments. + newFragment(curCell, bufIndex, offset); + lastFragmentOffset = offset; + } + offset += append(curBuffer, curCell); + prevCell = curCell; + this.cellsCount++; + remainingFlushSize -= curKvFlushSize; + } + // Re adjust the last curBuffer length. This was nit adjusted in the loop. + curBuffer.setLength(curBuffer.getPosition()); + assert curCell != null; + this.lastRK = CellUtil.cloneRow(curCell); + this.buffers = new SimplePositionedByteRange[bufferLst.size()]; + bufferLst.toArray(this.buffers); + return true; + } + + private List allocateBuffer(int flushSize, int chunkSize) { + LOG.debug("Trying to allocate total [" + flushSize + "] bytes for cell block with chunkSize : " + + chunkSize); + List bufferLst = new ArrayList(); + // Trying to allocate 'flushSize' in 1 or more chunks. Try to get those many chunks as needed + int remainingAllocSize = flushSize; + while (remainingAllocSize > 0) { + int allocSize = Math.min(remainingAllocSize, chunkSize); + ByteRange allocation = this.allocator.allocateBytes(allocSize); + if (allocation == null) break; + PositionedByteRange buffer = new SimplePositionedByteRange(allocation.getBytes(), + allocation.getOffset(), allocation.getLength()); + bufferLst.add(buffer); + remainingAllocSize -= allocSize; + } + if (remainingAllocSize > 0) { + // we were not able to get enough chunks to allocate 'flushSize'. Just return an empty + // bufferLst so that the callee will fail the flush. + bufferLst.clear(); + // TODO we need to put back the allocated bytes. But how? No API support as of now. This + // allocated bytes also get releases when the allocate is closed. That will happen at end of + // snapshot flush + } + return bufferLst; + } + + private boolean dupKey(Cell prev, Cell cur) { + return this.c.compareOnlyKeyPortion(prev, cur) == 0; + } + + private void newFragment(Cell curKv, int bufIndex, int offset) { + Cell beignKey = new KeyValue(curKv.getRowArray(), curKv.getRowOffset(), curKv.getRowLength(), + curKv.getFamilyArray(), curKv.getFamilyOffset(), curKv.getFamilyLength(), + curKv.getQualifierArray(), curKv.getQualifierOffset(), curKv.getQualifierLength(), + curKv.getTimestamp(), KeyValue.Type.codeToType(curKv.getTypeByte()), null, -1, 0); + this.indexMeta.put(beignKey, new Pair(bufIndex, offset)); + } + + private int append(final PositionedByteRange buffer, final Cell cell) { + int beginPos = buffer.getPosition(); + int klen = KeyValueUtil.keyLength(cell); + int vlen = cell.getValueLength(); + buffer.putInt(klen); + buffer.putInt(vlen); + KeyValueUtil.appendKeyTo(cell, buffer.getBytes(), buffer.getOffset() + buffer.getPosition()); + buffer.put(cell.getValueArray(), cell.getValueOffset(), vlen); + // Write the tagsLen with 2 bytes + int tl = cell.getTagsLength(); + byte tlh = (byte) tl; + tl >>= 8; + byte tll = (byte) tl; + buffer.put(tll); + buffer.put(tlh); + if (tl > 0) { + buffer.put(cell.getTagsArray(), cell.getTagsOffset(), tl); + } + buffer.putLong(cell.getSequenceId()); + return buffer.getPosition() - beginPos; + } + + static class CellBlockScanner extends NonLazyKeyValueScanner { + private long readPoint; + private KeyValue theNext; + private PositionedByteRange curBuf; + private int curBufIndex; + private final PositionedByteRange buffers[]; + private final CellBlock cellBlock; + + CellBlockScanner(long readPoint, CellBlock cellBlock) { + this.readPoint = readPoint; + this.cellBlock = cellBlock; + this.buffers = new SimplePositionedByteRange[cellBlock.buffers.length]; + int i = 0; + for (PositionedByteRange pbr : cellBlock.buffers) { + this.buffers[i++] = new SimplePositionedByteRange(pbr.getBytes(), pbr.getOffset(), + pbr.getLength()); + } + this.curBufIndex = 0; + this.curBuf = this.buffers[this.curBufIndex]; + this.cellBlock.allocator.incScannerCount(); + } + + @Override + public synchronized KeyValue peek() { + return this.theNext; + } + + @Override + public synchronized KeyValue next() throws IOException { + return next(true); + } + + private KeyValue next(boolean mvccCheck) throws IOException{ + if (theNext == null) { + return null; + } + final KeyValue ret = theNext; + readNextKV(mvccCheck); + return ret; + } + + @Override + public synchronized boolean seek(Cell k) throws IOException { + // When the seek kv is less than the current kv of the scanner, we need a rewind + boolean rewind = false; + if (this.theNext != null && this.cellBlock.c.compare(this.theNext, k) > 0) { + rewind = true; + } + return blockSeek(k, rewind); + } + + private boolean readNextKV(boolean mvccCheck) throws IOException { + int klen = 0, vlen = 0, tlen = 0; + long memstoreTS = 0; + boolean result = false; + int pos = 0; + while (this.curBufIndex < this.buffers.length) { + while (curBuf.getRemaining() > 0) { + pos = curBuf.getPosition(); // Mark + klen = curBuf.getInt(); + vlen = curBuf.getInt(); + curBuf.setPosition(curBuf.getPosition() + (klen + vlen)); // Skip + tlen = readTagsLen(); + curBuf.setPosition(curBuf.getPosition() + tlen); // Skip + memstoreTS = curBuf.getLong(); + if (!mvccCheck || memstoreTS <= this.readPoint) { + result = true; + break; + } + } + if (result) break; + // Go to the next buffer + this.curBufIndex++; + if(this.curBufIndex < this.buffers.length){ + this.curBuf = this.buffers[this.curBufIndex]; + this.curBuf.setPosition(0); + } + } + + if (result) { + int kvLen = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + klen + vlen + tlen; + // PositionedByteRange#getBytes() to deal with on heap or offheap bytes storage. + this.theNext = new KeyValue(curBuf.getBytes(), curBuf.getOffset() + pos, kvLen); + this.theNext.setSequenceId(memstoreTS); + } else { + this.theNext = null; + } + return result; + } + + // TODO add API in PBR? + private int readTagsLen() { + int tlen = 0; + tlen ^= curBuf.get() & 0xFF; + tlen <<= 8; + tlen ^= curBuf.get() & 0xFF; + return tlen; + } + + protected boolean blockSeek(Cell seekKey, boolean rewind) throws IOException { + Entry> floorEntry = this.cellBlock.indexMeta.floorEntry(seekKey); + if (floorEntry == null) { + // We have to seek within the 1st fragment itself. + if (rewind) { + this.curBufIndex = 0; + this.curBuf = this.buffers[this.curBufIndex]; + this.curBuf.setPosition(0); + } + } else { + if (rewind || (this.curBufIndex < floorEntry.getValue().getFirst() + || curBuf.getPosition() < floorEntry.getValue().getSecond())) { + this.curBufIndex = floorEntry.getValue().getFirst(); + this.curBuf = this.buffers[this.curBufIndex]; + this.curBuf.setPosition(floorEntry.getValue().getSecond()); + } + } + return seekInFragment(seekKey, false); + } + + private boolean seekInFragment(Cell seekKey, boolean seekBefore) throws IOException { + int klen, vlen, tlen; + int pos = 0; + long memstoreTS = -1; + boolean result = false; + KeyValue lastKv = null; + KeyOnlyKeyValue curKey = new KeyOnlyKeyValue(); + while (this.curBufIndex < this.buffers.length) { + while (curBuf.getRemaining() > 0) { + pos = curBuf.getPosition(); // Mark + klen = curBuf.getInt(); + vlen = curBuf.getInt(); + curKey.setKey(curBuf.getBytes(), curBuf.getOffset() + curBuf.getPosition(), klen); + int comp = compare(seekKey, curKey); + if (comp <= 0) { + result = true; + curBuf.setPosition(pos);// Reset + if (seekBefore) { + break; + } else { + return readNextKV(true); + } + } + curBuf.setPosition(pos + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + (klen + vlen));// Skip + // This will contain tags + tlen = readTagsLen(); + curBuf.setPosition(curBuf.getPosition() + tlen); + // Skip the memstore TS + memstoreTS = curBuf.getLong(); + lastKv = new KeyValue(curBuf.getBytes(), curBuf.getOffset() + pos, (klen + vlen + + KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + tlen)); + } + if (result) break; + // Go to the next buffer + this.curBufIndex++; + if(this.curBufIndex < this.buffers.length){ + this.curBuf = this.buffers[this.curBufIndex]; + this.curBuf.setPosition(0); + } + } + if (seekBefore && lastKv != null) { + // pos is the begin pos of the last KV. + curBuf.setPosition(pos); + assert memstoreTS >= 0; + if (memstoreTS <= this.readPoint) { + this.theNext = lastKv; + this.theNext.setSequenceId(memstoreTS); + return true; + } + return seekBefore(lastKv); + } + this.theNext = null; + return false; // didn't exactly find it. + } + + public synchronized boolean seekBefore(Cell kv) throws IOException { + NavigableMap> headMap = this.cellBlock.indexMeta.headMap(kv, + false); + if (headMap.isEmpty()) return false; + this.curBufIndex = headMap.lastEntry().getValue().getFirst(); + this.curBuf = this.buffers[this.curBufIndex]; + this.curBuf.setPosition(headMap.lastEntry().getValue().getSecond()); + return seekInFragment(kv, true); + } + + // TODO we need new comparator class which can compare BRs. + // In case of offheap BR we can not call this getBytes(). + private int compare(Cell c, Cell key) { + return this.cellBlock.c.compareOnlyKeyPortion(c, key); + } + + @Override + public synchronized boolean reseek(Cell key) throws IOException { + return blockSeek(getHighest(key, this.theNext), false); + } + + private Cell getHighest(Cell first, Cell second) { + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + int compare = this.cellBlock.c.compare(first, second); + return (compare > 0 ? first : second); + } + return (first != null ? first : second); + } + + @Override + public long getSequenceID() { + return Long.MAX_VALUE; + } + + @Override + public synchronized void close() { + this.theNext = null; + this.cellBlock.allocator.decScannerCount(); + } + + @Override + public boolean shouldUseScanner(Scan scan, SortedSet columns, long oldestUnexpiredTS) { + return (this.cellBlock.timeRangeTracker.includesTimeRange(scan.getTimeRange())) + && (this.cellBlock.timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS); + } + + @Override + public synchronized boolean backwardSeek(Cell key) throws IOException { + seek(key); + if (theNext == null + || Bytes.compareTo(theNext.getRowArray(), theNext.getRowOffset(), + theNext.getRowLength(), key.getRowArray(), key.getRowOffset(), + key.getRowLength()) > 0) { + return seekToPreviousRow(key); + } + return true; + } + + @Override + public synchronized boolean seekToPreviousRow(Cell key) throws IOException { + KeyValue seekKey = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(), + key.getRowLength()); + if (!seekBefore(seekKey)) { + close(); + return false; + } + KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(this.theNext.getRowArray(), + this.theNext.getRowOffset(), this.theNext.getRowLength()); + + if (!seek(firstKeyOfPreviousRow)) { + close(); + return false; + } + + boolean resultOfSkipKVs; + resultOfSkipKVs = skipKVsNewerThanReadpoint(); + if (!resultOfSkipKVs + || Bytes.compareTo(theNext.getBuffer(), theNext.getRowOffset(), theNext.getRowLength(), + firstKeyOfPreviousRow.getBuffer(), firstKeyOfPreviousRow.getRowOffset(), + firstKeyOfPreviousRow.getRowLength()) > 0) { + return seekToPreviousRow(firstKeyOfPreviousRow); + } + + return true; + } + + protected boolean skipKVsNewerThanReadpoint() throws IOException { + // We want to ignore all key-values that are newer than our current readPoint + KeyValue startKV = this.theNext; + // enforceMVCC + while (theNext != null && (theNext.getMvccVersion() > readPoint)) { + theNext = next(false); + if (Bytes.compareTo(theNext.getRowArray(), theNext.getRowOffset(), theNext.getRowLength(), + startKV.getRowArray(), startKV.getRowOffset(), startKV.getRowLength()) > 0) { + return false; + } + } + + if (theNext == null) { + close(); + return false; + } + + return true; + } + + @Override + public synchronized boolean seekToLastRow() throws IOException { + assert this.cellBlock.lastRK != null; + KeyValue seekKey = KeyValueUtil.createFirstOnRow(this.cellBlock.lastRK); + if (seek(seekKey)) { + return true; + } else { + return seekToPreviousRow(seekKey); + } + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellBlockManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellBlockManager.java new file mode 100644 index 0000000..3e8930b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellBlockManager.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Threads; + +public class CellBlockManager { + + private static final String CELLBLOCK_FLUSHER_THREADS_COUNT = + "hbase.regionserver.cellblock.flusher.threads.count"; + private static final int DEFAULT_CELLBLOCK_FLUSHER_THREADS_COUNT = 10; + + private static final Log LOG = LogFactory.getLog(CellBlockManager.class); + + private static CellBlockManager instance = null; + + private ThreadPoolExecutor executor; + private final MultiVersionConsistencyControl mvcc; + + private CellBlockManager(Configuration conf, MultiVersionConsistencyControl mvcc) { + int threadsCount = conf.getInt(CELLBLOCK_FLUSHER_THREADS_COUNT, + DEFAULT_CELLBLOCK_FLUSHER_THREADS_COUNT); + this.mvcc = mvcc; + this.executor = Threads.getBoundedCachedThreadPool(threadsCount, 30L, TimeUnit.SECONDS, + new ThreadFactory() { + private int count = 1; + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "CellBlockFlusher-" + count++); + } + }); + } + + public static CellBlockManager getInstance(Configuration conf, MultiVersionConsistencyControl mvcc) { + if (instance == null) { + synchronized (CellBlockManager.class) { + if (instance == null) { + instance = new CellBlockManager(conf, mvcc); + } + } + } + return instance; + } + + public void flushTo(final CellSkipListSet src, final int flushSize, + final CellBlock destination, final CellBlocksBackedMemStore memStore) { + if (src.isEmpty()) return; + this.executor.submit(new Runnable() { + @Override + public void run() { + // Wait for previous mvcc ops to finish so that the late binding of the seqId to all the + // cells will be over. This might wait some extra time waiting for the seqId for the latest + // cells added which are not going to get flushed now. But that is ok. + mvcc.waitForPreviousTransactionsComplete(); + if(destination.flush(src, flushSize)){ + memStore.finishFlushToCellBlock(destination, src); + LOG.info("Finished on memory flush for the MemStore."); + } else { + LOG.warn("Can not do a CellBlock flush. Not able to allocate enough space!"); + } + } + }); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellBlocksBackedMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellBlocksBackedMemStore.java new file mode 100644 index 0000000..324881d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellBlocksBackedMemStore.java @@ -0,0 +1,869 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; + +@InterfaceAudience.Private +public class CellBlocksBackedMemStore implements MemStore { + + private static final Log LOG = LogFactory.getLog(CellBlocksBackedMemStore.class); + public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + + (17 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + (2 * Bytes.SIZEOF_INT) + + Bytes.SIZEOF_BOOLEAN); + public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + + (4 * ClassSize.CELL_SKIPLIST_SET) + (4 * ClassSize.CONCURRENT_SKIPLISTMAP) + + (2 * ClassSize.ATOMIC_LONG) + (3 * ClassSize.TIMERANGE_TRACKER) + ClassSize.OBJECT + + (2 * ClassSize.ARRAYLIST)); + + public static final String CELLBLOCK_MIN_SIZE_KEY = "hbase.hregion.memstore.cellblock.min.size"; + private static final int DEFAULT_CELLBLOCK_SIZE = 128 * 1024 * 1024; // Defaults to 128MB. + + private final int cellBlockMinSize; + private final Configuration conf; + private final KeyValue.KVComparator comparator; + private final boolean isMetaRegion; + private final CellBlockManager manager; + + // current "in-service" kvset which allows update + private volatile CellSkipListSet cellSet; + // read-only kvset reference for in-memory flush to form cell block + private volatile CellSkipListSet cellSetForCellBlock; + // list to store cell blocks generated by in-memory flush + private volatile List cellBlocks; + private volatile TimeRangeTracker trt; + private volatile TimeRangeTracker trtForCellBlock; + + // snapshot of the current "in-service" kvset + private volatile CellSkipListSet snapshot; + // snapshot of the "read-only" kvset for in-memory flush + private volatile CellSkipListSet snapshotForCellBlock; + // snapshot of the list which stores cell blocks generated by in-memory flush + private volatile List snapshotCellBlocks; + private volatile TimeRangeTracker snapshotTrt; + + private volatile long timeOfOldestEdit = Long.MAX_VALUE; + + private AtomicLong totalSize; + // Estimate the heap size of kv after it's in-memory flushed + private AtomicInteger size; + private Object lock = new Object(); + private volatile MemStoreLAB allocator; + private volatile MemStoreLAB snapshotAllocator; + + private volatile long snapshotId; + private volatile long snapshotSize; + private volatile int cellsCount; + + public CellBlocksBackedMemStore(final Configuration conf, final KeyValue.KVComparator c, + final HRegion region) { + this.conf = conf; + this.manager = CellBlockManager.getInstance(conf, region.getMVCC()); + this.comparator = c; + this.isMetaRegion = region.getRegionInfo().isMetaRegion(); + this.cellBlockMinSize = conf.getInt(CELLBLOCK_MIN_SIZE_KEY, DEFAULT_CELLBLOCK_SIZE); + + this.cellSet = new CellSkipListSet(c); + this.cellSetForCellBlock = new CellSkipListSet(c); + this.cellBlocks = new ArrayList(); + this.trt = new TimeRangeTracker(); + this.trtForCellBlock = new TimeRangeTracker(); + + initializeSnapshotStructures(); + + this.totalSize = new AtomicLong(DEEP_OVERHEAD); + this.size = new AtomicInteger(0); + this.snapshotSize = 0; + this.allocator = new HeapMemStoreLAB(getMSLABConf(conf)); + } + + private static Configuration getMSLABConf(Configuration conf) { + Configuration mslabConf = new Configuration(conf); + int chunkSize = mslabConf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY, + HeapMemStoreLAB.CHUNK_SIZE_DEFAULT); + mslabConf.setInt(HeapMemStoreLAB.MAX_ALLOC_KEY, chunkSize); + return mslabConf; + } + + @Override + public MemStoreSnapshot snapshot() { + synchronized (lock) { + if (!(this.snapshot.isEmpty()) || !(this.snapshotForCellBlock.isEmpty()) + || !(this.snapshotCellBlocks.isEmpty())) { + // Any of these DS is non empty means we are in the middle of a previous flush. + LOG.warn("Snapshot called again without clearing previous. " + + "Doing nothing. Another ongoing flush or did we fail last attempt?"); + } else { + this.snapshotId = EnvironmentEdgeManager.currentTime(); + this.snapshotSize = keySize(); + this.snapshot = this.cellSet; + this.cellSet = new CellSkipListSet(this.comparator); + cellsCount += this.snapshot.size(); + // the "lock" makes sure snapshot in a consistent state and not disturbed by ongoing + // in-memory flush + this.snapshotForCellBlock = cellSetForCellBlock; + this.cellSetForCellBlock = new CellSkipListSet(this.comparator); + cellsCount += this.snapshotForCellBlock.size(); + this.snapshotCellBlocks = this.cellBlocks; + this.cellBlocks = new ArrayList(); + this.snapshotTrt = this.trt; + for (CellBlock cb : this.snapshotCellBlocks) { + TimeRangeTracker trt = cb.getTimeRangeTracker(); + if (this.snapshotTrt.minimumTimestamp > trt.minimumTimestamp) { + this.snapshotTrt.minimumTimestamp = trt.minimumTimestamp; + } + if (this.snapshotTrt.maximumTimestamp < trt.maximumTimestamp) { + this.snapshotTrt.maximumTimestamp = trt.maximumTimestamp; + } + cellsCount += cb.getCellsCount(); + } + if (this.snapshotTrt.minimumTimestamp > trtForCellBlock.minimumTimestamp) { + this.snapshotTrt.minimumTimestamp = trtForCellBlock.minimumTimestamp; + } + if (this.snapshotTrt.maximumTimestamp < trtForCellBlock.maximumTimestamp) { + this.snapshotTrt.maximumTimestamp = trtForCellBlock.maximumTimestamp; + } + trt = new TimeRangeTracker(); + this.snapshotAllocator = allocator; + this.allocator = new HeapMemStoreLAB(getMSLABConf(this.conf)); + // Reset heap to not include any keys + this.totalSize.set(DEEP_OVERHEAD); + this.size.set(0); + } + } + KeyValueScanner scanner = null; + try { + scanner = getSnapshotScanner(); + } catch (IOException e) { + // This scanner is on in memory structures. So IOE will never come ideally. If it comes throw + // RTE. Any type of Exception is handled in HRegion#internalFlushCache() and will fail + // snapshot operation and do server abort. + LOG.error("IOE while creating snapshot scanner. This is really strange!", e); + throw new RuntimeException(e); + } + return new MemStoreSnapshot(this.snapshotId, cellsCount, this.snapshotSize, + this.snapshotTrt, scanner); + } + + private KeyValueScanner getSnapshotScanner() throws IOException { + List scanners = new ArrayList(); + if (!this.snapshot.isEmpty()) { + scanners.add(new CollectionBackedScanner(this.snapshot, this.comparator)); + } + if (!this.snapshotForCellBlock.isEmpty()) { + scanners.add(new CollectionBackedScanner(this.snapshotForCellBlock, this.comparator)); + } + for (CellBlock cb : this.snapshotCellBlocks) { + KeyValueScanner scanner = cb.getScanner(Long.MAX_VALUE); + // Seek to the begin + scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + scanners.add(scanner); + } + return new KeyValueHeap(scanners, this.comparator); + } + + @Override + public long getFlushableSize() { + return this.snapshotSize > 0 ? this.snapshotSize : keySize(); + } + + @Override + public void clearSnapshot(long id) throws UnexpectedStateException { + if (this.snapshotId != id) { + throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " + + id); + } + this.snapshotSize = 0; + initializeSnapshotStructures(); + MemStoreLAB tmpAllocator = null; + if (this.snapshotAllocator != null) { + tmpAllocator = this.snapshotAllocator; + this.snapshotAllocator = null; + } + if (tmpAllocator != null) { + tmpAllocator.close(); + } + } + + private void initializeSnapshotStructures() { + this.snapshot = new CellSkipListSet(this.comparator); + this.snapshotForCellBlock = new CellSkipListSet(this.comparator); + this.snapshotCellBlocks = new ArrayList(); + this.snapshotTrt = new TimeRangeTracker(); + } + + @Override + public Pair add(Cell cell) { + // Notice: here flush got triggered *after* the size exceeds cellBlockMinSize + // We are not doing the in memory flushes for meta region. Any way META region is not supposed to + // hold more and more edits. It is supposed to flush memstore more frequently than user regions. + int memstoreSize = this.size.get(); + if (memstoreSize >= this.cellBlockMinSize && !isMetaRegion) { + LOG.debug("Memstore size [" + memstoreSize + "] has exceeded cellblock min size [" + + cellBlockMinSize + "], try to in-memory flush"); + synchronized (lock) { + flushCellBlock(); + } + } + return new Pair(internalAdd(cell, true), cell); + } + + private void flushCellBlock() { + int flushSize = this.size.get(); + // Check condition again with lock. + if (flushSize >= this.cellBlockMinSize) { + if (this.cellSetForCellBlock.isEmpty()) { // make sure only one flush ongoing + this.trtForCellBlock = this.trt; + this.trt = new TimeRangeTracker(); + CellBlock cellBlock = new CellBlock(trtForCellBlock, comparator, + this.allocator, this.conf); + this.cellSetForCellBlock = this.cellSet; + this.cellSet = new CellSkipListSet(this.comparator); + this.size.set(0); + manager.flushTo(this.cellSetForCellBlock, flushSize, cellBlock, this); + } else { + // TODO check the flush effect under high pressure write workload, might be lots of flush + // skipped and cause much bigger in-memory flush + LOG.warn("A previous flush to CellBlock is still pending! " + + "Taking more updates to the set"); + } + } else { + LOG.debug("Current memstore size [" + flushSize + "] seems under cellblock min size [" + + cellBlockMinSize + "]. Another parallel in memory flush would have happened."); + } + } + + private static int sizeInCellBlock(Cell c) { + return KeyValueUtil.keyLength(c) + c.getValueLength() + c.getTagsLength() + + KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + Bytes.SIZEOF_LONG;// This long is the + // memstoreTS part + } + + public void finishFlushToCellBlock(CellBlock cellBlock, CellSkipListSet cellSet) { + synchronized (lock) { + if (cellSet == this.cellSetForCellBlock) { + this.cellBlocks.add(cellBlock); + this.cellSetForCellBlock = new CellSkipListSet(this.comparator); + this.trtForCellBlock = new TimeRangeTracker(); + } else { + LOG.info("Skipping finish cell block flush." + + "This cell block might already have been included in a snapshot."); + // LAB chunk(s) allocated for this will get freed with the clearSnapshot. + } + } + } + + private long internalAdd(final Cell toAdd, boolean sizeChange) { + boolean notpresent = addToCellSet(toAdd); + long s = 0; + if (sizeChange) { + trt.includeTimestamp(toAdd); + s = heapSizeChange(toAdd, notpresent); + this.totalSize.addAndGet(s); + if (notpresent) this.size.addAndGet(sizeInCellBlock(toAdd)); + } + return s; + } + + private boolean addToCellSet(Cell e) { + boolean b = this.cellSet.add(e); + setOldestEditTimeToNow(); + return b; + } + + void setOldestEditTimeToNow() { + if (timeOfOldestEdit == Long.MAX_VALUE) { + timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); + } + } + + private static long heapSizeChange(final Cell cell, final boolean notpresent) { + return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + + CellUtil.estimatedHeapSizeOf(cell)) : 0; + } + + @Override + public long timeOfOldestEdit() { + return this.timeOfOldestEdit; + } + + @Override + public void rollback(Cell cell) { + // If the key is in the snapshot, delete it. We should not update + // this.size, because that tracks the size of only the memstore and + // not the snapshot. The flush of this snapshot to disk has not + // yet started because Store.flush() waits for all rwcc transactions to + // commit before starting the flush to disk. + Cell found = this.snapshot.get(cell); + boolean removed = false; + if (found != null && found.getSequenceId() == cell.getSequenceId()) { + this.snapshot.remove(cell); + removed = true; + // Removed from the snapshot, we have to change the snapshot size. + this.snapshotSize -= heapSizeChange(cell, true); + } + // If the key is in the memstore, delete it. Update this.size. + found = this.cellSet.get(cell); + if (found != null && found.getSequenceId() == cell.getSequenceId()) { + removeFromCellSet(cell); + long s = heapSizeChange(cell, true); + this.totalSize.addAndGet(-s); + this.size.addAndGet(-(sizeInCellBlock(cell))); + removed = true; + } + // Not trying to delete from snapshotForCellBlock or kvsetForCellBlock as they might be getting + // traversed or already got flushed to a cellblock + if (!removed) { + // When it is not present in the kvset and moved to CellBlock!! + // We need to make it as a delete entry. Later during flush or CellBlock compaction this will + // get eliminated. This has to be a exact version delete type. + KeyValue deleteCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), cell.getTimestamp(), KeyValue.Type.Delete, null, 0, 0); + internalAdd(deleteCell, false); + long s = heapSizeChange(cell, true); + this.totalSize.addAndGet(-s); + // this.snapshotSize > 0 means in btw this KV add and this rollback, a snapshot op has + // happened. Also we are here that the kv is not found in this.snapshot set. So mostly it + // might be in snapshotForCellBlock set. We have to adjust the snapshot size. + if (this.snapshotSize > 0) { + this.snapshotSize -= s; + } + } + } + + private boolean removeFromCellSet(Cell e) { + boolean b = this.cellSet.remove(e); + setOldestEditTimeToNow(); + return b; + } + + @Override + public long delete(Cell deleteCell) { + return add(deleteCell).getFirst(); + } + + // This will be used from the code flow, only for META table. We know for META table we wont add + // in memory flush. So below impl alone can be enough as of now. + @Override + public void getRowKeyAtOrBefore(GetClosestRowBeforeTracker state) { + if (isMetaRegion) { + getRowKeyAtOrBefore(this.cellSet, state); + getRowKeyAtOrBefore(this.snapshot, state); + } + // Not supporting getRowOrBefore API on non META tables + throw new UnsupportedOperationException(); + } + + // All copy pasted from DefaultMemStore.java + private void getRowKeyAtOrBefore(final NavigableSet set, + final GetClosestRowBeforeTracker state) { + if (set.isEmpty()) { + return; + } + if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) { + // Found nothing in row. Try backing up. + getRowKeyBefore(set, state); + } + } + + private boolean walkForwardInSingleRow(final SortedSet set, final KeyValue firstOnRow, + final GetClosestRowBeforeTracker state) { + boolean foundCandidate = false; + SortedSet tail = set.tailSet(firstOnRow); + if (tail.isEmpty()) return foundCandidate; + for (Iterator i = tail.iterator(); i.hasNext();) { + Cell cell = i.next(); + // Did we go beyond the target row? If so break. + if (state.isTooFar(cell, firstOnRow)) break; + if (state.isExpired(cell)) { + i.remove(); + continue; + } + // If we added something, this row is a contender. break. + if (state.handle(cell)) { + foundCandidate = true; + break; + } + } + return foundCandidate; + } + + private void getRowKeyBefore(NavigableSet set, GetClosestRowBeforeTracker state) { + KeyValue firstOnRow = state.getTargetKey(); + for (Member p = memberOfPreviousRow(set, state, firstOnRow); p != null; p = memberOfPreviousRow( + p.set, state, firstOnRow)) { + // Make sure we don't fall out of our table. + if (!state.isTargetTable(p.cell)) break; + // Stop looking if we've exited the better candidate range. + if (!state.isBetterCandidate(p.cell)) break; + // Make into firstOnRow + firstOnRow = new KeyValue(p.cell.getRowArray(), p.cell.getRowOffset(), p.cell.getRowLength(), + HConstants.LATEST_TIMESTAMP); + // If we find something, break; + if (walkForwardInSingleRow(p.set, firstOnRow, state)) break; + } + } + + private static class Member { + final Cell cell; + final NavigableSet set; + + Member(final NavigableSet s, final Cell cell) { + this.cell = cell; + this.set = s; + } + } + + private Member memberOfPreviousRow(NavigableSet set, + final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) { + NavigableSet head = set.headSet(firstOnRow, false); + if (head.isEmpty()) + return null; + for (Iterator i = head.descendingIterator(); i.hasNext();) { + Cell found = i.next(); + if (state.isExpired(found)) { + i.remove(); + continue; + } + return new Member(head, found); + } + return null; + } + + @Override + public long upsert(Iterable cells, long readpoint) { + long size = 0; + for (Cell cell : cells) { + size += upsert(cell, readpoint); + } + return size; + } + + private long upsert(Cell cell, long readpoint) { + long addedSize = internalAdd(cell, true); + + // Get the KeyValues for the row/family/qualifier regardless of timestamp. + // For this case we want to clean up any other puts + KeyValue firstKv = KeyValueUtil.createFirstOnRow( + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + SortedSet ss = cellSet.tailSet(firstKv); + Iterator it = ss.iterator(); + // versions visible to oldest scanner + int versionsVisible = 0; + while ( it.hasNext() ) { + Cell cur = it.next(); + if (cell == cur) { + // ignore the one just put in + continue; + } + // check that this is the row and column we are interested in, otherwise bail + if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) { + // only remove Puts that concurrent scanners cannot possibly see + if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && + cur.getSequenceId() <= readpoint) { + if (versionsVisible > 1) { + // if we get here we have seen at least one version visible to the oldest scanner, + // which means we can prove that no scanner will see this version + // false means there was a change, so give us the size. + long delta = heapSizeChange(cur, true); + addedSize -= delta; + this.totalSize.addAndGet(-delta); + this.size.addAndGet(-(sizeInCellBlock(cur))); + it.remove(); + setOldestEditTimeToNow(); + } else { + versionsVisible++; + } + } + } else { + // past the row or column, done + break; + } + } + return addedSize; + } + + @Override + public List getScanners(long readPt) { + List scanners = new ArrayList(); + synchronized (this.lock) { + scanners.add(new MemStoreScanner(readPt)); + for (CellBlock cb : this.cellBlocks) { + scanners.add(cb.getScanner(readPt)); + } + for (CellBlock cb : this.snapshotCellBlocks) { + scanners.add(cb.getScanner(readPt)); + } + } + return scanners; + } + + @Override + public long heapSize() { + return totalSize.get(); + } + + @Override + public long size() { + return heapSize(); + } + + private long keySize() { + return heapSize() - DEEP_OVERHEAD; + } + + @Override + public long updateColumnValue(byte[] row, byte[] family, byte[] qualifier, long newValue, long now) { + // This is not used in code at all and so not implementing here + throw new UnsupportedOperationException(); + } + + /** + * Check if this memstore may contain the required keys + * + * @param scan + * @return False if the key definitely does not exist in this Memstore + */ + public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { + return (trt.includesTimeRange(scan.getTimeRange()) + || snapshotTrt.includesTimeRange(scan.getTimeRange()) || trtForCellBlock + .includesTimeRange(scan.getTimeRange())) + && (Math.max(Math.max(trt.getMaximumTimestamp(), trtForCellBlock.getMaximumTimestamp()), + snapshotTrt.getMaximumTimestamp()) >= oldestUnexpiredTS); + } + + private class MemStoreScanner extends NonLazyKeyValueScanner { + // Next row information for either cellSet or snapshot + private Cell cellSetNextRow = null; + private Cell cellSetForCellBlockNextRow = null; + private Cell snapshotNextRow = null; + private Cell snapshotForCellBlockNextRow = null; + + // last iterated Cells for kvset and snapshot (to restore iterator state after reseek) + private Cell cellSetItRow = null; + private Cell cellSetForCellBlockItRow = null; + private Cell snapshotItRow = null; + private Cell snapshotForCellBlockItRow = null; + + // iterator based scanning. + private Iterator cellSetIt; + private Iterator cellSetForCellBlockIt; + private Iterator snapshotIt; + private Iterator snapshotForCellBlockIt; + + // The cellSet and snapshot at the time of creating this scanner + private CellSkipListSet cellSetAtCreation; + private CellSkipListSet cellSetForCellBlockAtCreation; + private CellSkipListSet snapshotAtCreation; + private CellSkipListSet snapshotForCellBlockAtCreation; + + // the pre-calculated Cell to be returned by peek() or next() + private Cell theNext; + + // A flag represents whether could stop skipping Cells for MVCC + // if have encountered the next row. Only used for reversed scan + private boolean stopSkippingCellsIfNextRow = false; + + private long readPoint; + + MemStoreScanner(long readPoint) { + this.readPoint = readPoint; + cellSetAtCreation = cellSet; + cellSetForCellBlockAtCreation = cellSetForCellBlock; + snapshotAtCreation = snapshot; + snapshotForCellBlockAtCreation = snapshotForCellBlock; + } + + private Cell getNext(Iterator it) { + Cell startCell = theNext; + Cell v = null; + try { + while (it.hasNext()) { + v = it.next(); + if (v.getSequenceId() <= this.readPoint) { + return v; + } + if (stopSkippingCellsIfNextRow && startCell != null + && comparator.compareRows(v, startCell) > 0) { + return null; + } + } + + return null; + } finally { + if (v != null) { + // in all cases, remember the last KV iterated to + if (it == cellSetForCellBlockIt) { + cellSetForCellBlockItRow = v; + } else if (it == cellSetIt) { + cellSetItRow = v; + } else if (it == snapshotIt) { + snapshotItRow = v; + } else { + snapshotForCellBlockItRow = v; + } + } + } + } + + /** + * Set the scanner at the seek key. Must be called only once: there is no thread safety between + * the scanner and the memStore. + * @param key seek key + * @return false if the key is null or if there is no data + */ + @Override + public synchronized boolean seek(Cell key) { + if (key == null) { + close(); + return false; + } + // kvset and snapshot will never be null. + // if tailSet can't find anything, SortedSet is empty (not null). + cellSetIt = cellSetAtCreation.tailSet(key).iterator(); + cellSetForCellBlockIt = cellSetForCellBlockAtCreation.tailSet(key).iterator(); + snapshotIt = snapshotAtCreation.tailSet(key).iterator(); + snapshotForCellBlockIt = snapshotForCellBlockAtCreation.tailSet(key).iterator(); + cellSetItRow = null; + cellSetForCellBlockItRow = null; + snapshotItRow = null; + snapshotForCellBlockItRow = null; + + return seekInSubLists(key); + } + + /** + * (Re)initialize the iterators after a seek or a reseek. + */ + private synchronized boolean seekInSubLists(Cell key) { + cellSetNextRow = getNext(cellSetIt); + cellSetForCellBlockNextRow = getNext(cellSetForCellBlockIt); + snapshotNextRow = getNext(snapshotIt); + snapshotForCellBlockNextRow = getNext(snapshotForCellBlockIt); + + // Calculate the next value + theNext = getLowest(getLowest(cellSetNextRow, cellSetForCellBlockNextRow), + getLowest(snapshotNextRow, snapshotForCellBlockNextRow)); + + // has data + return (theNext != null); + } + + /** + * Move forward on the sub-lists set previously by seek. + * @param key seek key (should be non-null) + * @return true if there is at least one Cell to read, false otherwise + */ + @Override + public synchronized boolean reseek(Cell key) { + cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator(); + cellSetForCellBlockIt = cellSetForCellBlockAtCreation.tailSet( + getHighest(key, cellSetForCellBlockItRow)).iterator(); + snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator(); + snapshotForCellBlockIt = snapshotForCellBlockAtCreation.tailSet( + getHighest(key, snapshotForCellBlockItRow)).iterator(); + return seekInSubLists(key); + } + + @Override + public synchronized Cell peek() { + return theNext; + } + + @Override + public synchronized Cell next() { + if (theNext == null) { + return null; + } + final Cell ret = theNext; + // Advance one of the iterators + if (theNext == cellSetNextRow) { + cellSetNextRow = getNext(cellSetIt); + } else if (theNext == cellSetForCellBlockNextRow) { + cellSetForCellBlockNextRow = getNext(cellSetForCellBlockIt); + } else if (theNext == snapshotNextRow) { + snapshotNextRow = getNext(snapshotIt); + } else { + snapshotForCellBlockNextRow = getNext(snapshotForCellBlockIt); + } + // Calculate the next value + theNext = getLowest(getLowest(cellSetNextRow, cellSetForCellBlockNextRow), + getLowest(snapshotNextRow, snapshotForCellBlockNextRow)); + return ret; + } + + /* + * Returns the lower of the two Cells, or null if they are both null. This uses + * comparator.compare() to compare the Cells using the memstore comparator. + */ + private Cell getLowest(Cell first, Cell second) { + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + int compare = comparator.compare(first, second); + return (compare <= 0 ? first : second); + } + return (first != null ? first : second); + } + + /* + * Returns the higher of the two Cells, or null if they are both null. This uses + * comparator.compare() to compare the Cells using the memstore comparator. + */ + private Cell getHighest(Cell first, Cell second) { + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + int compare = comparator.compare(first, second); + return (compare > 0 ? first : second); + } + return (first != null ? first : second); + } + + public synchronized void close() { + this.cellSetNextRow = null; + this.cellSetForCellBlockNextRow = null; + this.snapshotNextRow = null; + this.snapshotForCellBlockNextRow = null; + + this.cellSetIt = null; + this.cellSetForCellBlockIt = null; + this.snapshotIt = null; + this.snapshotForCellBlockIt = null; + + this.cellSetItRow = null; + this.cellSetForCellBlockItRow = null; + this.snapshotItRow = null; + this.snapshotForCellBlockItRow = null; + } + + /** + * MemStoreScanner returns max value as sequence id because it will always have the latest data + * among all files. + */ + @Override + public long getSequenceID() { + return Long.MAX_VALUE; + } + + @Override + public boolean shouldUseScanner(Scan scan, SortedSet columns, long oldestUnexpiredTS) { + return shouldSeek(scan, oldestUnexpiredTS); + } + + /** + * Seek scanner to the given key first. If it returns false(means peek()==null) or scanner's + * peek row is bigger than row of given key, seek the scanner to the previous row of given key + */ + @Override + public synchronized boolean backwardSeek(Cell key) { + seek(key); + if (peek() == null || comparator.compareRows(peek(), key) > 0) { + return seekToPreviousRow(key); + } + return true; + } + + /** + * Separately get the Cell before the specified key from cellSet and snapshotset, and use the + * row of higher one as the previous row of specified key, then seek to the first KeyValue of + * previous row + */ + @Override + public synchronized boolean seekToPreviousRow(Cell key) { + KeyValue firstKeyOnRow = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(), + key.getRowLength()); + SortedSet cellHead = cellSetAtCreation.headSet(firstKeyOnRow); + Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last(); + SortedSet snapshotHead = snapshotAtCreation.headSet(firstKeyOnRow); + Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead.last(); + Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow); + cellHead = cellSetForCellBlockAtCreation.headSet(firstKeyOnRow); + cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last(); + lastCellBeforeRow = getHighest(cellSetBeforeRow, lastCellBeforeRow); + snapshotHead = snapshotForCellBlockAtCreation.headSet(firstKeyOnRow); + snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead.last(); + lastCellBeforeRow = getHighest(snapshotBeforeRow, lastCellBeforeRow); + if (lastCellBeforeRow == null) { + theNext = null; + return false; + } + Cell firstKeyOnPreviousRow = KeyValueUtil.createFirstOnRow(lastCellBeforeRow.getRowArray(), + lastCellBeforeRow.getRowOffset(), lastCellBeforeRow.getRowLength()); + this.stopSkippingCellsIfNextRow = true; + seek(firstKeyOnPreviousRow); + this.stopSkippingCellsIfNextRow = false; + if (peek() == null || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) { + return seekToPreviousRow(lastCellBeforeRow); + } + return true; + } + + @Override + public synchronized boolean seekToLastRow() { + Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation.last(); + Cell second = cellSetForCellBlockAtCreation.isEmpty() ? null : cellSetForCellBlockAtCreation + .last(); + Cell higher = getHighest(first, second); + second = snapshotAtCreation.isEmpty() ? null : snapshotAtCreation.last(); + higher = getHighest(higher, second); + second = snapshotForCellBlockAtCreation.isEmpty() ? null : snapshotForCellBlockAtCreation + .last(); + higher = getHighest(higher, second); + if (higher == null) { + return false; + } + Cell firstKvOnLastRow = KeyValueUtil.createFirstOnRow(higher.getRowArray(), + higher.getRowOffset(), higher.getRowLength()); + if (seek(firstKvOnLastRow)) { + return true; + } else { + return seekToPreviousRow(higher); + } + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 48b78c2..124901d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -129,6 +129,12 @@ public class DefaultMemStore implements MemStore { } } + public DefaultMemStore(final Configuration conf, final KeyValue.KVComparator c, + final HRegion region) { + // No need to use HRegion instance in this MemStore. + this(conf, c); + } + void dump() { for (Cell cell: this.cellSet) { LOG.info(cell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 8e7f576..576ad18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -242,7 +242,8 @@ public class HStore implements Store { scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName()); this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { - Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator }); + Configuration.class, KeyValue.KVComparator.class, HRegion.class }, new Object[] { conf, + this.comparator, this.region }); this.offPeakHours = OffPeakHours.getInstance(conf); // Setting up cache configuration for this family diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellBlocksBackedMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellBlocksBackedMemStore.java new file mode 100644 index 0000000..a93412d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellBlocksBackedMemStore.java @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({RegionServerTests.class, MediumTests.class}) +public class TestCellBlocksBackedMemStore { + private static final byte[] FAMILY = Bytes.toBytes("cf"); + private static final byte[] Q = Bytes.toBytes("q1"); + private static final byte[] VALUE = Bytes.toBytes("v1"); + + private static Configuration conf; + + private static HBaseTestingUtility TEST_UTIL; + + @BeforeClass + public static void setUp() throws Exception { + conf = HBaseConfiguration.create(); + conf.setFloat("hbase.hregion.memstore.chunkpool.maxsize", 0.5f); + conf.setInt(CellBlocksBackedMemStore.CELLBLOCK_MIN_SIZE_KEY, 1024); + conf.setInt(CellBlock.CELL_BLOCK_FRAGMENT_SIZE_KEY, 500); + conf.setInt(HeapMemStoreLAB.CHUNK_SIZE_KEY, 1000); + conf.set("hbase.regionserver.memstore.class", CellBlocksBackedMemStore.class.getName()); + TEST_UTIL = HBaseTestingUtility.createLocalHTU(conf); + } + + @Test + public void testFlushToCellBlock() throws Exception { + byte[] r = Bytes.toBytes("r"); + byte[] r0 = Bytes.toBytes("r0"); + byte[] r11 = Bytes.toBytes("r11"); + byte[] r2 = Bytes.toBytes("r2"); + byte[] r21 = Bytes.toBytes("r21"); + byte[] r23 = Bytes.toBytes("r23"); + byte[] r3 = Bytes.toBytes("r3"); + byte[] r4 = Bytes.toBytes("r4"); + byte[] r5 = Bytes.toBytes("r5"); + byte[] r6 = Bytes.toBytes("r6"); + byte[] r7 = Bytes.toBytes("r7"); + byte[] r72 = Bytes.toBytes("r72"); + + TableName tableName = TableName.valueOf("testFlushToCellBlock"); + HRegion region = TEST_UTIL.createLocalHRegion(tableName.getName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW, "testFlushToCellBlock", conf, false, Durability.SKIP_WAL, null, + FAMILY); + + Put p = new Put(r11); + p.add(FAMILY, Q, VALUE);// 26 + 2 + 8 + region.put(p); + p = new Put(r2); + p.add(FAMILY, Q, VALUE);// 26 + 2 + 8 + region.put(p); + p = new Put(r3); + p.add(FAMILY, Q, new byte[500]);// 26 + 500 + 8 + region.put(p); + p = new Put(r7); + p.add(FAMILY, Q, new byte[400]);// 26 + 400 + 8 + region.put(p); + p = new Put(r5); + p.add(FAMILY, Q, new byte[300]);// 26 + 300 + 8 + region.put(p); // ** 1040 - > 606 + Thread.sleep(1000); + p = new Put(r6); + p.add(FAMILY, Q, new byte[100]);// 26 + 100 + 8 + region.put(p); + p = new Put(r4); + p.add(FAMILY, Q, new byte[100]);// 26 + 100 + 8 + region.put(p); + p = new Put(r0); + p.add(FAMILY, Q, new byte[400]);// 26 + 400 + 8 + region.put(p); // ** 1036 -> 602 + p = new Put(r21); + p.add(FAMILY, Q, new byte[200]);// 26 + 200 + 8 + region.put(p); // 1103 + + Scan scan = new Scan(); + RegionScanner rs = region.getScanner(scan); + try { + List cells = new ArrayList(); + Cell next; + /*assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r0, 0, + r0.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r1, 0, + r1.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r2, 0, + r2.length));*/ + rs.reseek(r21); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r21, 0, + r21.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r3, 0, + r3.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r4, 0, + r4.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r5, 0, + r5.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r6, 0, + r6.length)); + cells.clear(); + assertFalse(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r7, 0, + r7.length)); + cells.clear(); + assertFalse(rs.next(cells)); + assertEquals(0, cells.size()); + + //********** + HStore store = (HStore) region.stores.values().iterator().next(); + store.memstore.snapshot(); + // some more writes + p = new Put(r72); + p.add(FAMILY, Q, new byte[600]); + region.put(p); + p = new Put(r23); + p.add(FAMILY, Q, new byte[430]); + region.put(p);//** + p = new Put(r); + p.add(FAMILY, Q, new byte[20]); + region.put(p); + + rs = region.getScanner(scan); + + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r, 0, + r.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r0, 0, + r0.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r11, 0, + r11.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r2, 0, + r2.length)); + rs.reseek(r21); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r21, 0, + r21.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r23, 0, + r23.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r3, 0, + r3.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r4, 0, + r4.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r5, 0, + r5.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r6, 0, + r6.length)); + cells.clear(); + assertTrue(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r7, 0, + r7.length)); + cells.clear(); + assertFalse(rs.next(cells)); + assertEquals(1, cells.size()); + next = cells.get(0); + assertTrue(Bytes.equals(next.getRowArray(), next.getRowOffset(), next.getRowLength(), r72, 0, + r72.length)); + cells.clear(); + assertFalse(rs.next(cells)); + assertEquals(0, cells.size()); + + } finally { + rs.close(); + } + + // Backward seek testing + scan = new Scan(); + scan.setReversed(true); + rs = region.getScanner(scan); + List cells = new ArrayList(); + rs.next(cells); + assertEquals(1, cells.size()); + Cell cell = cells.get(0); + assertTrue(Bytes.equals(r72, 0, r72.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r7, 0, r7.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r6, 0, r6.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r5, 0, r5.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r4, 0, r4.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r3, 0, r3.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r23, 0, r23.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r21, 0, r21.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r2, 0, r2.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r11, 0, r11.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r0, 0, r0.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r, 0, r.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + + scan = new Scan(); + rs = region.getScanner(scan); + rs.reseek(r72); + cells.clear(); + rs.next(cells); + assertEquals(1, cells.size()); + cell = cells.get(0); + assertTrue(Bytes.equals(r72, 0, r72.length, cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + } +}