KAFKA STREAMS APP IN SCALA: /*I have tried to simplify the code as much as possible. What I'm doing here is incremental batch processing using window aggregation.*/ //Custom Serdes have been defined for both the classes below case class Record(@JsonProperty("key") key: Array[Byte], @JsonProperty("value") value: Array[Byte]) class RecordAccumulator { var batch: List[Record] = List[Record]() def add(r : Record) = { batch = batch :+ r this } } MAIN STREAMS SNIPPET: def streamRecords(conf: IndexerServiceConfiguration, props: Properties) = { val builder = new StreamsBuilder() val source: KStream[Array[Byte], Array[Byte]] = builder.stream(conf.topic) //stream records, map to (partition id, rec) and window by time; windowSize: 3 val windowedRecords = source .map[Integer, Record]( new KeyValueMapper[Array[Byte], Array[Byte], KeyValue[Integer, Record]] { def apply(k: Array[Byte], v: Array[Byte]): KeyValue[Integer, Record] = { new KeyValue(, Record(k, v)) } } ) .groupByKey(Serialized.`with`(serdesConfig.integerSerde, serdesConfig.recordSerde)) //recordSerde is custom serde for Record class .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(conf.windowSize)).until(TimeUnit.MINUTES.toMillis(conf.windowSize + 3))) //aggregate records -> convert the records into a batch by creating a list of records in a window val batchedByWindowRecords: KStream[Windowed[Integer], RecordAccumulator] = windowedRecords .aggregate( new Initializer[RecordAccumulator] { def apply(): RecordAccumulator = { new RecordAccumulator() } }, new Aggregator[Integer, Record, RecordAccumulator] { def apply(partitionId: Integer, newRec: Record, recAcc: RecordAccumulator) = { recAcc.add(newRec) recAcc } }, Materialized .as[Integer, RecordAccumulator, WindowStore[Bytes, Array[Byte]]]("localstore") .withKeySerde(serdesConfig.integerSerde) .withValueSerde(serdesConfig.recordAccSerde) ) .toStream //send the batch for processing batchedByWindowRecords.mapValues[Unit](new ValueMapper[RecordAccumulator, Unit] { override def apply(recAcc: RecordAccumulator): Unit = { if (recAcc.batch.nonEmpty) { log.info(s"batch size:${recAcc.batch.size}") process(recAcc.batch.toArray) //send batch to process } } }) val topology = builder.build() val streams = new KafkaStreams(topology, props) //To handle unprecedented streams errors streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler { override def uncaughtException(thread: Thread, throwable: Throwable): Unit = { System.exit(-1) } }) streams.cleanUp() streams.start()