diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java index 67d18ed..1242567 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java @@ -43,7 +43,9 @@ public enum DataBlockEncoding { FAST_DIFF(4, "org.apache.hadoop.hbase.io.encoding.FastDiffDeltaEncoder"), // id 5 is reserved for the COPY_KEY algorithm for benchmarking // COPY_KEY(5, "org.apache.hadoop.hbase.io.encoding.CopyKeyDataBlockEncoder"), - PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec"); + PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec"), + ROW_INDEX_V1(7, "org.apache.hadoop.hbase.io.encoding.rowindexV1.RowIndexCodecV1"), + ROW_INDEX_V2(8, "org.apache.hadoop.hbase.io.encoding.rowindexV2.RowIndexCodecV2"); private final short id; private final byte[] idInBytes; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexCodecV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexCodecV1.java new file mode 100644 index 0000000..82c6fbc --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexCodecV1.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding.rowindexV1; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.EncodingState; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.encoding.rowindexV2.RowIndexSeekerV2; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.util.LRUDictionary; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +public class RowIndexCodecV1 implements DataBlockEncoder { + + private static class RowIndexEncodingState extends EncodingState { + RowIndexEncoderV1 builder = null; + } + + @Override + public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out) + throws IOException { + if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { + throw new IOException(this.getClass().getName() + " only accepts " + + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context."); + } + + HFileBlockDefaultEncodingContext encodingCtx = + (HFileBlockDefaultEncodingContext) blkEncodingCtx; + encodingCtx.prepareEncoding(out); + if (encodingCtx.getHFileContext().isIncludesTags() + && encodingCtx.getHFileContext().isCompressTags()) { + if (encodingCtx.getTagCompressionContext() != null) { + // It will be overhead to create the TagCompressionContext again and again for every block + // encoding. + encodingCtx.getTagCompressionContext().clear(); + } else { + try { + TagCompressionContext tagCompressionContext = new TagCompressionContext( + LRUDictionary.class, Byte.MAX_VALUE); + encodingCtx.setTagCompressionContext(tagCompressionContext); + } catch (Exception e) { + throw new IOException("Failed to initialize TagCompressionContext", e); + } + } + } + + RowIndexEncoderV1 builder = new RowIndexEncoderV1(out, encodingCtx); + RowIndexEncodingState state = new RowIndexEncodingState(); + state.builder = builder; + blkEncodingCtx.setEncodingState(state); + } + + @Override + public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) + throws IOException { + RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx.getEncodingState(); + RowIndexEncoderV1 builder = state.builder; + builder.write(cell); + int size = KeyValueUtil.length(cell); + if (encodingCtx.getHFileContext().isIncludesMvcc()) { + size += WritableUtils.getVIntSize(cell.getSequenceId()); + } + return size; + } + + @Override + public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, + byte[] uncompressedBytesWithHeader) throws IOException { + RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx.getEncodingState(); + RowIndexEncoderV1 builder = state.builder; + builder.flush(); + // do i need to check this, or will it always be DataBlockEncoding.PREFIX_TREE? + if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) { + encodingCtx.postEncoding(BlockType.ENCODED_DATA); + } else { + encodingCtx.postEncoding(BlockType.DATA); + } + } + + @Override + public ByteBuffer decodeKeyValues(DataInputStream source, HFileBlockDecodingContext decodingCtx) + throws IOException { + if (!decodingCtx.getHFileContext().isIncludesTags()) { + int decompressedSize = source.readInt(); + byte[] data = new byte[decompressedSize]; + IOUtils.readFully(source, data, 0, decompressedSize); + return ByteBuffer.wrap(data); + } else { + ByteBuffer sourceAsBuffer = ByteBufferUtils.drainInputStreamToBuffer(source);// waste + sourceAsBuffer.mark(); + RowIndexSeekerV1 seeker = new RowIndexSeekerV1(KeyValue.COMPARATOR, decodingCtx); + seeker.setCurrentBuffer(sourceAsBuffer); + List kvs = new ArrayList(); + kvs.add(seeker.getKeyValueBuffer(true)); + while (seeker.next()) { + kvs.add(seeker.getKeyValueBuffer(true)); + } + int totalLength = 0; + for (ByteBuffer buf : kvs) { + totalLength += buf.remaining(); + } + byte[] keyValueBytes = new byte[totalLength]; + ByteBuffer result = ByteBuffer.wrap(keyValueBytes); + for (ByteBuffer buf : kvs) { + result.put(buf); + } + return result; + } + } + + @Override + public ByteBuffer getFirstKeyInBlock(ByteBuffer block) { + block.mark(); + ByteBufferUtils.skip(block, Bytes.SIZEOF_INT); + int keyLength = block.getInt(); + block.getInt(); + int pos = block.position(); + block.reset(); + ByteBuffer dup = block.duplicate(); + dup.position(pos); + dup.limit(pos + keyLength); + return dup.slice(); + } + + @Override + public EncodedSeeker createSeeker(KVComparator comparator, HFileBlockDecodingContext decodingCtx) { + // TODO Auto-generated method stub + return new RowIndexSeekerV1(comparator, decodingCtx); + } + + @Override + public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding, + byte[] header, HFileContext meta) { + // TODO Auto-generated method stub + return new HFileBlockDefaultEncodingContext(encoding, header, meta); + } + + @Override + public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) { + return new HFileBlockDefaultDecodingContext(meta); + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexEncoderV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexEncoderV1.java new file mode 100644 index 0000000..70b4487 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexEncoderV1.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.io.encoding.rowindexV1; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.CellOutputStream; +import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +public class RowIndexEncoderV1 implements CellOutputStream { + private static final Log LOG = LogFactory.getLog(RowIndexEncoderV1.class); + + /** Key comparator. Used to ensure we write in order. */ + private final KVComparator comparator; + /** The Cell previously appended. Becomes the last cell in the file. */ + private Cell lastCell = null; + + private DataOutputStream out; + private HFileBlockDefaultEncodingContext encodingCtx; + private List rowsOffset = new ArrayList(); + private int onDiskSize = 0; + + public RowIndexEncoderV1(DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) { + this.out = out; + this.encodingCtx = encodingCtx; + this.comparator = KeyValue.COMPARATOR; + } + + @Override + public void write(Cell cell) throws IOException { + // checkKey uses comparator to check we are writing in order. + boolean dupKey = checkKey(cell); + if (!dupKey) { + rowsOffset.add(out.size()); + if (LOG.isTraceEnabled()) { + LOG.trace("Row: " + Bytes.toString(CellUtil.cloneRow(cell)) + ", size: " + out.size()); + } + } + int klength = KeyValueUtil.keyLength(cell); + int vlength = cell.getValueLength(); + out.writeInt(klength); + out.writeInt(vlength); + CellUtil.writeFlatKey(cell, out); + // Write the value part + out.write(cell.getValueArray(), cell.getValueOffset(), vlength); + // Write the additional tag into the stream + afterEncodingKeyValue(cell, out, encodingCtx); + lastCell = cell; + } + + protected boolean checkKey(final Cell cell) throws IOException { + boolean isDuplicateKey = false; + if (cell == null) { + throw new IOException("Key cannot be null or empty"); + } + if (lastCell != null) { + int keyComp = comparator.compareRows(lastCell, cell); + if (keyComp > 0) { + throw new IOException("Added a key not lexically larger than" + + " previous. Current cell = " + cell + ", lastCell = " + lastCell); + } else if (keyComp == 0) { + isDuplicateKey = true; + } + } + return isDuplicateKey; + } + + @Override + public void flush() throws IOException { + int startOffset = rowsOffset.get(0); + onDiskSize = out.size() - startOffset; + out.writeInt(rowsOffset.size()); + for (int i = 0; i < rowsOffset.size(); i++) { + out.writeInt(rowsOffset.get(i) - startOffset); + } + out.writeInt(onDiskSize); + if (LOG.isTraceEnabled()) { + LOG.trace("RowNumber: " + rowsOffset.size()); + LOG.trace("onDiskSize: " + onDiskSize); + } + } + + protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out, + HFileBlockDefaultEncodingContext encodingCtx) throws IOException { + int size = 0; + if (encodingCtx.getHFileContext().isIncludesTags()) { + int tagsLength = cell.getTagsLength(); + ByteBufferUtils.putCompressedInt(out, tagsLength); + // There are some tags to be written + if (tagsLength > 0) { + TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext(); + // When tag compression is enabled, tagCompressionContext will have a not null value. Write + // the tags using Dictionary compression in such a case + if (tagCompressionContext != null) { + tagCompressionContext.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), + tagsLength); + } else { + out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + } + } + size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; + } + if (encodingCtx.getHFileContext().isIncludesMvcc()) { + // Copy memstore timestamp from the byte buffer to the output stream. + long memstoreTS = cell.getSequenceId(); + WritableUtils.writeVLong(out, memstoreTS); + // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be + // avoided. + size += WritableUtils.getVIntSize(memstoreTS); + } + return size; + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexSeekerV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexSeekerV1.java new file mode 100644 index 0000000..f619eb5 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexSeekerV1.java @@ -0,0 +1,824 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding.rowindexV1; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.SettableSequenceId; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.util.LRUDictionary; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.SimpleByteRange; +import org.apache.hadoop.hbase.util.SimpleMutableByteRange; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +public class RowIndexSeekerV1 implements EncodedSeeker { + private static final Log LOG = LogFactory.getLog(RowIndexSeekerV1.class); + + private static int INITIAL_KEY_BUFFER_SIZE = 512; + + private HFileBlockDecodingContext decodingCtx; + private final KVComparator comparator; + private TagCompressionContext tagCompressionContext = null; + + private ByteBuffer block; + private ByteBuffer currentBuffer; + private SeekerState current = new SeekerState(); // always valid + private SeekerState previous = new SeekerState(); // may not be valid + + private int rowNumber; + private ByteBuffer rowOffsets = null; + + public RowIndexSeekerV1(KVComparator comparator, HFileBlockDecodingContext decodingCtx) { + this.comparator = comparator; + this.decodingCtx = decodingCtx; + if (decodingCtx.getHFileContext().isCompressTags()) { + try { + tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize TagCompressionContext", e); + } + } + } + + @Override + public void setCurrentBuffer(ByteBuffer buffer) { + this.block = buffer; + if (this.tagCompressionContext != null) { + this.tagCompressionContext.clear(); + } + block.mark(); + block.position(block.limit() - Bytes.SIZEOF_INT); + int onDiskSize = block.getInt(); + block.reset(); + ByteBuffer dup = block.duplicate(); + dup.position(block.position()); + dup.limit(block.position() + onDiskSize); + currentBuffer = dup.slice(); + current.currentBuffer = currentBuffer; + ByteBufferUtils.skip(block, onDiskSize); + + rowNumber = block.getInt(); + int totalRowOffsetsLength = Bytes.SIZEOF_INT * rowNumber; + ByteBuffer rowDup = block.duplicate(); + rowDup.position(block.position()); + rowDup.limit(block.position() + totalRowOffsetsLength); + rowOffsets = rowDup.slice(); + + if (tagCompressionContext != null) { + current.tagCompressionContext = tagCompressionContext; + } + decodeFirst(); + } + + @Override + public ByteBuffer getKeyDeepCopy() { + ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength); + keyBuffer.put(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), current.keyLength); + keyBuffer.rewind(); + return keyBuffer; + } + + @Override + public ByteBuffer getValueShallowCopy() { + ByteBuffer dup = currentBuffer.duplicate(); + dup.position(current.valueOffset); + dup.limit(current.valueOffset + current.valueLength); + return dup.slice(); + } + + // @Override + public ByteBuffer getKeyValueBuffer() { + return getKeyValueBuffer(false); + } + + ByteBuffer getKeyValueBuffer(boolean addMvcc) { + ByteBuffer kvBuffer = createKVBuffer(addMvcc); + kvBuffer.putInt(current.keyLength); + kvBuffer.putInt(current.valueLength); + kvBuffer.put(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), current.keyLength); + ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.valueOffset, + current.valueLength); + if (current.tagsLength > 0) { + // Put short as unsigned + kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff)); + kvBuffer.put((byte) (current.tagsLength & 0xff)); + if (current.tagsOffset != -1) { + // the offset of the tags bytes in the underlying buffer is marked. So the temp + // buffer,tagsBuffer was not been used. + ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.tagsOffset, + current.tagsLength); + } else { + // When tagsOffset is marked as -1, tag compression was present and so the tags were + // uncompressed into temp buffer, tagsBuffer. Let us copy it from there + kvBuffer.put(current.tagsBuffer, 0, current.tagsLength); + } + } + if (addMvcc && includesMvcc()) { + ByteBufferUtils.writeVLong(kvBuffer, current.getSequenceId()); + } + kvBuffer.rewind(); + return kvBuffer; + } + + protected ByteBuffer createKVBuffer(boolean addMvcc) { + int kvBufSize = + (int) KeyValue.getKeyValueDataStructureSize(current.keyLength, current.valueLength, + current.tagsLength); + if (addMvcc && includesMvcc()) { + kvBufSize += WritableUtils.getVIntSize(current.getSequenceId()); + } + ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize); + return kvBuffer; + } + + @Override + public Cell getKeyValue() { + return current.shallowCopy(); + } + + @Override + public void rewind() { + currentBuffer.rewind(); + if (tagCompressionContext != null) { + tagCompressionContext.clear(); + } + decodeFirst(); + } + + @Override + public boolean next() { + if (!currentBuffer.hasRemaining()) { + return false; + } + decodeNext(); + previous.invalidate(); + return true; + } + + @Override + public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) { + return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore); + } + + private int binarySearch(Cell seekCell, boolean seekBefore) { + int low = 0; + int high = rowNumber - 1; + int mid = (low + high) >>> 1; + int comp = 0; + while (low <= high) { + mid = (low + high) >>> 1; + SimpleByteRange row = getRow(mid); + comp = + comparator.compareRows(row.getBytes(), row.getOffset(), row.getLength(), + seekCell.getRowArray(), seekCell.getRowOffset(), seekCell.getRowLength()); + if (comp < 0) { + low = mid + 1; + } else if (comp > 0) { + high = mid - 1; + } else { + // key found + if (seekBefore) { + return mid - 1; + } else { + return mid; + } + } + } + // key not found. + if (comp > 0) { + return mid - 1; + } else { + return mid; + } + } + + private SimpleByteRange getRow(int index) { + int offset = + Bytes.toIntUnsafe(rowOffsets.array(), rowOffsets.arrayOffset() + index * Bytes.SIZEOF_INT); + ByteBuffer block = currentBuffer.duplicate(); + block.position(offset + Bytes.SIZEOF_LONG); + short rowLen = block.getShort(); + SimpleByteRange row = + new SimpleByteRange(block.array(), block.arrayOffset() + block.position(), rowLen); + return row; + } + + @Override + public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) { + previous.invalidate(); + int index = binarySearch(seekCell, seekBefore); + if (index < 0) { + return HConstants.INDEX_KEY_MAGIC; // using optimized index key + } else { + int offset = + Bytes + .toIntUnsafe(rowOffsets.array(), rowOffsets.arrayOffset() + index * Bytes.SIZEOF_INT); + if (offset != 0) { + decodeAtPosition(offset); + } + } + do { + int comp; + Cell r = + new KeyValue.KeyOnlyKeyValue(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), + current.keyBuffer.getLength()); + comp = comparator.compareOnlyKeyPortion(seekCell, r); + // if (LOG.isTraceEnabled()) { + // LOG.trace("scan cell: " + r.toString()); + // } + if (comp == 0) { // exact match + if (seekBefore) { + // TODO + if (!previous.isValid()) { + // The caller (seekBefore) has to ensure that we are not at the + // first key in the block. + throw new IllegalStateException("Cannot seekBefore if " + + "positioned at the first key in the block: key=" + + Bytes.toStringBinary(seekCell.getRowArray())); + } + moveToPrevious(); + return 1; + } + return 0; + } + + if (comp < 0) { // already too large, check previous + if (previous.isValid()) { + moveToPrevious(); + } else { + return HConstants.INDEX_KEY_MAGIC; // using optimized index key + } + return 1; + } + + // move to next, if more data is available + if (currentBuffer.hasRemaining()) { + previous.copyFromNext(current); + decodeNext(); + } else { + break; + } + } while (true); + + // we hit the end of the block, not an exact match + return 1; + } + + private void moveToPrevious() { + if (!previous.isValid()) { + throw new IllegalStateException("Can move back only once and not in first key in the block."); + } + + SeekerState tmp = previous; + previous = current; + current = tmp; + + // move after last key value + currentBuffer.position(current.nextKvOffset); + // Already decoded the tag bytes. We cache this tags into current state and also the total + // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode + // the tags again. This might pollute the Data Dictionary what we use for the compression. + // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip + // 'tagsCompressedLength' bytes of source stream. + // See in decodeTags() + current.tagsBuffer = previous.tagsBuffer; + current.tagsCompressedLength = previous.tagsCompressedLength; + current.uncompressTags = false; + current.setKey(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), current.keyBuffer.getLength()); + previous.invalidate(); + } + + @Override + public int compareKey(KVComparator comparator, byte[] key, int offset, int length) { + return comparator.compareFlatKey(key, offset, length, current.keyBuffer.getBytes(), + current.keyBuffer.getOffset(), current.keyBuffer.getLength()); + } + + @Override + public int compareKey(KVComparator comparator, Cell key) { + return comparator.compareOnlyKeyPortion(key, + new KeyValue.KeyOnlyKeyValue(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), + current.keyBuffer.getLength())); + } + + protected void decodeFirst() { + decodeNext(); + previous.invalidate(); + } + + protected void decodeAtPosition(int position) { + currentBuffer.position(position); + decodeNext(); + previous.invalidate(); + } + + protected void decodeNext() { + int p = currentBuffer.position() + currentBuffer.arrayOffset(); + long ll = Bytes.toLong(currentBuffer.array(), p); + // Read top half as an int of key length and bottom int as value length + current.keyLength = (int) (ll >> Integer.SIZE); + current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll); + ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_LONG); + // current.keyLength = currentBuffer.getInt(); + // current.valueLength = currentBuffer.getInt(); + current.keyBuffer.set(currentBuffer.array(), + currentBuffer.arrayOffset() + currentBuffer.position(), current.keyLength); + ByteBufferUtils.skip(currentBuffer, current.keyLength); + current.valueOffset = currentBuffer.position(); + ByteBufferUtils.skip(currentBuffer, current.valueLength); + if (includesTags()) { + decodeTags(); + } + if (includesMvcc()) { + current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); + } else { + current.memstoreTS = 0; + } + current.nextKvOffset = currentBuffer.position(); + current.setKey(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), + current.keyBuffer.getLength()); + } + + protected boolean includesMvcc() { + return this.decodingCtx.getHFileContext().isIncludesMvcc(); + } + + protected boolean includesTags() { + return this.decodingCtx.getHFileContext().isIncludesTags(); + } + + protected void decodeTags() { + current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer); + if (tagCompressionContext != null) { + if (current.uncompressTags) { + // Tag compression is been used. uncompress it into tagsBuffer + current.ensureSpaceForTags(); + try { + current.tagsCompressedLength = + tagCompressionContext.uncompressTags(currentBuffer, current.tagsBuffer, 0, + current.tagsLength); + } catch (IOException e) { + throw new RuntimeException("Exception while uncompressing tags", e); + } + } else { + ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength); + current.uncompressTags = true;// Reset this. + } + current.tagsOffset = -1; + } else { + // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer. + // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer() + current.tagsOffset = currentBuffer.position(); + ByteBufferUtils.skip(currentBuffer, current.tagsLength); + } + } + + protected static class SeekerState implements Cell { + protected ByteBuffer currentBuffer; + protected TagCompressionContext tagCompressionContext; + protected int valueOffset = -1; + protected int keyLength; + protected int valueLength; + protected int tagsLength = 0; + protected int tagsOffset = -1; + protected int tagsCompressedLength = 0; + protected boolean uncompressTags = true; + + protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE]; + + protected SimpleMutableByteRange keyBuffer = new SimpleMutableByteRange(); + protected long memstoreTS; + protected int nextKvOffset; + protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue(); + + protected boolean isValid() { + return valueOffset != -1; + } + + protected void invalidate() { + valueOffset = -1; + tagsCompressedLength = 0; + currentKey = new KeyValue.KeyOnlyKeyValue(); + uncompressTags = true; + currentBuffer = null; + } + + protected void ensureSpaceForTags() { + if (tagsLength > tagsBuffer.length) { + // rare case, but we need to handle arbitrary length of tags + int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2; + while (tagsLength > newTagsBufferLength) { + newTagsBufferLength *= 2; + } + byte[] newTagsBuffer = new byte[newTagsBufferLength]; + System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length); + tagsBuffer = newTagsBuffer; + } + } + + protected void setKey(byte[] key, int offset, int length) { + currentKey.setKey(key, offset, length); + } + + protected void setMemstoreTS(long memTS) { + memstoreTS = memTS; + } + + /** + * Copy the state from the next one into this instance (the previous state placeholder). Used to + * save the previous state when we are advancing the seeker to the next key/value. + */ + protected void copyFromNext(SeekerState nextState) { + keyBuffer.set(nextState.keyBuffer.getBytes(), nextState.keyBuffer.getOffset(), + nextState.keyBuffer.getLength()); + currentKey = nextState.currentKey; + + valueOffset = nextState.valueOffset; + keyLength = nextState.keyLength; + valueLength = nextState.valueLength; + nextKvOffset = nextState.nextKvOffset; + memstoreTS = nextState.memstoreTS; + currentBuffer = nextState.currentBuffer; + tagsOffset = nextState.tagsOffset; + tagsLength = nextState.tagsLength; + if (nextState.tagCompressionContext != null) { + tagCompressionContext = nextState.tagCompressionContext; + } + } + + @Override + public byte[] getRowArray() { + return currentKey.getRowArray(); + } + + @Override + public int getRowOffset() { + return Bytes.SIZEOF_SHORT; + } + + @Override + public short getRowLength() { + return currentKey.getRowLength(); + } + + @Override + public byte[] getFamilyArray() { + return currentKey.getFamilyArray(); + } + + @Override + public int getFamilyOffset() { + return currentKey.getFamilyOffset(); + } + + @Override + public byte getFamilyLength() { + return currentKey.getFamilyLength(); + } + + @Override + public byte[] getQualifierArray() { + return currentKey.getQualifierArray(); + } + + @Override + public int getQualifierOffset() { + return currentKey.getQualifierOffset(); + } + + @Override + public int getQualifierLength() { + return currentKey.getQualifierLength(); + } + + @Override + public long getTimestamp() { + return currentKey.getTimestamp(); + } + + @Override + public byte getTypeByte() { + return currentKey.getTypeByte(); + } + + @Override + public long getMvccVersion() { + return memstoreTS; + } + + @Override + public long getSequenceId() { + return memstoreTS; + } + + @Override + public byte[] getValueArray() { + return currentBuffer.array(); + } + + @Override + public int getValueOffset() { + return currentBuffer.arrayOffset() + valueOffset; + } + + @Override + public int getValueLength() { + return valueLength; + } + + @Override + public byte[] getTagsArray() { + if (tagCompressionContext != null) { + return tagsBuffer; + } + return currentBuffer.array(); + } + + @Override + public int getTagsOffset() { + if (tagCompressionContext != null) { + return 0; + } + return currentBuffer.arrayOffset() + tagsOffset; + } + + @Override + public int getTagsLength() { + return tagsLength; + } + + @Override + @Deprecated + public byte[] getValue() { + throw new UnsupportedOperationException("getValue() not supported"); + } + + @Override + @Deprecated + public byte[] getFamily() { + throw new UnsupportedOperationException("getFamily() not supported"); + } + + @Override + @Deprecated + public byte[] getQualifier() { + throw new UnsupportedOperationException("getQualifier() not supported"); + } + + @Override + @Deprecated + public byte[] getRow() { + throw new UnsupportedOperationException("getRow() not supported"); + } + + @Override + public String toString() { + return KeyValue.keyToString(this.keyBuffer.getBytes(), this.keyBuffer.getOffset(), + KeyValueUtil.keyLength(this)) + + "/vlen=" + getValueLength() + "/seqid=" + memstoreTS; + } + + public Cell shallowCopy() { + return new ClonedSeekerState(currentBuffer, currentKey.getRowArray(), + currentKey.getRowOffset(), currentKey.getRowLength(), currentKey.getFamilyOffset(), + currentKey.getFamilyLength(), keyLength, currentKey.getQualifierOffset(), + currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(), + valueLength, valueOffset, memstoreTS, tagsOffset, tagsLength, tagCompressionContext, + tagsBuffer); + } + } + + /** + * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state + * members for taking a clone. Note that the value byte[] part is still pointing to the + * currentBuffer and the represented by the valueOffset and valueLength + */ + // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId + // there. So this has to be an instance of SettableSequenceId. SeekerState need not be + // SettableSequenceId as we never return that to top layers. When we have to, we make + // ClonedSeekerState from it. + protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId { + private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) + + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY)); + + private ByteBuffer currentBuffer; + private byte[] keyOnlyBuffer; + private int rowOffset; + private short rowLength; + private int familyOffset; + private byte familyLength; + private int qualifierOffset; + private int qualifierLength; + private long timestamp; + private byte typeByte; + private int valueOffset; + private int valueLength; + private int tagsLength; + private int tagsOffset; + private byte[] cloneTagsBuffer; + private long seqId; + private TagCompressionContext tagCompressionContext; + + protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, int rowOffset, + short rowLength, int familyOffset, byte familyLength, int keyLength, int qualOffset, + int qualLength, long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId, + int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext, + byte[] tagsBuffer) { + this.currentBuffer = currentBuffer; + this.keyOnlyBuffer = keyBuffer; + this.tagCompressionContext = tagCompressionContext; + this.rowOffset = rowOffset; + this.rowLength = rowLength; + this.familyOffset = familyOffset; + this.familyLength = familyLength; + this.qualifierOffset = qualOffset; + this.qualifierLength = qualLength; + this.timestamp = timeStamp; + this.typeByte = typeByte; + this.valueLength = valueLen; + this.valueOffset = valueOffset; + this.tagsOffset = tagsOffset; + this.tagsLength = tagsLength; + if (tagCompressionContext != null) { + this.cloneTagsBuffer = new byte[tagsLength]; + System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength); + } + setSequenceId(seqId); + } + + @Override + public byte[] getRowArray() { + return keyOnlyBuffer; + } + + @Override + public byte[] getFamilyArray() { + return keyOnlyBuffer; + } + + @Override + public byte[] getQualifierArray() { + return keyOnlyBuffer; + } + + @Override + public int getRowOffset() { + return rowOffset; + } + + @Override + public short getRowLength() { + return rowLength; + } + + @Override + public int getFamilyOffset() { + return familyOffset; + } + + @Override + public byte getFamilyLength() { + return familyLength; + } + + @Override + public int getQualifierOffset() { + return qualifierOffset; + } + + @Override + public int getQualifierLength() { + return qualifierLength; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public byte getTypeByte() { + return typeByte; + } + + @Override + @Deprecated + public long getMvccVersion() { + return getSequenceId(); + } + + @Override + public long getSequenceId() { + return seqId; + } + + @Override + public byte[] getValueArray() { + return currentBuffer.array(); + } + + @Override + public int getValueOffset() { + return currentBuffer.arrayOffset() + valueOffset; + } + + @Override + public int getValueLength() { + return valueLength; + } + + @Override + public byte[] getTagsArray() { + if (tagCompressionContext != null) { + return cloneTagsBuffer; + } + return currentBuffer.array(); + } + + @Override + public int getTagsOffset() { + if (tagCompressionContext != null) { + return 0; + } + return currentBuffer.arrayOffset() + tagsOffset; + } + + @Override + public int getTagsLength() { + return tagsLength; + } + + @Override + @Deprecated + public byte[] getValue() { + return CellUtil.cloneValue(this); + } + + @Override + @Deprecated + public byte[] getFamily() { + return CellUtil.cloneFamily(this); + } + + @Override + @Deprecated + public byte[] getQualifier() { + return CellUtil.cloneQualifier(this); + } + + @Override + @Deprecated + public byte[] getRow() { + return CellUtil.cloneRow(this); + } + + @Override + public String toString() { + return KeyValue.keyToString(this.keyOnlyBuffer, rowOffset, KeyValueUtil.keyLength(this)) + "/vlen=" + + getValueLength() + "/seqid=" + seqId; + } + + @Override + public void setSequenceId(long seqId) { + this.seqId = seqId; + } + + @Override + public long heapSize() { + return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; + } + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV2/RowIndexCodecV2.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV2/RowIndexCodecV2.java new file mode 100644 index 0000000..563fccb --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV2/RowIndexCodecV2.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding.rowindexV2; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.EncodingState; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.util.LRUDictionary; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +public class RowIndexCodecV2 implements DataBlockEncoder { + + private static class RowIndexEncodingState extends EncodingState { + RowIndexEncoderV2 builder = null; + } + + @Override + public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out) + throws IOException { + if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { + throw new IOException(this.getClass().getName() + " only accepts " + + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context."); + } + + HFileBlockDefaultEncodingContext encodingCtx = + (HFileBlockDefaultEncodingContext) blkEncodingCtx; + encodingCtx.prepareEncoding(out); + if (encodingCtx.getHFileContext().isIncludesTags() + && encodingCtx.getHFileContext().isCompressTags()) { + if (encodingCtx.getTagCompressionContext() != null) { + // It will be overhead to create the TagCompressionContext again and again for every block + // encoding. + encodingCtx.getTagCompressionContext().clear(); + } else { + try { + TagCompressionContext tagCompressionContext = new TagCompressionContext( + LRUDictionary.class, Byte.MAX_VALUE); + encodingCtx.setTagCompressionContext(tagCompressionContext); + } catch (Exception e) { + throw new IOException("Failed to initialize TagCompressionContext", e); + } + } + } + + RowIndexEncoderV2 builder = new RowIndexEncoderV2(out, encodingCtx); + RowIndexEncodingState state = new RowIndexEncodingState(); + state.builder = builder; + blkEncodingCtx.setEncodingState(state); + } + + @Override + public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) + throws IOException { + RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx.getEncodingState(); + RowIndexEncoderV2 builder = state.builder; + builder.write(cell); + int size = KeyValueUtil.length(cell); + if (encodingCtx.getHFileContext().isIncludesMvcc()) { + size += WritableUtils.getVIntSize(cell.getSequenceId()); + } + return size; + } + + @Override + public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, + byte[] uncompressedBytesWithHeader) throws IOException { + RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx.getEncodingState(); + RowIndexEncoderV2 builder = state.builder; + builder.flush(); + // do i need to check this, or will it always be DataBlockEncoding.PREFIX_TREE? + if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) { + encodingCtx.postEncoding(BlockType.ENCODED_DATA); + } else { + encodingCtx.postEncoding(BlockType.DATA); + } + } + + @Override + public ByteBuffer decodeKeyValues(DataInputStream source, HFileBlockDecodingContext decodingCtx) + throws IOException { + /** + * I don't think this method is called during normal HBase operation, so efficiency is not + * important. + */ + ByteBuffer sourceAsBuffer = ByteBufferUtils.drainInputStreamToBuffer(source);// waste + sourceAsBuffer.mark(); + RowIndexSeekerV2 seeker = new RowIndexSeekerV2(KeyValue.COMPARATOR, decodingCtx); + seeker.setCurrentBuffer(sourceAsBuffer); + List kvs = new ArrayList(); + kvs.add(seeker.getKeyValueBuffer(true)); + while (seeker.next()) { + kvs.add(seeker.getKeyValueBuffer(true)); + } + int totalLength = 0; + for (ByteBuffer buf : kvs) { + totalLength += buf.remaining(); + } + byte[] keyValueBytes = new byte[totalLength]; + ByteBuffer result = ByteBuffer.wrap(keyValueBytes); + for (ByteBuffer buf : kvs) { + result.put(buf); + } + return result; + } + + @Override + public ByteBuffer getFirstKeyInBlock(ByteBuffer block) { + block.mark(); + EncodedSeeker seeker = new RowIndexSeekerV2(); + seeker.setCurrentBuffer(block); + ByteBuffer firstKey = seeker.getKeyDeepCopy(); + block.reset(); + return firstKey; + } + + @Override + public EncodedSeeker createSeeker(KVComparator comparator, HFileBlockDecodingContext decodingCtx) { + return new RowIndexSeekerV2(comparator, decodingCtx); + } + + @Override + public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding, + byte[] header, HFileContext meta) { + // TODO Auto-generated method stub + return new HFileBlockDefaultEncodingContext(encoding, header, meta); + } + + @Override + public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) { + return new HFileBlockDefaultDecodingContext(meta); + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV2/RowIndexEncoderV2.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV2/RowIndexEncoderV2.java new file mode 100644 index 0000000..38e78ce --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV2/RowIndexEncoderV2.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding.rowindexV2; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.CellOutputStream; +import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +public class RowIndexEncoderV2 implements CellOutputStream { + private static final Log LOG = LogFactory.getLog(RowIndexEncoderV2.class); + + /** Key comparator. Used to ensure we write in order. */ + private final KVComparator comparator; + /** The Cell previously appended. Becomes the last cell in the block.*/ + private Cell lastCell = null; + + private DataOutputStream out; + private HFileBlockDefaultEncodingContext encodingCtx; + private List rowsOffset = new ArrayList(); + private int onDiskSize = 0; + + public RowIndexEncoderV2(DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) { + this.out = out; + this.encodingCtx = encodingCtx; + this.comparator = KeyValue.COMPARATOR; + } + + @Override + public void write(Cell cell) throws IOException { + // checkKey uses comparator to check we are writing in order. + boolean dupKey = checkKey(cell); + if (!dupKey) { + rowsOffset.add(out.size()); + if (LOG.isTraceEnabled()) { + LOG.trace("Row: " + Bytes.toString(CellUtil.cloneRow(cell)) + ", size: " + out.size()); + } + } + // int klength = KeyValueUtil.keyLength(cell); + int klength = getkeyLengthExcludeFamily(cell); + int vlength = cell.getValueLength(); + out.writeInt(klength); + out.writeInt(vlength); + // CellUtil.writeFlatKey(cell, out); + writeFlatKeyExcludeFamily(cell, out); + // Write the value part + out.write(cell.getValueArray(), cell.getValueOffset(), vlength); + // Write the additional tag into the stream + afterEncodingKeyValue(cell, out, encodingCtx); + lastCell = cell; + } + + static int getkeyLengthExcludeFamily(Cell cell) { + // exclude family + return KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE + cell.getRowLength() + + cell.getQualifierLength(); + } + + static void writeFlatKeyExcludeFamily(Cell cell, DataOutputStream out) throws IOException { + short rowLen = cell.getRowLength(); + out.writeShort(rowLen); + out.write(cell.getRowArray(), cell.getRowOffset(), rowLen); + // byte fLen = cell.getFamilyLength(); + // out.writeByte(fLen); + // out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen); + out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + out.writeLong(cell.getTimestamp()); + out.writeByte(cell.getTypeByte()); + } + + protected boolean checkKey(final Cell cell) throws IOException { + boolean isDuplicateKey = false; + if (cell == null) { + throw new IOException("Key cannot be null or empty"); + } + if (lastCell != null) { + int keyComp = comparator.compareRows(lastCell, cell); + if (keyComp > 0) { + throw new IOException("Added a key not lexically larger than" + + " previous. Current cell = " + cell + ", lastCell = " + lastCell); + } else if (keyComp == 0) { + isDuplicateKey = true; + } + } + return isDuplicateKey; + } + + @Override + public void flush() throws IOException { + int startOffset = rowsOffset.get(0); + onDiskSize = out.size() - startOffset; + out.writeInt(rowsOffset.size()); + for (int i = 0; i < rowsOffset.size(); i++) { + out.writeInt(rowsOffset.get(i) - startOffset); + } + byte fLen = lastCell.getFamilyLength(); + out.writeByte(fLen); + out.write(lastCell.getFamilyArray(), lastCell.getFamilyOffset(), fLen); + out.writeInt(onDiskSize); + if (LOG.isTraceEnabled()) { + LOG.trace("RowNumber: " + rowsOffset.size()); + LOG.trace("onDiskSize: " + onDiskSize); + } + } + + protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out, + HFileBlockDefaultEncodingContext encodingCtx) throws IOException { + int size = 0; + if (encodingCtx.getHFileContext().isIncludesTags()) { + int tagsLength = cell.getTagsLength(); + ByteBufferUtils.putCompressedInt(out, tagsLength); + // There are some tags to be written + if (tagsLength > 0) { + TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext(); + // When tag compression is enabled, tagCompressionContext will have a not null value. Write + // the tags using Dictionary compression in such a case + if (tagCompressionContext != null) { + tagCompressionContext.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), + tagsLength); + } else { + out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + } + } + size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; + } + if (encodingCtx.getHFileContext().isIncludesMvcc()) { + // Copy memstore timestamp from the byte buffer to the output stream. + long memstoreTS = cell.getSequenceId(); + WritableUtils.writeVLong(out, memstoreTS); + // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be + // avoided. + size += WritableUtils.getVIntSize(memstoreTS); + } + return size; + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV2/RowIndexSeekerV2.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV2/RowIndexSeekerV2.java new file mode 100644 index 0000000..88e486a --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV2/RowIndexSeekerV2.java @@ -0,0 +1,1060 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding.rowindexV2; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.SettableSequenceId; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.util.LRUDictionary; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.ByteRange; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.SimpleByteRange; +import org.apache.hadoop.hbase.util.SimpleMutableByteRange; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +public class RowIndexSeekerV2 implements EncodedSeeker { + private static final Log LOG = LogFactory.getLog(RowIndexSeekerV2.class); + + private static int INITIAL_KEY_BUFFER_SIZE = 512; + + private HFileBlockDecodingContext decodingCtx; + private KVComparator comparator; + private TagCompressionContext tagCompressionContext = null; + + private ByteBuffer block; + private ByteBuffer currentBuffer; + private SeekerState current = new SeekerState(); // always valid + private SeekerState previous = new SeekerState(); // may not be valid + + private int rowNumber; + private ByteBuffer rowOffsets = null; + private SimpleByteRange family = null; + + public RowIndexSeekerV2() { + } + + public RowIndexSeekerV2(KVComparator comparator, HFileBlockDecodingContext decodingCtx) { + this.comparator = comparator; + this.decodingCtx = decodingCtx; + if (decodingCtx.getHFileContext().isCompressTags()) { + try { + tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize TagCompressionContext", e); + } + } + } + + @Override + public void setCurrentBuffer(ByteBuffer buffer) { + this.block = buffer; + if (this.tagCompressionContext != null) { + this.tagCompressionContext.clear(); + } + + // cells content + block.mark(); + block.position(block.limit() - Bytes.SIZEOF_INT); + int onDiskSize = block.getInt(); + block.reset(); + ByteBuffer dup = block.duplicate(); + dup.position(block.position()); + dup.limit(block.position() + onDiskSize); + currentBuffer = dup.slice(); + current.currentBuffer = currentBuffer; + ByteBufferUtils.skip(block, onDiskSize); + + // read row offsets + rowNumber = block.getInt(); + int totalRowOffsetsLength = Bytes.SIZEOF_INT * rowNumber; + ByteBuffer rowDup = block.duplicate(); + rowDup.position(block.position()); + rowDup.limit(block.position() + totalRowOffsetsLength); + rowOffsets = rowDup.slice(); + ByteBufferUtils.skip(block, totalRowOffsetsLength); + + // read family + byte fLen = block.get(); + family = new SimpleByteRange(block.array(), block.arrayOffset() + block.position(), fLen); + ByteBufferUtils.skip(block, fLen); + if (tagCompressionContext != null) { + current.tagCompressionContext = tagCompressionContext; + } + decodeFirst(); + } + + @Override + public ByteBuffer getKeyDeepCopy() { + ByteBuffer keyBuffer = + ByteBuffer.allocate(current.keyLength + KeyValue.FAMILY_LENGTH_SIZE + family.getLength()); + writeWholeKey(keyBuffer); + keyBuffer.rewind(); + return keyBuffer; + } + + @Override + public ByteBuffer getValueShallowCopy() { + ByteBuffer dup = currentBuffer.duplicate(); + dup.position(current.valueOffset); + dup.limit(current.valueOffset + current.valueLength); + return dup.slice(); + } + + public void writeWholeKey(ByteBuffer keyBuffer) { + int rowlength = current.currentKey.getRowLength(); + keyBuffer.put(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), rowlength + + KeyValue.ROW_LENGTH_SIZE); + keyBuffer.put((byte) family.getLength()); + keyBuffer.put(family.getBytes(), family.getOffset(), family.getLength()); + keyBuffer.put(current.keyBuffer.getBytes(), current.keyBuffer.getOffset() + rowlength + + KeyValue.ROW_LENGTH_SIZE, current.keyLength - rowlength - KeyValue.ROW_LENGTH_SIZE); + } + + // @Override + public ByteBuffer getKeyValueBuffer() { + return getKeyValueBuffer(false); + } + + ByteBuffer getKeyValueBuffer(boolean addMvcc) { + ByteBuffer kvBuffer = createKVBuffer(addMvcc); + kvBuffer.putInt(current.keyLength + KeyValue.FAMILY_LENGTH_SIZE + family.getLength()); + kvBuffer.putInt(current.valueLength); + // kvBuffer.put(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), current.keyLength); + writeWholeKey(kvBuffer); + ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.valueOffset, + current.valueLength); + if (current.tagsLength > 0) { + // Put short as unsigned + kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff)); + kvBuffer.put((byte) (current.tagsLength & 0xff)); + if (current.tagsOffset != -1) { + // the offset of the tags bytes in the underlying buffer is marked. So the temp + // buffer,tagsBuffer was not been used. + ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.tagsOffset, + current.tagsLength); + } else { + // When tagsOffset is marked as -1, tag compression was present and so the tags were + // uncompressed into temp buffer, tagsBuffer. Let us copy it from there + kvBuffer.put(current.tagsBuffer, 0, current.tagsLength); + } + } + if (addMvcc && includesMvcc()) { + ByteBufferUtils.writeVLong(kvBuffer, current.getSequenceId()); + } + kvBuffer.rewind(); + return kvBuffer; + } + + protected ByteBuffer createKVBuffer(boolean addMvcc) { + int kvBufSize = + (int) KeyValue.getKeyValueDataStructureSize(current.keyLength + KeyValue.FAMILY_LENGTH_SIZE + + family.getLength(), current.valueLength, current.tagsLength); + if (addMvcc && includesMvcc()) { + kvBufSize += WritableUtils.getVIntSize(current.getSequenceId()); + } + ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize); + return kvBuffer; + } + + @Override + public Cell getKeyValue() { + return current.shallowCopy(); + } + + @Override + public void rewind() { + currentBuffer.rewind(); + if (tagCompressionContext != null) { + tagCompressionContext.clear(); + } + decodeFirst(); + } + + @Override + public boolean next() { + if (!currentBuffer.hasRemaining()) { + return false; + } + decodeNext(); + previous.invalidate(); + return true; + } + + @Override + public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) { + return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore); + } + + private int binarySearch(Cell seekCell, boolean seekBefore) { + int low = 0; + int high = rowNumber - 1; + int mid = (low + high) >>> 1; + int comp = 0; + while (low <= high) { + mid = (low + high) >>> 1; + SimpleByteRange row = getRow(mid); + comp = + comparator.compareRows(row.getBytes(), row.getOffset(), row.getLength(), + seekCell.getRowArray(), seekCell.getRowOffset(), seekCell.getRowLength()); + if (comp < 0) { + low = mid + 1; + } else if (comp > 0) { + high = mid - 1; + } else { + // key found + if (seekBefore) { + return mid - 1; + } else { + return mid; + } + } + } + // key not found. + if (comp > 0) { + // small than mid + return mid - 1; + } else { + // larger than mid. + return mid; + } + } + + private SimpleByteRange getRow(int index) { + int offset = + Bytes.toIntUnsafe(rowOffsets.array(), rowOffsets.arrayOffset() + index * Bytes.SIZEOF_INT); + ByteBuffer block = currentBuffer.duplicate(); + // Bytes.SIZEOF_LONG = keyLength valueLength + block.position(offset + Bytes.SIZEOF_LONG); + short rowLen = block.getShort(); + SimpleByteRange row = + new SimpleByteRange(block.array(), block.arrayOffset() + block.position(), rowLen); + return row; + } + + @Override + public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) { + previous.invalidate(); + int index = binarySearch(seekCell, seekBefore); + if (index < 0) { + return HConstants.INDEX_KEY_MAGIC; // using optimized index key + } else { + int offset = + Bytes + .toIntUnsafe(rowOffsets.array(), rowOffsets.arrayOffset() + index * Bytes.SIZEOF_INT); + if (offset != 0) { + decodeAtPosition(offset); + } + } + do { + int comp; + Cell r = current.currentKey; + comp = comparator.compareOnlyKeyPortion(seekCell, r); + // if (LOG.isTraceEnabled()) { + // LOG.trace("scan cell: " + r.toString()); + // } + if (comp == 0) { // exact match + if (seekBefore) { + // TODO + if (!previous.isValid()) { + // The caller (seekBefore) has to ensure that we are not at the + // first key in the block. + throw new IllegalStateException("Cannot seekBefore if " + + "positioned at the first key in the block: key=" + + Bytes.toStringBinary(seekCell.getRowArray())); + } + moveToPrevious(); + return 1; + } + return 0; + } + + if (comp < 0) { // already too large, check previous + if (previous.isValid()) { + moveToPrevious(); + } else { + return HConstants.INDEX_KEY_MAGIC; // using optimized index key + } + return 1; + } + + // move to next, if more data is available + if (currentBuffer.hasRemaining()) { + previous.copyFromNext(current); + decodeNext(); + } else { + break; + } + } while (true); + + // we hit the end of the block, not an exact match + return 1; + } + + private void moveToPrevious() { + if (!previous.isValid()) { + throw new IllegalStateException("Can move back only once and not in first key in the block."); + } + + SeekerState tmp = previous; + previous = current; + current = tmp; + + // move after last key value + currentBuffer.position(current.nextKvOffset); + // Already decoded the tag bytes. We cache this tags into current state and also the total + // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode + // the tags again. This might pollute the Data Dictionary what we use for the compression. + // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip + // 'tagsCompressedLength' bytes of source stream. + // See in decodeTags() + current.tagsBuffer = previous.tagsBuffer; + current.tagsCompressedLength = previous.tagsCompressedLength; + current.uncompressTags = false; + // current.setKey(current.keyBuffer, current.memstoreTS); + current.setKey(current.keyBuffer, family); + previous.invalidate(); + } + + @Override + public int compareKey(KVComparator comparator, byte[] key, int offset, int length) { + return comparator.compareOnlyKeyPortion(new KeyValue.KeyOnlyKeyValue(key, offset, length), + current.currentKey); + } + + @Override + public int compareKey(KVComparator comparator, Cell key) { + return comparator.compareOnlyKeyPortion(key, current.currentKey); + } + + protected void decodeFirst() { + decodeNext(); + previous.invalidate(); + } + + protected void decodeAtPosition(int position) { + currentBuffer.position(position); + decodeNext(); + previous.invalidate(); + } + + protected void decodeNext() { + int p = currentBuffer.position() + currentBuffer.arrayOffset(); + long ll = Bytes.toLong(currentBuffer.array(), p); + // Read top half as an int of key length and bottom int as value length + current.keyLength = (int) (ll >> Integer.SIZE); + current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll); + ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_LONG); + // current.keyLength = currentBuffer.getInt(); + // current.valueLength = currentBuffer.getInt(); + current.keyBuffer.set(currentBuffer.array(), + currentBuffer.arrayOffset() + currentBuffer.position(), current.keyLength); + ByteBufferUtils.skip(currentBuffer, current.keyLength); + current.valueOffset = currentBuffer.position(); + ByteBufferUtils.skip(currentBuffer, current.valueLength); + if (includesTags()) { + decodeTags(); + } + if (includesMvcc()) { + current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); + } else { + current.memstoreTS = 0; + } + current.nextKvOffset = currentBuffer.position(); + current.setKey(current.keyBuffer, family); + } + + protected boolean includesMvcc() { + if (this.decodingCtx != null) { + return this.decodingCtx.getHFileContext().isIncludesMvcc(); + } else { + return false; + } + } + + protected boolean includesTags() { + if (this.decodingCtx != null) { + return this.decodingCtx.getHFileContext().isIncludesTags(); + } else { + return false; + } + } + + protected void decodeTags() { + current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer); + if (tagCompressionContext != null) { + if (current.uncompressTags) { + // Tag compression is been used. uncompress it into tagsBuffer + current.ensureSpaceForTags(); + try { + current.tagsCompressedLength = + tagCompressionContext.uncompressTags(currentBuffer, current.tagsBuffer, 0, + current.tagsLength); + } catch (IOException e) { + throw new RuntimeException("Exception while uncompressing tags", e); + } + } else { + ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength); + current.uncompressTags = true;// Reset this. + } + current.tagsOffset = -1; + } else { + // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer. + // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer() + current.tagsOffset = currentBuffer.position(); + ByteBufferUtils.skip(currentBuffer, current.tagsLength); + } + } + + protected static class SeekerState implements Cell { + protected ByteBuffer currentBuffer; + protected TagCompressionContext tagCompressionContext; + protected int valueOffset = -1; + protected int keyLength; + protected int valueLength; + protected int tagsLength = 0; + protected int tagsOffset = -1; + protected int tagsCompressedLength = 0; + protected boolean uncompressTags = true; + + protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE]; + + protected SimpleMutableByteRange keyBuffer = new SimpleMutableByteRange(); + protected long memstoreTS; + protected int nextKvOffset; + protected KeyOnlyCell currentKey = new KeyOnlyCell(); + + protected boolean isValid() { + return valueOffset != -1; + } + + protected void invalidate() { + valueOffset = -1; + tagsCompressedLength = 0; + currentKey = new KeyOnlyCell(); + uncompressTags = true; + currentBuffer = null; + } + + protected void ensureSpaceForTags() { + if (tagsLength > tagsBuffer.length) { + // rare case, but we need to handle arbitrary length of tags + int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2; + while (tagsLength > newTagsBufferLength) { + newTagsBufferLength *= 2; + } + byte[] newTagsBuffer = new byte[newTagsBufferLength]; + System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length); + tagsBuffer = newTagsBuffer; + } + } + + protected void setKey(ByteRange keyBuffer, ByteRange family) { + currentKey.setKey(keyBuffer, family); + } + + protected void setMemstoreTS(long memTS) { + memstoreTS = memTS; + } + + /** + * Copy the state from the next one into this instance (the previous state placeholder). Used to + * save the previous state when we are advancing the seeker to the next key/value. + */ + protected void copyFromNext(SeekerState nextState) { + keyBuffer.set(nextState.keyBuffer.getBytes(), nextState.keyBuffer.getOffset(), + nextState.keyBuffer.getLength()); + currentKey = nextState.currentKey; + + valueOffset = nextState.valueOffset; + keyLength = nextState.keyLength; + valueLength = nextState.valueLength; + nextKvOffset = nextState.nextKvOffset; + memstoreTS = nextState.memstoreTS; + currentBuffer = nextState.currentBuffer; + tagsOffset = nextState.tagsOffset; + tagsLength = nextState.tagsLength; + if (nextState.tagCompressionContext != null) { + tagCompressionContext = nextState.tagCompressionContext; + } + } + + @Override + public byte[] getRowArray() { + return currentKey.getRowArray(); + } + + @Override + public int getRowOffset() { + return currentKey.getRowOffset(); + } + + @Override + public short getRowLength() { + return currentKey.getRowLength(); + } + + @Override + public byte[] getFamilyArray() { + return currentKey.getFamilyArray(); + } + + @Override + public int getFamilyOffset() { + return currentKey.getFamilyOffset(); + } + + @Override + public byte getFamilyLength() { + return currentKey.getFamilyLength(); + } + + @Override + public byte[] getQualifierArray() { + return currentKey.getQualifierArray(); + } + + @Override + public int getQualifierOffset() { + return currentKey.getQualifierOffset(); + } + + @Override + public int getQualifierLength() { + return currentKey.getQualifierLength(); + } + + @Override + public long getTimestamp() { + return currentKey.getTimestamp(); + } + + @Override + public byte getTypeByte() { + return currentKey.getTypeByte(); + } + + @Override + public long getMvccVersion() { + return memstoreTS; + } + + @Override + public long getSequenceId() { + return memstoreTS; + } + + @Override + public byte[] getValueArray() { + return currentBuffer.array(); + } + + @Override + public int getValueOffset() { + return currentBuffer.arrayOffset() + valueOffset; + } + + @Override + public int getValueLength() { + return valueLength; + } + + @Override + public byte[] getTagsArray() { + if (tagCompressionContext != null) { + return tagsBuffer; + } + return currentBuffer.array(); + } + + @Override + public int getTagsOffset() { + if (tagCompressionContext != null) { + return 0; + } + return currentBuffer.arrayOffset() + tagsOffset; + } + + @Override + public int getTagsLength() { + return tagsLength; + } + + @Override + @Deprecated + public byte[] getValue() { + throw new UnsupportedOperationException("getValue() not supported"); + } + + @Override + @Deprecated + public byte[] getFamily() { + throw new UnsupportedOperationException("getFamily() not supported"); + } + + @Override + @Deprecated + public byte[] getQualifier() { + throw new UnsupportedOperationException("getQualifier() not supported"); + } + + @Override + @Deprecated + public byte[] getRow() { + throw new UnsupportedOperationException("getRow() not supported"); + } + + @Override + public String toString() { + String family = + familyToString(currentKey.getFamilyArray(), currentKey.getFamilyOffset(), + currentKey.getFamilyLength()); + return keyToString(this.keyBuffer.getBytes(), this.keyBuffer.getOffset(), + this.keyBuffer.getLength(), family) + + "/vlen=" + getValueLength() + "/seqid=" + memstoreTS; + } + + public Cell shallowCopy() { + return new ClonedSeekerState(currentBuffer, currentKey.getRowArray(), + currentKey.getRowOffset(), currentKey.getRowLength(), currentKey.getFamilyArray(), + currentKey.getFamilyOffset(), currentKey.getFamilyLength(), + currentKey.getQualifierOffset(), currentKey.getQualifierLength(), + currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset, + memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer); + } + } + + /** + * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state + * members for taking a clone. Note that the value byte[] part is still pointing to the + * currentBuffer and the represented by the valueOffset and valueLength + */ + // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId + // there. So this has to be an instance of SettableSequenceId. SeekerState need not be + // SettableSequenceId as we never return that to top layers. When we have to, we make + // ClonedSeekerState from it. + protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId { + private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) + + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY)); + + private ByteBuffer currentBuffer; + private byte[] keyOnlyBuffer; + private byte[] familyBuffer; + private int rowOffset; + private short rowLength; + private int familyOffset; + private byte familyLength; + private int qualifierOffset; + private int qualifierLength; + private long timestamp; + private byte typeByte; + private int valueOffset; + private int valueLength; + private int tagsLength; + private int tagsOffset; + private byte[] cloneTagsBuffer; + private long seqId; + private TagCompressionContext tagCompressionContext; + + protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, int rowOffset, + short rowLength, byte[] familyBuffer, int familyOffset, byte familyLength, int qualOffset, + int qualLength, long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId, + int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext, + byte[] tagsBuffer) { + this.currentBuffer = currentBuffer; + this.keyOnlyBuffer = keyBuffer; + this.tagCompressionContext = tagCompressionContext; + this.rowOffset = rowOffset; + this.rowLength = rowLength; + this.familyBuffer = familyBuffer; + this.familyOffset = familyOffset; + this.familyLength = familyLength; + this.qualifierOffset = qualOffset; + this.qualifierLength = qualLength; + this.timestamp = timeStamp; + this.typeByte = typeByte; + this.valueLength = valueLen; + this.valueOffset = valueOffset; + this.tagsOffset = tagsOffset; + this.tagsLength = tagsLength; + if (tagCompressionContext != null) { + this.cloneTagsBuffer = new byte[tagsLength]; + System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength); + } + setSequenceId(seqId); + } + + @Override + public byte[] getRowArray() { + return keyOnlyBuffer; + } + + @Override + public byte[] getFamilyArray() { + return familyBuffer; + } + + @Override + public byte[] getQualifierArray() { + return keyOnlyBuffer; + } + + @Override + public int getRowOffset() { + return rowOffset; + } + + @Override + public short getRowLength() { + return rowLength; + } + + @Override + public int getFamilyOffset() { + return familyOffset; + } + + @Override + public byte getFamilyLength() { + return familyLength; + } + + @Override + public int getQualifierOffset() { + return qualifierOffset; + } + + @Override + public int getQualifierLength() { + return qualifierLength; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public byte getTypeByte() { + return typeByte; + } + + @Override + @Deprecated + public long getMvccVersion() { + return getSequenceId(); + } + + @Override + public long getSequenceId() { + return seqId; + } + + @Override + public byte[] getValueArray() { + return currentBuffer.array(); + } + + @Override + public int getValueOffset() { + return currentBuffer.arrayOffset() + valueOffset; + } + + @Override + public int getValueLength() { + return valueLength; + } + + @Override + public byte[] getTagsArray() { + if (tagCompressionContext != null) { + return cloneTagsBuffer; + } + return currentBuffer.array(); + } + + @Override + public int getTagsOffset() { + if (tagCompressionContext != null) { + return 0; + } + return currentBuffer.arrayOffset() + tagsOffset; + } + + @Override + public int getTagsLength() { + return tagsLength; + } + + @Override + @Deprecated + public byte[] getValue() { + return CellUtil.cloneValue(this); + } + + @Override + @Deprecated + public byte[] getFamily() { + return CellUtil.cloneFamily(this); + } + + @Override + @Deprecated + public byte[] getQualifier() { + return CellUtil.cloneQualifier(this); + } + + @Override + @Deprecated + public byte[] getRow() { + return CellUtil.cloneRow(this); + } + + @Override + public String toString() { + if (this.keyOnlyBuffer == null || getRowArray() == null || getRowLength() == 0) { + return "empty"; + } + int rowlength = getRowLength(); + String row = Bytes.toStringBinary(getRowArray(), getRowOffset(), rowlength); + String family = familyToString(getFamilyArray(), getFamilyOffset(), getFamilyLength()); + String qualifier = + getQualifierLength() == 0 ? "" : Bytes.toStringBinary(getQualifierArray(), + getQualifierOffset(), getQualifierLength()); + long timestamp = getTimestamp(); + String timestampStr = KeyValue.humanReadableTimestamp(timestamp); + byte type = getTypeByte(); + return row + "/" + family + (family != null && family.length() > 0 ? ":" : "") + qualifier + + "/" + timestampStr + "/" + Type.codeToType(type) + "/vlen=" + getValueLength() + + "/seqid=" + getSequenceId(); + } + + @Override + public void setSequenceId(long seqId) { + this.seqId = seqId; + } + + @Override + public long heapSize() { + return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; + } + } + + /** + * A simple form of KeyValue that creates a keyvalue with only the key part of the byte[] + * Mainly used in places where we need to compare two cells. Avoids copying of bytes + * In places like block index keys, we need to compare the key byte[] with a cell. + * Hence create a Keyvalue(aka Cell) that would help in comparing as two cells + */ + public static class KeyOnlyCell implements Cell { + private ByteRange keyBuffer; + private ByteRange familyBuffer; + + public KeyOnlyCell() { + } + + public void setKey(ByteRange keyBuffer, ByteRange familyBuffer) { + this.keyBuffer = keyBuffer; + this.familyBuffer = familyBuffer; + } + + @Override + public byte[] getRowArray() { + return keyBuffer.getBytes(); + } + + @Override + public int getRowOffset() { + return keyBuffer.getOffset() + Bytes.SIZEOF_SHORT; + } + + @Override + public short getRowLength() { + return Bytes.toShort(keyBuffer.getBytes(), keyBuffer.getOffset()); + } + + @Override + public byte[] getFamilyArray() { + return familyBuffer.getBytes(); + } + + @Override + public int getFamilyOffset() { + return familyBuffer.getOffset(); + } + + @Override + public byte getFamilyLength() { + return (byte) familyBuffer.getLength(); + } + + @Override + public byte[] getQualifierArray() { + return keyBuffer.getBytes(); + } + + @Override + public int getQualifierOffset() { + return getRowOffset() + getRowLength(); + } + + @Override + public int getQualifierLength() { + return keyBuffer.getLength() - getRowLength() - KeyValue.ROW_LENGTH_SIZE + - KeyValue.TIMESTAMP_TYPE_SIZE; + } + + @Override + public long getTimestamp() { + int tsOffset = keyBuffer.getOffset() + keyBuffer.getLength() - KeyValue.TIMESTAMP_TYPE_SIZE; + return Bytes.toLong(keyBuffer.getBytes(), tsOffset); + } + + @Override + public byte getTypeByte() { + return keyBuffer.getBytes()[keyBuffer.getOffset() + keyBuffer.getLength() - 1]; + } + + @Override + public long getMvccVersion() { + return 0; + } + + @Override + public long getSequenceId() { + return 0; + } + + @Override + public byte[] getValueArray() { + throw new IllegalArgumentException("KeyOnlyCell does not work with values."); + } + + @Override + public int getValueOffset() { + throw new IllegalArgumentException("KeyOnlyCell does not work with values."); + } + + @Override + public int getValueLength() { + throw new IllegalArgumentException("KeyOnlyCell does not work with values."); + } + + @Override + public byte[] getTagsArray() { + return HConstants.EMPTY_BYTE_ARRAY; + } + + @Override + public int getTagsOffset() { + return 0; + } + + @Override + public int getTagsLength() { + return 0; + } + + @Override + public byte[] getValue() { + throw new IllegalArgumentException("KeyOnlyCell does not support."); + } + + @Override + public byte[] getFamily() { + throw new IllegalArgumentException("KeyOnlyCell does not support."); + } + + @Override + public byte[] getQualifier() { + throw new IllegalArgumentException("KeyOnlyCell does not support."); + } + + @Override + public byte[] getRow() { + throw new IllegalArgumentException("KeyOnlyCell does not support."); + } + + @Override + public String toString() { + if (this.keyBuffer == null || getRowArray() == null || getRowLength() == 0) { + return "empty"; + } + String family = familyToString(this.familyBuffer); + return keyToString(this.keyBuffer.getBytes(), this.keyBuffer.getOffset(), + this.keyBuffer.getLength(), family) + + "/vlen=" + 0 + "/seqid=" + getSequenceId(); + } + } + + public static String familyToString(ByteRange familyBuffer) { + if (familyBuffer != null) { + return familyToString(familyBuffer.getBytes(), familyBuffer.getOffset(), + familyBuffer.getLength()); + } else { + return ""; + } + } + + public static String familyToString(final byte[] familyBuffer, final int familyOffset, + final int familyLength) { + if (familyBuffer != null) { + return familyLength == 0 ? "" : Bytes + .toStringBinary(familyBuffer, familyOffset, familyLength); + } else { + return ""; + } + } + + public static String keyToString(final byte[] keyBuffer, final int keyOffset, + final int keyLength, String family) { + if (keyBuffer == null || keyLength == 0) { + return "empty"; + } + int rowlength = Bytes.toShort(keyBuffer, keyOffset); + String row = Bytes.toStringBinary(keyBuffer, keyOffset + KeyValue.ROW_LENGTH_SIZE, rowlength); + int qualifierLength = + keyLength - rowlength - KeyValue.ROW_LENGTH_SIZE - KeyValue.TIMESTAMP_TYPE_SIZE; + String qualifier = + qualifierLength == 0 ? "" : Bytes.toStringBinary(keyBuffer, keyOffset + rowlength + + KeyValue.ROW_LENGTH_SIZE, qualifierLength); + int tsOffset = keyOffset + keyLength - KeyValue.TIMESTAMP_TYPE_SIZE; + long timestamp = Bytes.toLong(keyBuffer, tsOffset); + String timestampStr = KeyValue.humanReadableTimestamp(timestamp); + byte type = keyBuffer[keyOffset + keyLength - 1]; + return row + "/" + family + (family != null && family.length() > 0 ? ":" : "") + qualifier + + "/" + timestampStr + "/" + Type.codeToType(type); + } + +}