Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8705

NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.0
    • Fix Version/s: 2.2.3, 2.5.0, 2.3.2, 2.4.1
    • Component/s: streams
    • Labels:
      None

      Description

      NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode.

      Kafka Stream version: 2.3.0

      Code

      import org.apache.kafka.common.serialization.Serdes;
      import org.apache.kafka.streams.StreamsBuilder;
      import org.apache.kafka.streams.StreamsConfig;
      import org.apache.kafka.streams.kstream.Consumed;
      import org.apache.kafka.streams.kstream.KStream;
      
      import java.util.Properties;
      
      public class Main {
          public static void main(String[] args) {
              final StreamsBuilder streamsBuilder = new StreamsBuilder();
              final KStream<Integer, Integer> parentStream = streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), Serdes.Integer()))
                      .selectKey(Integer::sum);  // To make parentStream KeyChaingingPoint
              final KStream<Integer, Integer> childStream1 = parentStream.mapValues(v -> v + 1);
              final KStream<Integer, Integer> childStream2 = parentStream.mapValues(v -> v + 2);
              final KStream<Integer, Integer> childStream3 = parentStream.mapValues(v -> v + 3);
              childStream1
                      .merge(childStream2)
                      .merge(childStream3)
                      .to("outputTopic");
      
              final Properties properties = new Properties();
              properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
              streamsBuilder.build(properties);
          }
      }
      

      Expected result

      streamsBuilder.build should create Topology without throwing Exception.  The expected topology is:

      Topologies:
         Sub-topology: 0
          Source: KSTREAM-SOURCE-0000000000 (topics: [parentTopic])
            --> KSTREAM-KEY-SELECT-0000000001
          Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
            --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003, KSTREAM-MAPVALUES-0000000004
            <-- KSTREAM-SOURCE-0000000000
          Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
            --> KSTREAM-MERGE-0000000005
            <-- KSTREAM-KEY-SELECT-0000000001
          Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
            --> KSTREAM-MERGE-0000000005
            <-- KSTREAM-KEY-SELECT-0000000001
          Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
            --> KSTREAM-MERGE-0000000006
            <-- KSTREAM-KEY-SELECT-0000000001
          Processor: KSTREAM-MERGE-0000000005 (stores: [])
            --> KSTREAM-MERGE-0000000006
            <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003
          Processor: KSTREAM-MERGE-0000000006 (stores: [])
            --> KSTREAM-SINK-0000000007
            <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004
          Sink: KSTREAM-SINK-0000000007 (topic: outputTopic)
            <-- KSTREAM-MERGE-0000000006
      

      Actual result

      NullPointerException was thrown with the following stacktrace.

      Exception in thread "main" java.lang.NullPointerException
      	at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
      	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
      	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
      	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
      	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
      	at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
      	at Main.main(Main.java:24)

      Cause

      This exception occurs in InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.

          private void maybeUpdateKeyChangingRepartitionNodeMap() {
              final Map<StreamsGraphNode, Set<StreamsGraphNode>> mergeNodesToKeyChangers = new HashMap<>();
              for (final StreamsGraphNode mergeNode : mergeNodes) {
                  mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
                  final Collection<StreamsGraphNode> keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet();
                  for (final StreamsGraphNode key : keys) {
                      final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
                      if (maybeParentKey != null) {
                          mergeNodesToKeyChangers.get(mergeNode).add(key);
                      }
                  }
              }
      
              for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : mergeNodesToKeyChangers.entrySet()) {
                  final StreamsGraphNode mergeKey = entry.getKey();
                  final Collection<StreamsGraphNode> keyChangingParents = entry.getValue();
                  final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes = new LinkedHashSet<>();
                  for (final StreamsGraphNode keyChangingParent : keyChangingParents) {
                      repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
                      keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent);
                  }
      
                  keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey, repartitionNodes);
              }
          }

      In the example, there will be two elements in mergeNodesToKeyChangers.  (KSTREAM-MERGE-0000000005, KSTREAM-MERGE-0000000006)  And each element has one common keyChagingPrarent.  (KSTREAM-KEY-SELECT-0000000001)
      Also keyChangingOperationsToOptimizableRepartitionNodes has one element.  (KSTREAM-KEY-SELECT-0000000001)

      When the first element is processed in the second for loop, KSTREAM-MERGE-0000000005 is added to keyChangingOperationsToOptimizableRepartitionNodes, and KSTREAM-KEY-SELECT-0000000001 is removed from keyChangingOperationsToOptimizableRepartitionNodes.
      As a result, when the second element is processed in the seconf for loop, KSTREAM-KEY-SELECT-0000000001 is not found in keyChangingOperationsToOptimizableRepartitionNodes and it caused NullPointerException.

       

      It is the first time for me to report the issue in Kafka.  Please let me know if further information is required.  Thank you

        Attachments

          Activity

            People

            • Assignee:
              bbejeck Bill Bejeck
              Reporter:
              hiro116s Hiroshi Nakahara
            • Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: