Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java	(revision b2a5633a2c7e5d6de45a46d406f08a968916ab7f)
+++ clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java	(revision 0627f0e38bb055c3b0482828a06741f6a18f4eb6)
@@ -441,7 +441,7 @@
         AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
         Node targetNode = null;
         try {
-            targetNode = awaitNodeReady(nextRequestHandler.coordinatorType());
+            targetNode = awaitNodeReady(nextRequestHandler.coordinatorType(), nextRequestHandler.coordinatorKey());
             if (targetNode == null) {
                 maybeFindCoordinatorAndRetry(nextRequestHandler);
                 return true;
@@ -509,9 +509,9 @@
         return running;
     }
 
-    private Node awaitNodeReady(FindCoordinatorRequest.CoordinatorType coordinatorType) throws IOException {
+    private Node awaitNodeReady(FindCoordinatorRequest.CoordinatorType coordinatorType, String coordinatorKey) throws IOException {
         Node node = coordinatorType != null ?
-                transactionManager.coordinator(coordinatorType) :
+                transactionManager.coordinator(coordinatorType, coordinatorKey) :
                 client.leastLoadedNode(time.milliseconds());
 
         if (node != null && NetworkClientUtils.awaitReady(client, node, time, requestTimeoutMs)) {
Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java	(revision b2a5633a2c7e5d6de45a46d406f08a968916ab7f)
+++ clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java	(revision 0627f0e38bb055c3b0482828a06741f6a18f4eb6)
@@ -222,7 +222,7 @@
 
     private int inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID;
     private Node transactionCoordinator;
-    private Node consumerGroupCoordinator;
+    private final Map<String, Node> consumerGroupCoordinator;
     private boolean coordinatorSupportsBumpingEpoch;
 
     private volatile State currentState = State.UNINITIALIZED;
@@ -296,7 +296,7 @@
         this.log = logContext.logger(TransactionManager.class);
         this.transactionTimeoutMs = transactionTimeoutMs;
         this.transactionCoordinator = null;
-        this.consumerGroupCoordinator = null;
+        this.consumerGroupCoordinator = new HashMap<>();
         this.newPartitionsInTransaction = new HashSet<>();
         this.pendingPartitionsInTransaction = new HashSet<>();
         this.partitionsInTransaction = new HashSet<>();
@@ -913,10 +913,10 @@
         }
     }
 
-    Node coordinator(FindCoordinatorRequest.CoordinatorType type) {
+    Node coordinator(CoordinatorType type, String coordinatorKey) {
         switch (type) {
             case GROUP:
-                return consumerGroupCoordinator;
+                return consumerGroupCoordinator.get(coordinatorKey);
             case TRANSACTION:
                 return transactionCoordinator;
             default:
@@ -1120,7 +1120,7 @@
     private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) {
         switch (type) {
             case GROUP:
-                consumerGroupCoordinator = null;
+                consumerGroupCoordinator.remove(coordinatorKey);
                 break;
             case TRANSACTION:
                 transactionCoordinator = null;
@@ -1515,7 +1515,7 @@
                 Node node = findCoordinatorResponse.node();
                 switch (coordinatorType) {
                     case GROUP:
-                        consumerGroupCoordinator = node;
+                        consumerGroupCoordinator.put(builder.data().key(), node);
                         break;
                     case TRANSACTION:
                         transactionCoordinator = node;
Index: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java	(revision b2a5633a2c7e5d6de45a46d406f08a968916ab7f)
+++ clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java	(revision 0627f0e38bb055c3b0482828a06741f6a18f4eb6)
@@ -777,8 +777,8 @@
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
         prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, txnOffsetCommitResponse);
 
-        assertNull(transactionManager.coordinator(CoordinatorType.GROUP));
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP) != null);
+        assertNull(transactionManager.coordinator(CoordinatorType.GROUP, consumerGroupId));
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP, consumerGroupId) != null);
         assertTrue(transactionManager.hasPendingOffsetCommits());
 
         runUntil(() -> !transactionManager.hasPendingOffsetCommits());
@@ -797,11 +797,11 @@
         // It finds the coordinator and then gets a PID.
         transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, true, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) == null);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) == null);
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION, null));
     }
 
     @Test
@@ -830,7 +830,7 @@
     public void testUnsupportedInitTransactions() {
         transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
         assertFalse(transactionManager.hasError());
 
         client.prepareUnsupportedVersionResponse(body -> {
@@ -882,7 +882,7 @@
 
         prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch);
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP) != null);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP, consumerGroupId) != null);
 
         client.prepareResponse(request -> {
             TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) request;
@@ -916,7 +916,7 @@
 
         prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch);
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP) != null);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP, consumerGroupId) != null);
 
         client.prepareResponse(request -> {
             TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) request;
@@ -950,7 +950,7 @@
 
         prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch);
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP) != null);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP, consumerGroupId) != null);
 
         prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, singletonMap(tp, Errors.ILLEGAL_GENERATION));
         client.prepareResponse(request -> {
@@ -975,22 +975,22 @@
         // It finds the coordinator and then gets a PID.
         TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION, null));
 
         prepareInitPidResponse(Errors.NONE, true, producerId, epoch);
         // send pid to coordinator, should get disconnected before receiving the response, and resend the
         // FindCoordinator and InitPid requests.
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) == null);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) == null);
 
-        assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION, null));
         assertFalse(initPidResult.isCompleted());
         assertFalse(transactionManager.hasProducerId());
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
 
-        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION, null));
         assertFalse(initPidResult.isCompleted());
         prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
         runUntil(initPidResult::isCompleted);
@@ -1007,22 +1007,22 @@
         // It finds the coordinator and then gets a PID.
         TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION, null));
 
         client.disconnect(brokerNode.idString());
         client.blackout(brokerNode, 100);
         // send pid to coordinator. Should get disconnected before the send and resend the FindCoordinator
         // and InitPid requests.
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) == null);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) == null);
         time.sleep(110);  // waiting for the blackout period for the node to expire.
 
         assertFalse(initPidResult.isCompleted());
         assertFalse(transactionManager.hasProducerId());
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION, null));
         assertFalse(initPidResult.isCompleted());
         prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
 
@@ -1038,18 +1038,18 @@
         // It finds the coordinator and then gets a PID.
         TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION, null));
 
         prepareInitPidResponse(Errors.NOT_COORDINATOR, false, producerId, epoch);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) == null);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) == null);
 
         assertFalse(initPidResult.isCompleted());
         assertFalse(transactionManager.hasProducerId());
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION, null));
         assertFalse(initPidResult.isCompleted());
         prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
 
@@ -1078,8 +1078,8 @@
     public void testTransactionalIdAuthorizationFailureInInitProducerId() {
         TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION, null));
 
         prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, producerId, RecordBatch.NO_PRODUCER_EPOCH);
         runUntil(transactionManager::hasError);
@@ -1768,11 +1768,11 @@
 
         transactionManager.transitionToAbortableError(new KafkaException());
         sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, tp0, epoch, producerId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) == null);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) == null);
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION, null));
         assertTrue(transactionManager.hasAbortableError());
     }
 
@@ -1918,8 +1918,8 @@
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
         prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, txnOffsetCommitResponse);
 
-        assertNull(transactionManager.coordinator(CoordinatorType.GROUP));
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP) != null);
+        assertNull(transactionManager.coordinator(CoordinatorType.GROUP, consumerGroupId));
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP, consumerGroupId) != null);
         assertTrue(transactionManager.hasPendingOffsetCommits());
 
         runUntil(transactionManager::hasPendingOffsetCommits);  // The TxnOffsetCommit failed.
@@ -2085,10 +2085,10 @@
 //        prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, groupInstanceId, memberId, generationId, txnOffsetCommitResponse);
         prepareTxnCommitResponse.run();
 
-        assertNull(transactionManager.coordinator(CoordinatorType.GROUP));
+        assertNull(transactionManager.coordinator(CoordinatorType.GROUP, consumerGroupId));
         sender.runOnce();  // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator
         sender.runOnce();  // send find coordinator for group request
-        assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP));
+        assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP, consumerGroupId));
         assertTrue(transactionManager.hasPendingOffsetCommits());
         return addOffsetsResult;
     }
@@ -2980,10 +2980,10 @@
     @Test
     public void testCanBumpEpochDuringCoordinatorDisconnect() {
         doInitTransactions(0, (short) 0);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
         assertTrue(transactionManager.canBumpEpoch());
 
-        apiVersions.remove(transactionManager.coordinator(CoordinatorType.TRANSACTION).idString());
+        apiVersions.remove(transactionManager.coordinator(CoordinatorType.TRANSACTION, null).idString());
         assertTrue(transactionManager.canBumpEpoch());
     }
 
@@ -3334,8 +3334,8 @@
     private void doInitTransactions(long producerId, short epoch) {
         transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION, null) != null);
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION, null));
 
         prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
         runUntil(transactionManager::hasProducerId);
