Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10035

Improve the AbstractResetIntegrationTest

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 2.7.0
    • streams, unit tests
    • None

    Description

      In the test: AbstractResetIntegrationTest, there are several places like below:

       

      streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT * 100);
      

      which leverage `Long` to `String` conversion as a workaround.

       

      streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, STREAMS_CONSUMER_TIMEOUT * 100);
      

      or exception will be thrown if it is like:

      {{org.apache.kafka.common.config.ConfigException: Invalid value 200000 for configuration session.timeout.ms: Expected value to be a 32-bit integer, but it was a java.lang.Long
       at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672)
       at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
       at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
       at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
       at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
       at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:606)
       at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
       at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56)
       at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313)
       at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:766)
       at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:652)
       at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:562)
       at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270)
       at org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77)}}

      This may not seem very intuitive and need enhancement. 

       

      Attachments

        Issue Links

          Activity

            People

              serj567 Sergei
              feyman Haoran Xuan
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: