Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
None
-
None
Description
In the test: AbstractResetIntegrationTest, there are several places like below:
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT * 100);
which leverage `Long` to `String` conversion as a workaround.
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, STREAMS_CONSUMER_TIMEOUT * 100);
or exception will be thrown if it is like:
{{org.apache.kafka.common.config.ConfigException: Invalid value 200000 for configuration session.timeout.ms: Expected value to be a 32-bit integer, but it was a java.lang.Long at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672) at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:606) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630) at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56) at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313) at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:766) at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:652) at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:562) at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270) at org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77)}}
This may not seem very intuitive and need enhancement.
Attachments
Issue Links
- links to