Uploaded image for project: 'Accumulo'
  1. Accumulo
  2. ACCUMULO-1755

BatchWriter blocks all addMutation calls while binning mutations

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.6.6, 1.7.2, 1.8.0
    • Component/s: client
    • Labels:
      None

      Description

      Through code inspection, we found that the BatchWriter bins mutations inside of a synchronized block that covers calls to addMutation. Binning potentially involves lookups of tablet metadata and processes a fair amount of information. We will get better parallelism if we can either unlock the lock while binning, dedicate another thread to do the binning, or use one of the send threads to do the binning.

      This has not been verified empirically yet, so there is not yet any profiling info to indicate the level of improvement that we should expect. Profiling and repeatable demonstration of this performance bottleneck should be the first step on this ticket.

      1. TSBWBinningSimulator.java
        3 kB
        Adam Fuchs
      2. Screen Shot 2016-03-08 at 9.47.39 AM.png
        73 kB
        Adam Fuchs
      3. ACCUMULO-1755.patch
        16 kB
        Dave Marion
      4. 1755-perf-test.patch
        5 kB
        Dave Marion
      5. 1755-nosync-perf-test.patch
        27 kB
        Dave Marion

        Issue Links

          Activity

          Hide
          afuchs Adam Fuchs added a comment -

          Dave Marion et. al., I did a bit of theory work around this and built a simulator to illustrate performance with different pipelining strategies. Here are my calculations to go with it:

          Variables:
          N = Batch Size
          S = Send Time per mutation
          B = Binning Time per Mutation
          G = Generation Time per Mutation
          T = # Send Threads
          P = # Generation Threads

          Current 2-Stage Pipeline Throughput
          = N / max(stage 1 time, stage 2 time)
          = N / max(BN+GN/P, SN/T)
          = 1 / max(B+G/P, S/T)
          = min(1/(B+G/P), T/S)

          Async 3-Stage Pipeline Throughput
          = N / max(stage1, stage2, stage3)
          = N / max(BN, GN/P, SN/T)
          = 1 / max(B, G/P, S/T)
          = min(1/B, P/G, T/S)

          (Assumes no cpu contention, even workload over threads)

          Theoretical implications include:

          • When binning time, per-thread generation time, and per-thread send time are close we can speed up the pipeline by 2x by adding another stage.
          • We can balance the pipeline between generation threads and send threads but not binning threads – may need a pool for that to further optimize TSBW throughput
          • If send threads or downstream elements are the bottleneck then we will have less impact by adding a third stage to the pipeline
          Show
          afuchs Adam Fuchs added a comment - Dave Marion et. al., I did a bit of theory work around this and built a simulator to illustrate performance with different pipelining strategies. Here are my calculations to go with it: Variables: N = Batch Size S = Send Time per mutation B = Binning Time per Mutation G = Generation Time per Mutation T = # Send Threads P = # Generation Threads Current 2-Stage Pipeline Throughput = N / max(stage 1 time, stage 2 time) = N / max(BN+GN/P, SN/T) = 1 / max(B+G/P, S/T) = min(1/(B+G/P), T/S) Async 3-Stage Pipeline Throughput = N / max(stage1, stage2, stage3) = N / max(BN, GN/P, SN/T) = 1 / max(B, G/P, S/T) = min(1/B, P/G, T/S) (Assumes no cpu contention, even workload over threads) Theoretical implications include: When binning time, per-thread generation time, and per-thread send time are close we can speed up the pipeline by 2x by adding another stage. We can balance the pipeline between generation threads and send threads but not binning threads – may need a pool for that to further optimize TSBW throughput If send threads or downstream elements are the bottleneck then we will have less impact by adding a third stage to the pipeline
          Hide
          dlmarion Dave Marion added a comment -

          I wrote a new test that sends 1M mutations total using N threads with a BatchWriter buffer of different sizes. The test is run twice and the time discarded to account for JVM startup. Then the test is run 10 times and the average (in seconds) is reported for total time and time to add mutations.

          First, I added some code to the TSBW to determine that with my test data I was sending the following number of batches using 1, 10, and 100MB buffers:

          BatchWriter Max Memory Size Flushes To Accumulo
          1MB 515
          10MB 52
          100MB 6

          Here are the results of the test:

          master branch

          Using the patch 1755-perf-test.patch

          Total Time

          Threads 1MB 10MB 100MB
          1 3.121 2.818 3.158
          2 3.102 2.414 2.950
          4 3.367 2.573 3.114
          8 3.422 2.569 3.140
          16 3.590 2.741 3.332

          Add Mutation Time

          Threads 1MB 10MB 100MB
          1 3.114 2.733 2.498
          2 3.088 2.350 2.371
          4 3.360 2.506 2.472
          8 3.414 2.516 2.509
          16 3.582 2.692 2.696

          master branch with modifications to remove sync on addMutation()

          I successfully modified the TSBW to remove the synchronization modifier from the addMutation method. The multi-threaded binning test passes so I have some confidence that the data is correct. Use patch 1755-nosync-perf-test.patch

          Total Time

          Threads 1MB 10MB 100MB
          1 3.080 2.766 3.255
          2 2.972 2.420 3.137
          4 3.162 2.492 3.190
          8 3.100 2.658 3.623
          16 3.393 2.898 3.743

          Add Mutation Time

          Threads 1MB 10MB 100MB
          1 3.072 2.653 2.517
          2 2.965 2.371 2.527
          4 3.155 2.441 2.589
          8 3.092 2.602 2.961
          16 3.385 2.839 2.891

          I think the results are inconclusive. The tests run with MAC on localhost, so this is likely a best case scenario. I'd be interested to see this re-run on a real cluster.

          Show
          dlmarion Dave Marion added a comment - I wrote a new test that sends 1M mutations total using N threads with a BatchWriter buffer of different sizes. The test is run twice and the time discarded to account for JVM startup. Then the test is run 10 times and the average (in seconds) is reported for total time and time to add mutations. First, I added some code to the TSBW to determine that with my test data I was sending the following number of batches using 1, 10, and 100MB buffers: BatchWriter Max Memory Size Flushes To Accumulo 1MB 515 10MB 52 100MB 6 Here are the results of the test: master branch Using the patch 1755-perf-test.patch Total Time Threads 1MB 10MB 100MB 1 3.121 2.818 3.158 2 3.102 2.414 2.950 4 3.367 2.573 3.114 8 3.422 2.569 3.140 16 3.590 2.741 3.332 Add Mutation Time Threads 1MB 10MB 100MB 1 3.114 2.733 2.498 2 3.088 2.350 2.371 4 3.360 2.506 2.472 8 3.414 2.516 2.509 16 3.582 2.692 2.696 master branch with modifications to remove sync on addMutation() I successfully modified the TSBW to remove the synchronization modifier from the addMutation method. The multi-threaded binning test passes so I have some confidence that the data is correct. Use patch 1755-nosync-perf-test.patch Total Time Threads 1MB 10MB 100MB 1 3.080 2.766 3.255 2 2.972 2.420 3.137 4 3.162 2.492 3.190 8 3.100 2.658 3.623 16 3.393 2.898 3.743 Add Mutation Time Threads 1MB 10MB 100MB 1 3.072 2.653 2.517 2 2.965 2.371 2.527 4 3.155 2.441 2.589 8 3.092 2.602 2.961 16 3.385 2.839 2.891 I think the results are inconclusive. The tests run with MAC on localhost, so this is likely a best case scenario. I'd be interested to see this re-run on a real cluster.
          Hide
          dlmarion Dave Marion added a comment -

          Adam Fuchs Keith Turner FWIW, I have been doing some testing locally. I have not been able to show any real performance improvement. Running an application with this patch still shows multiple client threads blocking on TabletServerBatchWriter.addMutation() because of the synchronization on that method. All this patch did was make 1 of the client threads execute that method faster. I think the real performance improvement will be removing the synchronization modifier from the addMutation method.

          Show
          dlmarion Dave Marion added a comment - Adam Fuchs Keith Turner FWIW, I have been doing some testing locally. I have not been able to show any real performance improvement. Running an application with this patch still shows multiple client threads blocking on TabletServerBatchWriter.addMutation() because of the synchronization on that method. All this patch did was make 1 of the client threads execute that method faster. I think the real performance improvement will be removing the synchronization modifier from the addMutation method.
          Hide
          dlmarion Dave Marion added a comment -

          The previous implementation blocked all client threads from calling BatchWriter.addMutation(), meaning the clients could not do any work. In the new implementation the clients will be able to continue to do work, adding mutations, and even binning them in their own thread if necessary, before blocking.

          My statement from above is incorrect. We didn't remove the synchronization from TabletServerBatchWriter.addMutation. We only made it such that the binning is done either in a background thread or the current thread.

          Show
          dlmarion Dave Marion added a comment - The previous implementation blocked all client threads from calling BatchWriter.addMutation(), meaning the clients could not do any work. In the new implementation the clients will be able to continue to do work, adding mutations, and even binning them in their own thread if necessary, before blocking. My statement from above is incorrect. We didn't remove the synchronization from TabletServerBatchWriter.addMutation. We only made it such that the binning is done either in a background thread or the current thread.
          Hide
          dlmarion Dave Marion added a comment -

          I took the test that I created and ran it against master and my feature branch with 1 to 6 threads. I didn't see much difference, but looking back at it now I think its because the test pre-creates all of the mutations and adds them as fast as possible. The test is really for multi-threaded correctness rather than performance. In the new code there is still a synchronization point when adding the binned mutations to the queues for the tablet servers. The send threads in the test (local mini accumulo cluster) must be able to keep up with adding of the binned mutations. I don't expect that to be the case in a real deployment. Good news - performance wasn't worse.

          I think a better test is to write a simple multi-threaded client that creates and adds mutations to a common batch writer. Then, time the application as whole trying to insert N mutations with 1 to N client threads. The previous implementation blocked all client threads from calling BatchWriter.addMutation(), meaning the clients could not do any work. In the new implementation the clients will be able to continue to do work, adding mutations, and even binning them in their own thread if necessary, before blocking. I'll see if I can re-test with this new approach in the next few days. Do you have a different thought about how to test this?

          Show
          dlmarion Dave Marion added a comment - I took the test that I created and ran it against master and my feature branch with 1 to 6 threads. I didn't see much difference, but looking back at it now I think its because the test pre-creates all of the mutations and adds them as fast as possible. The test is really for multi-threaded correctness rather than performance. In the new code there is still a synchronization point when adding the binned mutations to the queues for the tablet servers. The send threads in the test (local mini accumulo cluster) must be able to keep up with adding of the binned mutations. I don't expect that to be the case in a real deployment. Good news - performance wasn't worse. I think a better test is to write a simple multi-threaded client that creates and adds mutations to a common batch writer. Then, time the application as whole trying to insert N mutations with 1 to N client threads. The previous implementation blocked all client threads from calling BatchWriter.addMutation(), meaning the clients could not do any work. In the new implementation the clients will be able to continue to do work, adding mutations, and even binning them in their own thread if necessary, before blocking. I'll see if I can re-test with this new approach in the next few days. Do you have a different thought about how to test this?
          Hide
          afuchs Adam Fuchs added a comment -

          Thanks, Dave. Got any perf testing results that show how much this improved things?

          Show
          afuchs Adam Fuchs added a comment - Thanks, Dave. Got any perf testing results that show how much this improved things?
          Hide
          dlmarion Dave Marion added a comment -

          Attaching original patch

          Show
          dlmarion Dave Marion added a comment - Attaching original patch
          Hide
          dlmarion Dave Marion added a comment -

          Committed to 1.6 and merged up to master. Built with 'mvn clean verify -DskipITs' on each branch and ran the new IT seperately.

          Show
          dlmarion Dave Marion added a comment - Committed to 1.6 and merged up to master. Built with 'mvn clean verify -DskipITs' on each branch and ran the new IT seperately.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion closed the pull request at:

          https://github.com/apache/accumulo/pull/75

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion closed the pull request at: https://github.com/apache/accumulo/pull/75
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-191403081

          Will apply manually. Thx.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-191403081 Will apply manually. Thx.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-191376367

          > I guarded the two methods that update the stats with trace logging checks.

          thats a nice improvement.

          I think this patch looks good now +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-191376367 > I guarded the two methods that update the stats with trace logging checks. thats a nice improvement. I think this patch looks good now +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-191295793

          Java 8 added accumulateAndGet to AtomicInt which can be used w/ lambdas to compute min max. Java 8 is so nice, but we can't use it yet.

          In java 8 could do the following

          ```java
          atomicInt.accumulateAndGet(update, Math::min)
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-191295793 Java 8 added accumulateAndGet to AtomicInt which can be used w/ lambdas to compute min max. Java 8 is so nice, but we can't use it yet. In java 8 could do the following ```java atomicInt.accumulateAndGet(update, Math::min) ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-191293786

          To make findbugs happy could CAS in a loop to compute the min and max, something like :

          ```java
          private static void computeMin(AtomicInt stat, int update) {
          int old = stat.get();
          while(!stat.compareAndSet(old, Math.min(old, update)))

          { old = stat.get(); }

          }
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-191293786 To make findbugs happy could CAS in a loop to compute the min and max, something like : ```java private static void computeMin(AtomicInt stat, int update) { int old = stat.get(); while(!stat.compareAndSet(old, Math.min(old, update))) { old = stat.get(); } } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-191289655

          > which is a findbugs issue, and I don't see the issue that it's complaining about

          Can you reproduce it locally via `mvn verify -DskipTests -Dcheckstyle.skip`? I know the jenkins output can sometimes be... a little weird to parse for w/e reason.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-191289655 > which is a findbugs issue, and I don't see the issue that it's complaining about Can you reproduce it locally via `mvn verify -DskipTests -Dcheckstyle.skip`? I know the jenkins output can sometimes be... a little weird to parse for w/e reason.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-191251406

          I looked into the build failure, which is a findbugs issue, and I don't see the issue that it's complaining about. AtomicInteger / Long do not implement Lock.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-191251406 I looked into the build failure, which is a findbugs issue, and I don't see the issue that it's complaining about. AtomicInteger / Long do not implement Lock.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-191248202

          So, I took a different approach. I believe that I resolved the race conditions by synchronizing on the objects being updated. This would still cause the performance penalty that you are talking about going to main memory. However, the stats objects being updated are only used if trace logging is enabled, so I guarded the two methods that update the stats with trace logging checks. Therefore, you will only pay a performance penalty if trace logging is enabled, but by turning on trace logging you should expect a little bit of a performance hit anyway.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-191248202 So, I took a different approach. I believe that I resolved the race conditions by synchronizing on the objects being updated. This would still cause the performance penalty that you are talking about going to main memory. However, the stats objects being updated are only used if trace logging is enabled, so I guarded the two methods that update the stats with trace logging checks. Therefore, you will only pay a performance penalty if trace logging is enabled, but by turning on trace logging you should expect a little bit of a performance hit anyway.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54645749

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -427,11 +437,11 @@ public void updateBinningStats(int count, long time, Map<String,TabletServerMuta
          updateBatchStats(binnedMutations);
          }

          • private synchronized void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations) {
          • tabletServersBatchSum += binnedMutations.size();
            + private void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations) {
            + tabletServersBatchSum.addAndGet(binnedMutations.size());
          • minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size());
          • maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size());
            + minTabletServersBatch.set(Math.min(minTabletServersBatch.get(), binnedMutations.size()));
            + maxTabletServersBatch.set(Math.max(maxTabletServersBatch.get(), binnedMutations.size()));
              • End diff –

          This method of updating has a race condition. Multiple threads could call get() before calling set(). Also all of these atomic vars require round trips to main memory (not sure how much this matters). I can think of two possible solutions. Both involve creating a BatchWriterStats class to make the code more managable.

          1. Could add a syncrhonized updateBatchStats method to BatchWriterStats. No longer syncing on main lock or making lots of trips to main mem.
          2. Could have an AtomicRef<BatchWriterStats>. To update batch writer stats read the ref, clone it, make updates to clone, update ref using CAS to ensure ref has not changed. If ref changed, then start over. This avoids lock, race conditions, and lots of trips to main memory.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54645749 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -427,11 +437,11 @@ public void updateBinningStats(int count, long time, Map<String,TabletServerMuta updateBatchStats(binnedMutations); } private synchronized void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations) { tabletServersBatchSum += binnedMutations.size(); + private void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations) { + tabletServersBatchSum.addAndGet(binnedMutations.size()); minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size()); maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size()); + minTabletServersBatch.set(Math.min(minTabletServersBatch.get(), binnedMutations.size())); + maxTabletServersBatch.set(Math.max(maxTabletServersBatch.get(), binnedMutations.size())); End diff – This method of updating has a race condition. Multiple threads could call get() before calling set(). Also all of these atomic vars require round trips to main memory (not sure how much this matters). I can think of two possible solutions. Both involve creating a BatchWriterStats class to make the code more managable. 1. Could add a syncrhonized updateBatchStats method to BatchWriterStats. No longer syncing on main lock or making lots of trips to main mem. 2. Could have an AtomicRef<BatchWriterStats>. To update batch writer stats read the ref, clone it, make updates to clone, update ref using CAS to ensure ref has not changed. If ref changed, then start over. This avoids lock, race conditions, and lots of trips to main memory.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-190927615

          Since MutationWriter.getLocator() is no longer called in sync, race conditions can occur. Could make the entire method synchronize on locators.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-190927615 Since MutationWriter.getLocator() is no longer called in sync, race conditions can occur. Could make the entire method synchronize on locators.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-190894684

          This is looking really nice, @dlmarion. :+1: from me.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-190894684 This is looking really nice, @dlmarion. :+1: from me.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-190894445

          Thanks josh, made the changes locally.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-190894445 Thanks josh, made the changes locally.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-190888928

          Realized I left a System.out.println in the test. Will remove locally and wait for feedback before pushing that change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-190888928 Realized I left a System.out.println in the test. Will remove locally and wait for feedback before pushing that change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54590327

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +710,33 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) throws InterruptedException {
            + if (null == mutationsToSend)
            + return;
            + boolean transferred = queue.tryTransfer(new Runnable() {
              • End diff –

          I was playing around with this locally. I added some prints and noticed the background thread was never binning. This was because the background thread was never started (adding vial thread pool API will start it). I fixed this issue by prestarting the threads, however I think it would be safer to add using the Executor API (because threads could idle time out). I was reading and found the following.

          • ThreadPoolExecutor javadocs recommend only using the queue for debugging and monitoring
          • The same thing can be accomplished using ThreadPoolExecutor.CallerRunsPolicy() and a SynchronousQueue()

          I got .CallerRunsPolicy+SynchronousQueue working here keith-turner/accumulo@bb28195

          I made some modifications to BatchWriterFlushIT in keith-turner/accumulo@3d3c252 . Running that I was able to see that sometimes the background thread binned and sometimes the foreground thread binned.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54590327 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +710,33 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) throws InterruptedException { + if (null == mutationsToSend) + return; + boolean transferred = queue.tryTransfer(new Runnable() { End diff – I was playing around with this locally. I added some prints and noticed the background thread was never binning. This was because the background thread was never started (adding vial thread pool API will start it). I fixed this issue by prestarting the threads, however I think it would be safer to add using the Executor API (because threads could idle time out). I was reading and found the following. ThreadPoolExecutor javadocs recommend only using the queue for debugging and monitoring The same thing can be accomplished using ThreadPoolExecutor.CallerRunsPolicy() and a SynchronousQueue() I got .CallerRunsPolicy+SynchronousQueue working here keith-turner/accumulo@bb28195 I made some modifications to BatchWriterFlushIT in keith-turner/accumulo@3d3c252 . Running that I was able to see that sometimes the background thread binned and sometimes the foreground thread binned.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54426939

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +711,33 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) throws InterruptedException {
            + if (null == mutationsToSend)
            + return;
            + boolean transferred = queue.tryTransfer(new Runnable() {
            + final MutationSet m = mutationsToSend;
            +
            + @Override
            + public void run() {
            + if (null != m)
            Unknown macro: { + try { + addMutations(m); + } catch (Exception e) { + updateUnknownErrors("Error processing mutation set", e); + } + }

            + }
            + });
            + if (!transferred) {
            + try {
            + addMutations(mutationsToSend);

              • End diff –

          Agreed. I believe that binning can be done concurrently. There is still a synchronization point later, after the binning, but I believe that is the right place to block. I updated this PR to make the Executor a daemon with a name.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54426939 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +711,33 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) throws InterruptedException { + if (null == mutationsToSend) + return; + boolean transferred = queue.tryTransfer(new Runnable() { + final MutationSet m = mutationsToSend; + + @Override + public void run() { + if (null != m) Unknown macro: { + try { + addMutations(m); + } catch (Exception e) { + updateUnknownErrors("Error processing mutation set", e); + } + } + } + }); + if (!transferred) { + try { + addMutations(mutationsToSend); End diff – Agreed. I believe that binning can be done concurrently. There is still a synchronization point later, after the binning, but I believe that is the right place to block. I updated this PR to make the Executor a daemon with a name.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54425522

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +711,33 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) throws InterruptedException {
            + if (null == mutationsToSend)
            + return;
            + boolean transferred = queue.tryTransfer(new Runnable() {
            + final MutationSet m = mutationsToSend;
            +
            + @Override
            + public void run() {
            + if (null != m)
            Unknown macro: { + try { + addMutations(m); + } catch (Exception e) { + updateUnknownErrors("Error processing mutation set", e); + } + }

            + }
            + });
            + if (!transferred) {
            + try {
            + addMutations(mutationsToSend);

              • End diff –

          I like this. Its an interesting approach of having multiple threads bin if it happens to be busy. All batchwriter code was written w/ assumption that one thread would bin, so this change needs careful review of all code called by binning code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54425522 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +711,33 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) throws InterruptedException { + if (null == mutationsToSend) + return; + boolean transferred = queue.tryTransfer(new Runnable() { + final MutationSet m = mutationsToSend; + + @Override + public void run() { + if (null != m) Unknown macro: { + try { + addMutations(m); + } catch (Exception e) { + updateUnknownErrors("Error processing mutation set", e); + } + } + } + }); + if (!transferred) { + try { + addMutations(mutationsToSend); End diff – I like this. Its an interesting approach of having multiple threads bin if it happens to be busy. All batchwriter code was written w/ assumption that one thread would bin, so this change needs careful review of all code called by binning code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54424394

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -629,6 +639,8 @@ public void run() {

          private static final int MUTATION_BATCH_SIZE = 1 << 17;
          private final ExecutorService sendThreadPool;
          + private final LinkedTransferQueue<Runnable> queue = new LinkedTransferQueue<>();
          + private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.SECONDS, queue);
          — End diff –

          Would be nice to name thread. Also it should be a daemon thread (I think it will not be by default). These are things I think that SimpleThreadPool does (may have name wrong, you were using it earlier).

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54424394 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -629,6 +639,8 @@ public void run() { private static final int MUTATION_BATCH_SIZE = 1 << 17; private final ExecutorService sendThreadPool; + private final LinkedTransferQueue<Runnable> queue = new LinkedTransferQueue<>(); + private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.SECONDS, queue); — End diff – Would be nice to name thread. Also it should be a daemon thread (I think it will not be by default). These are things I think that SimpleThreadPool does (may have name wrong, you were using it earlier).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-190235910

          Updated based on Keiths concerns, changed implementation such that:

          1. MutationWriter uses a ThreadPoolExecutor with 1 thread instead of a Timer. We might be able to change this later, not sure.
          2. The ThreadPoolExecutor uses a LinkedTransferQueue as its work queue
          3. MutationWriter.queueMutations attempts to put the MutationSet onto the work queue with no timeout. If there is a consumer available, then the MutationSet will get put onto the work queue and processed by a background thread. If no consumer is available (e.g. it's currently busy), then the MutationSet will be binned in the current thread.

          I also changed some variables to Atomic objects and removed the synchronization modifier from TSBW.updateBatchStats()

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-190235910 Updated based on Keiths concerns, changed implementation such that: 1. MutationWriter uses a ThreadPoolExecutor with 1 thread instead of a Timer. We might be able to change this later, not sure. 2. The ThreadPoolExecutor uses a LinkedTransferQueue as its work queue 3. MutationWriter.queueMutations attempts to put the MutationSet onto the work queue with no timeout. If there is a consumer available, then the MutationSet will get put onto the work queue and processed by a background thread. If no consumer is available (e.g. it's currently busy), then the MutationSet will be binned in the current thread. I also changed some variables to Atomic objects and removed the synchronization modifier from TSBW.updateBatchStats()
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54322064

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) {
          queued = new HashSet<String>();
          sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
          locators = new HashMap<String,TabletLocator>();
          + binTimer.schedule(new TimerTask() {
          + @Override
          + public void run() {
          + MutationSet m = queue.poll();
          + while (null != m) {
          + try

          { + addMutations(m); + }

          catch (Exception e)

          { + updateUnknownErrors("Error processing mutation set", e); + }

          + m = queue.poll();
          + }
          + }
          + }, 0, 500);
          — End diff –

          LOL, fair enough. I'm going to leave this as-is for the weekend. We can either leave it single threaded and lower the period, or change it to another Executor with multiple threads reading off a bounded/unbounded queue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54322064 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) { queued = new HashSet<String>(); sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName()); locators = new HashMap<String,TabletLocator>(); + binTimer.schedule(new TimerTask() { + @Override + public void run() { + MutationSet m = queue.poll(); + while (null != m) { + try { + addMutations(m); + } catch (Exception e) { + updateUnknownErrors("Error processing mutation set", e); + } + m = queue.poll(); + } + } + }, 0, 500); — End diff – LOL, fair enough. I'm going to leave this as-is for the weekend. We can either leave it single threaded and lower the period, or change it to another Executor with multiple threads reading off a bounded/unbounded queue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189540148

          > Leaving at 10 (or some other number) provides back pressure to the client threads adding mutations.

          I beilieve doing this makes it harder to understand the batch writer from a user perspective. This batch writer already provides back pressure based on memory usage. This constraint would provide back pressure in another way that I would not want to attempt to write up in user facing javadocs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189540148 > Leaving at 10 (or some other number) provides back pressure to the client threads adding mutations. I beilieve doing this makes it harder to understand the batch writer from a user perspective. This batch writer already provides back pressure based on memory usage. This constraint would provide back pressure in another way that I would not want to attempt to write up in user facing javadocs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54321821

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) {
          queued = new HashSet<String>();
          sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
          locators = new HashMap<String,TabletLocator>();
          + binTimer.schedule(new TimerTask() {
          + @Override
          + public void run() {
          + MutationSet m = queue.poll();
          + while (null != m) {
          + try

          { + addMutations(m); + }

          catch (Exception e)

          { + updateUnknownErrors("Error processing mutation set", e); + }

          + m = queue.poll();
          + }
          + }
          + }, 0, 500);
          — End diff –

          > Well, in the previous revision I was adding a MutationSet to an ExecutorService that had an unbounded queue and 1 thread. That's not immediate

          Yeah, its certainly not immediate. However, it probably much less than 1ms between adding to thread pool and thread calling run() on Runnable (when thread is idle). Relative to ~250ms, its like kinda immediate.

          > Can MutationSets be binned concurrently?

          I have no idea. Could not say yes w/o much more review of the code than I want to do late on a Fri afternoon . I suspect something would blow up.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54321821 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) { queued = new HashSet<String>(); sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName()); locators = new HashMap<String,TabletLocator>(); + binTimer.schedule(new TimerTask() { + @Override + public void run() { + MutationSet m = queue.poll(); + while (null != m) { + try { + addMutations(m); + } catch (Exception e) { + updateUnknownErrors("Error processing mutation set", e); + } + m = queue.poll(); + } + } + }, 0, 500); — End diff – > Well, in the previous revision I was adding a MutationSet to an ExecutorService that had an unbounded queue and 1 thread. That's not immediate Yeah, its certainly not immediate. However, it probably much less than 1ms between adding to thread pool and thread calling run() on Runnable (when thread is idle). Relative to ~250ms, its like kinda immediate. > Can MutationSets be binned concurrently? I have no idea. Could not say yes w/o much more review of the code than I want to do late on a Fri afternoon . I suspect something would blow up.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54321264

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) {
          queued = new HashSet<String>();
          sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
          locators = new HashMap<String,TabletLocator>();
          + binTimer.schedule(new TimerTask() {
          + @Override
          + public void run() {
          + MutationSet m = queue.poll();
          + while (null != m) {
          + try

          { + addMutations(m); + }

          catch (Exception e)

          { + updateUnknownErrors("Error processing mutation set", e); + }

          + m = queue.poll();
          + }
          + }
          + }, 0, 500);
          — End diff –

          Well, in the previous revision I was adding a MutationSet to an ExecutorService that had an unbounded queue and 1 thread. That's not immediate. Can MutationSets be binned concurrently? If so, then I can change the timer from a fixed delay to fixed rate. Thoughts?

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54321264 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) { queued = new HashSet<String>(); sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName()); locators = new HashMap<String,TabletLocator>(); + binTimer.schedule(new TimerTask() { + @Override + public void run() { + MutationSet m = queue.poll(); + while (null != m) { + try { + addMutations(m); + } catch (Exception e) { + updateUnknownErrors("Error processing mutation set", e); + } + m = queue.poll(); + } + } + }, 0, 500); — End diff – Well, in the previous revision I was adding a MutationSet to an ExecutorService that had an unbounded queue and 1 thread. That's not immediate. Can MutationSets be binned concurrently? If so, then I can change the timer from a fixed delay to fixed rate. Thoughts?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54320991

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) {
          queued = new HashSet<String>();
          sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
          locators = new HashMap<String,TabletLocator>();
          + binTimer.schedule(new TimerTask() {
          + @Override
          + public void run() {
          + MutationSet m = queue.poll();
          + while (null != m) {
          + try

          { + addMutations(m); + }

          catch (Exception e)

          { + updateUnknownErrors("Error processing mutation set", e); + }

          + m = queue.poll();
          + }
          + }
          + }, 0, 500);
          — End diff –

          Ok I think mutations should be binned and sent immediately. There are currently two cases when mutations are submitted to be binned and sent: when the users calls flush and when memory 1/2 full. Don't see a good reason to wait in either case.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54320991 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) { queued = new HashSet<String>(); sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName()); locators = new HashMap<String,TabletLocator>(); + binTimer.schedule(new TimerTask() { + @Override + public void run() { + MutationSet m = queue.poll(); + while (null != m) { + try { + addMutations(m); + } catch (Exception e) { + updateUnknownErrors("Error processing mutation set", e); + } + m = queue.poll(); + } + } + }, 0, 500); — End diff – Ok I think mutations should be binned and sent immediately. There are currently two cases when mutations are submitted to be binned and sent: when the users calls flush and when memory 1/2 full. Don't see a good reason to wait in either case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189534801

          Changing to unbounded is easy. Leaving at 10 (or some other number) provides back pressure to the client threads adding mutations. It's not 10 mutations, it's 10 MutationSets.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189534801 Changing to unbounded is easy. Leaving at 10 (or some other number) provides back pressure to the client threads adding mutations. It's not 10 mutations, it's 10 MutationSets.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54320640

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) {
          queued = new HashSet<String>();
          sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
          locators = new HashMap<String,TabletLocator>();
          + binTimer.schedule(new TimerTask() {
          + @Override
          + public void run() {
          + MutationSet m = queue.poll();
          + while (null != m) {
          + try

          { + addMutations(m); + }

          catch (Exception e)

          { + updateUnknownErrors("Error processing mutation set", e); + }

          + m = queue.poll();
          + }
          + }
          + }, 0, 500);
          — End diff –

          Yes, wake up every 1/2 second and bin mutations if there are any. No rationale, just thought it made sense. You have a different approach?

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54320640 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) { queued = new HashSet<String>(); sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName()); locators = new HashMap<String,TabletLocator>(); + binTimer.schedule(new TimerTask() { + @Override + public void run() { + MutationSet m = queue.poll(); + while (null != m) { + try { + addMutations(m); + } catch (Exception e) { + updateUnknownErrors("Error processing mutation set", e); + } + m = queue.poll(); + } + } + }, 0, 500); — End diff – Yes, wake up every 1/2 second and bin mutations if there are any. No rationale, just thought it made sense. You have a different approach?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189532729

          Thanks for doing this, @dlmarion. I think this approach is much easier to follow.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189532729 Thanks for doing this, @dlmarion. I think this approach is much easier to follow.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54319650

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) {
          queued = new HashSet<String>();
          sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
          locators = new HashMap<String,TabletLocator>();
          + binTimer.schedule(new TimerTask() {
          + @Override
          + public void run() {
          + MutationSet m = queue.poll();
          + while (null != m) {
          + try

          { + addMutations(m); + }

          catch (Exception e)

          { + updateUnknownErrors("Error processing mutation set", e); + }

          + m = queue.poll();
          + }
          + }
          + }, 0, 500);
          — End diff –

          Does this mean every 500ms check for something on the queue to bin? Whats the rational for this?

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54319650 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -638,6 +648,21 @@ public MutationWriter(int numSendThreads) { queued = new HashSet<String>(); sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName()); locators = new HashMap<String,TabletLocator>(); + binTimer.schedule(new TimerTask() { + @Override + public void run() { + MutationSet m = queue.poll(); + while (null != m) { + try { + addMutations(m); + } catch (Exception e) { + updateUnknownErrors("Error processing mutation set", e); + } + m = queue.poll(); + } + } + }, 0, 500); — End diff – Does this mean every 500ms check for something on the queue to bin? Whats the rational for this?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189530495

          Updated from comments by Keith and Josh.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189530495 Updated from comments by Keith and Josh.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189526782

          > I added a catch clause to startProcessing to catch the RejectedExecutionException that can be thrown. Were you thinking of something else?

          What you did is needed, but I was thinking of something else. If the call to `addMutations()` in the Runnable throws an unexpected exception, nothing will be done about that exception. The batch writer should transition to an error state. Thinking need to do something like the following.

          ```java
          void queueMutations(final MutationSet mutationsToSend) {
          queueThreadPool.submit(new Runnable() {
          public void run() {
          try

          { addMutations(mutationsToSend); }

          catch (Exception e)

          { updateUnknownErrors(e); }

          }
          });
          }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189526782 > I added a catch clause to startProcessing to catch the RejectedExecutionException that can be thrown. Were you thinking of something else? What you did is needed, but I was thinking of something else. If the call to `addMutations()` in the Runnable throws an unexpected exception, nothing will be done about that exception. The batch writer should transition to an error state. Thinking need to do something like the following. ```java void queueMutations(final MutationSet mutationsToSend) { queueThreadPool.submit(new Runnable() { public void run() { try { addMutations(mutationsToSend); } catch (Exception e) { updateUnknownErrors(e); } } }); } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54316255

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) {
            + queueThreadPool.submit(new Runnable() {
              • End diff –

          Taking your idea and going further, if we are only using one thread then we can use a timer driven approach and create zero Runnable objects. I'll update the code shortly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54316255 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) { + queueThreadPool.submit(new Runnable() { End diff – Taking your idea and going further, if we are only using one thread then we can use a timer driven approach and create zero Runnable objects. I'll update the code shortly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189523047

          I added a catch clause to startProcessing to catch the RejectedExecutionException that can be thrown. Were you thinking of something else?

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189523047 I added a catch clause to startProcessing to catch the RejectedExecutionException that can be thrown. Were you thinking of something else?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189513944

          > Addressed Keiths comments in the update.

          What about comment about exceptions?

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189513944 > Addressed Keiths comments in the update. What about comment about exceptions?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54311365

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) {
            + queueThreadPool.submit(new Runnable() {
              • End diff –

          Ultimately, I really don't have that strong of an opinion here. I think using the ExecutorService as a queue for MutationSets is a smell (instead of directly maintaining such a queue), but I can understand the argument for using the ExecutorService because it encompasses both approaches. I don't think there's a big reason one way or the other; I simply think that the pseudo-code outline above is clearer than Dave's current changes. If Keith and Dave are happy, then merge it and move on.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54311365 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) { + queueThreadPool.submit(new Runnable() { End diff – Ultimately, I really don't have that strong of an opinion here. I think using the ExecutorService as a queue for MutationSets is a smell (instead of directly maintaining such a queue), but I can understand the argument for using the ExecutorService because it encompasses both approaches. I don't think there's a big reason one way or the other; I simply think that the pseudo-code outline above is clearer than Dave's current changes. If Keith and Dave are happy, then merge it and move on.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54308100

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) {
            + queueThreadPool.submit(new Runnable() {
              • End diff –

          Keith was chattign with me on IRC about this. I've pseudocoded what I think would be a cleaner approach since we seem to be talking past each other:

          https://paste.apache.org/0GY6

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54308100 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) { + queueThreadPool.submit(new Runnable() { End diff – Keith was chattign with me on IRC about this. I've pseudocoded what I think would be a cleaner approach since we seem to be talking past each other: https://paste.apache.org/0GY6
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54307702

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) {
            + queueThreadPool.submit(new Runnable() {
              • End diff –

          > I didn't say there were multiple threads

          Didn't intend to imply you said that. Was comparing a thread pool with 1 thread to the following statement.

          > You just have one thread always running, looking for mutations to add.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54307702 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) { + queueThreadPool.submit(new Runnable() { End diff – > I didn't say there were multiple threads Didn't intend to imply you said that. Was comparing a thread pool with 1 thread to the following statement. > You just have one thread always running, looking for mutations to add.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54306350

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) {
            + queueThreadPool.submit(new Runnable() {
              • End diff –

          I didn't say there were multiple threads, there are multiple Runnables which need to be executed on that thread. This is what is unnecessary IMO

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54306350 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) { + queueThreadPool.submit(new Runnable() { End diff – I didn't say there were multiple threads, there are multiple Runnables which need to be executed on that thread. This is what is unnecessary IMO
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189464917

          Addressed Keiths comments in the update. Waiting for resolution on Josh's comment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189464917 Addressed Keiths comments in the update. Waiting for resolution on Josh's comment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54293942

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) {
            + queueThreadPool.submit(new Runnable() {
            + public void run() {
            + addMutations(mutationsToSend);
              • End diff –

          I meant unexpected exception

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54293942 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) { + queueThreadPool.submit(new Runnable() { + public void run() { + addMutations(mutationsToSend); End diff – I meant unexpected exception
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54293643

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) {
            + queueThreadPool.submit(new Runnable() {
              • End diff –

          There is single thread. The ExecutorService has one thread that waiting on a queue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54293643 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) { + queueThreadPool.submit(new Runnable() { End diff – There is single thread. The ExecutorService has one thread that waiting on a queue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54292893

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) {
            + queueThreadPool.submit(new Runnable() {
              • End diff –

          > Are you suggesting re-using the existing ExecutorService?

          Nope. I'm saying just make a single thread which is reading off some queue. The clients would be submitting mutations onto that queue. You just have one thread always running, looking for mutations to add. You avoid the whole creating, scheduling and starting the Runnable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54292893 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) { + queueThreadPool.submit(new Runnable() { End diff – > Are you suggesting re-using the existing ExecutorService? Nope. I'm saying just make a single thread which is reading off some queue. The clients would be submitting mutations onto that queue. You just have one thread always running, looking for mutations to add. You avoid the whole creating, scheduling and starting the Runnable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54292664

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) {
            + queueThreadPool.submit(new Runnable() {
              • End diff –

          Are you suggesting re-using the existing ExecutorService?

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54292664 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) { + queueThreadPool.submit(new Runnable() { End diff – Are you suggesting re-using the existing ExecutorService?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54290231

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -629,6 +630,7 @@ public void run() {

          private static final int MUTATION_BATCH_SIZE = 1 << 17;
          private final ExecutorService sendThreadPool;
          + private final ExecutorService queueThreadPool = new SimpleThreadPool(1, "QueueMutations");
          — End diff –

          I think a name with mutations and binning or binner in it would be more useful.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54290231 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -629,6 +630,7 @@ public void run() { private static final int MUTATION_BATCH_SIZE = 1 << 17; private final ExecutorService sendThreadPool; + private final ExecutorService queueThreadPool = new SimpleThreadPool(1, "QueueMutations"); — End diff – I think a name with mutations and binning or binner in it would be more useful.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54290076

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) {
            + queueThreadPool.submit(new Runnable() {
            + public void run() {
            + addMutations(mutationsToSend);
              • End diff –

          Need to ensure any exceptions are handled properly. If an expected exception happens here, mutations are lost, and the batch writer keeps working thats not good.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54290076 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) { + queueThreadPool.submit(new Runnable() { + public void run() { + addMutations(mutationsToSend); End diff – Need to ensure any exceptions are handled properly. If an expected exception happens here, mutations are lost, and the batch writer keeps working thats not good.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joshelser commented on a diff in the pull request:

          https://github.com/apache/accumulo/pull/75#discussion_r54289506

          — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java —
          @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN

          }

          • void addMutations(MutationSet mutationsToSend) {
            + void queueMutations(final MutationSet mutationsToSend) {
            + queueThreadPool.submit(new Runnable() {
              • End diff –

          I think it be simpler to use a producer/consumer model for the `MutationSet`'s with a single consumer thread instead of constantly submitting new `Runnable`'s. The consumer thread can essentially do the same thing: poll for a `MutationSet`, and then run `addMutations` with that object. It eliminates the need for another ExecutorService and repeatedly creating new Runnables. We might also be able to implement some back-pressure on the client via that Queue as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/75#discussion_r54289506 — Diff: core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java — @@ -699,7 +701,16 @@ else if (Tables.getTableState(context.getInstance(), table) == TableState.OFFLIN } void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) { + queueThreadPool.submit(new Runnable() { End diff – I think it be simpler to use a producer/consumer model for the `MutationSet`'s with a single consumer thread instead of constantly submitting new `Runnable`'s. The consumer thread can essentially do the same thing: poll for a `MutationSet`, and then run `addMutations` with that object. It eliminates the need for another ExecutorService and repeatedly creating new Runnables. We might also be able to implement some back-pressure on the client via that Queue as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189420402

          Talked with Keith offline and he suggested a simpler route. New changes uploaded.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189420402 Talked with Keith offline and he suggested a simpler route. New changes uploaded.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189385925

          > The set of mutations is still global,

          OIC. Your right, its not as bad as I thought. Although there is still problem. With the per thread mutation writers, mutations bound for the same tserver may never be merged in some cases. This situations are less likely to occur though. I am thinking of something like the following.

          • Thread 1 bins 1000 mutations for 20 tservers
          • 3 tserver queues are processed by send threads
          • Thread 2 bins 1000 mutations for 20 tserver (at this point 17 tservers from the 1st binning are not processed by send threads, those mutations are not merged together).

          This makes me wonder, why have a mutation writer per thread? Why not just add to the mutation writer outside of the sync?

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189385925 > The set of mutations is still global, OIC. Your right, its not as bad as I thought. Although there is still problem. With the per thread mutation writers, mutations bound for the same tserver may never be merged in some cases. This situations are less likely to occur though. I am thinking of something like the following. Thread 1 bins 1000 mutations for 20 tservers 3 tserver queues are processed by send threads Thread 2 bins 1000 mutations for 20 tserver (at this point 17 tservers from the 1st binning are not processed by send threads, those mutations are not merged together). This makes me wonder, why have a mutation writer per thread? Why not just add to the mutation writer outside of the sync?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dlmarion commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189378286

          The set of mutations is still global, if 3 threads add a mutation for the same tablet server and the MutationSet has not been swapped out (in startProcessing), then those mutations will be in the same RPC.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dlmarion commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189378286 The set of mutations is still global, if 3 threads add a mutation for the same tablet server and the MutationSet has not been swapped out (in startProcessing), then those mutations will be in the same RPC.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user keith-turner commented on the pull request:

          https://github.com/apache/accumulo/pull/75#issuecomment-189376744

          I think this change will result in a RPC per thread using the batch writer. If 3 threads add mutations for the same tablet server, then it will no longer send all of those in one RPC. If 100 threads add mutations for 100 tablets server, the batch writer will make ~10,000 RPCs instead of ~100.

          I have some idea on how to improve the batch writer based on what I learned while writing the conditional writer, I will try to write those up. It may or may not be useful.

          Show
          githubbot ASF GitHub Bot added a comment - Github user keith-turner commented on the pull request: https://github.com/apache/accumulo/pull/75#issuecomment-189376744 I think this change will result in a RPC per thread using the batch writer. If 3 threads add mutations for the same tablet server, then it will no longer send all of those in one RPC. If 100 threads add mutations for 100 tablets server, the batch writer will make ~10,000 RPCs instead of ~100. I have some idea on how to improve the batch writer based on what I learned while writing the conditional writer, I will try to write those up. It may or may not be useful.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user dlmarion opened a pull request:

          https://github.com/apache/accumulo/pull/75

          ACCUMULO-1755: Modified TSBW so that all client threads will not bloc…

          I ditched review board and went back to the drawing board.

          Before this change all client threads that were adding mutations to a BatchWriter would block
          when the mutations needed to be written to the tablet servers. This change gives each client
          thread their own MutationWriter object using a ThreadLocal. With this change only one client
          should block on adding the mutations to the sendThreadPool object, and allows the other client
          threads to push mutations onto a new MutationSet.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/apache/accumulo ACCUMULO-1755

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/accumulo/pull/75.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #75


          commit 9bd48cdbd25a7baf19bbda007a8e36f8ec81cf07
          Author: Dave Marion <dlmarion@apache.org>
          Date: 2016-02-26T16:26:53Z

          ACCUMULO-1755: Modified TSBW so that all client threads will not block on binMutations

          Before this change all client threads that were adding mutations to a BatchWriter would block
          when the mutations needed to be written to the tablet servers. This change gives each client
          thread their own MutationWriter object using a ThreadLocal. With this change only one client
          should block on adding the mutations to the sendThreadPool object, and allows the other client
          threads to push mutations onto a new MutationSet.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user dlmarion opened a pull request: https://github.com/apache/accumulo/pull/75 ACCUMULO-1755 : Modified TSBW so that all client threads will not bloc… I ditched review board and went back to the drawing board. Before this change all client threads that were adding mutations to a BatchWriter would block when the mutations needed to be written to the tablet servers. This change gives each client thread their own MutationWriter object using a ThreadLocal. With this change only one client should block on adding the mutations to the sendThreadPool object, and allows the other client threads to push mutations onto a new MutationSet. You can merge this pull request into a Git repository by running: $ git pull https://github.com/apache/accumulo ACCUMULO-1755 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/accumulo/pull/75.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #75 commit 9bd48cdbd25a7baf19bbda007a8e36f8ec81cf07 Author: Dave Marion <dlmarion@apache.org> Date: 2016-02-26T16:26:53Z ACCUMULO-1755 : Modified TSBW so that all client threads will not block on binMutations Before this change all client threads that were adding mutations to a BatchWriter would block when the mutations needed to be written to the tablet servers. This change gives each client thread their own MutationWriter object using a ThreadLocal. With this change only one client should block on adding the mutations to the sendThreadPool object, and allows the other client threads to push mutations onto a new MutationSet.
          Hide
          afuchs Adam Fuchs added a comment -

          I don't think the continuous ingest client uses multiple client threads per process (outside of those created in the BatchWriter). However, you could probably tweak the client to make it closer to your scenario. I think the overall test framework probably gives enough parameters to fit it to your scenario unless you're doing something with a very non-uniform distribution across tablets or very non-uniform key/value sizes.

          Show
          afuchs Adam Fuchs added a comment - I don't think the continuous ingest client uses multiple client threads per process (outside of those created in the BatchWriter). However, you could probably tweak the client to make it closer to your scenario. I think the overall test framework probably gives enough parameters to fit it to your scenario unless you're doing something with a very non-uniform distribution across tablets or very non-uniform key/value sizes.
          Hide
          dlmarion Dave Marion added a comment -

          No performance numbers, just seeing BLOCKED threads in a stack trace . I'll see what I can do about getting some performance numbers with and without my final patch. Do you think continuous ingest would be a good framework for this?

          Show
          dlmarion Dave Marion added a comment - No performance numbers, just seeing BLOCKED threads in a stack trace . I'll see what I can do about getting some performance numbers with and without my final patch. Do you think continuous ingest would be a good framework for this?
          Hide
          afuchs Adam Fuchs added a comment -

          Sounds like you have a good scenario to support the changes you're proposing. Do you have any performance measurements or profiling results you can share?

          Show
          afuchs Adam Fuchs added a comment - Sounds like you have a good scenario to support the changes you're proposing. Do you have any performance measurements or profiling results you can share?
          Hide
          dlmarion Dave Marion added a comment -

          Adam, I have this issue now where i have N clients sharing a batch writer. As you noted in the description, all the threads wait on binning mutations. I could use a batch writer per thread, and that may be the solution in the end. I think I can remove the synchronized modifier from addMutation, but I think in the end I may just be pushing the problem to an area of the code that the client has no control over. I'm interested in solving this issue though, any time you can spare would be appreciated.

          Show
          dlmarion Dave Marion added a comment - Adam, I have this issue now where i have N clients sharing a batch writer. As you noted in the description, all the threads wait on binning mutations. I could use a batch writer per thread, and that may be the solution in the end. I think I can remove the synchronized modifier from addMutation, but I think in the end I may just be pushing the problem to an area of the code that the client has no control over. I'm interested in solving this issue though, any time you can spare would be appreciated.
          Hide
          afuchs Adam Fuchs added a comment -

          Dave, thanks for resurrecting this ticket! Looks like your implementation is oriented towards the "don't hold a lock while binning" branch. This should work well for a multi-threaded client sharing a BatchWriter, but probably won't affect the performance of a single-threaded client. Do you have any perf tests you're using to evaluate how well your solution works? I don't remember whether any of our examples use a multi-threaded "mutation producer" pattern.

          One thing we might consider is whether we want to try to optimize the case of users writing simpler single-threaded clients and letting Accumulo handle all of the parallelization. That might lead us to an alternative implementation of moving the binning operation to another thread.

          Show
          afuchs Adam Fuchs added a comment - Dave, thanks for resurrecting this ticket! Looks like your implementation is oriented towards the "don't hold a lock while binning" branch. This should work well for a multi-threaded client sharing a BatchWriter, but probably won't affect the performance of a single-threaded client. Do you have any perf tests you're using to evaluate how well your solution works? I don't remember whether any of our examples use a multi-threaded "mutation producer" pattern. One thing we might consider is whether we want to try to optimize the case of users writing simpler single-threaded clients and letting Accumulo handle all of the parallelization. That might lead us to an alternative implementation of moving the binning operation to another thread.
          Hide
          elserj Josh Elser added a comment -

          At a glance, wouldn't a simple queue that we could dump mutations on be a simple change (would just require some memory mgmt controls)?

          In hindsight, this is pretty much what MutationSet is already doing

          Show
          elserj Josh Elser added a comment - At a glance, wouldn't a simple queue that we could dump mutations on be a simple change (would just require some memory mgmt controls)? In hindsight, this is pretty much what MutationSet is already doing
          Hide
          elserj Josh Elser added a comment -

          Just saw the reviewboard link. Thanks, Dave Marion!

          Show
          elserj Josh Elser added a comment - Just saw the reviewboard link. Thanks, Dave Marion !
          Show
          dlmarion Dave Marion added a comment - https://reviews.apache.org/r/43957/
          Hide
          elserj Josh Elser added a comment -

          Maybe something for 2.0? I wasn't looking to do an entire rewrite, just remove some of the locking.

          Perhaps. If it doesn't make things really complicated, it's fine, obviously. I was just thinking that it seems like there is a much easier way to tackle the problem by throwing some memory at the problem (maybe that's a flawed idea in itself too). You planning to throw up a patch with your ideas? That would definitely help me see exactly what you were thinking.

          Show
          elserj Josh Elser added a comment - Maybe something for 2.0? I wasn't looking to do an entire rewrite, just remove some of the locking. Perhaps. If it doesn't make things really complicated, it's fine, obviously. I was just thinking that it seems like there is a much easier way to tackle the problem by throwing some memory at the problem (maybe that's a flawed idea in itself too). You planning to throw up a patch with your ideas? That would definitely help me see exactly what you were thinking.
          Hide
          dlmarion Dave Marion added a comment -

          Maybe something for 2.0? I wasn't looking to do an entire rewrite, just remove some of the locking.

          Show
          dlmarion Dave Marion added a comment - Maybe something for 2.0? I wasn't looking to do an entire rewrite, just remove some of the locking.
          Hide
          elserj Josh Elser added a comment -

          At a glance, wouldn't a simple queue that we could dump mutations on be a simple change (would just require some memory mgmt controls)? Clients can push onto the queue to buffer mutations as long as there is free memory. Heck, we could even let users pass in the queue (and totally ignore the memory control problem). The BW can then consume and bin mutations in batches from the queue. It seems like that might be more simple to me, but I might be missing some details too

          Show
          elserj Josh Elser added a comment - At a glance, wouldn't a simple queue that we could dump mutations on be a simple change (would just require some memory mgmt controls)? Clients can push onto the queue to buffer mutations as long as there is free memory. Heck, we could even let users pass in the queue (and totally ignore the memory control problem). The BW can then consume and bin mutations in batches from the queue. It seems like that might be more simple to me, but I might be missing some details too
          Hide
          dlmarion Dave Marion added a comment - - edited

          We could solve this by:

          1. Making MutationSet.mutations a ConcurrentHashMap
          2. Making MutationSet.memoryUsed an AtomicLong
          3. Not synchronizing on access to TabletServerBatchWriter.mutations
          4. Changing TabletServerBatchWriter.mutations to an AtomicReference so that it is safe to swap it out in startProcessing()
          5. In startProcessing(), swap in a new MutationSet then add the mutations from the previous MutationSet to the writer.

          Show
          dlmarion Dave Marion added a comment - - edited We could solve this by: 1. Making MutationSet.mutations a ConcurrentHashMap 2. Making MutationSet.memoryUsed an AtomicLong 3. Not synchronizing on access to TabletServerBatchWriter.mutations 4. Changing TabletServerBatchWriter.mutations to an AtomicReference so that it is safe to swap it out in startProcessing() 5. In startProcessing(), swap in a new MutationSet then add the mutations from the previous MutationSet to the writer.
          Hide
          kturner Keith Turner added a comment -

          Limiting binning to one thread may be unnecessary. Could bin in parallel and do a synchronous merge, similar to map reduce job w/ one reducer.

          Show
          kturner Keith Turner added a comment - Limiting binning to one thread may be unnecessary. Could bin in parallel and do a synchronous merge, similar to map reduce job w/ one reducer.

            People

            • Assignee:
              dlmarion Dave Marion
              Reporter:
              afuchs Adam Fuchs
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 2h
                2h

                  Development