Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.0.2
-
None
-
None
Description
KafkaSpout does not retry all failed tuples.
We used following Configuration:
Map<String, Object> props = new HashMap<>();
props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1");
props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, ByteArrayDeserializer.class.getName());
props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.bootstrapServer());
KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new String[]
{"test-topic"}).build();
KafkaSpoutTuplesBuilder<byte[], byte[]> kafkaSpoutTuplesBuilder = new KafkaSpoutTuplesBuilder.Builder<>(new KeyValueKafkaSpoutTupleBuilder("test-topic")).build();
KafkaSpoutRetryService retryService = new KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3, KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1));
KafkaSpoutConfig<byte[], byte[]> config = new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder, retryService)
.setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
.setMaxUncommittedOffsets(30)
.setOffsetCommitPeriodMs(10)
.setMaxRetries(3)
.build();
kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config);
The downstream bolt fails every tuple and we expect, that those tuple will all be replayed. But that's not the case for every tuple.
Attachments
Issue Links
- duplicates
-
STORM-2087 Storm-kafka-client: Failed tuples are not always replayed
- Resolved
-
STORM-2229 KafkaSpout does not resend failed tuples
- Closed