Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2231

NULL in DisruptorQueue while multi-threaded ack

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.0.1, 1.1.0
    • Fix Version/s: 2.0.0, 1.2.0, 1.1.2, 1.0.5
    • Component/s: storm-core
    • Labels:
      None

      Description

      I use simple topology with one spout (9 workers) and one bolt (9 workers).
      I have topology.backpressure.enable: false in storm.yaml.
      Spouts send about 10 000 000 tuples in 10 minutes. Pending for spout is 80 000.
      Bolts buffer theirs tuples for 60 seconds and flush to database and ack tuples in parallel (10 threads).
      I read that OutputCollector can be used in many threads safely, so i use it.
      I don't have any bottleneck in bolts(flushing to database) or spouts(kafka spout), but about 2% of tuples fail due to tuple processing timeout (fails are recordered in spout stats only).
      I am sure that bolts ack all tuples. But some of acks don't come to spouts.

      While multi-threaded acking i see many errors in worker logs like that:
      2016-12-01 13:21:10.741 o.a.s.u.DisruptorQueue [ERROR] NULL found in disruptor-executor[3 3]-send-queue:853877

      I tried to use synchronized wrapper around OutputCollector to fix the error. But it didn't help.

      I found the workaround that helps me: i do all processing in bolt in multiple threads but call OutputCollector.ack methods in a one single separate thread.

      I think Storm has an error in the multi-threaded use of OutputCollector.

      If my topology has much less load, like 500 000 tuples per 10 minutes, then i don't lose any acks.

        Issue Links

          Activity

          Hide
          kevinconaway Kevin Conaway added a comment -

          I'm also seeing this in 1.1.0

          Show
          kevinconaway Kevin Conaway added a comment - I'm also seeing this in 1.1.0
          Hide
          kabhwan Jungtaek Lim added a comment -

          Alexander Kharitonov Kevin Conaway
          Could you give us some information around this? Also could you share why you're launching threads in bolt and ack from there instead of increasing tasks or additional bolts? For me it is possible use cases but didn't see the case actually.

          Show
          kabhwan Jungtaek Lim added a comment - Alexander Kharitonov Kevin Conaway Could you give us some information around this? Also could you share why you're launching threads in bolt and ack from there instead of increasing tasks or additional bolts? For me it is possible use cases but didn't see the case actually.
          Hide
          kabhwan Jungtaek Lim added a comment -

          Alexander Kharitonov Kevin Conaway
          I guess I found suspicious spot, but possible fixes may affect performance so would like to spend some time to do performance tests on fixes.
          I didn't reproduce the issue and don't know it is easy to reproduce, so if one of you can help testing and verifying the patch it should be really helpful.

          Show
          kabhwan Jungtaek Lim added a comment - Alexander Kharitonov Kevin Conaway I guess I found suspicious spot, but possible fixes may affect performance so would like to spend some time to do performance tests on fixes. I didn't reproduce the issue and don't know it is easy to reproduce, so if one of you can help testing and verifying the patch it should be really helpful.
          Hide
          kevinconaway Kevin Conaway added a comment -

          In our case the bolts send data to an external service asynchronously. The tuples are acked in a callback listener on the future that is returned from this call.

          We see this when occur when we get a large spike in traffic

          We are happy to take a look at the patch

          Show
          kevinconaway Kevin Conaway added a comment - In our case the bolts send data to an external service asynchronously. The tuples are acked in a callback listener on the future that is returned from this call. We see this when occur when we get a large spike in traffic We are happy to take a look at the patch
          Hide
          kabhwan Jungtaek Lim added a comment -

          Kevin Conaway
          Please take a look at https://github.com/apache/storm/pull/2293
          Please also let me know if you would help to reproduce but prefer having patched specific version of jar instead of building it yourself. I'll build and provide it.

          Show
          kabhwan Jungtaek Lim added a comment - Kevin Conaway Please take a look at https://github.com/apache/storm/pull/2293 Please also let me know if you would help to reproduce but prefer having patched specific version of jar instead of building it yourself. I'll build and provide it.
          Hide
          kevinconaway Kevin Conaway added a comment -

          Will this patch apply on 1.1.x?

          Is the workaround here to synchronize on the outputcollector before using it? Did i read your pr comment correctly?

          Show
          kevinconaway Kevin Conaway added a comment - Will this patch apply on 1.1.x? Is the workaround here to synchronize on the outputcollector before using it? Did i read your pr comment correctly?
          Hide
          kabhwan Jungtaek Lim added a comment -

          Kevin Conaway
          I just merged this into all 1.x version lines, so the patch will be shipped to 1.1.2. I would be appreciated if you can test this patch before releasing.

          Btw, I guess only the case that "wrapping output collector with synchronized keyword doesn't help" is N tasks : 1 executor. If it is not your case and wrapping output collector doesn't work in your case, please let me know details on your topology.

          Show
          kabhwan Jungtaek Lim added a comment - Kevin Conaway I just merged this into all 1.x version lines, so the patch will be shipped to 1.1.2. I would be appreciated if you can test this patch before releasing. Btw, I guess only the case that "wrapping output collector with synchronized keyword doesn't help" is N tasks : 1 executor. If it is not your case and wrapping output collector doesn't work in your case, please let me know details on your topology.
          Hide
          kabhwan Jungtaek Lim added a comment -

          Merged to master, 1.x, 1.1.x, 1.0.x branches.

          Show
          kabhwan Jungtaek Lim added a comment - Merged to master, 1.x, 1.1.x, 1.0.x branches.
          Hide
          kevinconaway Kevin Conaway added a comment - - edited

          Jungtaek Lim Per your suggestion, I am now synchronzing on the outputcollector before acking or failing a tuple. However, I still see the issue when a spike of traffic arrives.

          Show
          kevinconaway Kevin Conaway added a comment - - edited Jungtaek Lim Per your suggestion, I am now synchronzing on the outputcollector before acking or failing a tuple. However, I still see the issue when a spike of traffic arrives.
          Hide
          kevinconaway Kevin Conaway added a comment -

          Jungtaek Lim btw I can confirm that your patch fixes the issue with this test case that I wrote:

          Given that the underlying issue is a race condition, you may need to run it a few times but the error always shows up for me.

          Per my comment above, attempting to synchronize on the outputcollector also doesn't work when the disruptor-queue producer type is single threaded. I'm not sure what the workaround should be without your patch in place

          package org.apache.storm;
          
          import org.apache.storm.task.OutputCollector;
          import org.apache.storm.task.TopologyContext;
          import org.apache.storm.testing.AckFailMapTracker;
          import org.apache.storm.testing.FeederSpout;
          import org.apache.storm.testing.MkClusterParam;
          import org.apache.storm.testing.TestJob;
          import org.apache.storm.testing.TrackedTopology;
          import org.apache.storm.topology.OutputFieldsDeclarer;
          import org.apache.storm.topology.TopologyBuilder;
          import org.apache.storm.topology.base.BaseRichBolt;
          import org.apache.storm.tuple.Fields;
          import org.apache.storm.tuple.Tuple;
          import org.apache.storm.tuple.Values;
          import org.junit.Test;
          
          import java.util.ArrayList;
          import java.util.List;
          import java.util.Map;
          import java.util.UUID;
          import java.util.concurrent.BlockingQueue;
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;
          import java.util.concurrent.LinkedBlockingQueue;
          import java.util.concurrent.TimeUnit;
          
          public class Storm2231 {
          
              @Test
              public void runTest() {
                  Config daemonConfig = new Config();
                  daemonConfig.put(Config.STORM_LOCAL_HOSTNAME, "localhost");
          
                  MkClusterParam clusterParams = new MkClusterParam();
                  clusterParams.setDaemonConf(daemonConfig);
          
                  Testing.withTrackedCluster(clusterParams, new TestJob() {
                      @Override
                      public void run(ILocalCluster cluster) throws Exception {
                          AckFailMapTracker tracker = new AckFailMapTracker();
          
                          final FeederSpout spout = new FeederSpout(new Fields("message"));
                          spout.setAckFailDelegate(tracker);
          
                          TopologyBuilder builder = new TopologyBuilder();
                          builder.setSpout("spout", spout, 50);
                          builder.setBolt("bolt", new ExampleBolt(), 10).shuffleGrouping("spout");
          
                          Config topologyConfig = new Config();
                          topologyConfig.setDebug(false);
          
                          TrackedTopology tracked = Testing.mkTrackedTopology(cluster, builder.createTopology());
                          cluster.submitTopology(String.valueOf(System.nanoTime()), topologyConfig, tracked.getTopology());
          
                          ExecutorService service = Executors.newCachedThreadPool();
          
                          for (int i=0; i < 20; i++) {
                              service.submit(new Runnable() {
                                  @Override
                                  public void run() {
                                      for (int j=0; j <= 500_000; j++) {
                                          String messageId = Thread.currentThread().getId() + "-" + UUID.randomUUID().toString();
                                          synchronized (spout) {
                                              spout.feed(new Values(messageId), messageId);
                                          }
                                      }
                                  }
                              });
                          }
          
                          service.shutdown();
                          service.awaitTermination(1, TimeUnit.DAYS);
                      }
                  });
              }
          
              public static class ExampleBolt extends BaseRichBolt {
          
                  private transient ExecutorService executorService;
                  private transient BlockingQueue<Runnable> callbackQueue;
                  private transient OutputCollector outputCollector;
          
                  @Override
                  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
                      outputCollector = collector;
                      callbackQueue = new LinkedBlockingQueue<>();
                      executorService = Executors.newCachedThreadPool();
                      for (int i =0; i < 5; i++) {
                          executorService.submit(new Runnable() {
                              @Override
                              public void run() {
                                  while (!Thread.currentThread().isInterrupted()) {
                                      List<Runnable> callbacks = new ArrayList<>();
                                      callbackQueue.drainTo(callbacks);
                                      for (Runnable callback : callbacks) {
                                          callback.run();
                                      }
                                      try {
                                          Thread.sleep(100);
                                      } catch (InterruptedException ex) {
                                          Thread.currentThread().interrupt();
                                      }
                                  }
                              }
                          });
                      }
                  }
          
                  @Override
                  public void execute(final Tuple input) {
                      try {
                          callbackQueue.put(new Runnable() {
                              @Override
                              public void run() {
                                  outputCollector.ack(input);
                              }
                          });
                      } catch (InterruptedException ex) {
                          Thread.currentThread().interrupt();
                          outputCollector.fail(input);
                      }
                  }
          
                  @Override
                  public void declareOutputFields(OutputFieldsDeclarer declarer) {
          
                  }
          
                  @Override
                  public void cleanup() {
                      executorService.shutdownNow();
                  }
              }
          }
          
          Show
          kevinconaway Kevin Conaway added a comment - Jungtaek Lim btw I can confirm that your patch fixes the issue with this test case that I wrote: Given that the underlying issue is a race condition, you may need to run it a few times but the error always shows up for me. Per my comment above, attempting to synchronize on the outputcollector also doesn't work when the disruptor-queue producer type is single threaded. I'm not sure what the workaround should be without your patch in place package org.apache.storm; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.testing.AckFailMapTracker; import org.apache.storm.testing.FeederSpout; import org.apache.storm.testing.MkClusterParam; import org.apache.storm.testing.TestJob; import org.apache.storm.testing.TrackedTopology; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class Storm2231 { @Test public void runTest() { Config daemonConfig = new Config(); daemonConfig.put(Config.STORM_LOCAL_HOSTNAME, "localhost" ); MkClusterParam clusterParams = new MkClusterParam(); clusterParams.setDaemonConf(daemonConfig); Testing.withTrackedCluster(clusterParams, new TestJob() { @Override public void run(ILocalCluster cluster) throws Exception { AckFailMapTracker tracker = new AckFailMapTracker(); final FeederSpout spout = new FeederSpout( new Fields( "message" )); spout.setAckFailDelegate(tracker); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "spout" , spout, 50); builder.setBolt( "bolt" , new ExampleBolt(), 10).shuffleGrouping( "spout" ); Config topologyConfig = new Config(); topologyConfig.setDebug( false ); TrackedTopology tracked = Testing.mkTrackedTopology(cluster, builder.createTopology()); cluster.submitTopology( String .valueOf( System .nanoTime()), topologyConfig, tracked.getTopology()); ExecutorService service = Executors.newCachedThreadPool(); for ( int i=0; i < 20; i++) { service.submit( new Runnable () { @Override public void run() { for ( int j=0; j <= 500_000; j++) { String messageId = Thread .currentThread().getId() + "-" + UUID.randomUUID().toString(); synchronized (spout) { spout.feed( new Values(messageId), messageId); } } } }); } service.shutdown(); service.awaitTermination(1, TimeUnit.DAYS); } }); } public static class ExampleBolt extends BaseRichBolt { private transient ExecutorService executorService; private transient BlockingQueue< Runnable > callbackQueue; private transient OutputCollector outputCollector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { outputCollector = collector; callbackQueue = new LinkedBlockingQueue<>(); executorService = Executors.newCachedThreadPool(); for ( int i =0; i < 5; i++) { executorService.submit( new Runnable () { @Override public void run() { while (! Thread .currentThread().isInterrupted()) { List< Runnable > callbacks = new ArrayList<>(); callbackQueue.drainTo(callbacks); for ( Runnable callback : callbacks) { callback.run(); } try { Thread .sleep(100); } catch (InterruptedException ex) { Thread .currentThread().interrupt(); } } } }); } } @Override public void execute( final Tuple input) { try { callbackQueue.put( new Runnable () { @Override public void run() { outputCollector.ack(input); } }); } catch (InterruptedException ex) { Thread .currentThread().interrupt(); outputCollector.fail(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void cleanup() { executorService.shutdownNow(); } } }

            People

            • Assignee:
              kabhwan Jungtaek Lim
              Reporter:
              chemist Alexander Kharitonov
            • Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 1h 20m
                1h 20m

                  Development