Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.8.0
    • Component/s: core, tserver
    • Labels:
      None

      Description

      In discussing ACCUMULO-4166 with Keith Turner, we decided that the underlying issue is that major compactions can overwhelm a tablet server, rendering it nearly unresponsive.

      To address this, we should take a cue from Apache Cassandra and restrict how quickly we perform major compactions. Rate limiting reads and writes involved in major compactions will directly affect the IO load caused by major compactions, and should also indirectly affect the CPU load.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user ShawnWalker opened a pull request:

          https://github.com/apache/accumulo/pull/90

          ACCUMULO-4187: Added rate limiting for major compactions.

          Added configuration property `tserver.compaction.major.throughput` of type `PropertyType.MEMORY` to control rate limiting of major compactions on each tserver.

          Specifying a value of `0B` (the default) disables rate limiting.

          If a positive value is specified, then all tablet servers will limit the I/O performed during major compaction accordingly. For example, with `tserver.compaction.major.throughput=30M`, then each tserver will read no more than 30MiB per second and write no more than 30MiB combined over all major compaction threads.

          This change involved adding an optional `RateLimiter` parameter to `FileOperations.openReader(...)` and `FileOperations.openWriter(...)`. Most of the file changes involve adding an appropriate `null` to invocations of these methods.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/ShawnWalker/accumulo ACCUMULO-4187

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/accumulo/pull/90.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #90


          commit 51396335b275b980c09964ad3ca95bf2a6fe5842
          Author: Shawn Walker <accumulo@shawn-walker.net>
          Date: 2016-04-06T17:18:09Z

          ACCUMULO-4187: Added rate limiting for major compactions.

          Added configuration property tserver.compaction.major.throughput of type PropertyType.MEMORY with a default of 0B (unlimited). If another value is specified (e.g. 30M), then all tablet servers will limit the I/O performed during major compaction accordingly (e.g. neither reading nor writing more than 30MiB per second combined over all major compaction threads).


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user ShawnWalker opened a pull request: https://github.com/apache/accumulo/pull/90 ACCUMULO-4187 : Added rate limiting for major compactions. Added configuration property `tserver.compaction.major.throughput` of type `PropertyType.MEMORY` to control rate limiting of major compactions on each tserver. Specifying a value of `0B` (the default) disables rate limiting. If a positive value is specified, then all tablet servers will limit the I/O performed during major compaction accordingly. For example, with `tserver.compaction.major.throughput=30M`, then each tserver will read no more than 30MiB per second and write no more than 30MiB combined over all major compaction threads. This change involved adding an optional `RateLimiter` parameter to `FileOperations.openReader(...)` and `FileOperations.openWriter(...)`. Most of the file changes involve adding an appropriate `null` to invocations of these methods. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ShawnWalker/accumulo ACCUMULO-4187 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/accumulo/pull/90.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #90 commit 51396335b275b980c09964ad3ca95bf2a6fe5842 Author: Shawn Walker <accumulo@shawn-walker.net> Date: 2016-04-06T17:18:09Z ACCUMULO-4187 : Added rate limiting for major compactions. Added configuration property tserver.compaction.major.throughput of type PropertyType.MEMORY with a default of 0B (unlimited). If another value is specified (e.g. 30M), then all tablet servers will limit the I/O performed during major compaction accordingly (e.g. neither reading nor writing more than 30MiB per second combined over all major compaction threads).
          Hide
          ShawnWalker Shawn Walker added a comment -

          Created pull request https://github.com/apache/accumulo/pull/90 addressing this issue.

          Show
          ShawnWalker Shawn Walker added a comment - Created pull request https://github.com/apache/accumulo/pull/90 addressing this issue.
          Hide
          elserj Josh Elser added a comment -

          Thanks, Shawn Walker! I've gone ahead and added you as a contributor and assigned this issue to you.

          Show
          elserj Josh Elser added a comment - Thanks, Shawn Walker ! I've gone ahead and added you as a contributor and assigned this issue to you.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ctubbsii commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59459590

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          — End diff –

          Unfortunately, LongAdder is only available in JDK8, and we're still on JDK7.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ctubbsii commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59459590 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; — End diff – Unfortunately, LongAdder is only available in JDK8, and we're still on JDK7.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59489073

          — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java —
          @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
          */

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;

          /**

          • Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
            *
            */
          • public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
          • throws IOException;
            + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
              • End diff –

          I wonder if it wouldn't be a good idea to consolidate the various "Accumulo" configuration objects into one struct-like object. Presently, that would contain the AccumuloConfiguration and the new RateLimiter. This would prevent us from having to change the method signature every time something new is added here (so far, there's a lot of modifications from this already)

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59489073 — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java — @@ -55,23 +56,24 @@ public static FileOperations getInstance() { */ public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException; public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; /** Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters. * */ public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter, End diff – I wonder if it wouldn't be a good idea to consolidate the various "Accumulo" configuration objects into one struct-like object. Presently, that would contain the AccumuloConfiguration and the new RateLimiter. This would prevent us from having to change the method signature every time something new is added here (so far, there's a lot of modifications from this already)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59489093

          — Diff: core/src/main/java/org/apache/accumulo/core/conf/Property.java —
          @@ -293,6 +293,8 @@
          "The maximum number of concurrent tablet migrations for a tablet server"),
          TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3", PropertyType.COUNT,
          "The maximum number of concurrent major compactions for a tablet server"),
          + TSERV_MAJC_THROUGHPUT("tserver.compaction.major.throughput", "0B", PropertyType.MEMORY,
          + "Maximum number of bytes to read or write per second over all major compactions, or 0 for unlimited."),
          — End diff –

          ".... in a TabletServer." Clarify that this is not global, but local to a tserver.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59489093 — Diff: core/src/main/java/org/apache/accumulo/core/conf/Property.java — @@ -293,6 +293,8 @@ "The maximum number of concurrent tablet migrations for a tablet server"), TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3", PropertyType.COUNT, "The maximum number of concurrent major compactions for a tablet server"), + TSERV_MAJC_THROUGHPUT("tserver.compaction.major.throughput", "0B", PropertyType.MEMORY, + "Maximum number of bytes to read or write per second over all major compactions, or 0 for unlimited."), — End diff – ".... in a TabletServer." Clarify that this is not global, but local to a tserver.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59489344

          — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java —
          @@ -139,11 +146,12 @@ public long getStartPos() throws IOException {
          *
          */
          public static class Reader implements BlockFileReader {
          + private final RateLimiter readLimiter;
          private BCFile.Reader _bc;
          private String fileName = "not_available";
          private BlockCache _dCache = null;
          private BlockCache _iCache = null;

          • private FSDataInputStream fin = null;
            + private Closeable fin = null;
              • End diff –

          Any reason for `Closeable` instead of `InputStream`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59489344 — Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java — @@ -139,11 +146,12 @@ public long getStartPos() throws IOException { * */ public static class Reader implements BlockFileReader { + private final RateLimiter readLimiter; private BCFile.Reader _bc; private String fileName = "not_available"; private BlockCache _dCache = null; private BlockCache _iCache = null; private FSDataInputStream fin = null; + private Closeable fin = null; End diff – Any reason for `Closeable` instead of `InputStream`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59489500

          — Diff: core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java —
          @@ -132,7 +136,8 @@ FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, Accumul
          sampler = SamplerFactory.newSampler(samplerConfig, acuconf);
          }

          • CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf, acuconf);
            + CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(fs.create(new Path(file), false, bufferSize, (short) rep, block),
              • End diff –

          nit: I would be ok with splitting up this line. It's getting a little busy

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59489500 — Diff: core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java — @@ -132,7 +136,8 @@ FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, Accumul sampler = SamplerFactory.newSampler(samplerConfig, acuconf); } CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf, acuconf); + CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(fs.create(new Path(file), false, bufferSize, (short) rep, block), End diff – nit: I would be ok with splitting up this line. It's getting a little busy
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59489612

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java —
          @@ -14,24 +14,22 @@

          • License for the specific language governing permissions and limitations under
          • the License.
            */
            -
            -package org.apache.accumulo.core.file.rfile.bcfile;
            +package org.apache.accumulo.core.file.streams;

          import java.io.IOException;
          import java.io.InputStream;
          import java.security.AccessController;
          import java.security.PrivilegedActionException;
          import java.security.PrivilegedExceptionAction;

          -import org.apache.hadoop.fs.FSDataInputStream;
          +import org.apache.hadoop.fs.Seekable;
          — End diff –

          Seekable is public/evolving which means it's probably OK for us to expect to be stable, but mentioning to make sure we consciously acknowledge that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59489612 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java — @@ -14,24 +14,22 @@ License for the specific language governing permissions and limitations under the License. */ - -package org.apache.accumulo.core.file.rfile.bcfile; +package org.apache.accumulo.core.file.streams; import java.io.IOException; import java.io.InputStream; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Seekable; — End diff – Seekable is public/evolving which means it's probably OK for us to expect to be stable, but mentioning to make sure we consciously acknowledge that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59489718

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java —
          @@ -0,0 +1,55 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import org.apache.hadoop.fs.FSDataOutputStream;
          +
          +public class PositionedOutputs {
          — End diff –

          Missing javadoc on class and methods. If class shouldn't be instantiated, make a private constructor.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59489718 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java — @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +public class PositionedOutputs { — End diff – Missing javadoc on class and methods. If class shouldn't be instantiated, make a private constructor.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59489739

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java —
          @@ -0,0 +1,55 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import org.apache.hadoop.fs.FSDataOutputStream;
          +
          +public class PositionedOutputs {
          + public static PositionedOutputStream wrap(final OutputStream fout) {
          + if (fout instanceof FSDataOutputStream) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((FSDataOutputStream) fout).getPos(); + }

          + };
          + } else if (fout instanceof PositionedOutput) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((PositionedOutput) fout).position(); + }

          + };
          + } else {
          + return new PositionedOutputStream(fout) {
          — End diff –

          check for non-null?

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59489739 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java — @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +public class PositionedOutputs { + public static PositionedOutputStream wrap(final OutputStream fout) { + if (fout instanceof FSDataOutputStream) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((FSDataOutputStream) fout).getPos(); + } + }; + } else if (fout instanceof PositionedOutput) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((PositionedOutput) fout).position(); + } + }; + } else { + return new PositionedOutputStream(fout) { — End diff – check for non-null?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59489818

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java —
          @@ -0,0 +1,55 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import org.apache.hadoop.fs.FSDataOutputStream;
          +
          +public class PositionedOutputs {
          + public static PositionedOutputStream wrap(final OutputStream fout) {
          + if (fout instanceof FSDataOutputStream) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((FSDataOutputStream) fout).getPos(); + }

          + };
          + } else if (fout instanceof PositionedOutput) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((PositionedOutput) fout).position(); + }

          + };
          + } else {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + }

          + };
          + }
          + }
          +
          + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
          — End diff –

          Would prefer to see this its own file since it's used outside of this class.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59489818 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java — @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +public class PositionedOutputs { + public static PositionedOutputStream wrap(final OutputStream fout) { + if (fout instanceof FSDataOutputStream) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((FSDataOutputStream) fout).getPos(); + } + }; + } else if (fout instanceof PositionedOutput) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((PositionedOutput) fout).position(); + } + }; + } else { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + } + }; + } + } + + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput { — End diff – Would prefer to see this its own file since it's used outside of this class.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59489844

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java —
          @@ -0,0 +1,67 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.hadoop.fs.Seekable;
          +
          +/** A decorator for an

          {@code InputStream}

          which limits the rate at which reads are performed. */
          — End diff –

          Expand the javadoc please (style-nit)

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59489844 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java — @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.hadoop.fs.Seekable; + +/** A decorator for an {@code InputStream} which limits the rate at which reads are performed. */ — End diff – Expand the javadoc please (style-nit)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59489957

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java —
          @@ -0,0 +1,55 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with this
          + * work for additional information regarding copyright ownership. The ASF
          + * licenses this file to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance with the License.
          + * You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
          + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
          + * License for the specific language governing permissions and limitations under
          + * the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +
          +/** A decorator for

          {@code OutputStream}

          which limits the rate at which data may be written. */
          — End diff –

          Expand the comment (style-nit)

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59489957 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java — @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; + +/** A decorator for {@code OutputStream} which limits the rate at which data may be written. */ — End diff – Expand the comment (style-nit)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59490202

          — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java —
          @@ -0,0 +1,27 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.util.ratelimit;
          +
          +public interface RateLimiter {
          + /**
          + * Get current QPS of the rate limiter, with a nonpositive rate indicating no limit.
          — End diff –

          "QPS"? Essentially bytes per second in our compaction limiting scope?

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59490202 — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java — @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.ratelimit; + +public interface RateLimiter { + /** + * Get current QPS of the rate limiter, with a nonpositive rate indicating no limit. — End diff – "QPS"? Essentially bytes per second in our compaction limiting scope?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59490394

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          +import org.apache.accumulo.core.conf.AccumuloConfiguration;
          +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.accumulo.server.util.time.SimpleTimer;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +public class SharedRateLimiterFactory {
          + private static final long REPORT_RATE = 60000;
          + private static final long UPDATE_RATE = 1000;
          + private static SharedRateLimiterFactory instance = null;
          + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
          + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
          +
          + private SharedRateLimiterFactory() {}
          +
          + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
          + synchronized (SharedRateLimiterFactory.class) {
          + if (instance == null) {
          + instance = new SharedRateLimiterFactory();
          +
          + // Update periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.update(); + }

          + }, UPDATE_RATE, UPDATE_RATE);
          +
          + // Report periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.report(); + }

          + }, REPORT_RATE, REPORT_RATE);
          + }
          + return instance;
          + }
          + }
          +
          + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) {
          + return getInstance(SimpleTimer.getInstance(conf));
          — End diff –

          Reusing the SimpleTimer singleton instance might have unexpected implications on the rest of the server. I think having a Timer instance specifically for rate limiting would be good.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59490394 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedRateLimiterFactory { + private static final long REPORT_RATE = 60000; + private static final long UPDATE_RATE = 1000; + private static SharedRateLimiterFactory instance = null; + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class); + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + + private SharedRateLimiterFactory() {} + + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) { + synchronized (SharedRateLimiterFactory.class) { + if (instance == null) { + instance = new SharedRateLimiterFactory(); + + // Update periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.update(); + } + }, UPDATE_RATE, UPDATE_RATE); + + // Report periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.report(); + } + }, REPORT_RATE, REPORT_RATE); + } + return instance; + } + } + + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) { + return getInstance(SimpleTimer.getInstance(conf)); — End diff – Reusing the SimpleTimer singleton instance might have unexpected implications on the rest of the server. I think having a Timer instance specifically for rate limiting would be good.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59490427

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          +import org.apache.accumulo.core.conf.AccumuloConfiguration;
          +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.accumulo.server.util.time.SimpleTimer;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +public class SharedRateLimiterFactory {
          + private static final long REPORT_RATE = 60000;
          + private static final long UPDATE_RATE = 1000;
          + private static SharedRateLimiterFactory instance = null;
          + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
          + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
          +
          + private SharedRateLimiterFactory() {}
          +
          + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
          + synchronized (SharedRateLimiterFactory.class) {
          — End diff –

          Why not just `public static synchronized`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59490427 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedRateLimiterFactory { + private static final long REPORT_RATE = 60000; + private static final long UPDATE_RATE = 1000; + private static SharedRateLimiterFactory instance = null; + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class); + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + + private SharedRateLimiterFactory() {} + + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) { + synchronized (SharedRateLimiterFactory.class) { — End diff – Why not just `public static synchronized`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59490490

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          +import org.apache.accumulo.core.conf.AccumuloConfiguration;
          +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.accumulo.server.util.time.SimpleTimer;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +public class SharedRateLimiterFactory {
          + private static final long REPORT_RATE = 60000;
          + private static final long UPDATE_RATE = 1000;
          + private static SharedRateLimiterFactory instance = null;
          + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
          + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
          +
          + private SharedRateLimiterFactory() {}
          +
          + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
          + synchronized (SharedRateLimiterFactory.class) {
          + if (instance == null) {
          + instance = new SharedRateLimiterFactory();
          +
          + // Update periodically
          + timer.schedule(new Runnable() {
          — End diff –

          Can we re-poll the configuration and reschedule the task to make the implementation responsive to dynamic configuration update? Having to restart the server to get a new configuration value sucks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59490490 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedRateLimiterFactory { + private static final long REPORT_RATE = 60000; + private static final long UPDATE_RATE = 1000; + private static SharedRateLimiterFactory instance = null; + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class); + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + + private SharedRateLimiterFactory() {} + + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) { + synchronized (SharedRateLimiterFactory.class) { + if (instance == null) { + instance = new SharedRateLimiterFactory(); + + // Update periodically + timer.schedule(new Runnable() { — End diff – Can we re-poll the configuration and reschedule the task to make the implementation responsive to dynamic configuration update? Having to restart the server to get a new configuration value sucks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59490576

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          +import org.apache.accumulo.core.conf.AccumuloConfiguration;
          +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.accumulo.server.util.time.SimpleTimer;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +public class SharedRateLimiterFactory {
          + private static final long REPORT_RATE = 60000;
          + private static final long UPDATE_RATE = 1000;
          + private static SharedRateLimiterFactory instance = null;
          + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
          + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
          +
          + private SharedRateLimiterFactory() {}
          +
          + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
          + synchronized (SharedRateLimiterFactory.class) {
          + if (instance == null) {
          + instance = new SharedRateLimiterFactory();
          +
          + // Update periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.update(); + }

          + }, UPDATE_RATE, UPDATE_RATE);
          +
          + // Report periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.report(); + }

          + }, REPORT_RATE, REPORT_RATE);
          + }
          + return instance;
          + }
          + }
          +
          + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf)

          { + return getInstance(SimpleTimer.getInstance(conf)); + }

          +
          + /**
          + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
          + *
          + * @param name
          + * key for the rate limiter
          + * @param rateGenerator
          + * a function which can be called to get what the current rate for the rate limiter should be.
          + */
          + public RateLimiter create(String name, Callable<Long> rateGenerator) {
          + synchronized (activeLimiters) {
          + if (activeLimiters.containsKey(name))

          { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + }

          else {
          + long initialRate;
          + try

          { + initialRate = rateGenerator.call(); + }

          catch (Exception ex)

          { + throw new IllegalStateException(ex); + }

          + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
          + activeLimiters.put(name, limiter);
          + return limiter;
          + }
          + }
          + }
          +
          + protected void update() {
          — End diff –

          Some simple javadoc on the below methods/class would be nice.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59490576 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedRateLimiterFactory { + private static final long REPORT_RATE = 60000; + private static final long UPDATE_RATE = 1000; + private static SharedRateLimiterFactory instance = null; + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class); + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + + private SharedRateLimiterFactory() {} + + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) { + synchronized (SharedRateLimiterFactory.class) { + if (instance == null) { + instance = new SharedRateLimiterFactory(); + + // Update periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.update(); + } + }, UPDATE_RATE, UPDATE_RATE); + + // Report periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.report(); + } + }, REPORT_RATE, REPORT_RATE); + } + return instance; + } + } + + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) { + return getInstance(SimpleTimer.getInstance(conf)); + } + + /** + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed. + * + * @param name + * key for the rate limiter + * @param rateGenerator + * a function which can be called to get what the current rate for the rate limiter should be. + */ + public RateLimiter create(String name, Callable<Long> rateGenerator) { + synchronized (activeLimiters) { + if (activeLimiters.containsKey(name)) { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + } else { + long initialRate; + try { + initialRate = rateGenerator.call(); + } catch (Exception ex) { + throw new IllegalStateException(ex); + } + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate); + activeLimiters.put(name, limiter); + return limiter; + } + } + } + + protected void update() { — End diff – Some simple javadoc on the below methods/class would be nice.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59490681

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          +import org.apache.accumulo.core.conf.AccumuloConfiguration;
          +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.accumulo.server.util.time.SimpleTimer;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +public class SharedRateLimiterFactory {
          + private static final long REPORT_RATE = 60000;
          + private static final long UPDATE_RATE = 1000;
          + private static SharedRateLimiterFactory instance = null;
          + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
          + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
          +
          + private SharedRateLimiterFactory() {}
          +
          + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
          + synchronized (SharedRateLimiterFactory.class) {
          + if (instance == null) {
          + instance = new SharedRateLimiterFactory();
          +
          + // Update periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.update(); + }

          + }, UPDATE_RATE, UPDATE_RATE);
          +
          + // Report periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.report(); + }

          + }, REPORT_RATE, REPORT_RATE);
          + }
          + return instance;
          + }
          + }
          +
          + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf)

          { + return getInstance(SimpleTimer.getInstance(conf)); + }

          +
          + /**
          + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
          + *
          + * @param name
          + * key for the rate limiter
          + * @param rateGenerator
          + * a function which can be called to get what the current rate for the rate limiter should be.
          + */
          + public RateLimiter create(String name, Callable<Long> rateGenerator) {
          + synchronized (activeLimiters) {
          + if (activeLimiters.containsKey(name))

          { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + }

          else {
          + long initialRate;
          + try

          { + initialRate = rateGenerator.call(); + }

          catch (Exception ex)

          { + throw new IllegalStateException(ex); + }

          + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
          + activeLimiters.put(name, limiter);
          + return limiter;
          + }
          + }
          + }
          +
          + protected void update() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters)

          { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }
          + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + }
          + }
          +
          + protected void report() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }

          + for (SharedRateLimiter limiter : limiters)

          { + limiter.report(); + }

          + }
          +
          + protected class SharedRateLimiter extends GuavaRateLimiter {
          + private final LongAdder permitsAcquired = new LongAdder();
          + private final Callable<Long> rateCallable;
          + private final String name;
          +
          + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate)

          { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + }

          +
          + @Override
          + public void acquire(long permits)

          { + super.acquire(permits); + permitsAcquired.add(permits); + }

          +
          + public void update() {
          + try {
          + // Reset rate if needed
          + long rate = rateCallable.call();
          + if (rate != getRate())

          { + setRate(rate); + }

          + } catch (Exception ex)

          { + logger.debug("Failed to update rate limiter", ex); + }

          + }
          +
          + public void report() {
          + long sum = permitsAcquired.sumThenReset();
          + if (sum > 0) {
          + logger.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / REPORT_RATE, getRate()));
          — End diff –

          Wrap this in a `logger.ifDebugIsEnabled()` conditional, please. Typically, Logger instances are named `log` not `logger`. You could also do the string formatting "natively" with SLF4J's `{}` replacement syntax.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59490681 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedRateLimiterFactory { + private static final long REPORT_RATE = 60000; + private static final long UPDATE_RATE = 1000; + private static SharedRateLimiterFactory instance = null; + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class); + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + + private SharedRateLimiterFactory() {} + + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) { + synchronized (SharedRateLimiterFactory.class) { + if (instance == null) { + instance = new SharedRateLimiterFactory(); + + // Update periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.update(); + } + }, UPDATE_RATE, UPDATE_RATE); + + // Report periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.report(); + } + }, REPORT_RATE, REPORT_RATE); + } + return instance; + } + } + + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) { + return getInstance(SimpleTimer.getInstance(conf)); + } + + /** + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed. + * + * @param name + * key for the rate limiter + * @param rateGenerator + * a function which can be called to get what the current rate for the rate limiter should be. + */ + public RateLimiter create(String name, Callable<Long> rateGenerator) { + synchronized (activeLimiters) { + if (activeLimiters.containsKey(name)) { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + } else { + long initialRate; + try { + initialRate = rateGenerator.call(); + } catch (Exception ex) { + throw new IllegalStateException(ex); + } + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate); + activeLimiters.put(name, limiter); + return limiter; + } + } + } + + protected void update() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + } + } + + protected void report() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.report(); + } + } + + protected class SharedRateLimiter extends GuavaRateLimiter { + private final LongAdder permitsAcquired = new LongAdder(); + private final Callable<Long> rateCallable; + private final String name; + + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + } + + @Override + public void acquire(long permits) { + super.acquire(permits); + permitsAcquired.add(permits); + } + + public void update() { + try { + // Reset rate if needed + long rate = rateCallable.call(); + if (rate != getRate()) { + setRate(rate); + } + } catch (Exception ex) { + logger.debug("Failed to update rate limiter", ex); + } + } + + public void report() { + long sum = permitsAcquired.sumThenReset(); + if (sum > 0) { + logger.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / REPORT_RATE, getRate())); — End diff – Wrap this in a `logger.ifDebugIsEnabled()` conditional, please. Typically, Logger instances are named `log` not `logger`. You could also do the string formatting "natively" with SLF4J's `{}` replacement syntax.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59490771

          — Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java —
          @@ -3089,4 +3093,20 @@ public void removeBulkImportState(List<String> files)

          { bulkImportStatus.removeBulkImportStatus(files); }

          + private static final String MAJC_READ_LIMITER_KEY = "tserv_majc_read";
          + private static final String MAJC_WRITE_LIMITER_KEY = "tserv_majc_write";
          + private final Callable<Long> rateProvider = new Callable<Long>() {
          + @Override
          + public Long call() throws Exception

          { + return getConfiguration().getMemoryInBytes(Property.TSERV_MAJC_THROUGHPUT); + }

          + };
          +
          + public final RateLimiter getMajorCompactionReadLimiter() {
          — End diff –

          Javadoc, please

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59490771 — Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java — @@ -3089,4 +3093,20 @@ public void removeBulkImportState(List<String> files) { bulkImportStatus.removeBulkImportStatus(files); } + private static final String MAJC_READ_LIMITER_KEY = "tserv_majc_read"; + private static final String MAJC_WRITE_LIMITER_KEY = "tserv_majc_write"; + private final Callable<Long> rateProvider = new Callable<Long>() { + @Override + public Long call() throws Exception { + return getConfiguration().getMemoryInBytes(Property.TSERV_MAJC_THROUGHPUT); + } + }; + + public final RateLimiter getMajorCompactionReadLimiter() { — End diff – Javadoc, please
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-209218793

          Made a first pass through the code. Wow! Great work for a first contribution @ShawnWalker! Some general themes:

          • nit-picky stylistic things
          • Missing javadoc on public classes/methods

          Some new tests on these new classes (testing the rate limiting components and input/output streams should be really important) would really make this even better.

          I'll have to go back to reread about the use of `<T extends Class & Interface>` littered everywhere with a fresh mind. First time I've run across it and I don't think I entirely grokked the point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-209218793 Made a first pass through the code. Wow! Great work for a first contribution @ShawnWalker! Some general themes: nit-picky stylistic things Missing javadoc on public classes/methods Some new tests on these new classes (testing the rate limiting components and input/output streams should be really important) would really make this even better. I'll have to go back to reread about the use of `<T extends Class & Interface>` littered everywhere with a fresh mind. First time I've run across it and I don't think I entirely grokked the point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59563956

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java —
          @@ -0,0 +1,55 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import org.apache.hadoop.fs.FSDataOutputStream;
          +
          +public class PositionedOutputs {
          + public static PositionedOutputStream wrap(final OutputStream fout) {
          + if (fout instanceof FSDataOutputStream) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((FSDataOutputStream) fout).getPos(); + }

          + };
          + } else if (fout instanceof PositionedOutput) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((PositionedOutput) fout).position(); + }

          + };
          + } else {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + }

          + };
          + }
          + }
          +
          + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
          — End diff –

          I don't believe it is, in fact.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59563956 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java — @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +public class PositionedOutputs { + public static PositionedOutputStream wrap(final OutputStream fout) { + if (fout instanceof FSDataOutputStream) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((FSDataOutputStream) fout).getPos(); + } + }; + } else if (fout instanceof PositionedOutput) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((PositionedOutput) fout).position(); + } + }; + } else { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + } + }; + } + } + + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput { — End diff – I don't believe it is, in fact.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59564578

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java —
          @@ -0,0 +1,67 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.hadoop.fs.Seekable;
          +
          +/** A decorator for an

          {@code InputStream}

          which limits the rate at which reads are performed. */
          — End diff –

          Any suggestions? I'm uncertain what more to say.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59564578 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java — @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.hadoop.fs.Seekable; + +/** A decorator for an {@code InputStream} which limits the rate at which reads are performed. */ — End diff – Any suggestions? I'm uncertain what more to say.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59564604

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java —
          @@ -0,0 +1,55 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import org.apache.hadoop.fs.FSDataOutputStream;
          +
          +public class PositionedOutputs {
          + public static PositionedOutputStream wrap(final OutputStream fout) {
          + if (fout instanceof FSDataOutputStream) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((FSDataOutputStream) fout).getPos(); + }

          + };
          + } else if (fout instanceof PositionedOutput) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((PositionedOutput) fout).position(); + }

          + };
          + } else {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + }

          + };
          + }
          + }
          +
          + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
          — End diff –

          Oh, right you are. I thought callers of `PositionedOutputs.wrap` would be using it, but apparently not. As long as `PositionedOutputs.wrap` is returning `PositionedOutputStream`, I think this should be separated. If you want to change `PositionedOutputs.wrap` to just return a `FilterOutputStream`, we can make this `private static abstract ...`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59564604 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java — @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +public class PositionedOutputs { + public static PositionedOutputStream wrap(final OutputStream fout) { + if (fout instanceof FSDataOutputStream) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((FSDataOutputStream) fout).getPos(); + } + }; + } else if (fout instanceof PositionedOutput) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((PositionedOutput) fout).position(); + } + }; + } else { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + } + }; + } + } + + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput { — End diff – Oh, right you are. I thought callers of `PositionedOutputs.wrap` would be using it, but apparently not. As long as `PositionedOutputs.wrap` is returning `PositionedOutputStream`, I think this should be separated. If you want to change `PositionedOutputs.wrap` to just return a `FilterOutputStream`, we can make this `private static abstract ...`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59565074

          — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java —
          @@ -0,0 +1,27 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.util.ratelimit;
          +
          +public interface RateLimiter {
          + /**
          + * Get current QPS of the rate limiter, with a nonpositive rate indicating no limit.
          — End diff –

          Yes. I chose to say QPS (queries per second) here as it is the terminology used by `com.google.common.util.concurrent.RateLimiter` to describe its functionality. Things other than byte counts might potentially wish to be rate limited.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59565074 — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java — @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.ratelimit; + +public interface RateLimiter { + /** + * Get current QPS of the rate limiter, with a nonpositive rate indicating no limit. — End diff – Yes. I chose to say QPS (queries per second) here as it is the terminology used by `com.google.common.util.concurrent.RateLimiter` to describe its functionality. Things other than byte counts might potentially wish to be rate limited.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59565511

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java —
          @@ -0,0 +1,67 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.hadoop.fs.Seekable;
          +
          +/** A decorator for an

          {@code InputStream} which limits the rate at which reads are performed. */
          — End diff –

          No no, I meant add line returns so it's:
          ```
          /**
          * A decorator for an {@code InputStream}

          which limits the rate at which reads are performed.
          */
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59565511 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java — @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.hadoop.fs.Seekable; + +/** A decorator for an {@code InputStream} which limits the rate at which reads are performed. */ — End diff – No no, I meant add line returns so it's: ``` /** * A decorator for an {@code InputStream} which limits the rate at which reads are performed. */ ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59566722

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          +import org.apache.accumulo.core.conf.AccumuloConfiguration;
          +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.accumulo.server.util.time.SimpleTimer;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +public class SharedRateLimiterFactory {
          + private static final long REPORT_RATE = 60000;
          + private static final long UPDATE_RATE = 1000;
          + private static SharedRateLimiterFactory instance = null;
          + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
          + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
          +
          + private SharedRateLimiterFactory() {}
          +
          + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
          + synchronized (SharedRateLimiterFactory.class) {
          + if (instance == null) {
          + instance = new SharedRateLimiterFactory();
          +
          + // Update periodically
          + timer.schedule(new Runnable() {
          — End diff –

          Indeed, this is the entire purpose of calling the `update()` method on each rate limiter on a timer. The only reason that `SharedRateLimiterFactory` even touches `AccumuloConfiguration` is so that it can grab hold of an instance of `SimpleTimer`. Moving this polling to its own `Timer` object will make `SharedRateLimiterFactory` a self-contained singleton.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59566722 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedRateLimiterFactory { + private static final long REPORT_RATE = 60000; + private static final long UPDATE_RATE = 1000; + private static SharedRateLimiterFactory instance = null; + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class); + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + + private SharedRateLimiterFactory() {} + + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) { + synchronized (SharedRateLimiterFactory.class) { + if (instance == null) { + instance = new SharedRateLimiterFactory(); + + // Update periodically + timer.schedule(new Runnable() { — End diff – Indeed, this is the entire purpose of calling the `update()` method on each rate limiter on a timer. The only reason that `SharedRateLimiterFactory` even touches `AccumuloConfiguration` is so that it can grab hold of an instance of `SimpleTimer`. Moving this polling to its own `Timer` object will make `SharedRateLimiterFactory` a self-contained singleton.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59568240

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          +import org.apache.accumulo.core.conf.AccumuloConfiguration;
          +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.accumulo.server.util.time.SimpleTimer;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +public class SharedRateLimiterFactory {
          + private static final long REPORT_RATE = 60000;
          + private static final long UPDATE_RATE = 1000;
          + private static SharedRateLimiterFactory instance = null;
          + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
          + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
          +
          + private SharedRateLimiterFactory() {}
          +
          + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
          + synchronized (SharedRateLimiterFactory.class) {
          + if (instance == null) {
          + instance = new SharedRateLimiterFactory();
          +
          + // Update periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.update(); + }

          + }, UPDATE_RATE, UPDATE_RATE);
          +
          + // Report periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.report(); + }

          + }, REPORT_RATE, REPORT_RATE);
          + }
          + return instance;
          + }
          + }
          +
          + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf)

          { + return getInstance(SimpleTimer.getInstance(conf)); + }

          +
          + /**
          + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
          + *
          + * @param name
          + * key for the rate limiter
          + * @param rateGenerator
          + * a function which can be called to get what the current rate for the rate limiter should be.
          + */
          + public RateLimiter create(String name, Callable<Long> rateGenerator) {
          + synchronized (activeLimiters) {
          + if (activeLimiters.containsKey(name))

          { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + }

          else {
          + long initialRate;
          + try

          { + initialRate = rateGenerator.call(); + }

          catch (Exception ex)

          { + throw new IllegalStateException(ex); + }

          + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
          + activeLimiters.put(name, limiter);
          + return limiter;
          + }
          + }
          + }
          +
          + protected void update() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters)

          { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }
          + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + }
          + }
          +
          + protected void report() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }

          + for (SharedRateLimiter limiter : limiters)

          { + limiter.report(); + }

          + }
          +
          + protected class SharedRateLimiter extends GuavaRateLimiter {
          + private final LongAdder permitsAcquired = new LongAdder();
          + private final Callable<Long> rateCallable;
          + private final String name;
          +
          + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate)

          { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + }

          +
          + @Override
          + public void acquire(long permits)

          { + super.acquire(permits); + permitsAcquired.add(permits); + }

          +
          + public void update() {
          + try {
          + // Reset rate if needed
          + long rate = rateCallable.call();
          + if (rate != getRate())

          { + setRate(rate); + }

          + } catch (Exception ex)

          { + logger.debug("Failed to update rate limiter", ex); + }

          + }
          +
          + public void report() {
          + long sum = permitsAcquired.sumThenReset();
          + if (sum > 0) {
          + logger.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / REPORT_RATE, getRate()));
          — End diff –

          Can the logger configuration be changed at runtime? It might make sense simply not to schedule the reporting task if debug logging is disabled at startup.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59568240 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedRateLimiterFactory { + private static final long REPORT_RATE = 60000; + private static final long UPDATE_RATE = 1000; + private static SharedRateLimiterFactory instance = null; + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class); + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + + private SharedRateLimiterFactory() {} + + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) { + synchronized (SharedRateLimiterFactory.class) { + if (instance == null) { + instance = new SharedRateLimiterFactory(); + + // Update periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.update(); + } + }, UPDATE_RATE, UPDATE_RATE); + + // Report periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.report(); + } + }, REPORT_RATE, REPORT_RATE); + } + return instance; + } + } + + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) { + return getInstance(SimpleTimer.getInstance(conf)); + } + + /** + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed. + * + * @param name + * key for the rate limiter + * @param rateGenerator + * a function which can be called to get what the current rate for the rate limiter should be. + */ + public RateLimiter create(String name, Callable<Long> rateGenerator) { + synchronized (activeLimiters) { + if (activeLimiters.containsKey(name)) { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + } else { + long initialRate; + try { + initialRate = rateGenerator.call(); + } catch (Exception ex) { + throw new IllegalStateException(ex); + } + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate); + activeLimiters.put(name, limiter); + return limiter; + } + } + } + + protected void update() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + } + } + + protected void report() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.report(); + } + } + + protected class SharedRateLimiter extends GuavaRateLimiter { + private final LongAdder permitsAcquired = new LongAdder(); + private final Callable<Long> rateCallable; + private final String name; + + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + } + + @Override + public void acquire(long permits) { + super.acquire(permits); + permitsAcquired.add(permits); + } + + public void update() { + try { + // Reset rate if needed + long rate = rateCallable.call(); + if (rate != getRate()) { + setRate(rate); + } + } catch (Exception ex) { + logger.debug("Failed to update rate limiter", ex); + } + } + + public void report() { + long sum = permitsAcquired.sumThenReset(); + if (sum > 0) { + logger.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / REPORT_RATE, getRate())); — End diff – Can the logger configuration be changed at runtime? It might make sense simply not to schedule the reporting task if debug logging is disabled at startup.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59571157

          — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java —
          @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
          */

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;

          /**

          • Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
            *
            */
          • public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
          • throws IOException;
            + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
              • End diff –

          I was thinking perhaps a fluent syntax for opening readers/writers would make sense.

          E.g. so that it admits syntax like the following:
          ```java
          FileSKVIterator iterator=fileOperations.openReader(accumuloConfiguration)
          .ofFile(filename, fileSystem, fsConfiguration)
          .withRateLimiter(rateLimiter) // optional
          .withCache(dataCache, indexCache) // optional
          .forScanning(range, columnFamilies, inclusive);
          ```

          Unfortunately, introducing such a change seemed a bit much for the scope of what I wanted to accomplish.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59571157 — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java — @@ -55,23 +56,24 @@ public static FileOperations getInstance() { */ public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException; public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; /** Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters. * */ public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter, End diff – I was thinking perhaps a fluent syntax for opening readers/writers would make sense. E.g. so that it admits syntax like the following: ```java FileSKVIterator iterator=fileOperations.openReader(accumuloConfiguration) .ofFile(filename, fileSystem, fsConfiguration) .withRateLimiter(rateLimiter) // optional .withCache(dataCache, indexCache) // optional .forScanning(range, columnFamilies, inclusive); ``` Unfortunately, introducing such a change seemed a bit much for the scope of what I wanted to accomplish.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ctubbsii commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59571367

          — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java —
          @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
          */

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;

          /**

          • Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
            *
            */
          • public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
          • throws IOException;
            + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
              • End diff –

          So, I've had some thoughts on this as well. A lot of our internal passing around of AccumuloConfiguration makes it easy to lose track of where the configuration object came from, making it harder to ensure we're not using it outside the intended scope (for example, if it represents a TableConfiguration).

          In many parts of our internal code, I would prefer we grab what we need from AccumuloConfiguration, combine it with any additional configuration appropriate for that specific context, and create a new context-specific configuration object to pass around within that context.

          If we were to do something like that, what would the context granularity be here? "file operations options", "file reading options", something else?

          Show
          githubbot ASF GitHub Bot added a comment - Github user ctubbsii commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59571367 — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java — @@ -55,23 +56,24 @@ public static FileOperations getInstance() { */ public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException; public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; /** Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters. * */ public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter, End diff – So, I've had some thoughts on this as well. A lot of our internal passing around of AccumuloConfiguration makes it easy to lose track of where the configuration object came from, making it harder to ensure we're not using it outside the intended scope (for example, if it represents a TableConfiguration). In many parts of our internal code, I would prefer we grab what we need from AccumuloConfiguration, combine it with any additional configuration appropriate for that specific context, and create a new context-specific configuration object to pass around within that context. If we were to do something like that, what would the context granularity be here? "file operations options", "file reading options", something else?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59578033

          — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java —
          @@ -0,0 +1,51 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.util.ratelimit;
          +
          +/** Rate limiter from the Guava library. */
          +public class GuavaRateLimiter implements RateLimiter {
          + private final com.google.common.util.concurrent.RateLimiter rateLimiter;
          + private long currentRate;
          +
          + public GuavaRateLimiter(long initialRate) {
          + this.currentRate = initialRate;
          — End diff –

          should there be a sanity check here to ensure non negative? or are there sufficient checks elsewhere?

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59578033 — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java — @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.ratelimit; + +/** Rate limiter from the Guava library. */ +public class GuavaRateLimiter implements RateLimiter { + private final com.google.common.util.concurrent.RateLimiter rateLimiter; + private long currentRate; + + public GuavaRateLimiter(long initialRate) { + this.currentRate = initialRate; — End diff – should there be a sanity check here to ensure non negative? or are there sufficient checks elsewhere?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59578436

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          — End diff –

          I was unaware of LongAdder... its really neat. Yet another nice thing in JDK8 we can't use yet.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59578436 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; — End diff – I was unaware of LongAdder... its really neat. Yet another nice thing in JDK8 we can't use yet.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59579043

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          +import org.apache.accumulo.core.conf.AccumuloConfiguration;
          +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.accumulo.server.util.time.SimpleTimer;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +public class SharedRateLimiterFactory {
          + private static final long REPORT_RATE = 60000;
          + private static final long UPDATE_RATE = 1000;
          + private static SharedRateLimiterFactory instance = null;
          + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
          + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
          +
          + private SharedRateLimiterFactory() {}
          +
          + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
          + synchronized (SharedRateLimiterFactory.class) {
          + if (instance == null) {
          + instance = new SharedRateLimiterFactory();
          +
          + // Update periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.update(); + }

          + }, UPDATE_RATE, UPDATE_RATE);
          +
          + // Report periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.report(); + }

          + }, REPORT_RATE, REPORT_RATE);
          + }
          + return instance;
          + }
          + }
          +
          + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf)

          { + return getInstance(SimpleTimer.getInstance(conf)); + }

          +
          + /**
          + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
          + *
          + * @param name
          + * key for the rate limiter
          + * @param rateGenerator
          + * a function which can be called to get what the current rate for the rate limiter should be.
          + */
          + public RateLimiter create(String name, Callable<Long> rateGenerator) {
          + synchronized (activeLimiters) {
          + if (activeLimiters.containsKey(name))

          { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + }

          else {
          + long initialRate;
          + try

          { + initialRate = rateGenerator.call(); + }

          catch (Exception ex)

          { + throw new IllegalStateException(ex); + }

          + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
          + activeLimiters.put(name, limiter);
          + return limiter;
          + }
          + }
          + }
          +
          + protected void update() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters)

          { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }
          + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + }
          + }
          +
          + protected void report() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }

          + for (SharedRateLimiter limiter : limiters)

          { + limiter.report(); + }

          + }
          +
          + protected class SharedRateLimiter extends GuavaRateLimiter {
          + private final LongAdder permitsAcquired = new LongAdder();
          + private final Callable<Long> rateCallable;
          + private final String name;
          +
          + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate)

          { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + }

          +
          + @Override
          + public void acquire(long permits)

          { + super.acquire(permits); + permitsAcquired.add(permits); + }

          +
          + public void update() {
          + try {
          + // Reset rate if needed
          + long rate = rateCallable.call();
          + if (rate != getRate())

          { + setRate(rate); + }

          + } catch (Exception ex)

          { + logger.debug("Failed to update rate limiter", ex); + }

          + }
          +
          + public void report() {
          + long sum = permitsAcquired.sumThenReset();
          + if (sum > 0) {
          + logger.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / REPORT_RATE, getRate()));
          — End diff –

          The timer will not always call report at the requested frequency. Could track the last report time and use that to calculate the rate.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59579043 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedRateLimiterFactory { + private static final long REPORT_RATE = 60000; + private static final long UPDATE_RATE = 1000; + private static SharedRateLimiterFactory instance = null; + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class); + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + + private SharedRateLimiterFactory() {} + + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) { + synchronized (SharedRateLimiterFactory.class) { + if (instance == null) { + instance = new SharedRateLimiterFactory(); + + // Update periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.update(); + } + }, UPDATE_RATE, UPDATE_RATE); + + // Report periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.report(); + } + }, REPORT_RATE, REPORT_RATE); + } + return instance; + } + } + + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) { + return getInstance(SimpleTimer.getInstance(conf)); + } + + /** + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed. + * + * @param name + * key for the rate limiter + * @param rateGenerator + * a function which can be called to get what the current rate for the rate limiter should be. + */ + public RateLimiter create(String name, Callable<Long> rateGenerator) { + synchronized (activeLimiters) { + if (activeLimiters.containsKey(name)) { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + } else { + long initialRate; + try { + initialRate = rateGenerator.call(); + } catch (Exception ex) { + throw new IllegalStateException(ex); + } + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate); + activeLimiters.put(name, limiter); + return limiter; + } + } + } + + protected void update() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + } + } + + protected void report() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.report(); + } + } + + protected class SharedRateLimiter extends GuavaRateLimiter { + private final LongAdder permitsAcquired = new LongAdder(); + private final Callable<Long> rateCallable; + private final String name; + + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + } + + @Override + public void acquire(long permits) { + super.acquire(permits); + permitsAcquired.add(permits); + } + + public void update() { + try { + // Reset rate if needed + long rate = rateCallable.call(); + if (rate != getRate()) { + setRate(rate); + } + } catch (Exception ex) { + logger.debug("Failed to update rate limiter", ex); + } + } + + public void report() { + long sum = permitsAcquired.sumThenReset(); + if (sum > 0) { + logger.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / REPORT_RATE, getRate())); — End diff – The timer will not always call report at the requested frequency. Could track the last report time and use that to calculate the rate.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59579078

          — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java —
          @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
          */

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;

          /**

          • Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
            *
            */
          • public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
          • throws IOException;
            + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
              • End diff –

          > In many parts of our internal code, I would prefer we grab what we need from AccumuloConfiguration, combine it with any additional configuration appropriate for that specific context, and create a new context-specific configuration object to pass around within that context.

          Somehow get the RateLimiterFactory directly from the config? That'd be a stop-gap.

          > I was thinking perhaps a fluent syntax for opening readers/writers would make sense.
          > That said, introducing such a change seemed a bit ambitious.

          Yeah, I totally understand. Finding some middle ground to avoid lots of unrelated changes to your actual feature is ideal.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59579078 — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java — @@ -55,23 +56,24 @@ public static FileOperations getInstance() { */ public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException; public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; /** Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters. * */ public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter, End diff – > In many parts of our internal code, I would prefer we grab what we need from AccumuloConfiguration, combine it with any additional configuration appropriate for that specific context, and create a new context-specific configuration object to pass around within that context. Somehow get the RateLimiterFactory directly from the config? That'd be a stop-gap. > I was thinking perhaps a fluent syntax for opening readers/writers would make sense. > That said, introducing such a change seemed a bit ambitious. Yeah, I totally understand. Finding some middle ground to avoid lots of unrelated changes to your actual feature is ideal.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59579811

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          +import org.apache.accumulo.core.conf.AccumuloConfiguration;
          +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.accumulo.server.util.time.SimpleTimer;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +public class SharedRateLimiterFactory {
          + private static final long REPORT_RATE = 60000;
          + private static final long UPDATE_RATE = 1000;
          + private static SharedRateLimiterFactory instance = null;
          + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
          + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
          +
          + private SharedRateLimiterFactory() {}
          +
          + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
          + synchronized (SharedRateLimiterFactory.class) {
          + if (instance == null) {
          + instance = new SharedRateLimiterFactory();
          +
          + // Update periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.update(); + }

          + }, UPDATE_RATE, UPDATE_RATE);
          +
          + // Report periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.report(); + }

          + }, REPORT_RATE, REPORT_RATE);
          + }
          + return instance;
          + }
          + }
          +
          + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf)

          { + return getInstance(SimpleTimer.getInstance(conf)); + }

          +
          + /**
          + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
          + *
          + * @param name
          + * key for the rate limiter
          + * @param rateGenerator
          + * a function which can be called to get what the current rate for the rate limiter should be.
          + */
          + public RateLimiter create(String name, Callable<Long> rateGenerator) {
          + synchronized (activeLimiters) {
          + if (activeLimiters.containsKey(name))

          { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + }

          else {
          + long initialRate;
          + try

          { + initialRate = rateGenerator.call(); + }

          catch (Exception ex)

          { + throw new IllegalStateException(ex); + }

          + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
          + activeLimiters.put(name, limiter);
          + return limiter;
          + }
          + }
          + }
          +
          + protected void update() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters)

          { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }
          + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + }
          + }
          +
          + protected void report() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }

          + for (SharedRateLimiter limiter : limiters)

          { + limiter.report(); + }

          + }
          +
          + protected class SharedRateLimiter extends GuavaRateLimiter {
          + private final LongAdder permitsAcquired = new LongAdder();
          + private final Callable<Long> rateCallable;
          + private final String name;
          +
          + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate)

          { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + }

          +
          + @Override
          + public void acquire(long permits)

          { + super.acquire(permits); + permitsAcquired.add(permits); + }

          +
          + public void update() {
          + try {
          + // Reset rate if needed
          + long rate = rateCallable.call();
          + if (rate != getRate())

          { + setRate(rate); + }

          + } catch (Exception ex) {
          + logger.debug("Failed to update rate limiter", ex);
          — End diff –

          why log debug here? Not advocating for another level, just curious why this level was chosen.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59579811 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedRateLimiterFactory { + private static final long REPORT_RATE = 60000; + private static final long UPDATE_RATE = 1000; + private static SharedRateLimiterFactory instance = null; + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class); + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + + private SharedRateLimiterFactory() {} + + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) { + synchronized (SharedRateLimiterFactory.class) { + if (instance == null) { + instance = new SharedRateLimiterFactory(); + + // Update periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.update(); + } + }, UPDATE_RATE, UPDATE_RATE); + + // Report periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.report(); + } + }, REPORT_RATE, REPORT_RATE); + } + return instance; + } + } + + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) { + return getInstance(SimpleTimer.getInstance(conf)); + } + + /** + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed. + * + * @param name + * key for the rate limiter + * @param rateGenerator + * a function which can be called to get what the current rate for the rate limiter should be. + */ + public RateLimiter create(String name, Callable<Long> rateGenerator) { + synchronized (activeLimiters) { + if (activeLimiters.containsKey(name)) { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + } else { + long initialRate; + try { + initialRate = rateGenerator.call(); + } catch (Exception ex) { + throw new IllegalStateException(ex); + } + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate); + activeLimiters.put(name, limiter); + return limiter; + } + } + } + + protected void update() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + } + } + + protected void report() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.report(); + } + } + + protected class SharedRateLimiter extends GuavaRateLimiter { + private final LongAdder permitsAcquired = new LongAdder(); + private final Callable<Long> rateCallable; + private final String name; + + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + } + + @Override + public void acquire(long permits) { + super.acquire(permits); + permitsAcquired.add(permits); + } + + public void update() { + try { + // Reset rate if needed + long rate = rateCallable.call(); + if (rate != getRate()) { + setRate(rate); + } + } catch (Exception ex) { + logger.debug("Failed to update rate limiter", ex); — End diff – why log debug here? Not advocating for another level, just curious why this level was chosen.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59579822

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java —
          @@ -0,0 +1,55 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import org.apache.hadoop.fs.FSDataOutputStream;
          +
          +public class PositionedOutputs {
          + public static PositionedOutputStream wrap(final OutputStream fout) {
          + if (fout instanceof FSDataOutputStream) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((FSDataOutputStream) fout).getPos(); + }

          + };
          + } else if (fout instanceof PositionedOutput) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((PositionedOutput) fout).position(); + }

          + };
          + } else {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + }

          + };
          + }
          + }
          +
          + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
          — End diff –

          What I really want to say is "This method returns an `OutputStream` which implements `PositionedOutput`" But there's not really a way to say that in Java.

          I'm probably just being too clever with `PositionedOutput` anyways. The underlying issue is that `BCFile.Writer` needs an `OutputStream` implementing `DataOutput` with some facsimile of the `getPos()` method from `FSDataOutputStream`. `RateLimitedOutputStream` doesn't want to care about or interfere with either of these. But there doesn't seem to be any way to separate concerns cleanly. `getPos()` doesn't have an interface (like `Seekable`), and `java.io.DataOutputStream` (and so `FSDataOutputStream`) isn't implemented in a way which readily supports subclassing/inheritance.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59579822 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java — @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +public class PositionedOutputs { + public static PositionedOutputStream wrap(final OutputStream fout) { + if (fout instanceof FSDataOutputStream) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((FSDataOutputStream) fout).getPos(); + } + }; + } else if (fout instanceof PositionedOutput) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((PositionedOutput) fout).position(); + } + }; + } else { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + } + }; + } + } + + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput { — End diff – What I really want to say is "This method returns an `OutputStream` which implements `PositionedOutput`" But there's not really a way to say that in Java. I'm probably just being too clever with `PositionedOutput` anyways. The underlying issue is that `BCFile.Writer` needs an `OutputStream` implementing `DataOutput` with some facsimile of the `getPos()` method from `FSDataOutputStream`. `RateLimitedOutputStream` doesn't want to care about or interfere with either of these. But there doesn't seem to be any way to separate concerns cleanly. `getPos()` doesn't have an interface (like `Seekable`), and `java.io.DataOutputStream` (and so `FSDataOutputStream`) isn't implemented in a way which readily supports subclassing/inheritance.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user benjumanji commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59581047

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java —
          @@ -0,0 +1,55 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import org.apache.hadoop.fs.FSDataOutputStream;
          +
          +public class PositionedOutputs {
          + public static PositionedOutputStream wrap(final OutputStream fout) {
          + if (fout instanceof FSDataOutputStream) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((FSDataOutputStream) fout).getPos(); + }

          + };
          + } else if (fout instanceof PositionedOutput) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((PositionedOutput) fout).position(); + }

          + };
          + } else {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + }

          + };
          + }
          + }
          +
          + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
          — End diff –

          `<T extends Outputstream & PositionedOutput> T foo()` would work? Bit gross though.

          Show
          githubbot ASF GitHub Bot added a comment - Github user benjumanji commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59581047 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java — @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +public class PositionedOutputs { + public static PositionedOutputStream wrap(final OutputStream fout) { + if (fout instanceof FSDataOutputStream) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((FSDataOutputStream) fout).getPos(); + } + }; + } else if (fout instanceof PositionedOutput) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((PositionedOutput) fout).position(); + } + }; + } else { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + } + }; + } + } + + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput { — End diff – `<T extends Outputstream & PositionedOutput> T foo()` would work? Bit gross though.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59581242

          — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java —
          @@ -0,0 +1,51 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.util.ratelimit;
          +
          +/** Rate limiter from the Guava library. */
          +public class GuavaRateLimiter implements RateLimiter {
          + private final com.google.common.util.concurrent.RateLimiter rateLimiter;
          + private long currentRate;
          +
          + public GuavaRateLimiter(long initialRate) {
          + this.currentRate = initialRate;
          — End diff –

          I'm adopting the convention that a non-positive rate should mean "unlimited", and so allowing non-positive values as the current rate.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59581242 — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java — @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.ratelimit; + +/** Rate limiter from the Guava library. */ +public class GuavaRateLimiter implements RateLimiter { + private final com.google.common.util.concurrent.RateLimiter rateLimiter; + private long currentRate; + + public GuavaRateLimiter(long initialRate) { + this.currentRate = initialRate; — End diff – I'm adopting the convention that a non-positive rate should mean "unlimited", and so allowing non-positive values as the current rate.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59581868

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          +import org.apache.accumulo.core.conf.AccumuloConfiguration;
          +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.accumulo.server.util.time.SimpleTimer;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +public class SharedRateLimiterFactory {
          + private static final long REPORT_RATE = 60000;
          + private static final long UPDATE_RATE = 1000;
          + private static SharedRateLimiterFactory instance = null;
          + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
          + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
          +
          + private SharedRateLimiterFactory() {}
          +
          + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
          + synchronized (SharedRateLimiterFactory.class) {
          + if (instance == null) {
          + instance = new SharedRateLimiterFactory();
          +
          + // Update periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.update(); + }

          + }, UPDATE_RATE, UPDATE_RATE);
          +
          + // Report periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.report(); + }

          + }, REPORT_RATE, REPORT_RATE);
          + }
          + return instance;
          + }
          + }
          +
          + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf)

          { + return getInstance(SimpleTimer.getInstance(conf)); + }

          +
          + /**
          + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
          + *
          + * @param name
          + * key for the rate limiter
          + * @param rateGenerator
          + * a function which can be called to get what the current rate for the rate limiter should be.
          + */
          + public RateLimiter create(String name, Callable<Long> rateGenerator) {
          + synchronized (activeLimiters) {
          + if (activeLimiters.containsKey(name))

          { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + }

          else {
          + long initialRate;
          + try

          { + initialRate = rateGenerator.call(); + }

          catch (Exception ex)

          { + throw new IllegalStateException(ex); + }

          + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
          + activeLimiters.put(name, limiter);
          + return limiter;
          + }
          + }
          + }
          +
          + protected void update() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters)

          { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }
          + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + }
          + }
          +
          + protected void report() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }

          + for (SharedRateLimiter limiter : limiters)

          { + limiter.report(); + }

          + }
          +
          + protected class SharedRateLimiter extends GuavaRateLimiter {
          + private final LongAdder permitsAcquired = new LongAdder();
          + private final Callable<Long> rateCallable;
          + private final String name;
          +
          + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate)

          { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + }

          +
          + @Override
          + public void acquire(long permits)

          { + super.acquire(permits); + permitsAcquired.add(permits); + }

          +
          + public void update() {
          + try {
          + // Reset rate if needed
          + long rate = rateCallable.call();
          + if (rate != getRate())

          { + setRate(rate); + }

          + } catch (Exception ex) {
          + logger.debug("Failed to update rate limiter", ex);
          — End diff –

          1. "This should never happen."
          2. It's not a tragedy if it does happen.

          Suggestions?

          I suppose I could introduce a new interface (e.g. `SharedRateLimiterFactory.RateProvider`) instead of just using `Callable<Long>`. By making `RateProvider`'s method not declare any checked exceptions, we could avoid the issue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59581868 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedRateLimiterFactory { + private static final long REPORT_RATE = 60000; + private static final long UPDATE_RATE = 1000; + private static SharedRateLimiterFactory instance = null; + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class); + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + + private SharedRateLimiterFactory() {} + + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) { + synchronized (SharedRateLimiterFactory.class) { + if (instance == null) { + instance = new SharedRateLimiterFactory(); + + // Update periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.update(); + } + }, UPDATE_RATE, UPDATE_RATE); + + // Report periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.report(); + } + }, REPORT_RATE, REPORT_RATE); + } + return instance; + } + } + + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) { + return getInstance(SimpleTimer.getInstance(conf)); + } + + /** + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed. + * + * @param name + * key for the rate limiter + * @param rateGenerator + * a function which can be called to get what the current rate for the rate limiter should be. + */ + public RateLimiter create(String name, Callable<Long> rateGenerator) { + synchronized (activeLimiters) { + if (activeLimiters.containsKey(name)) { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + } else { + long initialRate; + try { + initialRate = rateGenerator.call(); + } catch (Exception ex) { + throw new IllegalStateException(ex); + } + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate); + activeLimiters.put(name, limiter); + return limiter; + } + } + } + + protected void update() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + } + } + + protected void report() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.report(); + } + } + + protected class SharedRateLimiter extends GuavaRateLimiter { + private final LongAdder permitsAcquired = new LongAdder(); + private final Callable<Long> rateCallable; + private final String name; + + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + } + + @Override + public void acquire(long permits) { + super.acquire(permits); + permitsAcquired.add(permits); + } + + public void update() { + try { + // Reset rate if needed + long rate = rateCallable.call(); + if (rate != getRate()) { + setRate(rate); + } + } catch (Exception ex) { + logger.debug("Failed to update rate limiter", ex); — End diff – "This should never happen." It's not a tragedy if it does happen. Suggestions? I suppose I could introduce a new interface (e.g. `SharedRateLimiterFactory.RateProvider`) instead of just using `Callable<Long>`. By making `RateProvider`'s method not declare any checked exceptions, we could avoid the issue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59582717

          — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java —
          @@ -0,0 +1,55 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.file.streams;
          +
          +import java.io.FilterOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import org.apache.hadoop.fs.FSDataOutputStream;
          +
          +public class PositionedOutputs {
          + public static PositionedOutputStream wrap(final OutputStream fout) {
          + if (fout instanceof FSDataOutputStream) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((FSDataOutputStream) fout).getPos(); + }

          + };
          + } else if (fout instanceof PositionedOutput) {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + return ((PositionedOutput) fout).position(); + }

          + };
          + } else {
          + return new PositionedOutputStream(fout) {
          + @Override
          + public long position() throws IOException

          { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + }

          + };
          + }
          + }
          +
          + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
          — End diff –

          Wouldn't work. The caller would have to explicitly specify what `T` is (since the compiler can't determine it), and `foo` would be obliged to return a `T`.

          It might work to make `PositionedOutputStream` private. Then nobody could actually do anything more with it than make use of the fact that it extends `OutputStream` and implements `PositionedOutput`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59582717 — Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java — @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +public class PositionedOutputs { + public static PositionedOutputStream wrap(final OutputStream fout) { + if (fout instanceof FSDataOutputStream) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((FSDataOutputStream) fout).getPos(); + } + }; + } else if (fout instanceof PositionedOutput) { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + return ((PositionedOutput) fout).position(); + } + }; + } else { + return new PositionedOutputStream(fout) { + @Override + public long position() throws IOException { + throw new UnsupportedOperationException("Underlying stream does not support position()"); + } + }; + } + } + + public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput { — End diff – Wouldn't work. The caller would have to explicitly specify what `T` is (since the compiler can't determine it), and `foo` would be obliged to return a `T`. It might work to make `PositionedOutputStream` private. Then nobody could actually do anything more with it than make use of the fact that it extends `OutputStream` and implements `PositionedOutput`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59584225

          — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java —
          @@ -0,0 +1,152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.server.util.ratelimit;
          +
          +import com.google.common.collect.ImmutableList;
          +import java.util.List;
          +import java.util.WeakHashMap;
          +import java.util.concurrent.Callable;
          +import java.util.concurrent.atomic.LongAdder;
          +import org.apache.accumulo.core.conf.AccumuloConfiguration;
          +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
          +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
          +import org.apache.accumulo.server.util.time.SimpleTimer;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +public class SharedRateLimiterFactory {
          + private static final long REPORT_RATE = 60000;
          + private static final long UPDATE_RATE = 1000;
          + private static SharedRateLimiterFactory instance = null;
          + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
          + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
          +
          + private SharedRateLimiterFactory() {}
          +
          + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
          + synchronized (SharedRateLimiterFactory.class) {
          + if (instance == null) {
          + instance = new SharedRateLimiterFactory();
          +
          + // Update periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.update(); + }

          + }, UPDATE_RATE, UPDATE_RATE);
          +
          + // Report periodically
          + timer.schedule(new Runnable() {
          + @Override
          + public void run()

          { + instance.report(); + }

          + }, REPORT_RATE, REPORT_RATE);
          + }
          + return instance;
          + }
          + }
          +
          + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf)

          { + return getInstance(SimpleTimer.getInstance(conf)); + }

          +
          + /**
          + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
          + *
          + * @param name
          + * key for the rate limiter
          + * @param rateGenerator
          + * a function which can be called to get what the current rate for the rate limiter should be.
          + */
          + public RateLimiter create(String name, Callable<Long> rateGenerator) {
          + synchronized (activeLimiters) {
          + if (activeLimiters.containsKey(name))

          { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + }

          else {
          + long initialRate;
          + try

          { + initialRate = rateGenerator.call(); + }

          catch (Exception ex)

          { + throw new IllegalStateException(ex); + }

          + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
          + activeLimiters.put(name, limiter);
          + return limiter;
          + }
          + }
          + }
          +
          + protected void update() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters)

          { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }
          + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + }
          + }
          +
          + protected void report() {
          + List<SharedRateLimiter> limiters;
          + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + }

          + for (SharedRateLimiter limiter : limiters)

          { + limiter.report(); + }

          + }
          +
          + protected class SharedRateLimiter extends GuavaRateLimiter {
          + private final LongAdder permitsAcquired = new LongAdder();
          + private final Callable<Long> rateCallable;
          + private final String name;
          +
          + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate)

          { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + }

          +
          + @Override
          + public void acquire(long permits)

          { + super.acquire(permits); + permitsAcquired.add(permits); + }

          +
          + public void update() {
          + try {
          + // Reset rate if needed
          + long rate = rateCallable.call();
          + if (rate != getRate())

          { + setRate(rate); + }

          + } catch (Exception ex) {
          + logger.debug("Failed to update rate limiter", ex);
          — End diff –

          OIC its because of the Callable, no suggestions this is fine.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59584225 — Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java — @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.ratelimit; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SharedRateLimiterFactory { + private static final long REPORT_RATE = 60000; + private static final long UPDATE_RATE = 1000; + private static SharedRateLimiterFactory instance = null; + private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class); + private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + + private SharedRateLimiterFactory() {} + + public static SharedRateLimiterFactory getInstance(SimpleTimer timer) { + synchronized (SharedRateLimiterFactory.class) { + if (instance == null) { + instance = new SharedRateLimiterFactory(); + + // Update periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.update(); + } + }, UPDATE_RATE, UPDATE_RATE); + + // Report periodically + timer.schedule(new Runnable() { + @Override + public void run() { + instance.report(); + } + }, REPORT_RATE, REPORT_RATE); + } + return instance; + } + } + + public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) { + return getInstance(SimpleTimer.getInstance(conf)); + } + + /** + * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed. + * + * @param name + * key for the rate limiter + * @param rateGenerator + * a function which can be called to get what the current rate for the rate limiter should be. + */ + public RateLimiter create(String name, Callable<Long> rateGenerator) { + synchronized (activeLimiters) { + if (activeLimiters.containsKey(name)) { + SharedRateLimiter limiter = activeLimiters.get(name); + return limiter; + } else { + long initialRate; + try { + initialRate = rateGenerator.call(); + } catch (Exception ex) { + throw new IllegalStateException(ex); + } + SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate); + activeLimiters.put(name, limiter); + return limiter; + } + } + } + + protected void update() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.update(); + } + } + + protected void report() { + List<SharedRateLimiter> limiters; + synchronized (activeLimiters) { + limiters = ImmutableList.copyOf(activeLimiters.values()); + } + for (SharedRateLimiter limiter : limiters) { + limiter.report(); + } + } + + protected class SharedRateLimiter extends GuavaRateLimiter { + private final LongAdder permitsAcquired = new LongAdder(); + private final Callable<Long> rateCallable; + private final String name; + + SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) { + super(initialRate); + this.name = name; + this.rateCallable = rateCallable; + } + + @Override + public void acquire(long permits) { + super.acquire(permits); + permitsAcquired.add(permits); + } + + public void update() { + try { + // Reset rate if needed + long rate = rateCallable.call(); + if (rate != getRate()) { + setRate(rate); + } + } catch (Exception ex) { + logger.debug("Failed to update rate limiter", ex); — End diff – OIC its because of the Callable, no suggestions this is fine.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59585544

          — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java —
          @@ -0,0 +1,51 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.util.ratelimit;
          +
          +/** Rate limiter from the Guava library. */
          +public class GuavaRateLimiter implements RateLimiter {
          + private final com.google.common.util.concurrent.RateLimiter rateLimiter;
          + private long currentRate;
          +
          + public GuavaRateLimiter(long initialRate) {
          + this.currentRate = initialRate;
          — End diff –

          The docs for `tserver.compaction.major.throughput` specify using 0 for unlimited. Is specifying 0 or negative documented elsewhere?

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59585544 — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java — @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.ratelimit; + +/** Rate limiter from the Guava library. */ +public class GuavaRateLimiter implements RateLimiter { + private final com.google.common.util.concurrent.RateLimiter rateLimiter; + private long currentRate; + + public GuavaRateLimiter(long initialRate) { + this.currentRate = initialRate; — End diff – The docs for `tserver.compaction.major.throughput` specify using 0 for unlimited. Is specifying 0 or negative documented elsewhere?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59586564

          — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java —
          @@ -0,0 +1,51 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.util.ratelimit;
          +
          +/** Rate limiter from the Guava library. */
          +public class GuavaRateLimiter implements RateLimiter {
          + private final com.google.common.util.concurrent.RateLimiter rateLimiter;
          + private long currentRate;
          +
          + public GuavaRateLimiter(long initialRate) {
          + this.currentRate = initialRate;
          — End diff –

          Looking into the validation associated with, PropertyType.MEMORY it seems to check for >=0.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59586564 — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java — @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.ratelimit; + +/** Rate limiter from the Guava library. */ +public class GuavaRateLimiter implements RateLimiter { + private final com.google.common.util.concurrent.RateLimiter rateLimiter; + private long currentRate; + + public GuavaRateLimiter(long initialRate) { + this.currentRate = initialRate; — End diff – Looking into the validation associated with, PropertyType.MEMORY it seems to check for >=0.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59586665

          — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java —
          @@ -0,0 +1,51 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.accumulo.core.util.ratelimit;
          +
          +/** Rate limiter from the Guava library. */
          +public class GuavaRateLimiter implements RateLimiter {
          + private final com.google.common.util.concurrent.RateLimiter rateLimiter;
          + private long currentRate;
          +
          + public GuavaRateLimiter(long initialRate) {
          + this.currentRate = initialRate;
          — End diff –

          There's a hint in the javadocs for `RateLimiter.getRate()`:
          ```java
          /** Get current QPS of the rate limiter, with a nonpositive rate indicating no limit. */
          public long getRate();
          ```
          But I'll add more explicit comments to `GuavaRateLimiter` and to `SharedRateLimiterFactory`

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59586665 — Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java — @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.ratelimit; + +/** Rate limiter from the Guava library. */ +public class GuavaRateLimiter implements RateLimiter { + private final com.google.common.util.concurrent.RateLimiter rateLimiter; + private long currentRate; + + public GuavaRateLimiter(long initialRate) { + this.currentRate = initialRate; — End diff – There's a hint in the javadocs for `RateLimiter.getRate()`: ```java /** Get current QPS of the rate limiter, with a nonpositive rate indicating no limit. */ public long getRate(); ``` But I'll add more explicit comments to `GuavaRateLimiter` and to `SharedRateLimiterFactory`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59589138

          — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java —
          @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
          */

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;

          /**

          • Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
            *
            */
          • public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
          • throws IOException;
            + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
              • End diff –

          To @ctubbsii's comment: Sounds interesting, but is it relevant to my pull request? It sounds like you're advocating a more general refactoring of `AccumuloConfiguration` handling.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59589138 — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java — @@ -55,23 +56,24 @@ public static FileOperations getInstance() { */ public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException; public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; /** Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters. * */ public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter, End diff – To @ctubbsii's comment: Sounds interesting, but is it relevant to my pull request? It sounds like you're advocating a more general refactoring of `AccumuloConfiguration` handling.
          Hide
          kturner Keith Turner added a comment -

          Recording offline discussions I had w/ Shawn about this issue.

          Shawn Walker experimented with per table rate limits while working on this. He identified a serious issue with this, a table with a low rate limit could monopolize all of the compaction threads causing problems for all other tables. One possible solution to this we discussed was Compaction Resource Groups. Each CRG would have a thread pool and rate limit. Multiple named CRGs could be defined at the system level. Tables and user initiated compactions could be assigned to a CRG. This solution allows a table to have low rate limit without impacting tables with higher rate limits.

          Show
          kturner Keith Turner added a comment - Recording offline discussions I had w/ Shawn about this issue. Shawn Walker experimented with per table rate limits while working on this. He identified a serious issue with this, a table with a low rate limit could monopolize all of the compaction threads causing problems for all other tables. One possible solution to this we discussed was Compaction Resource Groups. Each CRG would have a thread pool and rate limit. Multiple named CRGs could be defined at the system level. Tables and user initiated compactions could be assigned to a CRG. This solution allows a table to have low rate limit without impacting tables with higher rate limits.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59596175

          — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java —
          @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
          */

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;

          /**

          • Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
            *
            */
          • public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
          • throws IOException;
            + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
              • End diff –

          > It sounds like you're advocating a more general refactoring of AccumuloConfiguration handling.

          Well, it's relevant in that without some change, you have `, null` sprinkled everywhere.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59596175 — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java — @@ -55,23 +56,24 @@ public static FileOperations getInstance() { */ public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException; public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; /** Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters. * */ public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter, End diff – > It sounds like you're advocating a more general refactoring of AccumuloConfiguration handling. Well, it's relevant in that without some change, you have `, null` sprinkled everywhere.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ctubbsii commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59597441

          — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java —
          @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
          */

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;

          /**

          • Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
            *
            */
          • public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
          • throws IOException;
            + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
              • End diff –

          > Somehow get the RateLimiterFactory directly from the config? That'd be a stop-gap.

          @joshelser Not sure exactly what you mean, but what I meant was along the lines of creating a "FileOptions" (or similar) object to pass around instead of the AccumuloConfiguration object which is narrowly applicable to the context of those utility methods (context: operations on files?), and which is composed of the relevant configs from wherever they are sourced.

          @ShawnWalker I would consider it an optional improvement to this pull request. If the code modified here can be clearly demarcated as a distinct context, it may benefit from this kind of improvement as an alternative to adding the extra parameter everywhere. It can also be done at some future point in time, as a more general improvement (and a much bigger one... as it would require identifying multiple distinct contexts instead of possibly doing one at a time).

          Show
          githubbot ASF GitHub Bot added a comment - Github user ctubbsii commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59597441 — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java — @@ -55,23 +56,24 @@ public static FileOperations getInstance() { */ public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException; public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; /** Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters. * */ public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter, End diff – > Somehow get the RateLimiterFactory directly from the config? That'd be a stop-gap. @joshelser Not sure exactly what you mean, but what I meant was along the lines of creating a "FileOptions" (or similar) object to pass around instead of the AccumuloConfiguration object which is narrowly applicable to the context of those utility methods (context: operations on files?), and which is composed of the relevant configs from wherever they are sourced. @ShawnWalker I would consider it an optional improvement to this pull request. If the code modified here can be clearly demarcated as a distinct context, it may benefit from this kind of improvement as an alternative to adding the extra parameter everywhere. It can also be done at some future point in time, as a more general improvement (and a much bigger one... as it would require identifying multiple distinct contexts instead of possibly doing one at a time).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/90#discussion_r59598295

          — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java —
          @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
          */

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;

          public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,

          • AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
            + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;

          /**

          • Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
            *
            */
          • public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
          • throws IOException;
            + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
              • End diff –

          > Not sure exactly what you mean, but what I meant was along the lines of creating a "FileOptions" (or similar) object to pass around instead of the AccumuloConfiguration object which is narrowly applicable to the context of those utility methods (context: operations on files?), and which is composed of the relevant configs from wherever they are sourced.

          Something like:
          ```
          T impl = acuConf.getImpl(Property.MY_FACTORY_IMPLEMENTATION);
          ```
          Concretely:
          ```
          RATE_LIMITER_FACTORY("tserver.rate.limiter.factory", SharedRateLimiterFactory.class, PropertyType.IMPL, "...")
          ```
          ```
          RateLimiterFactory factory = acuConf.getImpl(Property.RATE_LIMITER_FACTORY);
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/90#discussion_r59598295 — Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java — @@ -55,23 +56,24 @@ public static FileOperations getInstance() { */ public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException; public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; + RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; /** Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters. * */ public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter, End diff – > Not sure exactly what you mean, but what I meant was along the lines of creating a "FileOptions" (or similar) object to pass around instead of the AccumuloConfiguration object which is narrowly applicable to the context of those utility methods (context: operations on files?), and which is composed of the relevant configs from wherever they are sourced. Something like: ``` T impl = acuConf.getImpl(Property.MY_FACTORY_IMPLEMENTATION); ``` Concretely: ``` RATE_LIMITER_FACTORY("tserver.rate.limiter.factory", SharedRateLimiterFactory.class, PropertyType.IMPL, "...") ``` ``` RateLimiterFactory factory = acuConf.getImpl(Property.RATE_LIMITER_FACTORY); ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-209647745

          I played around with this branch locally. I created a table with 10,000,000 entries using test_ingest using the following commands.

          ```
          ./bin/accumulo shell -u root -p secret -e "createtable test_ingest"
          ./bin/accumulo org.apache.accumulo.test.TestIngest -u root -p secret --timestamp 1 --size 50 --random 56 --rows 10000000 --start 0 --cols 1 --instance instance16
          ```
          I set the rate limit to 5M and forced a compaction. I saw the following in the tserver logs.

          ```
          Compaction 2<< 10,000,000 read | 10,000,000 written | 122,925 entries/sec | 81.350 secs | 431,758,096 bytes | 5307413.596 byte/sec
          ```

          Then I split the table into 8 tablets and forced a compaction to test the rate limit for multiple threads. I had the default of 3 compaction threads. I saw the following in the logs for this test.

          ```
          Compaction 2;row_0003749;row_00025 1,249,000 read | 1,249,000 written | 41,866 entries/sec | 29.833 secs | 53,926,291 bytes | 1807605.370 byte/sec
          Compaction 2;row_00025;row_000125 1,250,000 read | 1,250,000 written | 41,899 entries/sec | 29.833 secs | 53,970,229 bytes | 1809078.168 byte/sec
          Compaction 2;row_000125< 1,250,000 read | 1,250,000 written | 41,783 entries/sec | 29.916 secs | 53,969,343 bytes | 1804029.382 byte/sec
          Compaction 2;row_000625;row_0005 1,250,000 read | 1,250,000 written | 42,134 entries/sec | 29.667 secs | 53,970,847 bytes | 1819221.593 byte/sec
          Compaction 2;row_0005;row_0003749 1,251,000 read | 1,251,000 written | 42,109 entries/sec | 29.708 secs | 54,012,874 bytes | 1818125.555 byte/sec
          Compaction 2;row_00075;row_000625 1,250,000 read | 1,250,000 written | 41,881 entries/sec | 29.846 secs | 53,969,549 bytes | 1808267.406 byte/sec
          Compaction 2;row_000875;row_00075 1,250,000 read | 1,250,000 written | 63,909 entries/sec | 19.559 secs | 53,969,511 bytes | 2759318.523 byte/sec
          Compaction 2<;row_000875 1,250,000 read | 1,250,000 written | 63,798 entries/sec | 19.593 secs | 53,969,987 bytes | 2754554.535 byte/sec
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-209647745 I played around with this branch locally. I created a table with 10,000,000 entries using test_ingest using the following commands. ``` ./bin/accumulo shell -u root -p secret -e "createtable test_ingest" ./bin/accumulo org.apache.accumulo.test.TestIngest -u root -p secret --timestamp 1 --size 50 --random 56 --rows 10000000 --start 0 --cols 1 --instance instance16 ``` I set the rate limit to 5M and forced a compaction. I saw the following in the tserver logs. ``` Compaction 2<< 10,000,000 read | 10,000,000 written | 122,925 entries/sec | 81.350 secs | 431,758,096 bytes | 5307413.596 byte/sec ``` Then I split the table into 8 tablets and forced a compaction to test the rate limit for multiple threads. I had the default of 3 compaction threads. I saw the following in the logs for this test. ``` Compaction 2;row_0003749;row_00025 1,249,000 read | 1,249,000 written | 41,866 entries/sec | 29.833 secs | 53,926,291 bytes | 1807605.370 byte/sec Compaction 2;row_00025;row_000125 1,250,000 read | 1,250,000 written | 41,899 entries/sec | 29.833 secs | 53,970,229 bytes | 1809078.168 byte/sec Compaction 2;row_000125< 1,250,000 read | 1,250,000 written | 41,783 entries/sec | 29.916 secs | 53,969,343 bytes | 1804029.382 byte/sec Compaction 2;row_000625;row_0005 1,250,000 read | 1,250,000 written | 42,134 entries/sec | 29.667 secs | 53,970,847 bytes | 1819221.593 byte/sec Compaction 2;row_0005;row_0003749 1,251,000 read | 1,251,000 written | 42,109 entries/sec | 29.708 secs | 54,012,874 bytes | 1818125.555 byte/sec Compaction 2;row_00075;row_000625 1,250,000 read | 1,250,000 written | 41,881 entries/sec | 29.846 secs | 53,969,549 bytes | 1808267.406 byte/sec Compaction 2;row_000875;row_00075 1,250,000 read | 1,250,000 written | 63,909 entries/sec | 19.559 secs | 53,969,511 bytes | 2759318.523 byte/sec Compaction 2<;row_000875 1,250,000 read | 1,250,000 written | 63,798 entries/sec | 19.593 secs | 53,969,987 bytes | 2754554.535 byte/sec ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-210089684

          I've made changes to address most of the comments on this thread. I've also addressed a performance concern that Keith Turner noticed (`PositionedOutputs.PositionedOutputStream` was behaving poorly).

          I've additionally fixed an issue with tracing in `TabletServerBatchWriter` that was causing the test `ShellServerIT.trace(...)` to fail for me for reasons unrelated to my changes. Perhaps I should separate that out as a separate issue/patch?

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-210089684 I've made changes to address most of the comments on this thread. I've also addressed a performance concern that Keith Turner noticed (`PositionedOutputs.PositionedOutputStream` was behaving poorly). I've additionally fixed an issue with tracing in `TabletServerBatchWriter` that was causing the test `ShellServerIT.trace(...)` to fail for me for reasons unrelated to my changes. Perhaps I should separate that out as a separate issue/patch?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ctubbsii commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-210122789

          @ShawnWalker That sounds great! If you can separate out the issue with ShellServerIT as a separate issue, that'd be helpful. I'm guessing it affects older branches, as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ctubbsii commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-210122789 @ShawnWalker That sounds great! If you can separate out the issue with ShellServerIT as a separate issue, that'd be helpful. I'm guessing it affects older branches, as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-211398304

          @ShawnWalker I suspect the issue you fixed that was causing ShellServerIT to fail was introduced by ACCUMULO-1755

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-211398304 @ShawnWalker I suspect the issue you fixed that was causing ShellServerIT to fail was introduced by ACCUMULO-1755
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-211402498

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-211402498 +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-211409828

          I've reverse committed changes to `TabletServerBatchWriter` (which were moved to ACCUMULO-4191), and then squashed the changeset to a single commit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-211409828 I've reverse committed changes to `TabletServerBatchWriter` (which were moved to ACCUMULO-4191 ), and then squashed the changeset to a single commit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-211414852

          I think everything I asked about has been taken care of. Thanks for that, Shawn.

          The only thing I don't see (and I didn't say it explicitly earlier, so I don't think it's a blocker to merge this in) is a high-level test. I see you added some tests for the rate-limiter piece. I'm wondering if we could make an integration test specifically to test this feature. It's nice when we have a general test class (built around a minicluster) available so that we can easily test potential bugs and add new tests easily.

          @keith-turner LMK if you have time to merge this in and run the tests. Otherwise, I'll kick off something myself.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-211414852 I think everything I asked about has been taken care of. Thanks for that, Shawn. The only thing I don't see (and I didn't say it explicitly earlier, so I don't think it's a blocker to merge this in) is a high-level test. I see you added some tests for the rate-limiter piece. I'm wondering if we could make an integration test specifically to test this feature. It's nice when we have a general test class (built around a minicluster) available so that we can easily test potential bugs and add new tests easily. @keith-turner LMK if you have time to merge this in and run the tests. Otherwise, I'll kick off something myself.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-211418879

          @joshelser I can merge it. An end-to-end test would be nice to detect regressions. I suppose the test would make sure a compaction doesn't run too fast?

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-211418879 @joshelser I can merge it. An end-to-end test would be nice to detect regressions. I suppose the test would make sure a compaction doesn't run too fast?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-211419757

          > I suppose the test would make sure a compaction doesn't run too fast?

          Yeah, I was thinking about how best to test this. You can only reasonably assert a lower-bound on compaction time (to avoid performance skew on certain hosts). Maybe turning off compression for a table, writing a bunch of data and then asserting that a compaction takes at least X time is easiest. You'll still have to account for "compression" from the run-length encoding, but at least that should be uniform across hosts.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-211419757 > I suppose the test would make sure a compaction doesn't run too fast? Yeah, I was thinking about how best to test this. You can only reasonably assert a lower-bound on compaction time (to avoid performance skew on certain hosts). Maybe turning off compression for a table, writing a bunch of data and then asserting that a compaction takes at least X time is easiest. You'll still have to account for "compression" from the run-length encoding, but at least that should be uniform across hosts.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ShawnWalker commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-211546144

          With Keith's help, I've added a small end-to-end IT on rate limiting of major compactions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ShawnWalker commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-211546144 With Keith's help, I've added a small end-to-end IT on rate limiting of major compactions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-211550045

          > With Keith's help, I've added a small end-to-end IT on rate limiting of major compactions.

          Looks good. Great work guys! :+1:

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-211550045 > With Keith's help, I've added a small end-to-end IT on rate limiting of major compactions. Looks good. Great work guys! :+1:
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ctubbsii commented on the pull request:

          https://github.com/apache/accumulo/pull/90#issuecomment-211579493

          Looks like there's a few trivial findbugs issues to address in the IT.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ctubbsii commented on the pull request: https://github.com/apache/accumulo/pull/90#issuecomment-211579493 Looks like there's a few trivial findbugs issues to address in the IT.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/accumulo/pull/90

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/accumulo/pull/90

            People

            • Assignee:
              ShawnWalker Shawn Walker
              Reporter:
              ShawnWalker Shawn Walker
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 20m
                20m

                  Development