Description
After adding a a processor and a sink to topology after a globalstore and then calling StreamBuilder.build().describe() again (for debugging purposes and to check I got the topology right), had the following exception and stacktrace:
Caused by: java.util.NoSuchElementException: null
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) ~[na:1.8.0_171]
at java.util.HashMap$KeyIterator.next(HashMap.java:1466) ~[na:1.8.0_171]
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) ~[kafka-streams-1.1.0.jar:na]
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) ~[kafka-streams-1.1.0.jar:na]
at org.apache.kafka.streams.Topology.describe(Topology.java:647) ~[kafka-streams-1.1.0.jar:na]
Snipped of code that caused this:
GlobalKTable<String, ServiceList> jsonRoutesToServices
= builder.globalTable("routes-to-services",
Consumed.with(Serdes.String(), jsonServiceListSerde),
Materialized.<String, ServiceList, KeyValueStore<Bytes, byte[]>>as("routes-to-services"));
TopologyDescription td = builder.build().describe();
String parent = null;
// We get an iterator to a TreeSet sorted by processing order, and just want the last one.
for (TopologyDescription.GlobalStore store : td.globalStores()) {
parent = store.processor().name();
}
TopologyDescription tdtd = builder.build().describe();
builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new UnneededCruftSupplier(), parent);
builder.build().addSink("FST-ROUTES-TO-SERVICES", "fst-routes-to-services", Serdes.String().serializer(), fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");
TopologyDescription tdtdtd = builder.build().describe();
Note that the exception is thrown on the last line of the code snippet, calling describe again before adding anything works fine.
Attachments
Issue Links
- links to