diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index b09dc9a..2bb820e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,10 +40,9 @@ public final class ByteBufferArray { public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; private ByteBuffer buffers[]; - private Lock locks[]; private int bufferSize; private int bufferCount; - private ByteBufferAllocator allocator; + /** * We allocate a number of byte buffers as the capacity. In order not to out * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}), @@ -65,10 +62,7 @@ public final class ByteBufferArray { + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + bufferCount + ", direct=" + directByteBuffer); buffers = new ByteBuffer[bufferCount + 1]; - locks = new Lock[bufferCount + 1]; - this.allocator = allocator; for (int i = 0; i <= bufferCount; i++) { - locks[i] = new ReentrantLock(); if (i < bufferCount) { buffers[i] = allocator.allocate(bufferSize, directByteBuffer); } else { @@ -109,8 +103,8 @@ public final class ByteBufferArray { private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() { @Override - public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) { - bb.get(array, arrayIdx, len); + public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) { + ByteBufferUtils.copyFromBufferToArray(array, bb, pos, arrayIdx, len); } }; @@ -138,8 +132,8 @@ public final class ByteBufferArray { private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() { @Override - public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) { - bb.put(array, arrayIdx, len); + public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) { + ByteBufferUtils.copyFromArrayToBuffer(bb, pos, array, arrayIdx, len); } }; @@ -149,11 +143,12 @@ public final class ByteBufferArray { * bytes from the buffer to the destination array, else if it is a write * action, we will transfer the bytes from the source array to the buffer * @param bb byte buffer + * @param pos Start position in ByteBuffer * @param array a source or destination byte array * @param arrayOffset offset of the byte array * @param len read/write length */ - void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len); + void visit(ByteBuffer bb, int pos, byte[] array, int arrayOffset, int len); } /** @@ -176,7 +171,7 @@ public final class ByteBufferArray { assert startBuffer >= 0 && startBuffer < bufferCount; assert endBuffer >= 0 && endBuffer < bufferCount || (endBuffer == bufferCount && endOffset == 0); - if (startBuffer >= locks.length || startBuffer < 0) { + if (startBuffer >= buffers.length || startBuffer < 0) { String msg = "Failed multiple, start=" + start + ",startBuffer=" + startBuffer + ",bufferSize=" + bufferSize; LOG.error(msg); @@ -184,26 +179,19 @@ public final class ByteBufferArray { } int srcIndex = 0, cnt = -1; for (int i = startBuffer; i <= endBuffer; ++i) { - Lock lock = locks[i]; - lock.lock(); - try { - ByteBuffer bb = buffers[i]; - if (i == startBuffer) { - cnt = bufferSize - startOffset; - if (cnt > len) cnt = len; - bb.limit(startOffset + cnt).position(startOffset); - } else if (i == endBuffer) { - cnt = endOffset; - bb.limit(cnt).position(0); - } else { - cnt = bufferSize; - bb.limit(cnt).position(0); - } - visitor.visit(bb, array, srcIndex + arrayOffset, cnt); - srcIndex += cnt; - } finally { - lock.unlock(); + ByteBuffer bb = buffers[i].duplicate(); + int pos = 0; + if (i == startBuffer) { + cnt = bufferSize - startOffset; + if (cnt > len) cnt = len; + pos = startOffset; + } else if (i == endBuffer) { + cnt = endOffset; + } else { + cnt = bufferSize; } + visitor.visit(bb, pos, array, srcIndex + arrayOffset, cnt); + srcIndex += cnt; } assert srcIndex == len; } @@ -231,7 +219,7 @@ public final class ByteBufferArray { assert startBuffer >= 0 && startBuffer < bufferCount; assert endBuffer >= 0 && endBuffer < bufferCount || (endBuffer == bufferCount && endBufferOffset == 0); - if (startBuffer >= locks.length || startBuffer < 0) { + if (startBuffer >= buffers.length || startBuffer < 0) { String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer + ",bufferSize=" + bufferSize; LOG.error(msg); @@ -239,32 +227,21 @@ public final class ByteBufferArray { } int srcIndex = 0, cnt = -1; ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1]; - for (int i = startBuffer,j=0; i <= endBuffer; ++i,j++) { - Lock lock = locks[i]; - lock.lock(); - try { - ByteBuffer bb = buffers[i]; - if (i == startBuffer) { - cnt = bufferSize - startBufferOffset; - if (cnt > len) cnt = len; - ByteBuffer dup = bb.duplicate(); - dup.limit(startBufferOffset + cnt).position(startBufferOffset); - mbb[j] = dup.slice(); - } else if (i == endBuffer) { - cnt = endBufferOffset; - ByteBuffer dup = bb.duplicate(); - dup.position(0).limit(cnt); - mbb[j] = dup.slice(); - } else { - cnt = bufferSize ; - ByteBuffer dup = bb.duplicate(); - dup.position(0).limit(cnt); - mbb[j] = dup.slice(); - } - srcIndex += cnt; - } finally { - lock.unlock(); + for (int i = startBuffer, j = 0; i <= endBuffer; ++i, j++) { + ByteBuffer bb = buffers[i].duplicate(); + if (i == startBuffer) { + cnt = bufferSize - startBufferOffset; + if (cnt > len) cnt = len; + bb.limit(startBufferOffset + cnt).position(startBufferOffset); + } else if (i == endBuffer) { + cnt = endBufferOffset; + bb.position(0).limit(cnt); + } else { + cnt = bufferSize; + bb.position(0).limit(cnt); } + mbb[j] = bb.slice(); + srcIndex += cnt; } assert srcIndex == len; if (mbb.length > 1) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index df23614..9909f19 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -940,6 +940,28 @@ public final class ByteBufferUtils { } /** + * Copies bytes from given array's offset to length part into the given buffer. Puts the bytes + * to buffer's given position. + * @param out + * @param in + * @param inOffset + * @param length + */ + public static void copyFromArrayToBuffer(ByteBuffer out, int outOffset, byte[] in, int inOffset, + int length) { + if (out.hasArray()) { + System.arraycopy(in, inOffset, out.array(), out.arrayOffset() + outOffset, length); + } else if (UNSAFE_AVAIL) { + UnsafeAccess.copy(in, inOffset, out, outOffset, length); + } else { + int oldPos = out.position(); + out.position(outOffset); + out.put(in, inOffset, length); + out.position(oldPos); + } + } + + /** * Copies specified number of bytes from given offset of 'in' ByteBuffer to * the array. * @param out