Description
In our Scala application I am trying to implement custom naming for Kafka Streams application nodes.
We are using topicPattern for our stream source.
Here is an API which I am calling:
val topicsPattern="t-[A-Za-z0-9-].suffix" val operations: KStream[MyKey, MyValue] = builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))( Consumed.`with`[MyKey, MyValue].withName("my-fancy-name") )
Despite the fact that I am providing Consumed with custom name the topology describe still show "KSTREAM-SOURCE-0000000000" as name for our stream source.
It is not a problem if I just use a name for topic. But our application needs to get messages from set of topics based on topicname pattern matching.
After checking the kakfa code I see that
org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line 103) has a bug:
public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInternal<K, V> consumed) { final String name = newProcessorName(KStreamImpl.SOURCE_NAME); final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed);
node name construction does not take into account the name of consumed parameter.
For example code for another stream api call with topic name does it correctly:
final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
Attachments
Issue Links
- links to