Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2675

KafkaTridentSpoutOpaque not committing offsets to Kafka

    Details

      Description

      Every time I restart the topology the spout was picking the earliest message even though poll strategy is set UNCOMMITTED_EARLIEST. I looked at Kafka's __consumer_offsets topic to see if spout (consumer) is committing the offsets but did not find any commits. I am not even able to locate the code in the KafkaTridentSpoutEmitter class where we are updating the commits?

      conf.put(Config.TOPOLOGY_DEBUG, true);
      conf.put(Config.TOPOLOGY_WORKERS, 1);
      conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
      conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
      conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new String[]

      {"localhost"}

      ));
      conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);

      protected static KafkaSpoutConfig<String, String> getPMStatKafkaSpoutConfig()

      { ByTopicRecordTranslator<String, String> byTopic = new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), r.value()), new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM); return new KafkaSpoutConfig.Builder<String, String>(Utils.getBrokerHosts(), StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName()) .setMaxPartitionFectchBytes(10 * 1024) // 10 KB .setRetry(getRetryService()) .setOffsetCommitPeriodMs(10_000) .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) .setMaxUncommittedOffsets(250) .setProp("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer") .setProp("schema.registry.url","http://localhost:8081") .setProp("specific.avro.reader",true) .setGroupId(AGGREGATION_CONSUMER_GROUP) .setRecordTranslator(byTopic).build(); }

      Stream pmStatStream =
      topology.newStream("statStream", new KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)

      storm-version - 1.1.0

        Attachments

          Activity

            People

            • Assignee:
              Srdo Stig Rohde Døssing
              Reporter:
              Preet Preet Puri
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 1h 40m
                1h 40m