Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2675

KafkaTridentSpoutOpaque not committing offsets to Kafka

    XMLWordPrintableJSON

Details

    Description

      Every time I restart the topology the spout was picking the earliest message even though poll strategy is set UNCOMMITTED_EARLIEST. I looked at Kafka's __consumer_offsets topic to see if spout (consumer) is committing the offsets but did not find any commits. I am not even able to locate the code in the KafkaTridentSpoutEmitter class where we are updating the commits?

      conf.put(Config.TOPOLOGY_DEBUG, true);
      conf.put(Config.TOPOLOGY_WORKERS, 1);
      conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
      conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
      conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new String[]

      {"localhost"}

      ));
      conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);

      protected static KafkaSpoutConfig<String, String> getPMStatKafkaSpoutConfig()

      { ByTopicRecordTranslator<String, String> byTopic = new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), r.value()), new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM); return new KafkaSpoutConfig.Builder<String, String>(Utils.getBrokerHosts(), StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName()) .setMaxPartitionFectchBytes(10 * 1024) // 10 KB .setRetry(getRetryService()) .setOffsetCommitPeriodMs(10_000) .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) .setMaxUncommittedOffsets(250) .setProp("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer") .setProp("schema.registry.url","http://localhost:8081") .setProp("specific.avro.reader",true) .setGroupId(AGGREGATION_CONSUMER_GROUP) .setRecordTranslator(byTopic).build(); }

      Stream pmStatStream =
      topology.newStream("statStream", new KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)

      storm-version - 1.1.0

      Attachments

        Activity

          People

            srdo Stig Rohde Døssing
            Preet Preet Puri
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 1h 40m
                1h 40m