Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.7
Description
Current design of KafkaStreamer looks incorrect to me. In particular, it extends StreamAdapter, but ignores tuple extractors provided there and uses native Kafka decoders instead. This for example makes impossible to produce several entries from one message, like it can be done via StreamMultipleTupleExtractor in other streamers.
To fix this, we should:
- Declare the KafkaStreamer like this:
KafkaStreamer<K, V> extends StreamAdapter<MessageAndMetadata<byte[], byte[]>, K, V>
- Remove keyDecoder and valDecoder in favor of tuple extractors.
- Instead of doing getStreamer().addData(...) directly, call addMessage(...) method providing the raw message consumed from Kafka (MessageAndMetadata<byte[], byte[]>). This method will make sure that configured extractor is invoked and that all entries are added to IgniteDataStreamer.