Details
-
New Feature
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
For many situations in Big Data it is preferable to work with a small buffer of records at a go, rather than one record at a time.
The natural example is calling some external API that supports batching for efficiency.
How can we do this in Kafka Streams? I cannot find anything in the API that looks like what I want.
So far I have:
builder.stream[String, String]("my-input-topic") .mapValues(externalApiCall).to("my-output-topic")
What I want is:
builder.stream[String, String]("my-input-topic") .batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
In Scala and Akka Streams the function is called grouped or batch. In Spark Structured Streaming we can do mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall)).
Attachments
Issue Links
- is related to
-
KAFKA-6989 Support Async Processing in Streams
- Open