diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java new file mode 100644 index 0000000..cccbae0 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java @@ -0,0 +1,63 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This class is an extension to KeyValue where rowLen and keyLen are cached. + * Parsing the backing byte[] every time to get these values will affect the performance. + * In read path, we tend to read these values many times in Comparator, SQM etc. + * Note: Please do not use these objects in write path as it will increase the heap space usage. + * See https://issues.apache.org/jira/browse/HBASE-13448 + */ +@InterfaceAudience.Private +@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS") +public class SizeCachedKeyValue extends KeyValue { + private static final int HEAP_SIZE_OVERHEAD = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT; + + private short rowLen; + private int keyLen; + + public SizeCachedKeyValue(byte[] bytes, int offset, int length, long seqId) { + super(bytes, offset, length); + // We will read all these cached values at least once. Initialize now itself so that we can + // avoid uninitialized checks with every time call + rowLen = super.getRowLength(); + keyLen = super.getKeyLength(); + setSequenceId(seqId); + } + + @Override + public short getRowLength() { + return rowLen; + } + + @Override + public int getKeyLength() { + return this.keyLen; + } + + @Override + public long heapSize() { + return super.heapSize() + HEAP_SIZE_OVERHEAD; + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java new file mode 100644 index 0000000..d28d1a8 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java @@ -0,0 +1,52 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This class is an extension to ContentSizeCachedKeyValue where there are no tags in Cell. + * Note: Please do not use these objects in write path as it will increase the heap space usage. + * See https://issues.apache.org/jira/browse/HBASE-13448 + */ +@InterfaceAudience.Private +public class SizeCachedNoTagsKeyValue extends SizeCachedKeyValue { + + public SizeCachedNoTagsKeyValue(byte[] bytes, int offset, int length, long seqId) { + super(bytes, offset, length, seqId); + } + + @Override + public int getTagsLength() { + return 0; + } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + ByteBufferUtils.putInt(out, this.length); + out.write(this.bytes, this.offset, this.length); + return this.length + Bytes.SIZEOF_INT; + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java index 67d18ed..ba767ee 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java @@ -16,15 +16,15 @@ */ package org.apache.hadoop.hbase.io.encoding; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + import java.io.IOException; import java.io.OutputStream; import java.util.HashMap; import java.util.Map; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.util.Bytes; - /** * Provide access to all data block encoding algorithms. All of the algorithms * are required to have unique id which should NEVER be changed. If you @@ -43,7 +43,8 @@ public enum DataBlockEncoding { FAST_DIFF(4, "org.apache.hadoop.hbase.io.encoding.FastDiffDeltaEncoder"), // id 5 is reserved for the COPY_KEY algorithm for benchmarking // COPY_KEY(5, "org.apache.hadoop.hbase.io.encoding.CopyKeyDataBlockEncoder"), - PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec"); + PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec"), + ROW_INDEX_V1(7, "org.apache.hadoop.hbase.io.encoding.rowindexV1.RowIndexCodecV1"); private final short id; private final byte[] idInBytes; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexCodecV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexCodecV1.java new file mode 100644 index 0000000..5ddd107 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexCodecV1.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding.rowindexV1; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.EncodingState; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +public class RowIndexCodecV1 implements DataBlockEncoder { + + private static class RowIndexEncodingState extends EncodingState { + RowIndexEncoderV1 builder = null; + } + + @Override + public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, + DataOutputStream out) throws IOException { + if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { + throw new IOException(this.getClass().getName() + " only accepts " + + HFileBlockDefaultEncodingContext.class.getName() + " as the " + + "encoding context."); + } + + HFileBlockDefaultEncodingContext encodingCtx = (HFileBlockDefaultEncodingContext) blkEncodingCtx; + encodingCtx.prepareEncoding(out); + + RowIndexEncoderV1 builder = new RowIndexEncoderV1(out, encodingCtx); + RowIndexEncodingState state = new RowIndexEncodingState(); + state.builder = builder; + blkEncodingCtx.setEncodingState(state); + } + + @Override + public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, + DataOutputStream out) throws IOException { + RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx + .getEncodingState(); + RowIndexEncoderV1 builder = state.builder; + builder.write(cell); + int size = KeyValueUtil.length(cell); + if (encodingCtx.getHFileContext().isIncludesMvcc()) { + size += WritableUtils.getVIntSize(cell.getSequenceId()); + } + return size; + } + + @Override + public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, + DataOutputStream out, byte[] uncompressedBytesWithHeader) + throws IOException { + RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx + .getEncodingState(); + RowIndexEncoderV1 builder = state.builder; + builder.flush(); + if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) { + encodingCtx.postEncoding(BlockType.ENCODED_DATA); + } else { + encodingCtx.postEncoding(BlockType.DATA); + } + } + + @Override + public ByteBuffer decodeKeyValues(DataInputStream source, + HFileBlockDecodingContext decodingCtx) throws IOException { + ByteBuffer sourceAsBuffer = ByteBufferUtils + .drainInputStreamToBuffer(source);// waste + sourceAsBuffer.mark(); + if (!decodingCtx.getHFileContext().isIncludesTags()) { + sourceAsBuffer.position(sourceAsBuffer.limit() - Bytes.SIZEOF_INT); + int onDiskSize = sourceAsBuffer.getInt(); + sourceAsBuffer.reset(); + ByteBuffer dup = sourceAsBuffer.duplicate(); + dup.position(sourceAsBuffer.position()); + dup.limit(sourceAsBuffer.position() + onDiskSize); + return dup.slice(); + } else { + RowIndexSeekerV1 seeker = new RowIndexSeekerV1(CellComparator.COMPARATOR, + decodingCtx); + seeker.setCurrentBuffer(new SingleByteBuff(sourceAsBuffer)); + List kvs = new ArrayList(); + kvs.add(seeker.getCell()); + while (seeker.next()) { + kvs.add(seeker.getCell()); + } + boolean includesMvcc = decodingCtx.getHFileContext().isIncludesMvcc(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + for (Cell cell : kvs) { + KeyValue currentCell = KeyValueUtil.copyToNewKeyValue(cell); + out.write(currentCell.getBuffer(), currentCell.getOffset(), + currentCell.getLength()); + if (includesMvcc) { + WritableUtils.writeVLong(out, cell.getSequenceId()); + } + } + out.flush(); + return ByteBuffer.wrap(baos.getBuffer(), 0, baos.size()); + } + } + + @Override + public HFileBlockEncodingContext newDataBlockEncodingContext( + DataBlockEncoding encoding, byte[] header, HFileContext meta) { + return new HFileBlockDefaultEncodingContext(encoding, header, meta); + } + + @Override + public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) { + return new HFileBlockDefaultDecodingContext(meta); + } + + @Override + public Cell getFirstKeyCellInBlock(ByteBuff block) { + block.mark(); + int keyLength = block.getInt(); + block.getInt(); + ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate(); + block.reset(); + return createFirstKeyCell(key, keyLength); + } + + @Override + public EncodedSeeker createSeeker(CellComparator comparator, + HFileBlockDecodingContext decodingCtx) { + return new RowIndexSeekerV1(comparator, decodingCtx); + } + + protected Cell createFirstKeyCell(ByteBuffer key, int keyLength) { + if (key.hasArray()) { + return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + + key.position(), keyLength); + } else { + return new ByteBufferedKeyOnlyKeyValue(key, key.position(), keyLength); + } + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexEncoderV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexEncoderV1.java new file mode 100644 index 0000000..7eb19de --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexEncoderV1.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.io.encoding.rowindexV1; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.CellOutputStream; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +public class RowIndexEncoderV1 implements CellOutputStream { + private static final Log LOG = LogFactory.getLog(RowIndexEncoderV1.class); + + /** Key comparator. Used to ensure we write in order. */ + private final CellComparator comparator; + /** The Cell previously appended. Becomes the last cell in the file. */ + private Cell lastCell = null; + + private DataOutputStream out; + private HFileBlockDefaultEncodingContext encodingCtx; + private List rowsOffset = new ArrayList(); + private int onDiskSize = 0; + + public RowIndexEncoderV1(DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) { + this.out = out; + this.encodingCtx = encodingCtx; + this.comparator = CellComparator.COMPARATOR; + } + + @Override + public void write(Cell cell) throws IOException { + // checkRow uses comparator to check we are writing in order. + boolean dupRow = checkRow(cell); + if (!dupRow) { + rowsOffset.add(out.size()); + // if (LOG.isTraceEnabled()) { + // LOG.trace("Row: " + Bytes.toString(CellUtil.cloneRow(cell)) + + // ", size: " + out.size()); + // } + } + int klength = KeyValueUtil.keyLength(cell); + int vlength = cell.getValueLength(); + out.writeInt(klength); + out.writeInt(vlength); + CellUtil.writeFlatKey(cell, out); + // Write the value part + CellUtil.writeValue(out, cell, vlength); + // Write the additional tag into the stream + if (encodingCtx.getHFileContext().isIncludesTags()) { + int tagsLength = cell.getTagsLength(); + out.writeShort(tagsLength); + if (tagsLength > 0) { + CellUtil.writeTags(out, cell, tagsLength); + } + } + if (encodingCtx.getHFileContext().isIncludesMvcc()) { + WritableUtils.writeVLong(out, cell.getSequenceId()); + } + lastCell = cell; + } + + protected boolean checkRow(final Cell cell) throws IOException { + boolean isDuplicateRow = false; + if (cell == null) { + throw new IOException("Key cannot be null or empty"); + } + if (lastCell != null) { + int keyComp = comparator.compareRows(lastCell, cell); + if (keyComp > 0) { + throw new IOException("Added a key not lexically larger than" + + " previous. Current cell = " + cell + ", lastCell = " + lastCell); + } else if (keyComp == 0) { + isDuplicateRow = true; + } + } + return isDuplicateRow; + } + + @Override + public void flush() throws IOException { + int startOffset = rowsOffset.get(0); + onDiskSize = out.size() - startOffset; + out.writeInt(rowsOffset.size()); + for (int i = 0; i < rowsOffset.size(); i++) { + out.writeInt(rowsOffset.get(i) - startOffset); + } + out.writeInt(onDiskSize); + if (LOG.isTraceEnabled()) { + LOG.trace("RowNumber: " + rowsOffset.size()); + LOG.trace("onDiskSize: " + onDiskSize); + } + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexSeekerV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexSeekerV1.java new file mode 100644 index 0000000..ff8e6ea --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/rowindexV1/RowIndexSeekerV1.java @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding.rowindexV1; + +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ByteBufferedCell; +import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.OffheapKeyValue; +import org.apache.hadoop.hbase.SizeCachedKeyValue; +import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ObjectIntPair; + +@InterfaceAudience.Private +public class RowIndexSeekerV1 implements EncodedSeeker { + private static final Log LOG = LogFactory.getLog(RowIndexSeekerV1.class); + + private HFileBlockDecodingContext decodingCtx; + private final CellComparator comparator; + + // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too + // many object creations. + protected final ObjectIntPair tmpPair = new ObjectIntPair(); + + private ByteBuff block; + private ByteBuff currentBuffer; + private SeekerState current = new SeekerState(); // always valid + private SeekerState previous = new SeekerState(); // may not be valid + + private int rowNumber; + private ByteBuff rowOffsets = null; + + public RowIndexSeekerV1(CellComparator comparator, + HFileBlockDecodingContext decodingCtx) { + this.comparator = comparator; + this.decodingCtx = decodingCtx; + } + + @Override + public void setCurrentBuffer(ByteBuff buffer) { + this.block = buffer; + int onDiskSize = block.getInt(block.limit() - Bytes.SIZEOF_INT); + + // Data part + ByteBuff dup = block.duplicate(); + dup.position(block.position()); + dup.limit(block.position() + onDiskSize); + currentBuffer = dup.slice(); + current.currentBuffer = currentBuffer; + block.skip(onDiskSize); + + // Row offset + rowNumber = block.getInt(); + int totalRowOffsetsLength = Bytes.SIZEOF_INT * rowNumber; + ByteBuff rowDup = block.duplicate(); + rowDup.position(block.position()); + rowDup.limit(block.position() + totalRowOffsetsLength); + rowOffsets = rowDup.slice(); + + decodeFirst(); + } + + @Override + public Cell getKey() { + if (current.keyBuffer.hasArray()) { + return new KeyValue.KeyOnlyKeyValue(current.keyBuffer.array(), + current.keyBuffer.arrayOffset() + current.keyBuffer.position(), + current.keyLength); + } else { + byte[] key = new byte[current.keyLength]; + ByteBufferUtils.copyFromBufferToArray(key, current.keyBuffer, + current.keyBuffer.position(), 0, current.keyLength); + return new KeyValue.KeyOnlyKeyValue(key, 0, current.keyLength); + } + } + + @Override + public ByteBuffer getValueShallowCopy() { + currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, + tmpPair); + ByteBuffer dup = tmpPair.getFirst().duplicate(); + dup.position(tmpPair.getSecond()); + dup.limit(tmpPair.getSecond() + current.valueLength); + return dup.slice(); + } + + @Override + public Cell getCell() { + return current.toCell(); + } + + @Override + public void rewind() { + currentBuffer.rewind(); + decodeFirst(); + } + + @Override + public boolean next() { + if (!currentBuffer.hasRemaining()) { + return false; + } + decodeNext(); + previous.invalidate(); + return true; + } + + private int binarySearch(Cell seekCell, boolean seekBefore) { + int low = 0; + int high = rowNumber - 1; + int mid = (low + high) >>> 1; + int comp = 0; + while (low <= high) { + mid = (low + high) >>> 1; + ByteBuffer row = getRow(mid); + // LOG.info("row: " + // + ByteBufferUtils.toStringBinary(row, row.position(), + // row.remaining())); + comp = compareRows(row, seekCell); + if (comp < 0) { + low = mid + 1; + } else if (comp > 0) { + high = mid - 1; + } else { + // key found + if (seekBefore) { + return mid - 1; + } else { + return mid; + } + } + } + // key not found. + if (comp > 0) { + return mid - 1; + } else { + return mid; + } + } + + private int compareRows(ByteBuffer row, Cell seekCell) { + if (seekCell instanceof ByteBufferedCell) { + return ByteBufferUtils.compareTo(row, row.position(), row.remaining(), + ((ByteBufferedCell) seekCell).getRowByteBuffer(), + ((ByteBufferedCell) seekCell).getRowPosition(), + seekCell.getRowLength()); + } else { + return ByteBufferUtils.compareTo(row, row.position(), row.remaining(), + seekCell.getRowArray(), seekCell.getRowOffset(), + seekCell.getRowLength()); + } + } + + private ByteBuffer getRow(int index) { + int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT); + ByteBuff block = currentBuffer.duplicate(); + block.position(offset + Bytes.SIZEOF_LONG); + short rowLen = block.getShort(); + block.asSubByteBuffer(block.position(), rowLen, tmpPair); + ByteBuffer row = tmpPair.getFirst(); + row.position(tmpPair.getSecond()).limit(tmpPair.getSecond() + rowLen); + return row; + } + + @Override + public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) { + previous.invalidate(); + int index = binarySearch(seekCell, seekBefore); + if (index < 0) { + return HConstants.INDEX_KEY_MAGIC; // using optimized index key + } else { + int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT); + if (offset != 0) { + decodeAtPosition(offset); + } + } + do { + int comp; + comp = comparator.compareKeyIgnoresMvcc(seekCell, current.currentKey); + // if (LOG.isTraceEnabled()) { + // LOG.trace("scan cell: " + r.toString()); + // } + if (comp == 0) { // exact match + if (seekBefore) { + if (!previous.isValid()) { + // The caller (seekBefore) has to ensure that we are not at the + // first key in the block. + throw new IllegalStateException("Cannot seekBefore if " + + "positioned at the first key in the block: key=" + + Bytes.toStringBinary(seekCell.getRowArray())); + } + moveToPrevious(); + return 1; + } + return 0; + } + + if (comp < 0) { // already too large, check previous + if (previous.isValid()) { + moveToPrevious(); + } else { + return HConstants.INDEX_KEY_MAGIC; // using optimized index key + } + return 1; + } + + // move to next, if more data is available + if (currentBuffer.hasRemaining()) { + previous.copyFromNext(current); + decodeNext(); + } else { + break; + } + } while (true); + + // we hit the end of the block, not an exact match + return 1; + } + + private void moveToPrevious() { + if (!previous.isValid()) { + throw new IllegalStateException("Can move back only once and not in first key in the block."); + } + + SeekerState tmp = previous; + previous = current; + current = tmp; + + // move after last key value + currentBuffer.position(current.nextKvOffset); + previous.invalidate(); + } + + @Override + public int compareKey(CellComparator comparator, Cell key) { + return comparator.compareKeyIgnoresMvcc(key, current.currentKey); + } + + protected void decodeFirst() { + decodeNext(); + previous.invalidate(); + } + + protected void decodeAtPosition(int position) { + currentBuffer.position(position); + decodeNext(); + previous.invalidate(); + } + + protected void decodeNext() { + current.startOffset = currentBuffer.position(); + long ll = currentBuffer.getLongAfterPosition(0); + // Read top half as an int of key length and bottom int as value length + current.keyLength = (int) (ll >> Integer.SIZE); + current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll); + currentBuffer.skip(Bytes.SIZEOF_LONG); + + currentBuffer.asSubByteBuffer(currentBuffer.position(), current.keyLength, + tmpPair); + + ByteBuffer key = tmpPair.getFirst().duplicate(); + key.position(tmpPair.getSecond()).limit( + tmpPair.getSecond() + current.keyLength); + current.keyBuffer = key; + + currentBuffer.skip(current.keyLength); + // value part + current.valueOffset = currentBuffer.position(); + currentBuffer.skip(current.valueLength); + if (includesTags()) { + decodeTags(); + } + if (includesMvcc()) { + current.memstoreTS = ByteBuff.readVLong(currentBuffer); + } else { + current.memstoreTS = 0; + } + current.nextKvOffset = currentBuffer.position(); + current.currentKey.setKey(current.keyBuffer, tmpPair.getSecond(), + current.keyLength); + } + + protected boolean includesMvcc() { + return this.decodingCtx.getHFileContext().isIncludesMvcc(); + } + + protected boolean includesTags() { + return this.decodingCtx.getHFileContext().isIncludesTags(); + } + + protected void decodeTags() { + current.tagsLength = currentBuffer.getShortAfterPosition(0); + currentBuffer.skip(Bytes.SIZEOF_SHORT); + current.tagsOffset = currentBuffer.position(); + currentBuffer.skip(current.tagsLength); + } + + private class SeekerState implements Cell { + /** + * The size of a (key length, value length) tuple that prefixes each entry + * in a data block. + */ + public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + + protected ByteBuff currentBuffer; + protected int startOffset = -1; + protected int valueOffset = -1; + protected int keyLength; + protected int valueLength; + protected int tagsLength = 0; + protected int tagsOffset = -1; + + protected ByteBuffer keyBuffer = null; + protected long memstoreTS; + protected int nextKvOffset; + // buffer backed keyonlyKV + private ByteBufferedKeyOnlyKeyValue currentKey = new ByteBufferedKeyOnlyKeyValue(); + + protected boolean isValid() { + return valueOffset != -1; + } + + protected void invalidate() { + valueOffset = -1; + currentKey = new ByteBufferedKeyOnlyKeyValue(); + currentBuffer = null; + } + + protected void setKey(ByteBuffer key) { + currentKey.setKey(key, key.position(), key.remaining()); + } + + /** + * Copy the state from the next one into this instance (the previous state placeholder). Used to + * save the previous state when we are advancing the seeker to the next key/value. + */ + protected void copyFromNext(SeekerState nextState) { + keyBuffer = nextState.keyBuffer; + currentKey = nextState.currentKey; + + startOffset = nextState.startOffset; + valueOffset = nextState.valueOffset; + keyLength = nextState.keyLength; + valueLength = nextState.valueLength; + nextKvOffset = nextState.nextKvOffset; + memstoreTS = nextState.memstoreTS; + currentBuffer = nextState.currentBuffer; + tagsOffset = nextState.tagsOffset; + tagsLength = nextState.tagsLength; + } + + @Override + public byte[] getRowArray() { + return currentKey.getRowArray(); + } + + @Override + public int getRowOffset() { + return currentKey.getRowOffset(); + } + + @Override + public short getRowLength() { + return currentKey.getRowLength(); + } + + @Override + public byte[] getFamilyArray() { + return currentKey.getFamilyArray(); + } + + @Override + public int getFamilyOffset() { + return currentKey.getFamilyOffset(); + } + + @Override + public byte getFamilyLength() { + return currentKey.getFamilyLength(); + } + + @Override + public byte[] getQualifierArray() { + return currentKey.getQualifierArray(); + } + + @Override + public int getQualifierOffset() { + return currentKey.getQualifierOffset(); + } + + @Override + public int getQualifierLength() { + return currentKey.getQualifierLength(); + } + + @Override + public long getTimestamp() { + return currentKey.getTimestamp(); + } + + @Override + public byte getTypeByte() { + return currentKey.getTypeByte(); + } + + @Override + public long getSequenceId() { + return memstoreTS; + } + + @Override + public byte[] getValueArray() { + // TODO + return currentBuffer.array(); + } + + @Override + public int getValueOffset() { + // TODO + return currentBuffer.arrayOffset() + valueOffset; + } + + @Override + public int getValueLength() { + return valueLength; + } + + @Override + public byte[] getTagsArray() { + // TODO + return currentBuffer.array(); + } + + @Override + public int getTagsOffset() { + // TODO + return currentBuffer.arrayOffset() + tagsOffset; + } + + @Override + public int getTagsLength() { + return tagsLength; + } + + @Override + public String toString() { + return CellUtil.getCellKeyAsString(toCell()); + } + + protected int getCellBufSize() { + int kvBufSize = KEY_VALUE_LEN_SIZE + keyLength + valueLength; + if (includesTags()) { + kvBufSize += Bytes.SIZEOF_SHORT + tagsLength; + } + return kvBufSize; + } + + public Cell toCell() { + Cell ret; + int cellBufSize = getCellBufSize(); + long seqId = 0l; + if (includesMvcc()) { + seqId = memstoreTS; + } + if (currentBuffer.hasArray()) { + // TODO : reduce the varieties of KV here. Check if based on a boolean + // we can handle the 'no tags' case. + if (tagsLength > 0) { + ret = new SizeCachedKeyValue(currentBuffer.array(), + currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId); + } else { + ret = new SizeCachedNoTagsKeyValue(currentBuffer.array(), + currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId); + } + } else { + currentBuffer.asSubByteBuffer(startOffset, cellBufSize, tmpPair); + ByteBuffer buf = tmpPair.getFirst(); + if (buf.isDirect()) { + ret = new OffheapKeyValue(buf, tmpPair.getSecond(), cellBufSize, + tagsLength > 0, seqId); + } else { + if (tagsLength > 0) { + ret = new SizeCachedKeyValue(buf.array(), buf.arrayOffset() + + tmpPair.getSecond(), cellBufSize, seqId); + } else { + ret = new SizeCachedNoTagsKeyValue(buf.array(), buf.arrayOffset() + + tmpPair.getSecond(), cellBufSize, seqId); + } + } + } + return ret; + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java deleted file mode 100644 index cccbae0..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * This class is an extension to KeyValue where rowLen and keyLen are cached. - * Parsing the backing byte[] every time to get these values will affect the performance. - * In read path, we tend to read these values many times in Comparator, SQM etc. - * Note: Please do not use these objects in write path as it will increase the heap space usage. - * See https://issues.apache.org/jira/browse/HBASE-13448 - */ -@InterfaceAudience.Private -@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS") -public class SizeCachedKeyValue extends KeyValue { - private static final int HEAP_SIZE_OVERHEAD = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT; - - private short rowLen; - private int keyLen; - - public SizeCachedKeyValue(byte[] bytes, int offset, int length, long seqId) { - super(bytes, offset, length); - // We will read all these cached values at least once. Initialize now itself so that we can - // avoid uninitialized checks with every time call - rowLen = super.getRowLength(); - keyLen = super.getKeyLength(); - setSequenceId(seqId); - } - - @Override - public short getRowLength() { - return rowLen; - } - - @Override - public int getKeyLength() { - return this.keyLen; - } - - @Override - public long heapSize() { - return super.heapSize() + HEAP_SIZE_OVERHEAD; - } -} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java deleted file mode 100644 index d28d1a8..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ByteBufferUtils; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * This class is an extension to ContentSizeCachedKeyValue where there are no tags in Cell. - * Note: Please do not use these objects in write path as it will increase the heap space usage. - * See https://issues.apache.org/jira/browse/HBASE-13448 - */ -@InterfaceAudience.Private -public class SizeCachedNoTagsKeyValue extends SizeCachedKeyValue { - - public SizeCachedNoTagsKeyValue(byte[] bytes, int offset, int length, long seqId) { - super(bytes, offset, length, seqId); - } - - @Override - public int getTagsLength() { - return 0; - } - - @Override - public int write(OutputStream out, boolean withTags) throws IOException { - ByteBufferUtils.putInt(out, this.length); - out.write(this.bytes, this.offset, this.length); - return this.length + Bytes.SIZEOF_INT; - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java index 21941f7..46cc9f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java @@ -102,7 +102,7 @@ public class TestSeekToBlockWithEncoders { KeyValue kv4 = new KeyValue(Bytes.toBytes("aad"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("val")); sampleKv.add(kv4); - KeyValue kv5 = new KeyValue(Bytes.toBytes("aaaad"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), + KeyValue kv5 = new KeyValue(Bytes.toBytes("aaddd"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("val")); sampleKv.add(kv5); KeyValue toSeek = new KeyValue(Bytes.toBytes("aaaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), @@ -125,7 +125,7 @@ public class TestSeekToBlockWithEncoders { KeyValue kv3 = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("val")); sampleKv.add(kv3); - KeyValue kv4 = new KeyValue(Bytes.toBytes("aaae"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), + KeyValue kv4 = new KeyValue(Bytes.toBytes("aade"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("val")); sampleKv.add(kv4); KeyValue kv5 = new KeyValue(Bytes.toBytes("bbbcd"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),