Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13681

Sink event duplicates for partition-stuck stream application

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.8.1
    • None
    • streams
    • None

    Description

      Hello,

      We found the following unpredictable behavior of Kafka streams:

      public void buildStreams(final BuilderHelper builder) {        
      KTable<TableId, TableValue> table = builder.table();        
      KTable<TableId, ArrayList<InternalWorkflowDTO>> workflowTable = workflowTable(builder);        
      table
                      .mapValues(value -> mappers.mainDTO(value))
                      .leftJoin(workflowTable, mappers::joinWorkflows)
                      .toStream()
                      .map((key, value) -> KeyValue.pair(
                              AggregateId.newBuilder().setId(value.getId()).build(),
                              mappers.aggregateDTO(value)))
                      .peek((k, v) -> logSinkRecord(v))
                      .filter((id, dto) -> !isReprocessing)
                      .to(...);
          }    
      
      private static KTable<TableId, ArrayList<InternalWorkflowDTO>> workflowTable(BuilderHelper builderHelper) {
                  return builderHelper.workflowTable()
                          .groupBy((id, workflow) -> KeyValue.pair(
                                  TableId.newBuilder().setId(workflow.getTableId()).build(),
                                  mappers.mapWorkflow(workflow)),
                                  Grouped.with(...))
                          .aggregate(ArrayList::new, (key, value, agg) -> {
                              agg.add(value);
                              return agg;
                          }, (key, value, agg) -> {
                              agg.remove(value);
                              return agg;
                          }, Materialized.with(...));
              } 

      it is a small part of our topology but it shows the error flow.

      Data structure:

      We have two many-partition topics: entity and workflow. Every topic is represented as KTable.

      Data error that causes application shutdown:

      Our final event(join the entity and workflow ktables) expects a not-null field in the entity but for some reason, it comes for one event. The whole aggregator fails in mappers.aggregateDTO(value) of the buildStreams method 

      We have a health check which restarts the aggregator if it fails.

      When incorrect data comes to one partition, the partition processing is stuck but other partitions are processed.

      It causes that at every restart, workflowTable topology repeats .aggregate() add/remove flows and puts new List into the repartition topic. But offsets are not moved for processed partitions due to the aggregator's shutdown.

      This behavior generates/sinks a lot of final entity duplicates at every restart because the flow is successful for data from a not-corrupted partition but offsets are not moved for them.

      And it also causes troubles if @EqualsAndHashCode is defined to use all fields to compare. At every restart, the topology tries to remove the old value(not existing after the first run) and adds a new value at the end of the list. The list grows after each restart(contains the same - new value values).

      I also attached the topology description. To visualize: https://zz85.github.io/kafka-streams-viz/

      Current workaround:

      To redefine @EqualsAndHashCode to use entities' ids only.

      Not solved issue:

      Sink events duplication at every restart.

      Thank you in advance!

      Attachments

        1. fail_topology.txt
          2 kB
          Mikhail Dubrovin

        Issue Links

          Activity

            People

              Unassigned Unassigned
              DrozD_0 Mikhail Dubrovin
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: