Details
Description
Concurrent tests being added under KAFKA-6096 for transaction coordinator fail to complete some transactions when multiple transactions are completed concurrently.
The problem is with the following code snippet - there are two very similar uses of concurrent map in TransactionMarkerChannelManager and the test fails because some transaction markers are discarded. getOrElseUpdate in scala maps are not atomic. The test passes consistently with one thread.
val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = new ConcurrentHashMap[Int, TxnMarkerQueue]().asScala
val brokerRequestQueue = markersQueuePerBroker.getOrElseUpdate(brokerId, new TxnMarkerQueue(broker))
Attachments
Issue Links
- links to