Description
Hello,
We found the following unpredictable behavior of Kafka streams:
public void buildStreams(final BuilderHelper builder) { KTable<TableId, TableValue> table = builder.table(); KTable<TableId, ArrayList<InternalWorkflowDTO>> workflowTable = workflowTable(builder); table .mapValues(value -> mappers.mainDTO(value)) .leftJoin(workflowTable, mappers::joinWorkflows) .toStream() .map((key, value) -> KeyValue.pair( AggregateId.newBuilder().setId(value.getId()).build(), mappers.aggregateDTO(value))) .peek((k, v) -> logSinkRecord(v)) .filter((id, dto) -> !isReprocessing) .to(...); } private static KTable<TableId, ArrayList<InternalWorkflowDTO>> workflowTable(BuilderHelper builderHelper) { return builderHelper.workflowTable() .groupBy((id, workflow) -> KeyValue.pair( TableId.newBuilder().setId(workflow.getTableId()).build(), mappers.mapWorkflow(workflow)), Grouped.with(...)) .aggregate(ArrayList::new, (key, value, agg) -> { agg.add(value); return agg; }, (key, value, agg) -> { agg.remove(value); return agg; }, Materialized.with(...)); }
it is a small part of our topology but it shows the error flow.
Data structure:
We have two many-partition topics: entity and workflow. Every topic is represented as KTable.
Data error that causes application shutdown:
Our final event(join the entity and workflow ktables) expects a not-null field in the entity but for some reason, it comes for one event. The whole aggregator fails in mappers.aggregateDTO(value) of the buildStreams method
We have a health check which restarts the aggregator if it fails.
When incorrect data comes to one partition, the partition processing is stuck but other partitions are processed.
It causes that at every restart, workflowTable topology repeats .aggregate() add/remove flows and puts new List into the repartition topic. But offsets are not moved for processed partitions due to the aggregator's shutdown.
This behavior generates/sinks a lot of final entity duplicates at every restart because the flow is successful for data from a not-corrupted partition but offsets are not moved for them.
And it also causes troubles if @EqualsAndHashCode is defined to use all fields to compare. At every restart, the topology tries to remove the old value(not existing after the first run) and adds a new value at the end of the list. The list grows after each restart(contains the same - new value values).
I also attached the topology description. To visualize: https://zz85.github.io/kafka-streams-viz/
Current workaround:
To redefine @EqualsAndHashCode to use entities' ids only.
Not solved issue:
Sink events duplication at every restart.
Thank you in advance!
Attachments
Attachments
Issue Links
- is fixed by
-
KAFKA-13676 When processing in ALOS, when one task encounters a task-specific exception we could still commit progress made by other tasks
- Resolved