Uploaded image for project: 'Accumulo'
  1. Accumulo
  2. ACCUMULO-4164

Avoid copy of RFile Index blocks when in cache

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.5, 1.7.1
    • Fix Version/s: 1.6.6, 1.7.2, 1.8.0
    • Component/s: None
    • Labels:
      None

      Description

      I have been doing performance experiments with RFile. During the course of these experiments I noticed that RFile is not as fast at it should be in the case where index blocks are in cache and the RFile is not already open. The reason is that the RFile code copies and deserializes the index data even though its already in memory.

      I made the following change to RFile in a branch.

      • Avoid copy of index data when its in cache
      • Deserialize offsets lazily (instead of upfront) during binary search
      • Stopped calling lots of synchronized methods during deserialization of index info. The existing code use ByteArrayInputStream which results in lots of fine grained synchronization. Switching to an inputstream that offers the same functionality w/o sync showed a measurable performance difference.

      These changes lead to performance in the following two situations :

      • When an RFiles data is in cache, but its not open on the tserver.
      • For RFiles with multilevel indexes with index data in cache. Currently an open RFile only keeps the root node in memory. Lower level index nodes are always read from the cache or DFS. The changes I made would always avoid the copy and deserialization of lower level index nodes when in cache.

      I have seen significant performance improvements testing with the two cases above. My test are currently based on a new API I am creating for RFile, so I can not easily share them until I get that pushed.

      For the case where a tserver has all files frequently in use already open and those files have a single level index, these changes should not make a significant performance difference.

      These change should result in less memory use for opening the same rfile multiple times for different scans (when data is in cache). In this case all of the RFiles would share the same byte array holding the serialized index data.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user keith-turner opened a pull request:

          https://github.com/apache/accumulo/pull/80

          ACCUMULO-4164 Avoid copying rfile index when in cache. Avoid sync wh…

          …en deserializing index.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/keith-turner/accumulo rfile-no-index-copy

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/accumulo/pull/80.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #80


          commit c86ec0b2627a7660d13a3a01a73573b12423fd9c
          Author: Keith Turner <kturner@apache.org>
          Date: 2016-03-12T00:38:38Z

          ACCUMULO-4164 Avoid copying rfile index when in cache. Avoid sync when deserializing index.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user keith-turner opened a pull request: https://github.com/apache/accumulo/pull/80 ACCUMULO-4164 Avoid copying rfile index when in cache. Avoid sync wh… …en deserializing index. You can merge this pull request into a Git repository by running: $ git pull https://github.com/keith-turner/accumulo rfile-no-index-copy Alternatively you can review and apply these changes as the patch at: https://github.com/apache/accumulo/pull/80.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #80 commit c86ec0b2627a7660d13a3a01a73573b12423fd9c Author: Keith Turner <kturner@apache.org> Date: 2016-03-12T00:38:38Z ACCUMULO-4164 Avoid copying rfile index when in cache. Avoid sync when deserializing index.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/80#discussion_r56753645

          — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java —
          @@ -0,0 +1,132 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.accumulo.core.file.blockfile.impl;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +
          +/**
          + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization.
          + */
          +public class SeekableByteArrayInputStream extends InputStream {
          +
          + // make this volatile to ensure data set by one thread can be seen by another
          + private volatile byte buffer[];
          + private int cur;
          + private int max;
          +
          + @Override
          + public int read() {
          + if (cur < max)

          { + return buffer[cur++] & 0xff; + }

          else

          { + return -1; + }
          + }
          +
          + @Override
          + public int read(byte b[], int offset, int length) {
          + if (b == null) { + throw new NullPointerException(); + }
          +
          + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + }
          +
          + if (length == 0) { + return 0; + }
          +
          + int avail = max - cur;
          +
          + if (avail <= 0) { + return -1; + }

          +
          + if (length > avail)

          { + length = avail; + }

          +
          + System.arraycopy(buffer, cur, b, offset, length);
          + cur += length;
          + return length;
          + }
          +
          + @Override
          + public long skip(long requestedSkip)

          { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + }

          +
          + @Override
          + public int available()

          { + return max - cur; + }

          +
          + @Override
          + public boolean markSupported()

          { + return false; + }

          +
          + @Override
          + public void mark(int readAheadLimit)

          { + throw new UnsupportedOperationException(); + }

          +
          + @Override
          + public void reset() {
          + throw new UnsupportedOperationException();
          — End diff –

          Isn't this simple to implement? Is there a reason not to?

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/80#discussion_r56753645 — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java — @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.file.blockfile.impl; + +import java.io.IOException; +import java.io.InputStream; + +/** + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization. + */ +public class SeekableByteArrayInputStream extends InputStream { + + // make this volatile to ensure data set by one thread can be seen by another + private volatile byte buffer[]; + private int cur; + private int max; + + @Override + public int read() { + if (cur < max) { + return buffer[cur++] & 0xff; + } else { + return -1; + } + } + + @Override + public int read(byte b[], int offset, int length) { + if (b == null) { + throw new NullPointerException(); + } + + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + + if (length == 0) { + return 0; + } + + int avail = max - cur; + + if (avail <= 0) { + return -1; + } + + if (length > avail) { + length = avail; + } + + System.arraycopy(buffer, cur, b, offset, length); + cur += length; + return length; + } + + @Override + public long skip(long requestedSkip) { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + } + + @Override + public int available() { + return max - cur; + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readAheadLimit) { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + throw new UnsupportedOperationException(); — End diff – Isn't this simple to implement? Is there a reason not to?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/80#discussion_r56753652

          — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java —
          @@ -0,0 +1,132 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.accumulo.core.file.blockfile.impl;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +
          +/**
          + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization.
          + */
          +public class SeekableByteArrayInputStream extends InputStream {
          +
          + // make this volatile to ensure data set by one thread can be seen by another
          + private volatile byte buffer[];
          + private int cur;
          + private int max;
          +
          + @Override
          + public int read() {
          + if (cur < max)

          { + return buffer[cur++] & 0xff; + }

          else

          { + return -1; + }
          + }
          +
          + @Override
          + public int read(byte b[], int offset, int length) {
          + if (b == null) { + throw new NullPointerException(); + }
          +
          + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + }
          +
          + if (length == 0) { + return 0; + }
          +
          + int avail = max - cur;
          +
          + if (avail <= 0) { + return -1; + }

          +
          + if (length > avail)

          { + length = avail; + }

          +
          + System.arraycopy(buffer, cur, b, offset, length);
          + cur += length;
          + return length;
          + }
          +
          + @Override
          + public long skip(long requestedSkip)

          { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + }

          +
          + @Override
          + public int available()

          { + return max - cur; + }

          +
          + @Override
          + public boolean markSupported()

          { + return false; + }

          +
          + @Override
          + public void mark(int readAheadLimit)

          { + throw new UnsupportedOperationException(); + }
          +
          + @Override
          + public void reset() { + throw new UnsupportedOperationException(); + }

          +
          + @Override
          + public void close() throws IOException {}
          +
          + public SeekableByteArrayInputStream(byte[] buf) {
          + this.buffer = buf;
          — End diff –

          `Objects.requireNonNull(buf)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/80#discussion_r56753652 — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java — @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.file.blockfile.impl; + +import java.io.IOException; +import java.io.InputStream; + +/** + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization. + */ +public class SeekableByteArrayInputStream extends InputStream { + + // make this volatile to ensure data set by one thread can be seen by another + private volatile byte buffer[]; + private int cur; + private int max; + + @Override + public int read() { + if (cur < max) { + return buffer[cur++] & 0xff; + } else { + return -1; + } + } + + @Override + public int read(byte b[], int offset, int length) { + if (b == null) { + throw new NullPointerException(); + } + + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + + if (length == 0) { + return 0; + } + + int avail = max - cur; + + if (avail <= 0) { + return -1; + } + + if (length > avail) { + length = avail; + } + + System.arraycopy(buffer, cur, b, offset, length); + cur += length; + return length; + } + + @Override + public long skip(long requestedSkip) { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + } + + @Override + public int available() { + return max - cur; + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readAheadLimit) { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException {} + + public SeekableByteArrayInputStream(byte[] buf) { + this.buffer = buf; — End diff – `Objects.requireNonNull(buf)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/80#discussion_r56753657

          — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java —
          @@ -0,0 +1,132 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.accumulo.core.file.blockfile.impl;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +
          +/**
          + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization.
          + */
          +public class SeekableByteArrayInputStream extends InputStream {
          +
          + // make this volatile to ensure data set by one thread can be seen by another
          + private volatile byte buffer[];
          + private int cur;
          + private int max;
          +
          + @Override
          + public int read() {
          + if (cur < max)

          { + return buffer[cur++] & 0xff; + }

          else

          { + return -1; + }
          + }
          +
          + @Override
          + public int read(byte b[], int offset, int length) {
          + if (b == null) { + throw new NullPointerException(); + }
          +
          + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + }
          +
          + if (length == 0) { + return 0; + }
          +
          + int avail = max - cur;
          +
          + if (avail <= 0) { + return -1; + }

          +
          + if (length > avail)

          { + length = avail; + }

          +
          + System.arraycopy(buffer, cur, b, offset, length);
          + cur += length;
          + return length;
          + }
          +
          + @Override
          + public long skip(long requestedSkip)

          { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + }

          +
          + @Override
          + public int available()

          { + return max - cur; + }

          +
          + @Override
          + public boolean markSupported()

          { + return false; + }

          +
          + @Override
          + public void mark(int readAheadLimit)

          { + throw new UnsupportedOperationException(); + }
          +
          + @Override
          + public void reset() { + throw new UnsupportedOperationException(); + }

          +
          + @Override
          + public void close() throws IOException {}
          +
          + public SeekableByteArrayInputStream(byte[] buf)

          { + this.buffer = buf; + this.cur = 0; + this.max = buf.length; + }

          +
          + public SeekableByteArrayInputStream(byte[] buf, int maxOffset) {
          + this.buffer = buf;
          — End diff –

          `Objects.requireNonNull(buf)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/80#discussion_r56753657 — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java — @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.file.blockfile.impl; + +import java.io.IOException; +import java.io.InputStream; + +/** + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization. + */ +public class SeekableByteArrayInputStream extends InputStream { + + // make this volatile to ensure data set by one thread can be seen by another + private volatile byte buffer[]; + private int cur; + private int max; + + @Override + public int read() { + if (cur < max) { + return buffer[cur++] & 0xff; + } else { + return -1; + } + } + + @Override + public int read(byte b[], int offset, int length) { + if (b == null) { + throw new NullPointerException(); + } + + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + + if (length == 0) { + return 0; + } + + int avail = max - cur; + + if (avail <= 0) { + return -1; + } + + if (length > avail) { + length = avail; + } + + System.arraycopy(buffer, cur, b, offset, length); + cur += length; + return length; + } + + @Override + public long skip(long requestedSkip) { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + } + + @Override + public int available() { + return max - cur; + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readAheadLimit) { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException {} + + public SeekableByteArrayInputStream(byte[] buf) { + this.buffer = buf; + this.cur = 0; + this.max = buf.length; + } + + public SeekableByteArrayInputStream(byte[] buf, int maxOffset) { + this.buffer = buf; — End diff – `Objects.requireNonNull(buf)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/80#discussion_r56753672

          — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java —
          @@ -0,0 +1,132 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.accumulo.core.file.blockfile.impl;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +
          +/**
          + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization.
          + */
          +public class SeekableByteArrayInputStream extends InputStream {
          +
          + // make this volatile to ensure data set by one thread can be seen by another
          + private volatile byte buffer[];
          + private int cur;
          + private int max;
          +
          + @Override
          + public int read() {
          + if (cur < max)

          { + return buffer[cur++] & 0xff; + }

          else

          { + return -1; + }
          + }
          +
          + @Override
          + public int read(byte b[], int offset, int length) {
          + if (b == null) { + throw new NullPointerException(); + }
          +
          + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + }
          +
          + if (length == 0) { + return 0; + }
          +
          + int avail = max - cur;
          +
          + if (avail <= 0) { + return -1; + }

          +
          + if (length > avail)

          { + length = avail; + }

          +
          + System.arraycopy(buffer, cur, b, offset, length);
          + cur += length;
          + return length;
          + }
          +
          + @Override
          + public long skip(long requestedSkip)

          { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + }

          +
          + @Override
          + public int available()

          { + return max - cur; + }

          +
          + @Override
          + public boolean markSupported()

          { + return false; + }

          +
          + @Override
          + public void mark(int readAheadLimit)

          { + throw new UnsupportedOperationException(); + }
          +
          + @Override
          + public void reset() { + throw new UnsupportedOperationException(); + }

          +
          + @Override
          + public void close() throws IOException {}
          +
          + public SeekableByteArrayInputStream(byte[] buf)

          { + this.buffer = buf; + this.cur = 0; + this.max = buf.length; + }

          +
          + public SeekableByteArrayInputStream(byte[] buf, int maxOffset)

          { + this.buffer = buf; + this.cur = 0; + this.max = maxOffset; + }

          +
          + public void seek(int position)

          { + if (position < 0 || position >= max) + throw new IllegalArgumentException("position = " + position + " maxOffset = " + max); + this.cur = position; + }

          +
          + public int getPosition()

          { + return this.cur; + }

          +
          + public byte[] getBuffer() {
          — End diff –

          Javadoc to be explicit that this is the actual array (changes to it will be represented in the InputStream? Do we want this do be `public`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/80#discussion_r56753672 — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java — @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.file.blockfile.impl; + +import java.io.IOException; +import java.io.InputStream; + +/** + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization. + */ +public class SeekableByteArrayInputStream extends InputStream { + + // make this volatile to ensure data set by one thread can be seen by another + private volatile byte buffer[]; + private int cur; + private int max; + + @Override + public int read() { + if (cur < max) { + return buffer[cur++] & 0xff; + } else { + return -1; + } + } + + @Override + public int read(byte b[], int offset, int length) { + if (b == null) { + throw new NullPointerException(); + } + + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + + if (length == 0) { + return 0; + } + + int avail = max - cur; + + if (avail <= 0) { + return -1; + } + + if (length > avail) { + length = avail; + } + + System.arraycopy(buffer, cur, b, offset, length); + cur += length; + return length; + } + + @Override + public long skip(long requestedSkip) { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + } + + @Override + public int available() { + return max - cur; + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readAheadLimit) { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException {} + + public SeekableByteArrayInputStream(byte[] buf) { + this.buffer = buf; + this.cur = 0; + this.max = buf.length; + } + + public SeekableByteArrayInputStream(byte[] buf, int maxOffset) { + this.buffer = buf; + this.cur = 0; + this.max = maxOffset; + } + + public void seek(int position) { + if (position < 0 || position >= max) + throw new IllegalArgumentException("position = " + position + " maxOffset = " + max); + this.cur = position; + } + + public int getPosition() { + return this.cur; + } + + public byte[] getBuffer() { — End diff – Javadoc to be explicit that this is the actual array (changes to it will be represented in the InputStream? Do we want this do be `public`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/80#discussion_r56753765

          — Diff: core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java —
          @@ -129,85 +130,114 @@ public int hashCode() {
          }
          }

          • // a list that deserializes index entries on demand
          • private static class SerializedIndex extends AbstractList<IndexEntry> implements List<IndexEntry>, RandomAccess {
            + private static abstract class SerializedIndexBase<T> extends AbstractList<T> implements List<T>, RandomAccess {
            + protected int[] offsets;
            + protected byte[] data;
          • private int[] offsets;
          • private byte[] data;
          • private boolean newFormat;
            + protected SeekableByteArrayInputStream sbais;
            + protected DataInputStream dis;
            + protected int offsetsOffset;
            + protected int indexOffset;
            + protected int numOffsets;
            + protected int indexSize;
          • SerializedIndex(int[] offsets, byte[] data, boolean newFormat) {
            + SerializedIndexBase(int[] offsets, byte[] data) { this.offsets = offsets; this.data = data; - this.newFormat = newFormat; + sbais = new SeekableByteArrayInputStream(data); + dis = new DataInputStream(sbais); }
          • @Override
          • public IndexEntry get(int index) {
          • int len;
          • if (index == offsets.length - 1)
          • len = data.length - offsets[index];
          • else
          • len = offsets[index + 1] - offsets[index];
            + SerializedIndexBase(byte[] data, int offsetsOffset, int numOffsets, int indexOffset, int indexSize) { + sbais = new SeekableByteArrayInputStream(data, indexOffset + indexSize); + dis = new DataInputStream(sbais); + this.offsetsOffset = offsetsOffset; + this.indexOffset = indexOffset; + this.numOffsets = numOffsets; + this.indexSize = indexSize; + }
          • ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
          • DataInputStream dis = new DataInputStream(bais);
            + protected abstract T newValue() throws IOException;
              • End diff –

          Add javadoc for what implementations should do with this abstract method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/80#discussion_r56753765 — Diff: core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java — @@ -129,85 +130,114 @@ public int hashCode() { } } // a list that deserializes index entries on demand private static class SerializedIndex extends AbstractList<IndexEntry> implements List<IndexEntry>, RandomAccess { + private static abstract class SerializedIndexBase<T> extends AbstractList<T> implements List<T>, RandomAccess { + protected int[] offsets; + protected byte[] data; private int[] offsets; private byte[] data; private boolean newFormat; + protected SeekableByteArrayInputStream sbais; + protected DataInputStream dis; + protected int offsetsOffset; + protected int indexOffset; + protected int numOffsets; + protected int indexSize; SerializedIndex(int[] offsets, byte[] data, boolean newFormat) { + SerializedIndexBase(int[] offsets, byte[] data) { this.offsets = offsets; this.data = data; - this.newFormat = newFormat; + sbais = new SeekableByteArrayInputStream(data); + dis = new DataInputStream(sbais); } @Override public IndexEntry get(int index) { int len; if (index == offsets.length - 1) len = data.length - offsets [index] ; else len = offsets [index + 1] - offsets [index] ; + SerializedIndexBase(byte[] data, int offsetsOffset, int numOffsets, int indexOffset, int indexSize) { + sbais = new SeekableByteArrayInputStream(data, indexOffset + indexSize); + dis = new DataInputStream(sbais); + this.offsetsOffset = offsetsOffset; + this.indexOffset = indexOffset; + this.numOffsets = numOffsets; + this.indexSize = indexSize; + } ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets [index] , len); DataInputStream dis = new DataInputStream(bais); + protected abstract T newValue() throws IOException; End diff – Add javadoc for what implementations should do with this abstract method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on the pull request:

          https://github.com/apache/accumulo/pull/80#issuecomment-198774097

          Tried to give this a once-over. I've not looked at the MultiLevelIndex class though, so I don't have much understanding about the actual use (and the caching issue). Will try to give it a closer look, but don't wait up on me.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on the pull request: https://github.com/apache/accumulo/pull/80#issuecomment-198774097 Tried to give this a once-over. I've not looked at the MultiLevelIndex class though, so I don't have much understanding about the actual use (and the caching issue). Will try to give it a closer look, but don't wait up on me.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/80#discussion_r56990429

          — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java —
          @@ -0,0 +1,132 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.accumulo.core.file.blockfile.impl;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +
          +/**
          + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization.
          + */
          +public class SeekableByteArrayInputStream extends InputStream {
          +
          + // make this volatile to ensure data set by one thread can be seen by another
          + private volatile byte buffer[];
          + private int cur;
          + private int max;
          +
          + @Override
          + public int read() {
          + if (cur < max)

          { + return buffer[cur++] & 0xff; + }

          else

          { + return -1; + }
          + }
          +
          + @Override
          + public int read(byte b[], int offset, int length) {
          + if (b == null) { + throw new NullPointerException(); + }
          +
          + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + }
          +
          + if (length == 0) { + return 0; + }
          +
          + int avail = max - cur;
          +
          + if (avail <= 0) { + return -1; + }

          +
          + if (length > avail)

          { + length = avail; + }

          +
          + System.arraycopy(buffer, cur, b, offset, length);
          + cur += length;
          + return length;
          + }
          +
          + @Override
          + public long skip(long requestedSkip)

          { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + }

          +
          + @Override
          + public int available()

          { + return max - cur; + }

          +
          + @Override
          + public boolean markSupported()

          { + return false; + }

          +
          + @Override
          + public void mark(int readAheadLimit)

          { + throw new UnsupportedOperationException(); + }

          +
          + @Override
          + public void reset() {
          + throw new UnsupportedOperationException();
          — End diff –

          > Is there a reason not to?

          Because nothing would call it ATM and I would have to write test.. the other code is well tested indirectly by the rfile test...

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/80#discussion_r56990429 — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java — @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.file.blockfile.impl; + +import java.io.IOException; +import java.io.InputStream; + +/** + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization. + */ +public class SeekableByteArrayInputStream extends InputStream { + + // make this volatile to ensure data set by one thread can be seen by another + private volatile byte buffer[]; + private int cur; + private int max; + + @Override + public int read() { + if (cur < max) { + return buffer[cur++] & 0xff; + } else { + return -1; + } + } + + @Override + public int read(byte b[], int offset, int length) { + if (b == null) { + throw new NullPointerException(); + } + + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + + if (length == 0) { + return 0; + } + + int avail = max - cur; + + if (avail <= 0) { + return -1; + } + + if (length > avail) { + length = avail; + } + + System.arraycopy(buffer, cur, b, offset, length); + cur += length; + return length; + } + + @Override + public long skip(long requestedSkip) { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + } + + @Override + public int available() { + return max - cur; + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readAheadLimit) { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + throw new UnsupportedOperationException(); — End diff – > Is there a reason not to? Because nothing would call it ATM and I would have to write test.. the other code is well tested indirectly by the rfile test...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/80#issuecomment-199835734

          > I've not looked at the MultiLevelIndex class though, so I don't have much understanding about the actual use (and the caching issue).

          When reading an rfile index, multilevelindex leaves the index in serialized form and does binary searches on the serialized data. This reduces rfile open time (avoid serializing all keys up front) and saves memory. When the index block is in cache it was copying the bytes unnecessarily. Now the binary search on the serialized index can be done directly on the cached data.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/80#issuecomment-199835734 > I've not looked at the MultiLevelIndex class though, so I don't have much understanding about the actual use (and the caching issue). When reading an rfile index, multilevelindex leaves the index in serialized form and does binary searches on the serialized data. This reduces rfile open time (avoid serializing all keys up front) and saves memory. When the index block is in cache it was copying the bytes unnecessarily. Now the binary search on the serialized index can be done directly on the cached data.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/80#discussion_r56993564

          — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java —
          @@ -0,0 +1,132 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.accumulo.core.file.blockfile.impl;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +
          +/**
          + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization.
          + */
          +public class SeekableByteArrayInputStream extends InputStream {
          +
          + // make this volatile to ensure data set by one thread can be seen by another
          + private volatile byte buffer[];
          + private int cur;
          + private int max;
          +
          + @Override
          + public int read() {
          + if (cur < max)

          { + return buffer[cur++] & 0xff; + }

          else

          { + return -1; + }
          + }
          +
          + @Override
          + public int read(byte b[], int offset, int length) {
          + if (b == null) { + throw new NullPointerException(); + }
          +
          + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + }
          +
          + if (length == 0) { + return 0; + }
          +
          + int avail = max - cur;
          +
          + if (avail <= 0) { + return -1; + }

          +
          + if (length > avail)

          { + length = avail; + }

          +
          + System.arraycopy(buffer, cur, b, offset, length);
          + cur += length;
          + return length;
          + }
          +
          + @Override
          + public long skip(long requestedSkip)

          { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + }

          +
          + @Override
          + public int available()

          { + return max - cur; + }

          +
          + @Override
          + public boolean markSupported()

          { + return false; + }

          +
          + @Override
          + public void mark(int readAheadLimit)

          { + throw new UnsupportedOperationException(); + }
          +
          + @Override
          + public void reset() { + throw new UnsupportedOperationException(); + }

          +
          + @Override
          + public void close() throws IOException {}
          +
          + public SeekableByteArrayInputStream(byte[] buf)

          { + this.buffer = buf; + this.cur = 0; + this.max = buf.length; + }

          +
          + public SeekableByteArrayInputStream(byte[] buf, int maxOffset) {
          + this.buffer = buf;
          — End diff –

          This patch is againts 1.6 ... I added some Preconditions checks.. I will changes those to `Objects.requireNonNull(buf)` when merging up.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/80#discussion_r56993564 — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java — @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.file.blockfile.impl; + +import java.io.IOException; +import java.io.InputStream; + +/** + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization. + */ +public class SeekableByteArrayInputStream extends InputStream { + + // make this volatile to ensure data set by one thread can be seen by another + private volatile byte buffer[]; + private int cur; + private int max; + + @Override + public int read() { + if (cur < max) { + return buffer[cur++] & 0xff; + } else { + return -1; + } + } + + @Override + public int read(byte b[], int offset, int length) { + if (b == null) { + throw new NullPointerException(); + } + + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + + if (length == 0) { + return 0; + } + + int avail = max - cur; + + if (avail <= 0) { + return -1; + } + + if (length > avail) { + length = avail; + } + + System.arraycopy(buffer, cur, b, offset, length); + cur += length; + return length; + } + + @Override + public long skip(long requestedSkip) { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + } + + @Override + public int available() { + return max - cur; + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readAheadLimit) { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException {} + + public SeekableByteArrayInputStream(byte[] buf) { + this.buffer = buf; + this.cur = 0; + this.max = buf.length; + } + + public SeekableByteArrayInputStream(byte[] buf, int maxOffset) { + this.buffer = buf; — End diff – This patch is againts 1.6 ... I added some Preconditions checks.. I will changes those to `Objects.requireNonNull(buf)` when merging up.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner closed the pull request at:

          https://github.com/apache/accumulo/pull/80

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner closed the pull request at: https://github.com/apache/accumulo/pull/80
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on the pull request:

          https://github.com/apache/accumulo/pull/80#issuecomment-204620229

          @keith-turner just glanced at the updates in your last commit. They're very nice. I appreciate you taking the time to add them.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on the pull request: https://github.com/apache/accumulo/pull/80#issuecomment-204620229 @keith-turner just glanced at the updates in your last commit. They're very nice. I appreciate you taking the time to add them.
          Hide
          afuchs Adam Fuchs added a comment -

          I would love to see the perf test results for this change. Can you post them, Keith Turner?

          Show
          afuchs Adam Fuchs added a comment - I would love to see the perf test results for this change. Can you post them, Keith Turner ?
          Hide
          kturner Keith Turner added a comment - - edited

          I finally got around to creating something to do this. I was going to use the new RFile API in 1.8.0 to show the change, but 1.8.0 has always had this improvement. So I wrote something that uses the internal RFile APIs in 1.6 and 1.7. The following repo has a random seek test, that continually does 1,000 random seeks against a RFile with 10M key values. Running this test against 1.7.1 the times converge to ~97ms for 1,000 random seeks. Running this test against 1.7.2 the times converge to ~11ms for 1,000 random seeks.

          https://github.com/keith-turner/rfile-pert-test

          Show
          kturner Keith Turner added a comment - - edited I finally got around to creating something to do this. I was going to use the new RFile API in 1.8.0 to show the change, but 1.8.0 has always had this improvement. So I wrote something that uses the internal RFile APIs in 1.6 and 1.7. The following repo has a random seek test, that continually does 1,000 random seeks against a RFile with 10M key values. Running this test against 1.7.1 the times converge to ~97ms for 1,000 random seeks. Running this test against 1.7.2 the times converge to ~11ms for 1,000 random seeks. https://github.com/keith-turner/rfile-pert-test
          Hide
          elserj Josh Elser added a comment -

          Wow, great stuff Keith Turner

          Show
          elserj Josh Elser added a comment - Wow, great stuff Keith Turner

            People

            • Assignee:
              kturner Keith Turner
              Reporter:
              kturner Keith Turner
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 0.5h
                0.5h

                  Development