Description
As of today, the producer epoch keeps increasing until it hits Short.Max. The correct behavior at this point should be making another call to re-initialize a new PID, otherwise trying with Short.Max will throw fatal exception which eventually kills the producer.
Stream log:
[2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) [2020-03-05 04:25:41,147] ERROR [stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3] Thread StreamsThread threadId: stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3
[2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) TaskManager
MetadataState:
GlobalMetadata: []
GlobalStores: []
My HostInfo: HostInfo{host='unknown', port=-1}
Cluster(id = null, nodes = [], partitions = [], controller = null)
Active tasks:
Running:
Running Partitions:
New:
Restoring:
Restoring Partitions:
Restored Partitions:
Suspended:
Standby tasks:
Running:
Running Partitions:
New:
encountered an error processing soak test (org.apache.kafka.streams.StreamsSoakTest)
[2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) org.apache.kafka.streams.errors.StreamsException: stream-thread [stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3] Failed to rebalance.
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:862)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:749)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
[2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The server experienced an unexpected error when processing the request.
at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:571)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at java.lang.Thread.run(Thread.java:748)
Producer log:
[2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_broker_i-0d1ef6c9a1a1708f6_server-log) [2020-03-05 04:25:40,885] INFO [Transaction State Manager 1001]: TransactionalId stream-soak-test-1_0 append transaction log for TxnTransitMetadata(producerId=0, producerEpoch=576, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-0, stream-soak-test-logData10MinuteFinalCount-store-changelog-0, stream-soak-test-logData10MinuteSuppressedCount-store-changelog-0, stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-0, stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog-0, stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000025-changelog-0, stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-0, windowed-node-counts-0), txnStartTimestamp=1583382340885, txnLastUpdateTimestamp=1583382340885) transition failed due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Ongoing), aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the callback (kafka.coordinator.transaction.TransactionStateManager)
[2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_broker_i-0d1ef6c9a1a1708f6_stdout) java.lang.IllegalStateException: Cannot fence producer with epoch equal to Short.MaxValue since this would overflow
at kafka.coordinator.transaction.TransactionMetadata.prepareFenceProducerEpoch(TransactionMetadata.scala:194)
at kafka.coordinator.transaction.TransactionCoordinator.kafka$coordinator$transaction$TransactionCoordinator$$prepareInitProduceIdTransit(TransactionCoordinator.scala:216)
at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2$$anonfun$apply$1.apply(TransactionCoordinator.scala:143)
at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2$$anonfun$apply$1.apply(TransactionCoordinator.scala:143)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2.apply(TransactionCoordinator.scala:142)
at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2.apply(TransactionCoordinator.scala:138)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:137)
at kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:1638)
at kafka.server.KafkaApis.handle(KafkaApis.scala:135)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:748)
Attachments
Issue Links
- links to