diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index a35851544..7e9ee4b7d 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -169,34 +169,30 @@ class TransactionStateManager(brokerId: Int, def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = { responses.foreach { case (topicPartition, response) => - response.error match { - case Errors.NONE => - inReadLock(stateLock) { - val toRemove = transactionalIdByPartition(topicPartition.partition()) - transactionMetadataCache.get(topicPartition.partition) - .foreach { txnMetadataCacheEntry => - toRemove.foreach { idCoordinatorEpochAndMetadata => - val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId) - txnMetadata.inLock { - if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch - && txnMetadata.pendingState.contains(Dead) - && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch - ) - txnMetadataCacheEntry.metadataPerTransactionalId.remove(idCoordinatorEpochAndMetadata.transactionalId) - else { - debug(s"failed to remove expired transactionalId: ${idCoordinatorEpochAndMetadata.transactionalId}" + - s" from cache. pendingState: ${txnMetadata.pendingState} producerEpoch: ${txnMetadata.producerEpoch}" + - s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}" + - s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch} expected coordinatorEpoch: " + - s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}") - txnMetadata.pendingState = None - } - } + inReadLock(stateLock) { + val toRemove = transactionalIdByPartition(topicPartition.partition()) + transactionMetadataCache.get(topicPartition.partition) + .foreach { txnMetadataCacheEntry => + toRemove.foreach { idCoordinatorEpochAndMetadata => + val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId) + txnMetadata.inLock { + if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch + && txnMetadata.pendingState.contains(Dead) + && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch + && response.error == Errors.NONE + ) + txnMetadataCacheEntry.metadataPerTransactionalId.remove(idCoordinatorEpochAndMetadata.transactionalId) + else { + warn(s"failed to remove expired transactionalId: ${idCoordinatorEpochAndMetadata.transactionalId}" + + s" from cache. pendingState: ${txnMetadata.pendingState} producerEpoch: ${txnMetadata.producerEpoch}" + + s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}" + + s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch} expected coordinatorEpoch: " + + s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}") + txnMetadata.pendingState = None } } - } - case _ => - debug(s"writing transactionalId tombstones for partition: ${topicPartition.partition} failed with error: ${response.error.message()}") + } + } } } } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 34b82d9ea..1b42a52fe 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -439,8 +439,8 @@ class TransactionStateManagerTest { @Test def shouldNotRemoveExpiredTransactionalIdsIfLogAppendFails(): Unit = { setupAndRunTransactionalIdExpiration(Errors.NOT_ENOUGH_REPLICAS, CompleteAbort) - verifyMetadataDoesExist(transactionalId1) - verifyMetadataDoesExist(transactionalId2) + verifyMetadataDoesExist(transactionalId1, _.pendingState.isEmpty) + verifyMetadataDoesExist(transactionalId2, _.pendingState.isEmpty) } @Test @@ -464,11 +464,11 @@ class TransactionStateManagerTest { verifyMetadataDoesExist(transactionalId2) } - private def verifyMetadataDoesExist(transactionalId: String) = { + private def verifyMetadataDoesExist(transactionalId: String, metadataCheck: TransactionMetadata => Boolean = _ => true) = { transactionManager.getTransactionState(transactionalId) match { case Left(errors) => fail("shouldn't have been any errors") case Right(None) => fail("metadata should have been removed") - case Right(Some(metadata)) => // ok + case Right(Some(metadata)) => assertTrue(metadataCheck(metadata.transactionMetadata)) } }