test75 // this excerpt from the log shows kafka producer/consumer logs, demonstrating double processing of a message with exactly-once semantics enabled. // setup: 3 brokers, 1 zookeeper. // we'll focus on partition 3: // consumer-1 is assigned to all partitions at the beginning. we start by bringing down 1 broker. // the message "3" is sent to topic0 on offset 4 (key="3", value="3", offset=4). // it starts a transaction with test-producer-1, produces a message to topic1 (just echo the message, same key/value), and on AddOffsetsToTxnRequest // it gets stuck for 15 seconds. at this point the broker has finished going down, and it comes back up. // the consumers are configured with max.poll.latency=5000, so they rebalance after 5 seconds, while consumer-1 is stuck without polling. // consumer-4 gets partition 3 now, and it receives the message on offset 4 since we didn't finish the transaction previously. // it sends the message to topic1, commits the offset and the transaction using test-producer-2. at this point we've handled offset 4 once. // some seconds after, test-producer-1 receives an error for AddOffsetsToTxnResponse, followed by a successful AddOffsetsToTxnResponse. // since there's no exception, it proceeds to complete the initial transaction, and succeeds. at this point we've // handled [topic0 partition 3 offset 4] two times, breaking exactly-once semantics. //consumer-1 has all partitions, specifically partition 3 21:09:03.637 INFO thread=[pool-31-thread-1] class=[ConsumerCoordinator] [Consumer clientId=consumer-1, groupId=group1-uQ4rn] Setting newly assigned partitions [topic0-6, topic0-7, topic0-4, topic0-5, topic0-8, topic0-9, topic0-2, topic0-3, topic0-0, topic0-1] // we bring one of 3 brokers down 21:09:10.712 INFO thread=[specs2-3] class=[KafkaServer] [KafkaServer id=29711] Starting controlled shutdown // producer is starting a transaction from consumer-1 21:09:10.795 DEBUG thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-5] class=[Sender] [Producer clientId=producer-MULTI-TOPIC-5, transactionalId=test-producer-1] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=test-producer-1, producerId=2000, producerEpoch=0, partitions=[topic1-9, topic1-3]) to node localhost:8575 (id: 90600 rack: null) // topic0 consumer on partition 3 sends "3" to topic1 21:09:10.780 TRACE thread=[pool-53-thread-1] class=[KafkaProducer] [Producer clientId=producer-MULTI-TOPIC-5, transactionalId=test-producer-1] Sending record ProducerRecord(topic=topic1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=3, value={"payload":"{\"msg\":\"3\"}","header":{"headerFields":{"originalMessageId":"96d86fc1-5351-4cb5-84fc-3d1b44ab0eba","messageId":"e7b47295-47df-4d94-bf20-02c5a622b89e"}}}, timestamp=null) with callback com.wixpress.greyhound.AsyncProducerCallbackBuilder$$Lambda$2453/2135261528@393c5982 to topic topic1 partition 3 21:09:10.795 DEBUG thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-5] class=[Sender] [Producer clientId=producer-MULTI-TOPIC-5, transactionalId=test-producer-1] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=test-producer-1, producerId=2000, producerEpoch=0, partitions=[topic1-9, topic1-3]) to node localhost:8575 (id: 90600 rack: null) 21:09:10.804 TRACE thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-5] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-5, transactionalId=test-producer-1] Received transactional response AddPartitionsToTxnResponse(errors={topic1-9=NONE, topic1-3=NONE}, throttleTimeMs=0) for request (type=AddPartitionsToTxnRequest, transactionalId=test-producer-1, producerId=2000, producerEpoch=0, partitions=[topic1-9, topic1-3]) 21:09:10.804 DEBUG thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-5] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-5, transactionalId=test-producer-1] Successfully added partitions [topic1-9, topic1-3] to transaction //sending offsets to transaction 21:09:10.819 DEBUG thread=[greyhound-thread-cached-g-consumers-20-1-[248]] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-5, transactionalId=test-producer-1] Begin adding offsets {topic0-3=OffsetAndMetadata{offset=5, metadata=''}, topic0-1=OffsetAndMetadata{offset=5, metadata=''}, topic0-4=OffsetAndMetadata{offset=5, metadata=''}} for consumer group group0-Eu1oa to transaction 21:09:10.820 DEBUG thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-5] class=[Sender] [Producer clientId=producer-MULTI-TOPIC-5, transactionalId=test-producer-1] Sending transactional request (type=AddOffsetsToTxnRequest, transactionalId=test-producer-1, producerId=2000, producerEpoch=0, consumerGroupId=group0-Eu1oa) to node localhost:8575 (id: 90600 rack: null) //[This AddOffsetsToTxnRequest is only completed after 15 seconds, and max.poll.latency.ms=5000 so since no poll happens, other consumers start rebalance] 21:09:10.859 INFO thread=[specs2-3] class=[KafkaServer] [KafkaServer id=29711] Controlled shutdown succeeded // restarting broker 21:09:15.341 INFO thread=[specs2-3] class=[KafkaServer] [KafkaServer id=29711] started //consumer-4 gets topic0 partition 3 after rebalance 21:09:18.695 INFO thread=[pool-68-thread-1] class=[ConsumerCoordinator] [Consumer clientId=consumer-4, groupId=group0-Eu1oa] Setting newly assigned partitions [topic0-4, topic0-2, topic0-3, topic0-0, topic0-1] 21:09:18.696 TRACE thread=[pool-68-thread-1] class=[NetworkClient] [Consumer clientId=consumer-4, groupId=group0-Eu1oa] Completed receive from node 2147393047 for OFFSET_FETCH with correlation id 55, received {throttle_time_ms=0,responses=[{topic=topic0,partition_responses=[{partition=0,offset=3,metadata=,error_code=0},{partition=1,offset=3,metadata=,error_code=0},{partition=2,offset=5,metadata=,error_code=0},{partition=3,offset=3,metadata=,error_code=0},{partition=4,offset=3,metadata=,error_code=0}]}],error_code=0} 21:09:18.696 DEBUG thread=[pool-68-thread-1] class=[ConsumerCoordinator] [Consumer clientId=consumer-4, groupId=group0-Eu1oa] Setting offset for partition topic0-3 to the committed offset 3 // it gets offset 3 since we haven't finished the transaction that committed it in the first consumer 21:09:18.946 DEBUG thread=[pool-68-thread-1] class=[Fetcher] [Consumer clientId=consumer-4, groupId=group0-Eu1oa] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(topic0-3, topic0-0), toForget=(topic0-6, topic0-9), implied=()) to broker localhost:8575 (id: 90600 rack: null) 21:09:18.946 TRACE thread=[pool-68-thread-1] class=[NetworkClient] [Consumer clientId=consumer-4, groupId=group0-Eu1oa] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=884473993,epoch=12,topics=[{topic=topic0,partitions=[{partition=3,fetch_offset=3,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=3,log_start_offset=-1,max_bytes=1048576}]}],forgetten_topics_data=[{topic=topic0,partitions=[6,9]}]} with correlation id 57 to node 90600 21:09:18.947 TRACE thread=[pool-68-thread-1] class=[NetworkClient] [Consumer clientId=consumer-4, groupId=group0-Eu1oa] Completed receive from node 90600 for FETCH with correlation id 57, received {throttle_time_ms=0,error_code=0,session_id=884473993,responses=[{topic=topic0,partition_responses=[{partition_header={partition=3,error_code=0,high_watermark=12,last_stable_offset=12,log_start_offset=0,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=3, timestamp=1542229747819, key=4 bytes, value=6 bytes)), (record=DefaultRecord(offset=4, timestamp=1542229750681, key=1 bytes, value=108 bytes)), (record=DefaultRecord(offset=5, timestamp=1542229750712, key=4 bytes, value=6 bytes)), (record=DefaultRecord(offset=6, timestamp=1542229754743, key=1 bytes, value=108 bytes)), (record=DefaultRecord(offset=7, timestamp=1542229755006, key=4 bytes, value=6 bytes)), (record=DefaultRecord(offset=8, timestamp=1542229755008, key=1 bytes, value=108 bytes)), (record=DefaultRecord(offset=9, timestamp=1542229755057, key=4 bytes, value=6 bytes)), (record=DefaultRecord(offset=10, timestamp=1542229755349, key=1 bytes, value=108 bytes)), (record=DefaultRecord(offset=11, timestamp=1542229755384, key=4 bytes, value=6 bytes))]},{partition_header={partition=0,error_code=0,high_watermark=12,last_stable_offset=12,log_start_offset=0,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=3, timestamp=1542229747819, key=4 bytes, value=6 bytes)), (record=DefaultRecord(offset=4, timestamp=1542229750680, key=1 bytes, value=108 bytes)), (record=DefaultRecord(offset=5, timestamp=1542229750712, key=4 bytes, value=6 bytes)), (record=DefaultRecord(offset=6, timestamp=1542229754742, key=1 bytes, value=108 bytes)), (record=DefaultRecord(offset=7, timestamp=1542229755006, key=4 bytes, value=6 bytes)), (record=DefaultRecord(offset=8, timestamp=1542229755007, key=1 bytes, value=108 bytes)), (record=DefaultRecord(offset=9, timestamp=1542229755057, key=4 bytes, value=6 bytes)), (record=DefaultRecord(offset=10, timestamp=1542229755342, key=1 bytes, value=108 bytes)), (record=DefaultRecord(offset=11, timestamp=1542229755384, key=4 bytes, value=6 bytes))]}]}]} // adding partitions and producing message (same message as consumer-1 did before) 21:09:18.963 TRACE thread=[pool-65-thread-1] class=[KafkaProducer] [Producer clientId=producer-MULTI-TOPIC-7, transactionalId=test-producer-2] Sending record ProducerRecord(topic=topic1-jMIMm, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=3, value={"payload":"{\"msg\":\"3\"}","header":{"headerFields":{"originalMessageId":"96d86fc1-5351-4cb5-84fc-3d1b44ab0eba","messageId":"edd3e2c8-0dc9-4c94-aca6-c49a921119c0"}}}, timestamp=null) with callback com.wixpress.greyhound.AsyncProducerCallbackBuilder$$Lambda$2453/2135261528@7c5679c1 to topic topic1-jMIMm partition 21:09:18.967 DEBUG thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-7] class=[Sender] [Producer clientId=producer-MULTI-TOPIC-7, transactionalId=test-producer-2] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=test-producer-2, producerId=1001, producerEpoch=0, partitions=[topic1-3]) to node localhost:8275 (id: 29711 rack: null) 21:09:18.969 TRACE thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-7] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-7, transactionalId=test-producer-2] Received transactional response AddPartitionsToTxnResponse(errors={topic1-3=NONE}, throttleTimeMs=0) for request (type=AddPartitionsToTxnRequest, transactionalId=test-producer-2, producerId=1001, producerEpoch=0, partitions=[topic1-3]) // adding offsets 21:09:18.974 DEBUG thread=[greyhound-thread-cached-g-consumers-28-1-[278]] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-7, transactionalId=test-producer-2] Begin adding offsets {topic0-2=OffsetAndMetadata{offset=7, metadata=''}, topic0-3=OffsetAndMetadata{offset=5, metadata=''}} for consumer group group0-Eu1oa to transaction 21:09:18.975 DEBUG thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-7] class=[Sender] [Producer clientId=producer-MULTI-TOPIC-7, transactionalId=test-producer-2] Sending transactional request (type=AddOffsetsToTxnRequest, transactionalId=test-producer-2, producerId=1001, producerEpoch=0, consumerGroupId=group0-Eu1oa) to node localhost:8275 (id: 29711 rack: null) 21:09:18.978 TRACE thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-7] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-7, transactionalId=test-producer-2] Received transactional response AddOffsetsToTxnResponse(error=NONE, throttleTimeMs=0) for request (type=AddOffsetsToTxnRequest, transactionalId=test-producer-2, producerId=1001, producerEpoch=0, consumerGroupId=group0-Eu1oa) 21:09:18.981 TRACE thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-7] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-7, transactionalId=test-producer-2] Received transactional response TxnOffsetCommitResponse(errors={topic0-2=NONE, topic0-3=NONE}, throttleTimeMs=0) for request (type=TxnOffsetCommitRequest, transactionalId=test-producer-2, producerId=1001, producerEpoch=0, consumerGroupId=group0-Eu1oa, offsets={topic0-2=CommittedOffset(offset=7, metadata=''), topic0-3=CommittedOffset(offset=5, metadata='')}) //committed transaction successfully - at this time message "3" from [topic 0 partition 3 offset 4] is relayed to topic1 - the first time 21:09:18.984 TRACE thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-7] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-7, transactionalId=test-producer-2] Received transactional response EndTxnResponse(error=NONE, throttleTimeMs=0) for request (type=EndTxnRequest, transactionalId=test-producer-2, producerId=1001, producerEpoch=0, result=COMMIT) .... //the original producer that tried to add offsets to transaction - gets error message 21:09:25.826 TRACE thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-5] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-5, transactionalId=test-producer-1] Received transactional response AddOffsetsToTxnResponse(error=COORDINATOR_NOT_AVAILABLE, throttleTimeMs=0) for request (type=AddOffsetsToTxnRequest, transactionalId=test-producer-1, producerId=2000, producerEpoch=0, consumerGroupId=group0-Eu1oa) // but then it's succesful 21:09:26.040 TRACE thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-5] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-5, transactionalId=test-producer-1] Received transactional response AddOffsetsToTxnResponse(error=NONE, throttleTimeMs=0) for request (type=AddOffsetsToTxnRequest, transactionalId=test-producer-1, producerId=2000, producerEpoch=0, consumerGroupId=group0-Eu1oa) 21:09:26.044 DEBUG thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-5] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-5, transactionalId=test-producer-1] Successfully added offsets {topic0-4=CommittedOffset(offset=5, metadata=''), topic0-3=CommittedOffset(offset=5, metadata=''), topic0-1=CommittedOffset(offset=5, metadata='')} from consumer group group0-Eu1oa to transaction. // and we end the transaction, causing double processing of message "3": // at this time message "3" from [topic 0 partition 3 offset 4] is relayed to topic1 for the second time. 21:09:26.047 TRACE thread=[kafka-producer-network-thread | producer-MULTI-TOPIC-5] class=[TransactionManager] [Producer clientId=producer-MULTI-TOPIC-5, transactionalId=test-producer-1] Received transactional response EndTxnResponse(error=NONE, throttleTimeMs=0) for request (type=EndTxnRequest, transactionalId=test-producer-1, producerId=2000, producerEpoch=0, result=COMMIT) ==============================