Description
Using the Processor API, users need to pass in a `ProcessorSupplier` that needs to return new `Processor` instance each time `get()` is called.
Users violate this rule on a regular basis and return the same instance on `get()`. This mistake leads to a (not very informative) `NullPointerException` during runtime. (Cf: https://stackoverflow.com/questions/61790984/kafka-stream-forward-method-throwing-nullpointerexception-because-processornode/61978396)
We could improve the error message by checking if `currentNode()` returns `null` (https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L183) and throw an informative error message for this case.
Furthermore, we could do a "sanity" check within `KafkaStreams` constructor before we start the process threads: we get all `Suppliers` for the `Topology` and call `get()` two times on each supplier to compare if the returned object references are different – if they are the same, we throw an informative error message.
We should improve the JavaDocs, too: https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java#L34 (also for `Transformer` et al. – it seems to be too subtle what "new" means. Similarly for https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/Topology.java and https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java (`process()`, `transform()` etc.)
Furthermore, we should improve the docs: to explain the supplier pattern explicitly: https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html