Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7432

API Method on Kafka Streams for processing chunks/batches of data

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • streams
    • None

    Description

      For many situations in Big Data it is preferable to work with a small buffer of records at a go, rather than one record at a time.

      The natural example is calling some external API that supports batching for efficiency.

      How can we do this in Kafka Streams? I cannot find anything in the API that looks like what I want.

      So far I have:

      builder.stream[String, String]("my-input-topic") .mapValues(externalApiCall).to("my-output-topic")

      What I want is:

      builder.stream[String, String]("my-input-topic") .batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")

      In Scala and Akka Streams the function is called grouped or batch. In Spark Structured Streaming we can do mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall)).

       

       

      https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              sams sam
              Votes:
              11 Vote for this issue
              Watchers:
              18 Start watching this issue

              Dates

                Created:
                Updated: