Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7523

TransformerSupplier/ProcessorSupplier enhancements

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.6.0
    • Component/s: streams
    • Labels:

      Description

      KIP-401: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756

      I have found that when writing "low level" Processors and Transformers that are stateful, often I want these processors to "own" one or more state stores, the details of which are not important to the business logic of the application.  However, when incorporating these into the topologies defined by the high level API, using KStream::transform or KStream::process, I'm forced to specify the stores so the topology is wired up correctly.  This creates an unfortunate pattern where the TransformerSupplier or ProcessorSupplier, who (according to the pattern I've been following) holds the information about the name of the state stores, must be defined above the "high level" "fluent API"-style pipeline, which makes it hard to understand the business logic data flow.

       

      What I currently have to do:

      TransformerSupplier transformerSupplier = new TransformerSupplierWithState(topology, val -> businessLogic(val));
      builder.stream("in.topic")
              .transform(transformerSupplier, transformerSupplier.stateStoreNames())
              .to("out.topic");

      I have to both define the TransformerSupplier above the "fluent block", and pass the topology in so I can call topology.addStateStore() inside the TransformerSupplier constructor and tell the StreamsBuilder what the state store names are for that point in the topology. The lambda val -> businessLogic(val) is really what I want to see in-line because that's the crux of what is happening, along with the name of some factory method describing what the transformer is doing for me internally. This issue is obviously exacerbated when the "fluent block" is much longer than this example - It gets worse the farther away val -> businessLogic(val) is from KStream::transform.

       
      An improvement:

      builder.stream("in.topic")
              .transform(transformerSupplierWithState(topology, val -> businessLogic(val)))
              .to("out.topic");

      Which implies the existence of a KStream::transform that takes a single argument that adheres to this interface:

      interface TransformerSupplierWithState<K, V, R> {
          Transformer<K, V, R> get();
          String[] stateStoreNames();
      }

      Or better yet, I wouldn't have to pass in the topology, the caller of TransformerSupplierWithState could also handle the job of "adding" its state stores to the topology:

      interface TransformerSupplierWithState<K, V, R> {
          Transformer<K, V, R> get();
          Map<String, StoreBuilder> stateStores();
      }

      Which would enable my ideal:

      builder.stream("in.topic")
              .transform(transformerSupplierWithState(val -> businessLogic(val)))
              .to("out.topic");

      I think this would be a huge improvement in the usability of low-level processors with the high-level DSL.

      Please let me know if I'm missing something as to why this cannot or should not happen, or if there is a better forum for this suggestion (presumably it would require a KIP?). I'd be happy to build it as well if there is a chance of it being merged, it doesn't seem like a huge challenge to me.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                pgwhalen Paul Whalen
                Reporter:
                pgwhalen Paul Whalen
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: