Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10035

Improve the AbstractResetIntegrationTest

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.7.0
    • Component/s: streams, unit tests
    • Labels:
      None

      Description

      In the test: AbstractResetIntegrationTest, there are several places like below:

       

      streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT * 100);
      

      which leverage `Long` to `String` conversion as a workaround.

       

      streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, STREAMS_CONSUMER_TIMEOUT * 100);
      

      or exception will be thrown if it is like:

      {{org.apache.kafka.common.config.ConfigException: Invalid value 200000 for configuration session.timeout.ms: Expected value to be a 32-bit integer, but it was a java.lang.Long
       at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672)
       at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
       at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
       at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
       at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
       at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:606)
       at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
       at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56)
       at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313)
       at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:766)
       at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:652)
       at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:562)
       at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270)
       at org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77)}}

      This may not seem very intuitive and need enhancement. 

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                serj567 Sergei
                Reporter:
                feyman Haoran Xuan
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: