WARN  10:22:32.848[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=true || exhausted=false || closed=false || flushInProgress=false 
WARN  10:22:37.822[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] beginFlush(): 0 -> 1 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.beginFlush(RecordAccumulator.java:701)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1123)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

WARN  10:22:37.827[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:37.830[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] awaitFlushCompletion(): 1 -> 0 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:720)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1126)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

WARN  10:22:37.943[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] beginFlush(): 0 -> 1 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.beginFlush(RecordAccumulator.java:701)
	at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:439)
	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
	at java.base/java.lang.Thread.run(Thread.java:829)

WARN  10:22:38.624[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:39.622[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:40.622[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:41.623[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:42.624[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:43.624[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:44.625[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:45.626[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:46.627[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:47.627[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:48.029[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] beginFlush(): 1 -> 2 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.beginFlush(RecordAccumulator.java:701)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1123)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

WARN  10:22:48.030[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] awaitFlushCompletion(): 2 -> 1 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:720)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1126)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

WARN  10:22:48.630[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:49.628[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:50.629[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:51.629[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:52.630[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:53.630[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:54.631[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:55.632[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:56.633[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:57.633[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:58.134[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] beginFlush(): 1 -> 2 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.beginFlush(RecordAccumulator.java:701)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1123)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

WARN  10:22:58.134[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] awaitFlushCompletion(): 2 -> 1 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:720)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1126)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

WARN  10:22:58.636[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:22:59.634[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:00.634[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:01.636[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:02.636[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:03.638[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:04.637[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:05.637[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:06.639[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:07.638[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:08.241[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] beginFlush(): 1 -> 2 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.beginFlush(RecordAccumulator.java:701)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1123)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

WARN  10:23:08.241[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] awaitFlushCompletion(): 2 -> 1 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:720)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1126)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

WARN  10:23:08.642[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:09.640[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:10.641[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:11.641[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:12.641[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:13.642[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:14.642[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:15.643[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:16.644[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:17.645[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:18.348[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] beginFlush(): 1 -> 2 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.beginFlush(RecordAccumulator.java:701)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1123)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

WARN  10:23:18.348[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] awaitFlushCompletion(): 2 -> 1 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:720)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1126)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

WARN  10:23:18.646[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:19.645[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:20.645[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:21.646[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:22.647[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:23.647[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:24.647[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:25.648[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:26.649[kafka-producer-network-thread | forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] Batch is sendable because full=false || expired=false || exhausted=false || closed=false || flushInProgress=true 
WARN  10:23:27.752[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] beginFlush(): 1 -> 2 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.beginFlush(RecordAccumulator.java:701)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1123)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.tryCloseCleanAllActiveTasks(TaskManager.java:848)
	at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:791)
	at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1073)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:561)

WARN  10:23:27.752[forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1] RecordAccumulator - [Producer clientId=forwarding-stream-d0aed0d0-6f35-46a5-a601-07f4d727339c-StreamThread-1-0_0-producer, transactionalId=forwarding-stream-0_0] awaitFlushCompletion(): 2 -> 1 java.lang.Exception: null
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:720)
	at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1126)
	at org.apache.kafka.streams.processor.internals.StreamsProducer.flush(StreamsProducer.java:316)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:258)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:391)
	at org.apache.kafka.streams.processor.internals.TaskManager.tryCloseCleanAllActiveTasks(TaskManager.java:848)
	at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:791)
	at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1073)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:561)