Description
See KIP-1112: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping
Implementation plan/wrapping procedure for DSL operators
- make sure processor is added via ProcessorParameters#addProcessorTo instead of calling the InternalTopologyBuilder's #addProcessor and #addStateStore methods directly
- convert stateful operators to implement the ProcessorSupplier#stores method, rather than directly calling #addStateStore and/or #connectProcessorAndStateStore
- update and/or add ProcessorWrapper tests to StreamsBuilderTest (some existing tests may use processors that haven't been converted yet and are expected to break/need fixing)
TODO list:
Non-source/sink PAPI processors:https://github.com/apache/kafka/pull/17892 -- ableegoldmanProcessorGraphNode (stateless DSL KStream operators):https://github.com/apache/kafka/pull/17892 -- ableegoldmanKTableSource:https://github.com/apache/kafka/pull/17903 -- agavraKStream/TableAggregate (ie count, reduce, aggregate): https://github.com/apache/kafka/pull/17929 -- ableegoldman- TableProcessorNode (stateless table operators eg KTable#filter):
filter:https://github.com/apache/kafka/pull/18205 – rohanmapValues:– rohan- transformValues: won't do, will be replaced with processValues anyway
StreamToTableNode: https://github.com/apache/kafka/pull/18149 – agavraStream-Table join:https://github.com/apache/kafka/pull/18047 – ableegoldmanStreamStreamJoinNode: https://github.com/apache/kafka/pull/18111 – ableegoldmanKTableKTableJoinNode:https://github.com/apache/kafka/pull/18048 -– agavra- StatefulProcessorNode:
TableSuppressNode: https://github.com/apache/kafka/pull/18150 – agavraForeignTableJoinNode: https://github.com/apache/kafka/pull/18194 – ableegoldman- clean up StatefulProcessorNode and migrate all converted operators to use ProcessorGraphNode instead: https://github.com/apache/kafka/pull/18195 sophie
Follow up:
- convert source & sink nodes to using ProcessorSupplier and wrap them too
- clean up StoreFactory<->StoreBuilder wrapping and configuration
- future-proof the wrapping mechanism:
- ensure new processor implementations get wrapped, eg by protecting the InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
- protect #addStateStore from being called out-of-band to prevent new state stores (whether from new DSL operators or modifications to existing ones) from being added to processors without being returned by the ProcessorSupplier#stores method
- consider deprecating older alternative to ProcessorSupplier#stores
- cons: using lambdas for processor suppliers is very nice
Attachments
Issue Links
- links to
(9 links to)