diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java index 135d25c..ab4a16d 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.PositionedByteRange; /** * Various types of HFile blocks. Ordinal values of these enum constants must not be relied upon. @@ -134,6 +135,11 @@ public enum BlockType { public void write(ByteBuffer buf) { buf.put(magic); } + + public void write(PositionedByteRange buf) { + buf.put(0, magic); + buf.setPosition(magic.length + buf.getPosition()); + } public BlockCategory getCategory() { return metricCat; @@ -154,6 +160,23 @@ public enum BlockType { throw new IOException("Invalid HFile block magic: " + Bytes.toStringBinary(buf, offset, MAGIC_LENGTH)); } + + public static BlockType parse(PositionedByteRange buf, int offset, int length) + throws IOException { + if (length != MAGIC_LENGTH) { + throw new IOException("Magic record of invalid length: " + + Bytes.toStringBinary(buf, offset, length)); + } + + for (BlockType blockType : values()) { + if (buf.compare(offset, MAGIC_LENGTH, blockType.magic, 0, MAGIC_LENGTH) == 0) { + return blockType; + } + } + + throw new IOException("Invalid HFile block magic: " + + Bytes.toStringBinary(buf, offset, MAGIC_LENGTH)); + } public static BlockType read(DataInputStream in) throws IOException { byte[] buf = new byte[MAGIC_LENGTH]; @@ -170,6 +193,13 @@ public enum BlockType { buf.position(buf.position() + MAGIC_LENGTH); return blockType; } + + public static BlockType read(PositionedByteRange buf) throws IOException { + BlockType blockType = parse(buf, buf.getOffset() + buf.getPosition(), + Math.min(buf.getLimit() - buf.getPosition(), MAGIC_LENGTH)); + buf.setPosition(buf.getPosition() + MAGIC_LENGTH); + return blockType; + } /** * Put the magic record out to the specified byte array position. diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteRangeArray.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteRangeArray.java new file mode 100644 index 0000000..081f39d --- /dev/null +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteRangeArray.java @@ -0,0 +1,233 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; + +/** + * This class manages an array of ByteRanges with a default size 4MB. These byte + * ranges are sequential and could be considered as a large byte range.It + * supports reading/writing data from this large byte array with a position and + * offset + */ +public class ByteRangeArray { + static final Log LOG = LogFactory.getLog(ByteRangeArray.class); + + static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; + private PositionedByteRange byteRanges[]; + private Lock locks[]; + private int bufferSize; + private int bufferCount; + + /** + * We allocate a number of byte buffers as the capacity. In order not to out + * of the array bounds for the last byte(see {@link ByteBufferArray#multiple} + * ), we will allocate one additional buffer with capacity 0; + * + * @param capacity + * total size of the byte buffer array + * @param directByteBuffer + * true if we allocate direct buffer + */ + public ByteRangeArray(long capacity, boolean directByteBuffer) { + this.bufferSize = DEFAULT_BUFFER_SIZE; + if (this.bufferSize > (capacity / 16)) + this.bufferSize = (int) roundUp(capacity / 16, 32768); + this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize); + LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity) + " , sizePerBuffer=" + + StringUtils.byteDesc(bufferSize) + ", count=" + bufferCount); + if (directByteBuffer) { + byteRanges = new SimplePositionedOffHeapByteRange[bufferCount + 1]; + } else { + byteRanges = new SimplePositionedByteRange[bufferCount + 1]; + } + locks = new Lock[bufferCount + 1]; + for (int i = 0; i <= bufferCount; i++) { + locks[i] = new ReentrantLock(); + if (i < bufferCount) { + if (directByteBuffer) { + byteRanges[i] = new SimplePositionedOffHeapByteRange(bufferSize); + } else { + byteRanges[i] = new SimplePositionedByteRange(bufferSize); + } + } else { + // TODO : // This should be onheap actually + byteRanges[i] = new SimplePositionedByteRange(0); + } + + } + } + + private long roundUp(long n, long to) { + return ((n + to - 1) / to) * to; + } + + /** + * Transfers bytes from this buffer array into the given destination array + * + * @param start + * start position in the ByteBufferArray + * @param len + * The maximum number of bytes to be written to the given array + * @param dstArray + * The array into which bytes are to be written + * @return number of bytes read + */ + public int getMultiple(long start, int len, byte[] dstArray) { + return getMultiple(start, len, dstArray, 0); + } + + /** + * Transfers bytes from this buffer array into the given destination array + * + * @param start + * start offset of this buffer array + * @param len + * The maximum number of bytes to be written to the given array + * @param dstArray + * The array into which bytes are to be written + * @param dstOffset + * The offset within the given array of the first byte to be written + * @return number of bytes read + */ + public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) { + multiple(start, len, dstArray, dstOffset, new Visitor() { + public void visit(PositionedByteRange bb, byte[] array, int arrayIdx, int len) { + bb.get(array, arrayIdx, len); + } + }); + return len; + } + + /** + * Transfers bytes from the given source array into this buffer array + * + * @param start + * start offset of this buffer array + * @param len + * The maximum number of bytes to be read from the given array + * @param srcArray + * The array from which bytes are to be read + */ + public void putMultiple(long start, int len, byte[] srcArray) { + putMultiple(start, len, srcArray, 0); + } + + /** + * Transfers bytes from the given source array into this buffer array + * + * @param start + * start offset of this buffer array + * @param len + * The maximum number of bytes to be read from the given array + * @param srcArray + * The array from which bytes are to be read + * @param srcOffset + * The offset within the given array of the first byte to be read + */ + public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) { + multiple(start, len, srcArray, srcOffset, new Visitor() { + public void visit(PositionedByteRange bb, byte[] array, int arrayIdx, int len) { + bb.put(array, arrayIdx, len); + } + }); + } + + private interface Visitor { + /** + * Visit the given byte buffer, if it is a read action, we will transfer the + * bytes from the buffer to the destination array, else if it is a write + * action, we will transfer the bytes from the source array to the buffer + * + * @param bb + * byte buffer + * @param array + * a source or destination byte array + * @param arrayOffset + * offset of the byte array + * @param len + * read/write length + */ + void visit(PositionedByteRange bb, byte[] array, int arrayOffset, int len); + } + + /** + * Access(read or write) this buffer array with a position and length as the + * given array. Here we will only lock one buffer even if it may be need visit + * several buffers. The consistency is guaranteed by the caller. + * + * @param start + * start offset of this buffer array + * @param len + * The maximum number of bytes to be accessed + * @param array + * The array from/to which bytes are to be read/written + * @param arrayOffset + * The offset within the given array of the first byte to be read or + * written + * @param visitor + * implement of how to visit the byte buffer + */ + void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) { + assert len >= 0; + long end = start + len; + int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize); + int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize); + assert array.length >= len + arrayOffset; + assert startBuffer >= 0 && startBuffer < bufferCount; + assert endBuffer >= 0 && endBuffer < bufferCount + || (endBuffer == bufferCount && endOffset == 0); + if (startBuffer >= locks.length || startBuffer < 0) { + String msg = "Failed multiple, start=" + start + ",startBuffer=" + startBuffer + + ",bufferSize=" + bufferSize; + LOG.error(msg); + throw new RuntimeException(msg); + } + int srcIndex = 0, cnt = -1; + for (int i = startBuffer; i <= endBuffer; ++i) { + Lock lock = locks[i]; + lock.lock(); + try { + PositionedByteRange bb = byteRanges[i]; + if (i == startBuffer) { + cnt = bufferSize - startOffset; + if (cnt > len) + cnt = len; + bb.setLimit(startOffset + cnt).setPosition(startOffset); + } else if (i == endBuffer) { + cnt = endOffset; + bb.setLimit(cnt).setPosition(0); + } else { + cnt = bufferSize; + bb.setLimit(cnt).setPosition(0); + } + visitor.visit(bb, array, srcIndex + arrayOffset, cnt); + srcIndex += cnt; + } finally { + lock.unlock(); + } + } + assert srcIndex == len; + } +} diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index deae034..0c19349 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -443,6 +443,35 @@ public class Bytes { } return result.toString(); } + + /** + * Write a printable representation of a byte array. Non-printable + * characters are hex escaped in the format \\x%02X, eg: + * \x00 \x05 etc + * + * @param b array to write out + * @param off offset to start at + * @param len length to write + * @return string output + */ + public static String toStringBinary(final PositionedByteRange b, int off, int len) { + StringBuilder result = new StringBuilder(); + // Just in case we are passed a 'len' that is > buffer length... + if (off >= b.getLength()) return result.toString(); + if (off + len > b.getLength()) len = b.getLength() - off; + for (int i = off; i < off + len ; ++i ) { + int ch = b.get(i) & 0xFF; + if ( (ch >= '0' && ch <= '9') + || (ch >= 'A' && ch <= 'Z') + || (ch >= 'a' && ch <= 'z') + || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0 ) { + result.append((char)ch); + } else { + result.append(String.format("\\x%02X", ch)); + } + } + return result.toString(); + } private static boolean isHexDigit(char c) { return diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java new file mode 100644 index 0000000..ec2fd2b --- /dev/null +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java @@ -0,0 +1,171 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.List; + +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; + +/** + * Utilities for interacting with and monitoring DirectByteBuffer allocations. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class DirectMemoryUtils { + private static final Log LOG = LogFactory.getLog(DirectMemoryUtils.class); + private static final String MEMORY_USED = "MemoryUsed"; + private static final MBeanServer BEAN_SERVER; + private static final ObjectName NIO_DIRECT_POOL; + private static final boolean HAS_MEMORY_USED_ATTRIBUTE; + + static { + // initialize singletons. Only maintain a reference to the MBeanServer if + // we're able to consume it -- hence convoluted logic. + ObjectName n = null; + MBeanServer s = null; + Object a = null; + try { + n = new ObjectName("java.nio:type=BufferPool,name=direct"); + } catch (MalformedObjectNameException e) { + LOG.warn("Unable to initialize ObjectName for DirectByteBuffer allocations."); + } finally { + NIO_DIRECT_POOL = n; + } + if (NIO_DIRECT_POOL != null) { + s = ManagementFactory.getPlatformMBeanServer(); + } + BEAN_SERVER = s; + if (BEAN_SERVER != null) { + try { + a = BEAN_SERVER.getAttribute(NIO_DIRECT_POOL, MEMORY_USED); + } catch (JMException e) { + LOG.debug("Failed to retrieve nio.BufferPool direct MemoryUsed attribute.", e); + } + } + HAS_MEMORY_USED_ATTRIBUTE = a != null; + } + + /** + * @return the setting of -XX:MaxDirectMemorySize as a long. Returns 0 if + * -XX:MaxDirectMemorySize is not set. + */ + public static long getDirectMemorySize() { + RuntimeMXBean runtimemxBean = ManagementFactory.getRuntimeMXBean(); + List arguments = runtimemxBean.getInputArguments(); + long multiplier = 1; //for the byte case. + for (String s : arguments) { + if (s.contains("-XX:MaxDirectMemorySize=")) { + String memSize = s.toLowerCase() + .replace("-xx:maxdirectmemorysize=", "").trim(); + + if (memSize.contains("k")) { + multiplier = 1024; + } + + else if (memSize.contains("m")) { + multiplier = 1048576; + } + + else if (memSize.contains("g")) { + multiplier = 1073741824; + } + memSize = memSize.replaceAll("[^\\d]", ""); + + long retValue = Long.parseLong(memSize); + return retValue * multiplier; + } + } + return 0; + } + + /** + * @return the current amount of direct memory used. + */ + public static long getDirectMemoryUsage() { + if (BEAN_SERVER == null || NIO_DIRECT_POOL == null || !HAS_MEMORY_USED_ATTRIBUTE) return 0; + try { + Long value = (Long) BEAN_SERVER.getAttribute(NIO_DIRECT_POOL, MEMORY_USED); + return value == null ? 0 : value; + } catch (JMException e) { + // should print further diagnostic information? + return 0; + } + } + + /** + * DirectByteBuffers are garbage collected by using a phantom reference and a + * reference queue. Every once a while, the JVM checks the reference queue and + * cleans the DirectByteBuffers. However, as this doesn't happen + * immediately after discarding all references to a DirectByteBuffer, it's + * easy to OutOfMemoryError yourself using DirectByteBuffers. This function + * explicitly calls the Cleaner method of a DirectByteBuffer. + * + * @param toBeDestroyed + * The DirectByteBuffer that will be "cleaned". Utilizes reflection. + * + */ + public static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed) + throws IllegalArgumentException, IllegalAccessException, + InvocationTargetException, SecurityException, NoSuchMethodException { + + Preconditions.checkArgument(toBeDestroyed.isDirect(), + "toBeDestroyed isn't direct!"); + + Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner"); + cleanerMethod.setAccessible(true); + Object cleaner = cleanerMethod.invoke(toBeDestroyed); + Method cleanMethod = cleaner.getClass().getMethod("clean"); + cleanMethod.setAccessible(true); + cleanMethod.invoke(cleaner); + } + + /** + * DirectByteBuffers are garbage collected by using a phantom reference and a + * reference queue. Every once a while, the JVM checks the reference queue and + * cleans the DirectByteBuffers. However, as this doesn't happen + * immediately after discarding all references to a DirectByteBuffer, it's + * easy to OutOfMemoryError yourself using DirectByteBuffers. This function + * explicitly calls the Cleaner method of a DirectByteBuffer. + * + * @param toBeDestroyed + * The DirectByteBuffer that will be "cleaned". Utilizes reflection. + * + */ + public static void destroyDirectByteRange(PositionedByteRange toBeDestroyed) + throws IllegalArgumentException, IllegalAccessException, + InvocationTargetException, SecurityException, NoSuchMethodException { + toBeDestroyed.clean(); + } +} diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/PositionedByteRange.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/PositionedByteRange.java index 7d49538..02d4ed5 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/PositionedByteRange.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/PositionedByteRange.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.util; +import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; @@ -55,11 +56,29 @@ public interface PositionedByteRange extends ByteRange { public PositionedByteRange setPosition(int position); /** + * Limits the byte range upto a specified value. Limit cannot be greater than + * capacity + * + * @param limit + * @return + */ + public PositionedByteRange setLimit(int limit); + + /** + * Return the current limit + * + * @return + */ + public int getLimit(); + + /** * The number of bytes remaining between position and the end of the range. */ public int getRemaining(); /** + + /** * Retrieve the next byte from this range without incrementing position. */ public byte peek(); @@ -114,6 +133,98 @@ public interface PositionedByteRange extends ByteRange { * @return this. */ public PositionedByteRange put(byte[] val, int offset, int length); + + /** + * Resets the position and offset + */ + public void rewind(); + + public PositionedByteRange clear(); + + /** + * Returns true if the implementation of the {@link PositionedByteRange} is off heap + * @return true/false + */ + public boolean hasArray(); + + /** + * Adds a int in the current position. The position also gets moved + * by 4 + * @param val + */ + public void putInt(int val); + + /** + * Adds a long in the current position. The position also gets moved + * by 8 + * @param val + */ + public void putLong(long val); + + /** + * Adds a short in the current position. The position also gets moved + * by 2 + * @param val + */ + public void putShort(short val); + /** + * Returns an integer in the current position + * @return integer + */ + public int getInt(); + + /** + * Returns an integer in the current position + * + * @return integer + */ + public long getLong(); + + /** + * Returns a short in the current position + * + * @return + */ + public short getShort(); + + /** + * Returns a short at the given index + * @param index + * @return + */ + public short getShort(int index); + + /** + * Incase of DBB, we need to call this method to clean the buffers allocated + * off heap + */ + public void clean() throws IllegalArgumentException, IllegalAccessException, + InvocationTargetException, SecurityException, NoSuchMethodException; + + /** + * Compares the byte retrieved from the {@link PositionedByteRange} with the specifed byte[] b + * @param srcOffset - offset from which the source has to be read for comparison + * @param srcLength - length upto which the source has to be compared + * @param b - byte array with comparison is to happen + * @param offset - offset of the array to be compared + * @param length - length of the array to be compared + * @return + */ + public int compare(int srcOffset, int srcLength, byte[] b, int offset, int length); + + /** + * Store {@code buf} another {@link PositionedByteRange} into the current one + * @param buf + * the byterange to store. + * @return this. + */ + public void put(PositionedByteRange buf); + + /** + * Returns a shallow copy of the ByteRange but the position is reset to initial position + * @return + */ + public PositionedByteRange asReadOnlyByteRange(); // override parent interface declarations to return this interface. diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/SimpleByteRange.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/SimpleByteRange.java index 6f71c66..efc09d4 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/SimpleByteRange.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/SimpleByteRange.java @@ -54,6 +54,8 @@ public class SimpleByteRange implements ByteRange { * frequently used ranges, long-lived ranges, or long ranges. */ private int hash = UNSET_HASH_VALUE; + + protected int limit = 0; /** * Create a new {@code ByteRange} lacking a backing array and with an diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/SimplePositionedByteRange.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/SimplePositionedByteRange.java index c13539d..e9a25c8 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/SimplePositionedByteRange.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/SimplePositionedByteRange.java @@ -206,8 +206,8 @@ public class SimplePositionedByteRange extends SimpleByteRange implements Positi * Similar to {@link ByteBuffer#clear()}. Sets position to 0, length to * capacity. */ - @VisibleForTesting - PositionedByteRange clear() { + @Override + public PositionedByteRange clear() { clearHashCache(); position = 0; length = bytes.length - offset; @@ -258,4 +258,100 @@ public class SimplePositionedByteRange extends SimpleByteRange implements Positi clone.position = this.position; return clone; } + + @Override + public void rewind() { + this.position = 0; + this.offset = 0; + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public void putInt(int val) { + Bytes.putInt(getBytes(), getPosition() + getOffset(), val); + setPosition(getPosition() + Bytes.SIZEOF_INT); + } + + @Override + public void putLong(long val) { + Bytes.putLong(getBytes(), getPosition() + getOffset(), val); + setPosition(getPosition() + Bytes.SIZEOF_LONG); + } + + @Override + public int getInt() { + int res = Bytes.toInt(getBytes(), getOffset() + getPosition()); + setPosition(getPosition() + Bytes.SIZEOF_INT); + return res; + } + + @Override + public long getLong() { + long res = Bytes.toLong(getBytes(), getOffset() + getPosition()); + setPosition(getPosition() + Bytes.SIZEOF_LONG); + return res; + } + + @Override + public void putShort(short val) { + Bytes.putShort(getBytes(), getPosition() + getOffset(), val); + setPosition(getPosition() + Bytes.SIZEOF_SHORT); + } + + @Override + public short getShort() { + short res = Bytes.toShort(getBytes(), getOffset() + getPosition()); + setPosition(getPosition() + Bytes.SIZEOF_SHORT); + return res; + } + + @Override + public PositionedByteRange setLimit(int limit) { + this.limit = limit; + return this; + } + + @Override + public int getLimit() { + return limit; + } + + @Override + public void clean() { + throw new UnsupportedOperationException(); + } + + @Override + public int compare(int srcOffset, int srcLength, byte[] b, int offset, int length) { + return Bytes.compareTo(getBytes(), srcOffset, srcLength, b, offset, length); + } + + @Override + public int compareTo(ByteRange other) { + return super.compareTo(other); + } + + @Override + public void put(PositionedByteRange buf) { + for(int i = 0; i < buf.getLimit(); i++) { + put(buf.get(i)); + } + } + + @Override + public PositionedByteRange asReadOnlyByteRange() { + SimplePositionedByteRange clone = new SimplePositionedByteRange(bytes, offset, length); + clone.position = 0; + clone.limit = this.limit; + return clone; + } + + @Override + public short getShort(int index) { + return Bytes.toShort(getBytes(), index); + } } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/SimplePositionedOffHeapByteRange.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/SimplePositionedOffHeapByteRange.java new file mode 100644 index 0000000..e21fc3a --- /dev/null +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/SimplePositionedOffHeapByteRange.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; + +/** + * A simple positioned based off heap byte range backed by Java ByteBuffer. + * Future implementation can evolve from Netty ByteBuf, IBM Java + * BoxedPackedObject, etc. + */ +public class SimplePositionedOffHeapByteRange implements + PositionedByteRange { + + private int position; + + private int limit; + + private ByteBuffer directBuf; + + public SimplePositionedOffHeapByteRange() { + super(); + } + + public SimplePositionedOffHeapByteRange(int capacity) { + this.directBuf = ByteBuffer.allocateDirect(capacity); + this.limit = capacity; + } + + public SimplePositionedOffHeapByteRange(ByteBuffer buffer) { + this.directBuf = buffer; + this.limit = buffer.limit(); + } + + @Override + public byte[] getBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public int getOffset() { + throw new UnsupportedOperationException(); + } + + @Override + public int getLength() { + return directBuf.capacity(); + } + + @Override + public boolean isEmpty() { + return directBuf.capacity() == 0; + } + + @Override + public byte get(int index) { + byte b = directBuf.get(index); + this.position += index; + return b; + } + + @Override + public byte[] deepCopyToNewArray() { + throw new UnsupportedOperationException(); + } + + @Override + public void deepCopyTo(byte[] destination, int destinationOffset) { + throw new UnsupportedOperationException(); + } + + @Override + public void deepCopySubRangeTo(int innerOffset, int copyLength, byte[] destination, + int destinationOffset) { + + } + + @Override + public int compareTo(ByteRange br) { + int n = this.getPosition() + + Math.min(this.getRemaining(), ((PositionedByteRange) br).getRemaining()); + for (int i = this.getPosition(), j = ((PositionedByteRange) br).getPosition(); + i < n; i++, j++) { + byte v1 = this.get(i); + byte v2 = br.get(j); + if (v1 == v2) + continue; + if ((v1 != v1) && (v2 != v2)) // For float and double + continue; + if (v1 < v2) + return -1; + return +1; + } + return this.getRemaining() - ((PositionedByteRange) br).getRemaining(); + } + + @Override + public int getPosition() { + return position; + } + + @Override + public PositionedByteRange setPosition(int position) { + directBuf.position(position); + this.position = position; + return this; + } + + @Override + public int getRemaining() { + return directBuf.capacity() - this.position; + } + + @Override + public byte peek() { + byte b = directBuf.get(); + setPosition(directBuf.position() - 1); + return b; + } + + @Override + public byte get() { + byte b = directBuf.get(); + this.position++; + return b; + } + + @Override + public PositionedByteRange get(byte[] dst) { + int i = 0; + if (dst != null && dst.length > 0) { + for (; i < dst.length; i++) { + dst[i] = directBuf.get(); + this.position++; + } + } + return this; + } + + @Override + public PositionedByteRange get(byte[] dst, int offset, int length) { + if (offset < 0 || length < 0) { + throw new IllegalArgumentException("offset and length cannot be negative"); + } + if (offset > length) { + throw new IllegalArgumentException("dst[] offset cannot be greater than length"); + } + if (length > dst.length) { + throw new IllegalArgumentException("length cannot be greater dst's length"); + } + int i = offset; + if (dst != null && dst.length > 0) { + for (; i < length; i++) { + dst[i] = directBuf.get(); + this.position++; + } + } + return this; + } + + @Override + public PositionedByteRange put(byte val) { + this.directBuf.put(val); + this.position++; + return this; + } + + @Override + public PositionedByteRange put(byte[] val) { + if (val == null) { + throw new IllegalArgumentException("val cannot be null"); + } + this.directBuf.put(val); + this.position += val.length; + return this; + } + + @Override + public PositionedByteRange put(byte[] val, int offset, int length) { + if (offset < 0 || length < 0) { + throw new IllegalArgumentException("offset and length cannot be negative"); + } + if (offset > length) { + throw new IllegalArgumentException("val[] offset cannot be greater than length"); + } + if (length > val.length) { + throw new IllegalArgumentException("length cannot be greater dst's length"); + } + if (val != null && val.length > 0) { + this.directBuf.put(val, offset, length); + this.position += length; + } + return this; + } + + @Override + public void rewind() { + this.directBuf.rewind(); + this.position = 0; + } + + @Override + public boolean hasArray() { + return true; + } + + @Override + public PositionedByteRange unset() { + return null; + } + + @Override + public PositionedByteRange set(int capacity) { + this.position = 0; + this.directBuf = ByteBuffer.allocateDirect(capacity); + limit = capacity; + return this; + } + + @Override + public PositionedByteRange set(byte[] bytes) { + throw new UnsupportedOperationException(); + } + + @Override + public PositionedByteRange set(byte[] bytes, int offset, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public PositionedByteRange setOffset(int offset) { + throw new UnsupportedOperationException(); + } + + @Override + public PositionedByteRange setLength(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public PositionedByteRange get(int index, byte[] dst) { + if (dst == null) { + throw new IllegalArgumentException("dst cannot be null."); + } + if (index < 0) { + throw new IllegalArgumentException("Index cannot be negative."); + } + int i = 0; + this.directBuf.position(index); + for (; i < dst.length; i++) { + dst[i] = this.directBuf.get(); + } + this.directBuf.position(index + dst.length); + this.position += index + dst.length; + return this; + } + + @Override + public PositionedByteRange get(int index, byte[] dst, int offset, int length) { + if (dst == null) { + throw new IllegalArgumentException("dst cannot be null."); + } + if (index < 0 || offset < 0 || length < 0) { + throw new IllegalArgumentException("Index/offset/length cannot be negative."); + } + if (offset > length) { + throw new IllegalArgumentException("dst[] offset cannot be greater than length"); + } + if (length > dst.length) { + throw new IllegalArgumentException("length cannot be greater dst's length"); + } + int i = offset; + this.directBuf.position(index); + for (; i < dst.length; i++) { + dst[i] = this.directBuf.get(); + } + this.directBuf.position(index + length); + this.position += index + dst.length; + return this; + } + + @Override + public PositionedByteRange put(int index, byte val) { + if (index < 0) { + throw new IllegalArgumentException("index cannot be negative"); + } + this.directBuf.position(index); + this.directBuf.put(val); + this.position += index + 1; + return this; + } + + @Override + public PositionedByteRange put(int index, byte[] val) { + if (index < 0) { + throw new IllegalArgumentException("index cannot be negative"); + } + if (val == null) { + throw new IllegalArgumentException("val[] cannot be null"); + } + this.directBuf.position(index); + this.directBuf.put(val); + this.position += index + val.length; + return this; + } + + @Override + public PositionedByteRange put(int index, byte[] val, int offset, int length) { + if (index < 0 || offset < 0 || length < 0) { + throw new IllegalArgumentException("Index/offset/length cannot be negative."); + } + if (val == null) { + throw new IllegalArgumentException("val[] cannot be null"); + } + if (offset > length) { + throw new IllegalArgumentException("val[] offset cannot be greater than length"); + } + this.directBuf.position(index); + this.directBuf.put(val, offset, length); + this.position += index + length; + return this; + } + + @Override + public PositionedByteRange deepCopy() { + SimplePositionedOffHeapByteRange copy = new SimplePositionedOffHeapByteRange(this.getLength()); + this.directBuf.rewind(); + while (directBuf.hasRemaining()) { + copy.put(directBuf.get()); + } + return copy; + } + + @Override + public PositionedByteRange shallowCopy() { + SimplePositionedOffHeapByteRange clone = new SimplePositionedOffHeapByteRange(directBuf); + clone.setPosition(this.getPosition()); + return clone; + } + + @Override + public PositionedByteRange shallowCopySubRange(int innerOffset, int copyLength) { + this.directBuf.position(innerOffset); + this.directBuf.limit(copyLength); + SimplePositionedOffHeapByteRange clone = new SimplePositionedOffHeapByteRange(directBuf.slice()); + return clone; + } + + @Override + public void putInt(int val) { + int pos = getPosition(); + for (int i = 3; i > 0; i--) { + put(pos + i, (byte) val); + val >>>= 8; + } + put(pos, (byte) val); + setPosition(pos + 4); + } + + @Override + public void putLong(long val) { + int pos = getPosition(); + for (int i = 7; i > 0; i--) { + put(pos + i, (byte) val); + val >>>= 8; + } + put(pos, (byte) val); + setPosition(pos + 8); + } + + @Override + public int getInt() { + int n = 0; + for (int i = 0; i < Bytes.SIZEOF_INT; i++) { + n <<= 8; + n ^= get() & 0xFF; + } + return n; + } + + @Override + public long getLong() { + long l = 0; + for (int i = 0; i < Bytes.SIZEOF_LONG; i++) { + l <<= 8; + l ^= get() & 0xFF; + } + return l; + } + + @Override + public void putShort(short val) { + int pos = getPosition(); + put(pos + 1, (byte) val); + val >>= 8; + put(pos, (byte) val); + setPosition(pos + 2); + } + + @Override + public short getShort() { + short n = 0; + n ^= get() & 0xFF; + n <<= 8; + n ^= get() & 0xFF; + return n; + } + + @Override + public PositionedByteRange setLimit(int limit) { + this.directBuf.limit(limit); + this.limit = limit; + return this; + } + + @Override + public int getLimit() { + return limit; + } + + @Override + public PositionedByteRange clear() { + this.directBuf.clear(); + this.limit = this.directBuf.capacity(); + return this; + } + + @Override + public void clean() throws IllegalArgumentException, SecurityException, IllegalAccessException, + InvocationTargetException, NoSuchMethodException { + DirectMemoryUtils.destroyDirectByteBuffer(this.directBuf); + } + + @Override + public int compare(int srcOffset, int srcLength, byte[] bytes, int offset, int length) { + // Bring WritableComparator code local + int end1 = srcOffset + srcLength; + int end2 = offset + length; + for (int i = srcOffset, j = offset; i < end1 && j < end2; i++, j++) { + int a = (directBuf.get() & 0xff); + int b = (bytes[j] & 0xff); + if (a != b) { + return a - b; + } + } + return srcLength - length; + } + + @Override + public PositionedByteRange asReadOnlyByteRange() { + SimplePositionedOffHeapByteRange clone = new SimplePositionedOffHeapByteRange( + directBuf.asReadOnlyBuffer()); + clone.setPosition(0); + return clone; + } + + @Override + public void put(PositionedByteRange buf) { + for(int i = 0; i < buf.getLength(); i++) { + put(buf.get(i)); + } + } + + @Override + public short getShort(int offset) { + short n = 0; + n ^= get(offset) & 0xFF; + n <<= 8; + n ^= get(offset)+1 & 0xFF; + return n; + } +} diff --git hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestSimplePositionedOffHeapByteRange.java hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestSimplePositionedOffHeapByteRange.java new file mode 100644 index 0000000..c4811e6 --- /dev/null +++ hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestSimplePositionedOffHeapByteRange.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSimplePositionedOffHeapByteRange { + + @Test + public void testBasics() { + SimplePositionedOffHeapByteRange br = new SimplePositionedOffHeapByteRange(100); + byte[] dst = new byte[100]; + for(int i = 0; i < dst.length; i++) { + dst[i] = (byte)i; + } + br.put(dst); + byte b = br.get(50); + assertEquals(b, (byte)50); + br.setPosition(50); + b = br.peek(); + assertEquals(b, (byte)50); + b = br.get(); + assertEquals(b, (byte)50); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java index 5145199..5027094 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.PositionedByteRange; /** * Cacheable is an interface that allows for an object to be cached. If using an @@ -48,6 +49,8 @@ public interface Cacheable extends HeapSize { * Serializes its data into destination. */ void serialize(ByteBuffer destination); + + void serialize(PositionedByteRange destination); /** * Returns CacheableDeserializer instance which reconstructs original object from ByteBuffer. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java index b99341c..3fc2acf 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.PositionedByteRange; /** * Interface for a deserializer. Throws an IOException if the serialized data is @@ -33,17 +33,17 @@ public interface CacheableDeserializer { * * @return T the deserialized object. */ - T deserialize(ByteBuffer b) throws IOException; + T deserialize(PositionedByteRange b) throws IOException; /** * * @param b - * @param reuse true if Cacheable object can use the given buffer as its + * @param reuse true if Cacheable object can use the given byte range as its * content * @return T the deserialized object. * @throws IOException */ - T deserialize(ByteBuffer b, boolean reuse) throws IOException; + T deserialize(PositionedByteRange b, boolean reuse) throws IOException; /** * Get the identifier of this deserialiser. Identifier is unique for each diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 89702c5..e516dc1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1,5 +1,5 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one +o * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CompoundBloomFilter; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; import org.apache.hadoop.io.IOUtils; import com.google.common.base.Preconditions; @@ -120,33 +122,35 @@ public class HFileBlock implements Cacheable { private static final CacheableDeserializer blockDeserializer = new CacheableDeserializer() { - public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{ - buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); - ByteBuffer newByteBuffer; + @Override + public int getDeserialiserIdentifier() { + return deserializerIdentifier; + } + + @Override + public Cacheable deserialize(PositionedByteRange b) throws IOException { + return deserialize(b, false); + } + + @Override + public Cacheable deserialize(PositionedByteRange buf, boolean reuse) throws IOException { + buf.setLimit(buf.getLimit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); + PositionedByteRange newByteBuffer; if (reuse) { - newByteBuffer = buf.slice(); + newByteBuffer = buf.shallowCopySubRange(buf.getOffset(), buf.getLength()); } else { - newByteBuffer = ByteBuffer.allocate(buf.limit()); + newByteBuffer = new SimplePositionedByteRange(buf.getLimit()); newByteBuffer.put(buf); } - buf.position(buf.limit()); - buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); + buf.setPosition(buf.getLimit()); + buf.setLimit(buf.getLimit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); boolean usesChecksum = buf.get() == (byte)1; - HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum); + HFileBlock ourBuffer = new HFileBlock(newByteBuffer, + usesChecksum); ourBuffer.offset = buf.getLong(); ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt(); return ourBuffer; } - - @Override - public int getDeserialiserIdentifier() { - return deserializerIdentifier; - } - - @Override - public HFileBlock deserialize(ByteBuffer b) throws IOException { - return deserialize(b, false); - } }; private static final int deserializerIdentifier; static { @@ -169,7 +173,7 @@ public class HFileBlock implements Cacheable { private final int onDiskDataSizeWithHeader; /** The in-memory representation of the hfile block */ - private ByteBuffer buf; + private PositionedByteRange buf; /** Meta data that holds meta information on the hfileblock**/ private HFileContext fileContext; @@ -211,7 +215,7 @@ public class HFileBlock implements Cacheable { * @param fileContext HFile meta data */ HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, - int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf, + int uncompressedSizeWithoutHeader, long prevBlockOffset, PositionedByteRange buf, boolean fillHeader, long offset, int onDiskDataSizeWithHeader, HFileContext fileContext) { this.blockType = blockType; @@ -234,7 +238,7 @@ public class HFileBlock implements Cacheable { * because majorNumbers indicate the format of a HFile whereas minorNumbers * indicate the format inside a HFileBlock. */ - HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException { + HFileBlock(PositionedByteRange b, boolean usesHBaseChecksum) throws IOException { b.rewind(); blockType = BlockType.read(b); onDiskSizeWithoutHeader = b.getInt(); @@ -244,7 +248,8 @@ public class HFileBlock implements Cacheable { contextBuilder.withHBaseCheckSum(usesHBaseChecksum); if (usesHBaseChecksum) { contextBuilder.withChecksumType(ChecksumType.codeToType(b.get())); - contextBuilder.withBytesPerCheckSum(b.getInt()); + contextBuilder + .withBytesPerCheckSum(b.getInt()); this.onDiskDataSizeWithHeader = b.getInt(); } else { contextBuilder.withChecksumType(ChecksumType.NULL); @@ -326,8 +331,8 @@ public class HFileBlock implements Cacheable { * @return the buffer with header skipped */ public ByteBuffer getBufferWithoutHeader() { - return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(), - buf.limit() - headerSize() - totalChecksumBytes()).slice(); + return ByteBuffer.wrap(buf.getBytes(), buf.getOffset() + headerSize(), + buf.getLimit() - headerSize() - totalChecksumBytes()).slice(); } /** @@ -340,8 +345,8 @@ public class HFileBlock implements Cacheable { * @return the buffer of this block for read-only operations */ public ByteBuffer getBufferReadOnly() { - return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), - buf.limit() - totalChecksumBytes()).slice(); + return ByteBuffer.wrap(buf.getBytes(), buf.getOffset(), + buf.getLimit() - totalChecksumBytes()).slice(); } /** @@ -352,7 +357,7 @@ public class HFileBlock implements Cacheable { * @return the byte buffer with header included for read-only operations */ public ByteBuffer getBufferReadOnlyWithHeader() { - return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice(); + return ByteBuffer.wrap(buf.getBytes(), buf.getOffset(), buf.getLimit()).slice(); } /** @@ -362,7 +367,8 @@ public class HFileBlock implements Cacheable { * @return the byte buffer with header included */ ByteBuffer getBufferWithHeader() { - ByteBuffer dupBuf = buf.duplicate(); + ByteBuffer dupBuf = ByteBuffer.wrap(buf.shallowCopy().getBytes(), 0, + buf.getLimit()); dupBuf.rewind(); return dupBuf; } @@ -391,18 +397,23 @@ public class HFileBlock implements Cacheable { blockTypeFromBuf + ", block type field: " + blockType); } } - - sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader, - "onDiskSizeWithoutHeader"); - - sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader, - "uncompressedSizeWithoutHeader"); - - sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset"); + sanityCheckAssertion( + buf.getInt(), + onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); + sanityCheckAssertion( + buf.getInt(), + uncompressedSizeWithoutHeader, "uncompressedSizeWithoutHeader"); + sanityCheckAssertion( + buf.getLong(), + prevBlockOffset, "prevBlocKOffset"); if (this.fileContext.isUseHBaseChecksum()) { sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); - sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum"); - sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, + sanityCheckAssertion( + buf.getInt(), + this.fileContext.getBytesPerChecksum(), "bytesPerChecksum"); + sanityCheckAssertion( + buf.getInt(), + onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); } @@ -410,17 +421,18 @@ public class HFileBlock implements Cacheable { int hdrSize = headerSize(); int expectedBufLimit = uncompressedSizeWithoutHeader + headerSize() + cksumBytes; - if (buf.limit() != expectedBufLimit) { + // Need to see how to change the limit() api + if (buf.getLimit() != expectedBufLimit) { throw new AssertionError("Expected buffer limit " + expectedBufLimit - + ", got " + buf.limit()); + + ", got " + buf.getLimit()); } // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next // block's, header, so there are two sensible values for buffer capacity. int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes; - if (buf.capacity() != size && - buf.capacity() != size + hdrSize) { - throw new AssertionError("Invalid buffer capacity: " + buf.capacity() + + if (buf.getLength() != size && + buf.getLength() != size + hdrSize) { + throw new AssertionError("Invalid buffer capacity: " + buf.getLength() + ", expected " + size + " or " + (size + hdrSize)); } } @@ -436,8 +448,8 @@ public class HFileBlock implements Cacheable { + ", prevBlockOffset=" + prevBlockOffset + ", dataBeginsWith=" - + Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), - Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())) + + Bytes.toStringBinary(buf, buf.getOffset() + headerSize(), + Math.min(32, buf.getLimit() - buf.getOffset() - headerSize())) + ", fileOffset=" + offset; } @@ -446,8 +458,8 @@ public class HFileBlock implements Cacheable { if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) { String blockInfoMsg = "Block offset: " + offset + ", data starts with: " - + Bytes.toStringBinary(buf.array(), buf.arrayOffset(), - buf.arrayOffset() + Math.min(32, buf.limit())); + + Bytes.toStringBinary(buf, buf.getOffset(), + buf.getOffset() + Math.min(32, buf.getLimit())); throw new IOException("On-disk size without header provided is " + expectedOnDiskSizeWithoutHeader + ", but block " + "header contains " + onDiskSizeWithoutHeader + ". " + @@ -469,14 +481,14 @@ public class HFileBlock implements Cacheable { cksumBytes + (extraBytes ? headerSize() : 0); - ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); + PositionedByteRange newBuf = new SimplePositionedByteRange(capacityNeeded); // Copy header bytes. - System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(), - newBuf.arrayOffset(), headerSize()); + buf.get(buf.getOffset() + buf.getPosition(), newBuf.getBytes(), + newBuf.getOffset() + newBuf.getPosition(), headerSize()); buf = newBuf; - buf.limit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes); + buf.setLimit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes); } /** An additional sanity-check in case no compression is being used. */ @@ -514,8 +526,8 @@ public class HFileBlock implements Cacheable { * @return a byte stream reading the data section of this block */ public DataInputStream getByteStream() { - return new DataInputStream(new ByteArrayInputStream(buf.array(), - buf.arrayOffset() + headerSize(), buf.limit() - headerSize())); + return new DataInputStream(new ByteArrayInputStream(buf.getBytes(), + buf.getOffset() + headerSize(), buf.getLimit() - headerSize())); } @Override @@ -535,7 +547,7 @@ public class HFileBlock implements Cacheable { if (buf != null) { // Deep overhead of the byte buffer. Needs to be aligned separately. - size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE); + size += ClassSize.align(buf.getLength() + BYTE_BUFFER_HEAP_SIZE); } return ClassSize.align(size); @@ -989,6 +1001,11 @@ public class HFileBlock implements Cacheable { expectState(State.BLOCK_READY); return ByteBuffer.wrap(uncompressedBytesWithHeader); } + + PositionedByteRange getUncompressedByteRangeWithHeader() { + expectState(State.BLOCK_READY); + return new SimplePositionedByteRange(uncompressedBytesWithHeader); + } private void expectState(State expectedState) { if (state != expectedState) { @@ -1033,7 +1050,7 @@ public class HFileBlock implements Cacheable { .withIncludesTags(fileContext.isIncludesTags()) .build(); return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), - getUncompressedSizeWithoutHeader(), prevOffset, getUncompressedBufferWithHeader(), + getUncompressedSizeWithoutHeader(), prevOffset, getUncompressedByteRangeWithHeader(), DONT_FILL_HEADER, startOffset, onDiskBytesWithHeader.length + onDiskChecksum.length, newContext); } @@ -1247,7 +1264,8 @@ public class HFileBlock implements Cacheable { private static class PrefetchedHeader { long offset = -1; byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; - ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); + PositionedByteRange buf = new SimplePositionedByteRange(header, 0, + HConstants.HFILEBLOCK_HEADER_SIZE); } /** Reads version 2 blocks from the filesystem. */ @@ -1409,7 +1427,7 @@ public class HFileBlock implements Cacheable { // And we also want to skip reading the header again if it has already // been read. PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); - ByteBuffer headerBuf = prefetchedHeader.offset == offset ? + PositionedByteRange headerBuf = prefetchedHeader.offset == offset ? prefetchedHeader.buf : null; int nextBlockOnDiskSize = 0; @@ -1435,10 +1453,9 @@ public class HFileBlock implements Cacheable { if (headerBuf != null) { // the header has been read when reading the previous block, copy // to this block's header - System.arraycopy(headerBuf.array(), - headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); + headerBuf.get(headerBuf.getOffset(), onDiskBlock, 0, hdrSize); } else { - headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); + headerBuf = new SimplePositionedByteRange(onDiskBlock, 0, hdrSize); } // We know the total on-disk size but not the uncompressed size. Read // the entire block into memory, then parse the header and decompress @@ -1479,17 +1496,15 @@ public class HFileBlock implements Cacheable { // operations. This might happen when we are doing the first read // in a series of reads or a random read, and we don't have access // to the block index. This is costly and should happen very rarely. - headerBuf = ByteBuffer.allocate(hdrSize); - readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), + headerBuf = new SimplePositionedByteRange(hdrSize); + readAtOffset(is, headerBuf.getBytes(), headerBuf.getOffset(), hdrSize, false, offset, pread); } b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum()); onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize]; - System.arraycopy(headerBuf.array(), - headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); - nextBlockOnDiskSize = - readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader() - - hdrSize, true, offset + hdrSize, pread); + headerBuf.get(headerBuf.getOffset(), onDiskBlock, 0, hdrSize); + nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader() + - hdrSize, true, offset + hdrSize, pread); onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize; } @@ -1525,8 +1540,8 @@ public class HFileBlock implements Cacheable { } if (nextBlockOnDiskSize > 0) { // Copy next block's header bytes into the new block if we have them. - System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(), - b.buf.arrayOffset() + hdrSize + System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.getBytes(), + b.buf.getOffset() + hdrSize + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), hdrSize); } @@ -1535,8 +1550,8 @@ public class HFileBlock implements Cacheable { // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already // contains the header of next block, so no need to set next // block's header in it. - b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, - onDiskSizeWithHeader), this.fileContext.isUseHBaseChecksum()); + b = new HFileBlock(new SimplePositionedByteRange(onDiskBlock, 0, onDiskSizeWithHeader), + this.fileContext.isUseHBaseChecksum()); } b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; @@ -1583,14 +1598,22 @@ public class HFileBlock implements Cacheable { @Override public int getSerializedLength() { if (buf != null) { - return this.buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; + return this.buf.getLimit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; } return 0; } @Override public void serialize(ByteBuffer destination) { - ByteBuffer dupBuf = this.buf.duplicate(); + PositionedByteRange dupBuf = this.buf.shallowCopy(); + dupBuf.rewind(); + destination.put(ByteBuffer.wrap(dupBuf.getBytes(), dupBuf.getOffset(), dupBuf.getLength())); + serializeExtraInfo(destination); + } + + @Override + public void serialize(PositionedByteRange destination) { + PositionedByteRange dupBuf = this.buf.shallowCopy(); dupBuf.rewind(); destination.put(dupBuf); serializeExtraInfo(destination); @@ -1602,6 +1625,13 @@ public class HFileBlock implements Cacheable { destination.putInt(this.nextBlockOnDiskSizeWithHeader); destination.rewind(); } + + public void serializeExtraInfo(PositionedByteRange destination) { + destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); + destination.putLong(this.offset); + destination.putInt(this.nextBlockOnDiskSizeWithHeader); + destination.rewind(); + } @Override public CacheableDeserializer getDeserializer() { @@ -1643,10 +1673,10 @@ public class HFileBlock implements Cacheable { if (this.buf.compareTo(castedComparison.buf) != 0) { return false; } - if (this.buf.position() != castedComparison.buf.position()){ + if (this.buf.getPosition() != castedComparison.buf.getPosition()){ return false; } - if (this.buf.limit() != castedComparison.buf.limit()){ + if (this.buf.getLimit() != castedComparison.buf.getLimit()){ return false; } return true; @@ -1758,5 +1788,32 @@ public class HFileBlock implements Cacheable { " bytesPerChecksum " + bytesPerChecksum + " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; } + + /** + * Convert the contents of the block header into a human readable string. + * This is mostly helpful for debugging. This assumes that the block + * has minor version > 0. + */ + static String toStringHeader(PositionedByteRange buf) throws IOException { + int offset = buf.getOffset(); + long magic = buf.getLong(); + BlockType bt = BlockType.read(buf); + int compressedBlockSizeNoHeader = buf.getInt(); + int uncompressedBlockSizeNoHeader = buf.getInt(); + long prevBlockOffset = buf.getLong(); + byte cksumtype = buf.get(offset); + long bytesPerChecksum = buf.getInt(); + long onDiskDataSizeWithHeader = buf.getInt(); + return " Header dump: magic: " + magic + + " blockType " + bt + + " compressedBlockSizeNoHeader " + + compressedBlockSizeNoHeader + + " uncompressedBlockSizeNoHeader " + + uncompressedBlockSizeNoHeader + + " prevBlockOffset " + prevBlockOffset + + " checksumType " + ChecksumType.codeToType(cksumtype) + + " bytesPerChecksum " + bytesPerChecksum + + " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 33ef789..d83954d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -59,11 +59,11 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; import org.apache.hadoop.hbase.io.hfile.HFileBlock; -import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.ConcurrentIndex; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; import org.apache.hadoop.util.StringUtils; import com.google.common.collect.ImmutableList; @@ -372,7 +372,8 @@ public class BucketCache implements BlockCache, HeapSize { throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected"); } Cacheable cachedBlock = bucketEntry.deserializerReference( - deserialiserMap).deserialize(bb, true); + deserialiserMap).deserialize + (new SimplePositionedByteRange(bb.array(), bb.arrayOffset(), bb.limit()), true); long timeTaken = System.nanoTime() - start; cacheStats.hit(caching); cacheStats.ioHit(timeTaken); @@ -1159,6 +1160,7 @@ public class BucketCache implements BlockCache, HeapSize { bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); try { if (data instanceof HFileBlock) { + //This place needs a change for sure ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader(); sliceBuf.rewind(); assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 50f93be..9270715 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteBufferArray; +import org.apache.hadoop.hbase.util.PositionedByteRange; /** * IO engine that stores data on the memory using an array of ByteBuffers @@ -98,4 +99,16 @@ public class ByteBufferIOEngine implements IOEngine { public void shutdown() { } + + @Override + public int read(PositionedByteRange dstByteRange, long offset) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void write(PositionedByteRange srcByteRange, long offset) throws IOException { + // TODO Auto-generated method stub + + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteRangeIOEngine.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteRangeIOEngine.java new file mode 100644 index 0000000..94fbaf1 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteRangeIOEngine.java @@ -0,0 +1,100 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.util.ByteRangeArray; +import org.apache.hadoop.hbase.util.PositionedByteRange; + +public class ByteRangeIOEngine implements IOEngine{ + + private ByteRangeArray byteRangeArray; + + /** + * Construct the ByteRangeIOEngine with the given capacity + * @param capacity + * @param direct true if allocate direct buffer + * @throws IOException + */ + public ByteRangeIOEngine(long capacity, boolean direct) throws IOException { + byteRangeArray = new ByteRangeArray(capacity, direct); + } + @Override + public boolean isPersistent() { + return false; + } + /** + * Transfers data from the buffer array to the given byte buffer + * @param dstBuffer the given byte buffer into which bytes are to be written + * @param offset The offset in the ByteBufferArray of the first byte to be + * read + * @return number of bytes read + * @throws IOException + */ + @Override + public int read(ByteBuffer dstBuffer, long offset) throws IOException { + assert dstBuffer.hasArray(); + return byteRangeArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(), + dstBuffer.arrayOffset()); + } + + /** + * Transfers data from the given byte buffer to the buffer array + * @param srcBuffer the given byte buffer from which bytes are to be read + * @param offset The offset in the ByteBufferArray of the first byte to be + * written + * @throws IOException + */ + @Override + public void write(ByteBuffer srcBuffer, long offset) throws IOException { + assert srcBuffer.hasArray(); + byteRangeArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(), + srcBuffer.arrayOffset()); + } + + /** + * No operation for the sync in the memory IO engine + */ + @Override + public void sync() { + + } + + /** + * No operation for the shutdown in the memory IO engine + */ + @Override + public void shutdown() { + + } + @Override + public int read(PositionedByteRange dstByteRange, long offset) throws IOException { + return byteRangeArray.getMultiple(offset, dstByteRange.getRemaining(), dstByteRange.getBytes(), + dstByteRange.getOffset()); + } + @Override + public void write(PositionedByteRange srcByteRange, long offset) throws IOException { + // TODO Auto-generated method stub + byteRangeArray.putMultiple(offset, srcByteRange.getRemaining(), srcByteRange.getBytes(), + srcByteRange.getOffset()); + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index a1eea0b..409c0bc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -26,6 +26,7 @@ import java.nio.channels.FileChannel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.PositionedByteRange; import org.apache.hadoop.util.StringUtils; /** @@ -108,4 +109,16 @@ public class FileIOEngine implements IOEngine { LOG.error("Can't shutdown cleanly", ex); } } + + @Override + public int read(PositionedByteRange dstByteRange, long offset) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void write(PositionedByteRange srcByteRange, long offset) throws IOException { + // TODO Auto-generated method stub + + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java index 09313ab..6ddd49d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.PositionedByteRange; /** * A class implementing IOEngine interface could support data services for @@ -43,6 +44,15 @@ public interface IOEngine { * @throws IOException */ int read(ByteBuffer dstBuffer, long offset) throws IOException; + + /** + * Transfers data from IOEngine to the given byte range + * @param dstBuffer the given byte range into which bytes are to be written + * @param offset The offset in the IO engine where the first byte to be read + * @return number of bytes read + * @throws IOException + */ + int read(PositionedByteRange dstByteRange, long offset) throws IOException; /** * Transfers data from the given byte buffer to IOEngine @@ -52,6 +62,15 @@ public interface IOEngine { * @throws IOException */ void write(ByteBuffer srcBuffer, long offset) throws IOException; + + /** + * Transfers data from the given byte range to IOEngine + * @param srcByteRange the given byte range from which bytes are to be read + * @param offset The offset in the IO engine where the first byte to be + * written + * @throws IOException + */ + void write(PositionedByteRange srcByteRange, long offset) throws IOException; /** * Sync the data to IOEngine after writing diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java index 7e713d6..59eaabd 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.io.hfile.slab; -import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.PositionedByteRange; import org.apache.hadoop.util.StringUtils; import com.google.common.cache.CacheBuilder; @@ -119,7 +119,7 @@ public class SingleSizeCache implements BlockCache, HeapSize { @Override public void cacheBlock(BlockCacheKey blockName, Cacheable toBeCached) { - ByteBuffer storedBlock; + PositionedByteRange storedBlock; try { storedBlock = backingStore.alloc(toBeCached.getSerializedLength()); @@ -167,7 +167,7 @@ public class SingleSizeCache implements BlockCache, HeapSize { return null; } return contentBlock.deserializer - .deserialize(contentBlock.serializedData.asReadOnlyBuffer()); + .deserialize(contentBlock.serializedData.asReadOnlyByteRange()); } } catch (Throwable t) { LOG.error("Deserializer threw an exception. This may indicate a bug.", t); @@ -199,7 +199,7 @@ public class SingleSizeCache implements BlockCache, HeapSize { return; } evictedHeap = evictedBlock.heapSize(); - ByteBuffer bb = evictedBlock.serializedData; + PositionedByteRange bb = evictedBlock.serializedData; evictedBlock.serializedData = null; backingStore.free(bb); @@ -329,11 +329,11 @@ public class SingleSizeCache implements BlockCache, HeapSize { /* Just a pair class, holds a reference to the parent cacheable */ private static class CacheablePair implements HeapSize { final CacheableDeserializer deserializer; - ByteBuffer serializedData; + PositionedByteRange serializedData; AtomicLong recentlyAccessed; private CacheablePair(CacheableDeserializer deserializer, - ByteBuffer serializedData) { + PositionedByteRange serializedData) { this.recentlyAccessed = new AtomicLong(); this.deserializer = deserializer; this.serializedData = serializedData; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java index 77bea49..5424d3f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.io.hfile.slab; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -27,6 +26,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DirectMemoryUtils; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedOffHeapByteRange; + import com.google.common.base.Preconditions; /** @@ -40,18 +42,18 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize { static final Log LOG = LogFactory.getLog(Slab.class); /** This is where our items, or blocks of the slab, are stored. */ - private LinkedBlockingQueue buffers; + private LinkedBlockingQueue buffers; /** This is where our Slabs are stored */ - private ConcurrentLinkedQueue slabs; + private ConcurrentLinkedQueue slabs; private final int blockSize; private final int numBlocks; private long heapSize; Slab(int blockSize, int numBlocks) { - buffers = new LinkedBlockingQueue(); - slabs = new ConcurrentLinkedQueue(); + buffers = new LinkedBlockingQueue(); + slabs = new ConcurrentLinkedQueue(); this.blockSize = blockSize; this.numBlocks = numBlocks; @@ -73,11 +75,10 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize { } private void allocateAndSlice(int size, int sliceSize) { - ByteBuffer newSlab = ByteBuffer.allocateDirect(size); + PositionedByteRange newSlab = new SimplePositionedOffHeapByteRange(size); slabs.add(newSlab); - for (int j = 0; j < newSlab.capacity(); j += sliceSize) { - newSlab.limit(j + sliceSize).position(j); - ByteBuffer aSlice = newSlab.slice(); + for (int j = 0; j < newSlab.getLength(); j += sliceSize) { + PositionedByteRange aSlice = newSlab.shallowCopySubRange(j, j + sliceSize); buffers.add(aSlice); heapSize += ClassSize.estimateBase(aSlice.getClass(), false); } @@ -89,9 +90,9 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize { * deconstructor in C++. */ void shutdown() { - for (ByteBuffer aSlab : slabs) { + for (PositionedByteRange aSlab : slabs) { try { - DirectMemoryUtils.destroyDirectByteBuffer(aSlab); + DirectMemoryUtils.destroyDirectByteRange(aSlab); } catch (Exception e) { LOG.warn("Unable to deallocate direct memory during shutdown", e); } @@ -114,17 +115,18 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize { * Throws an exception if you try to allocate a * bigger size than the allocator can handle. Alloc will block until a buffer is available. */ - ByteBuffer alloc(int bufferSize) throws InterruptedException { + PositionedByteRange alloc(int bufferSize) throws InterruptedException { int newCapacity = Preconditions.checkPositionIndex(bufferSize, blockSize); - ByteBuffer returnedBuffer = buffers.take(); + PositionedByteRange returnedBuffer = buffers.take(); - returnedBuffer.clear().limit(newCapacity); + // Not adding the limit method here? May be we need the limit method here + returnedBuffer.clear().setLimit(newCapacity); return returnedBuffer; } - void free(ByteBuffer toBeFreed) { - Preconditions.checkArgument(toBeFreed.capacity() == blockSize); + void free(PositionedByteRange toBeFreed) { + Preconditions.checkArgument(toBeFreed.getLength() == blockSize); buffers.add(toBeFreed); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java deleted file mode 100644 index 466a498..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.util; - -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -import com.google.common.base.Preconditions; - -import javax.management.JMException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -/** - * Utilities for interacting with and monitoring DirectByteBuffer allocations. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class DirectMemoryUtils { - private static final Log LOG = LogFactory.getLog(DirectMemoryUtils.class); - private static final String MEMORY_USED = "MemoryUsed"; - private static final MBeanServer BEAN_SERVER; - private static final ObjectName NIO_DIRECT_POOL; - private static final boolean HAS_MEMORY_USED_ATTRIBUTE; - - static { - // initialize singletons. Only maintain a reference to the MBeanServer if - // we're able to consume it -- hence convoluted logic. - ObjectName n = null; - MBeanServer s = null; - Object a = null; - try { - n = new ObjectName("java.nio:type=BufferPool,name=direct"); - } catch (MalformedObjectNameException e) { - LOG.warn("Unable to initialize ObjectName for DirectByteBuffer allocations."); - } finally { - NIO_DIRECT_POOL = n; - } - if (NIO_DIRECT_POOL != null) { - s = ManagementFactory.getPlatformMBeanServer(); - } - BEAN_SERVER = s; - if (BEAN_SERVER != null) { - try { - a = BEAN_SERVER.getAttribute(NIO_DIRECT_POOL, MEMORY_USED); - } catch (JMException e) { - LOG.debug("Failed to retrieve nio.BufferPool direct MemoryUsed attribute.", e); - } - } - HAS_MEMORY_USED_ATTRIBUTE = a != null; - } - - /** - * @return the setting of -XX:MaxDirectMemorySize as a long. Returns 0 if - * -XX:MaxDirectMemorySize is not set. - */ - public static long getDirectMemorySize() { - RuntimeMXBean runtimemxBean = ManagementFactory.getRuntimeMXBean(); - List arguments = runtimemxBean.getInputArguments(); - long multiplier = 1; //for the byte case. - for (String s : arguments) { - if (s.contains("-XX:MaxDirectMemorySize=")) { - String memSize = s.toLowerCase() - .replace("-xx:maxdirectmemorysize=", "").trim(); - - if (memSize.contains("k")) { - multiplier = 1024; - } - - else if (memSize.contains("m")) { - multiplier = 1048576; - } - - else if (memSize.contains("g")) { - multiplier = 1073741824; - } - memSize = memSize.replaceAll("[^\\d]", ""); - - long retValue = Long.parseLong(memSize); - return retValue * multiplier; - } - } - return 0; - } - - /** - * @return the current amount of direct memory used. - */ - public static long getDirectMemoryUsage() { - if (BEAN_SERVER == null || NIO_DIRECT_POOL == null || !HAS_MEMORY_USED_ATTRIBUTE) return 0; - try { - Long value = (Long) BEAN_SERVER.getAttribute(NIO_DIRECT_POOL, MEMORY_USED); - return value == null ? 0 : value; - } catch (JMException e) { - // should print further diagnostic information? - return 0; - } - } - - /** - * DirectByteBuffers are garbage collected by using a phantom reference and a - * reference queue. Every once a while, the JVM checks the reference queue and - * cleans the DirectByteBuffers. However, as this doesn't happen - * immediately after discarding all references to a DirectByteBuffer, it's - * easy to OutOfMemoryError yourself using DirectByteBuffers. This function - * explicitly calls the Cleaner method of a DirectByteBuffer. - * - * @param toBeDestroyed - * The DirectByteBuffer that will be "cleaned". Utilizes reflection. - * - */ - public static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed) - throws IllegalArgumentException, IllegalAccessException, - InvocationTargetException, SecurityException, NoSuchMethodException { - - Preconditions.checkArgument(toBeDestroyed.isDirect(), - "toBeDestroyed isn't direct!"); - - Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner"); - cleanerMethod.setAccessible(true); - Object cleaner = cleanerMethod.invoke(toBeDestroyed); - Method cleanMethod = cleaner.getClass().getMethod("clean"); - cleanMethod.setAccessible(true); - cleanMethod.invoke(cleaner); - } -} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index cf11575..fb473ba 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; public class CacheTestUtils { @@ -73,7 +75,8 @@ public class CacheTestUtils { conf); final AtomicInteger totalQueries = new AtomicInteger(); - final ConcurrentLinkedQueue blocksToTest = new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue blocksToTest = + new ConcurrentLinkedQueue(); final AtomicInteger hits = new AtomicInteger(); final AtomicInteger miss = new AtomicInteger(); @@ -253,7 +256,12 @@ public class CacheTestUtils { new CacheableDeserializer() { @Override - public Cacheable deserialize(ByteBuffer b) throws IOException { + public int getDeserialiserIdentifier() { + return deserializerIdentifier; + } + + @Override + public Cacheable deserialize(PositionedByteRange b) throws IOException { int len = b.getInt(); Thread.yield(); byte buf[] = new byte[len]; @@ -262,13 +270,7 @@ public class CacheTestUtils { } @Override - public int getDeserialiserIdentifier() { - return deserializerIdentifier; - } - - @Override - public Cacheable deserialize(ByteBuffer b, boolean reuse) - throws IOException { + public Cacheable deserialize(PositionedByteRange b, boolean reuse) throws IOException { return deserialize(b); } }; @@ -312,6 +314,14 @@ public class CacheTestUtils { public BlockType getBlockType() { return BlockType.DATA; } + + @Override + public void serialize(PositionedByteRange destination) { + destination.putInt(buf.length); + Thread.yield(); + destination.put(buf); + destination.rewind(); + } } @@ -326,9 +336,9 @@ public class CacheTestUtils { // declare our data size to be smaller than it by the serialization space // required. - ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize + PositionedByteRange cachedBuffer = new SimplePositionedByteRange(blockSize - HFileBlock.EXTRA_SERIALIZATION_SPACE); - rand.nextBytes(cachedBuffer.array()); + rand.nextBytes(cachedBuffer.getBytes()); cachedBuffer.rewind(); int onDiskSizeWithoutHeader = blockSize - HFileBlock.EXTRA_SERIALIZATION_SPACE; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java index d7f9cbb..10bce37 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java @@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.io.hfile; import java.nio.ByteBuffer; import junit.framework.TestCase; + import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.PositionedByteRange; import org.junit.experimental.categories.Category; @Category(SmallTests.class) @@ -140,6 +142,12 @@ public class TestCachedBlockQueue extends TestCase { return BlockType.DATA; } + @Override + public void serialize(PositionedByteRange destination) { + // TODO Auto-generated method stub + + } + }, accessTime, false); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 0a2187d..0c77381 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.compress.Compressor; import org.junit.Before; @@ -853,7 +854,7 @@ public class TestHFileBlock { .withCompression(Algorithm.NONE) .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) .withChecksumType(ChecksumType.NULL).build(); - HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, + HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, new SimplePositionedByteRange(buf.array()), HFileBlock.FILL_HEADER, -1, 0, meta); long byteBufferExpectedSize = diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java index 1d48509..a244eed 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; import org.apache.hadoop.io.compress.Compressor; import org.junit.Before; import org.junit.Test; @@ -705,9 +707,9 @@ public class TestHFileBlockCompatibility { * * @return uncompressed block for caching on write in the form of a buffer */ - public ByteBuffer getUncompressedBufferWithHeader() { + public PositionedByteRange getUncompressedBufferWithHeader() { byte[] b = getUncompressedDataWithHeader(); - return ByteBuffer.wrap(b, 0, b.length); + return new SimplePositionedByteRange(b, 0, b.length); } /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java index b005d41..8076385 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; import org.apache.hadoop.hbase.util.test.RedundantKVGenerator; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -108,10 +110,10 @@ public class TestHFileDataBlockEncoder { ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer( generator.generateTestKeyValues(60, useTags), includesMemstoreTS); int size = keyValues.limit(); - ByteBuffer buf = ByteBuffer.allocate(size + headerSize); - buf.position(headerSize); + PositionedByteRange buf = new SimplePositionedByteRange(size + headerSize); + buf.setPosition(headerSize); keyValues.rewind(); - buf.put(keyValues); + buf.put(keyValues.array()); HFileContext hfileContext = new HFileContextBuilder().withHBaseCheckSum(false) .withIncludesMvcc(includesMemstoreTS) .withIncludesTags(useTags) @@ -135,7 +137,7 @@ public class TestHFileDataBlockEncoder { byte[] encodedBytes = context.getUncompressedBytesWithHeader(); size = encodedBytes.length - block.getDummyHeaderForVersion().length; return new HFileBlock(context.getBlockType(), size, size, -1, - ByteBuffer.wrap(encodedBytes), HFileBlock.FILL_HEADER, 0, + new SimplePositionedByteRange(encodedBytes), HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), block.getHFileContext()); } @@ -168,10 +170,10 @@ public class TestHFileDataBlockEncoder { ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer( generator.generateTestKeyValues(60, useTag), includesMemstoreTS); int size = keyValues.limit(); - ByteBuffer buf = ByteBuffer.allocate(size + HConstants.HFILEBLOCK_HEADER_SIZE); - buf.position(HConstants.HFILEBLOCK_HEADER_SIZE); + PositionedByteRange buf = new SimplePositionedByteRange(size + HConstants.HFILEBLOCK_HEADER_SIZE); + buf.setPosition(HConstants.HFILEBLOCK_HEADER_SIZE); keyValues.rewind(); - buf.put(keyValues); + buf.put(keyValues.array()); HFileContext meta = new HFileContextBuilder() .withIncludesMvcc(includesMemstoreTS) .withIncludesTags(useTag) diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java index 2de7608..2e6595f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.PositionedByteRange; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -778,6 +779,12 @@ public class TestLruBlockCache { return BlockType.DATA; } + @Override + public void serialize(PositionedByteRange destination) { + // TODO Auto-generated method stub + + } + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteRangeIOEngine.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteRangeIOEngine.java new file mode 100644 index 0000000..c24dfe8 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteRangeIOEngine.java @@ -0,0 +1,75 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Basic test for {@link ByteBufferIOEngine} + */ +@Category(SmallTests.class) +public class TestByteRangeIOEngine { + + @Test + public void testByteRangeIOEngine() throws Exception { + int capacity = 32 * 1024 * 1024; // 32 MB + int testNum = 100; + int maxBlockSize = 64 * 1024; + ByteRangeIOEngine ioEngine = new ByteRangeIOEngine(capacity, false); + int testOffsetAtStartNum = testNum / 10; + int testOffsetAtEndNum = testNum / 10; + for (int i = 0; i < testNum; i++) { + byte val = (byte) (Math.random() * 255); + int blockSize = (int) (Math.random() * maxBlockSize); + if (blockSize == 0) { + blockSize = 1; + } + byte[] byteArray = new byte[blockSize]; + for (int j = 0; j < byteArray.length; ++j) { + byteArray[j] = val; + } + PositionedByteRange srcBuffer = new SimplePositionedByteRange(byteArray); + int offset = 0; + if (testOffsetAtStartNum > 0) { + testOffsetAtStartNum--; + offset = 0; + } else if (testOffsetAtEndNum > 0) { + testOffsetAtEndNum--; + offset = capacity - blockSize; + } else { + offset = (int) (Math.random() * (capacity - maxBlockSize)); + } + ioEngine.write(srcBuffer, offset); + PositionedByteRange dstBuffer = new SimplePositionedByteRange(blockSize); + ioEngine.read(dstBuffer, offset); + byte[] byteArray2 = dstBuffer.getBytes(); + for (int j = 0; j < byteArray.length; ++j) { + assertTrue(byteArray[j] == byteArray2[j]); + } + } + assert testOffsetAtStartNum == 0; + assert testOffsetAtEndNum == 0; + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlab.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlab.java index 71d708a..034c8ee 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlab.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlab.java @@ -19,11 +19,13 @@ package org.apache.hadoop.hbase.io.hfile.slab; -import static org.junit.Assert.*; -import java.nio.ByteBuffer; +import static org.junit.Assert.assertEquals; import org.apache.hadoop.hbase.SmallTests; -import org.junit.*; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.junit.experimental.categories.Category; /**Test cases for Slab.java*/ @@ -32,7 +34,7 @@ public class TestSlab { static final int BLOCKSIZE = 1000; static final int NUMBLOCKS = 100; Slab testSlab; - ByteBuffer[] buffers = new ByteBuffer[NUMBLOCKS]; + PositionedByteRange[] buffers = new PositionedByteRange[NUMBLOCKS]; @Before public void setUp() { @@ -48,7 +50,7 @@ public class TestSlab { public void testBasicFunctionality() throws InterruptedException { for (int i = 0; i < NUMBLOCKS; i++) { buffers[i] = testSlab.alloc(BLOCKSIZE); - assertEquals(BLOCKSIZE, buffers[i].limit()); + assertEquals(BLOCKSIZE, buffers[i].getLimit()); } // write an unique integer to each allocated buffer. @@ -68,7 +70,7 @@ public class TestSlab { for (int i = 0; i < NUMBLOCKS; i++) { buffers[i] = testSlab.alloc(BLOCKSIZE); - assertEquals(BLOCKSIZE, buffers[i].limit()); + assertEquals(BLOCKSIZE, buffers[i].getLimit()); } }