Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-963

Frozen topology (KafkaSpout + Multilang bolt)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.9.4, 0.9.5, 0.9.6
    • None
    • - VMware ESX 5.5
      - Ubuntu Server 14.04 LTS (kernel 3.16.0-41-generic)
      - Java (TM) SE Runtime Environment (build 1.8.0_45-b14)
      - Python 2.7.6 (default, Jun 22 2015, 17:58:13)
      - Zookeeper 3.4.6

    Description

      Hi,

      We've got a pretty simple topology running with Storm 0.9.5 (tried also with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:

      kafkaSpout (3) -----> processBolt (12)

      Some info:

      • kafkaSpout reads from a topic with 3 partitions and 2 replications
      • processBolt iterates throught the message and saves the results in MongoDB
      • processBolt is implemented in Python and has a storm.log("I'm doing something") just to add a simple debug message in the logs
      • The messages can be quite big (~25-40 MB) and are in JSON format
      • The kafka topic has a retention of 2 hours
      • We use the same ZooKeeper cluster to both Kafka and Storm

      The topology gets frozen after several hours (not days) running. We don't see any message in the logs... In fact, the periodic message from s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine, the message from the Bolt also dissapears. Logs are copy/pasted further on. If we redeploy the topology everything starts to work again until it becomes frozen again.

      Our kafkaSpout config is:

      ZkHosts zkHosts = new ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
      SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic", "/topic/ourclientid", "ourclientid");
      kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
      kafkaConfig.fetchSizeBytes = 50*1024*1024;
      kafkaConfig.bufferSizeBytes = 50*1024*1024;
      

      We've also tried setting the following options

      kafkaConfig.forceFromStart = true;
      kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); // Also with kafka.api.OffsetRequest.LatestTime();
      kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
      

      Right now the topology is running without acking the messages since there's a bug in kafkaSpout with failed messages and deleted offsets in Kafka.

      This is what can be seen in the logs in one of the workers:

      2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
      2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
      2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
      2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
      2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing partition manager connections
      2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, 2=kafka3:9092}}
      2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned [Partition{host=kafka2, partition=1}]
      2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted partition managers: []
      2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New partition managers: []
      2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished refreshing
      2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
      2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
      2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing partition manager connections
      2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, 2=kafka3:9092}}
      2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned [Partition{host=kafka2:9092, partition=1}]
      2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted partition managers: []
      2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New partition managers: []
      2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished refreshing
      2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing partition manager connections
      2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, 2=kafka3:9092}}
      2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned [Partition{host=kafka2:9092, partition=1}]
      2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted partition managers: []
      2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New partition managers: []
      2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished refreshing
      2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing partition manager connections
      2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, 2=kafka3:9092}}
      2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned [Partition{host=kafka2:9092, partition=1}]
      2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted partition managers: []
      2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New partition managers: []
      2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished refreshing
      2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing partition manager connections
      2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, 2=kafka3:9092}}
      2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned [Partition{host=kafka2:9092, partition=1}]
      2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted partition managers: []
      2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New partition managers: []
      2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished refreshing
      

      and then it becomes frozen. Nothing is written into the nimbus log. We've checked the offsets in ZooKeeper and they're not updated:

      {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
      cZxid = 0x100028958
      ctime = Wed Jul 01 12:22:36 CEST 2015
      mZxid = 0x100518527
      mtime = Thu Jul 23 12:42:41 CEST 2015
      pZxid = 0x100028958
      cversion = 0
      dataVersion = 446913
      aclVersion = 0
      ephemeralOwner = 0x0
      dataLength = 183
      numChildren = 0
      

      Any ideas of what we could be missing?

      PS: This was sent to the Storm user's mailing list and got 0 replies :\

      Attachments

        1. jstack-bopcat.txt
          45 kB
          Kirill Prasalov
        2. jstack.txt
          97 kB
          Abhishek Agarwal
        3. dump
          241 kB
          darsh221

        Issue Links

          Activity

            People

              Unassigned Unassigned
              alexsobrino Alex Sobrino
              Votes:
              2 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated: