Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9841

Connector and Task duplicated when a worker join with old generation assignment

    XMLWordPrintableJSON

Details

    Description

      When using IncrementalCooperativeAssignor.class to assign connectors and tasks.

      Suppose there is a worker 'W' got some connection issue with the coordinator.

      During the connection issue, the connectors/tasks on 'W' are assigned to the others worker

      When the connection issue disappear, 'W' will join the group with an old generation assignment. Then the group leader will get duplicated connectors/tasks in the metadata sent by the workers. But the duplicated connectors/tasks will not be revoked.

       

      Generation 3:

      Worker1:

      [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

      Worker2:

      [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

      Worker3:

      [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 ributed.DistributedHerder)

      Worker4:

      [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[misc], taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

      Worker5:

      [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

       

      Generation 4:

      Worker1:

      [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

      Worker2:

      [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

      Worker3:

      [2020-03-17 04:32:35,489] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Group coordinator xxxxxx:9092 (id: 2147483631 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
      [2020-03-17 04:32:35,590] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Discovered group coordinator xxxxxx:9092 (id: 2147483631 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
      [2020-03-17 04:32:36,910] INFO WorkerSourceTask{id=misc-3} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2020-03-17 04:32:36,910] INFO WorkerSourceTask{id=misc-3} flushing 86 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Attempt to heartbeat failed since group is rebalancing (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
      [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
      [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

      Worker4:

      [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[misc], taskIds=[misc-3, misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

      Worker5:

      [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

       

      Generation 5:

      Worker1:

      [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

      Worker2:

      [2020-03-17 04:32:42,756] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

      Worker3:

      [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

      Worker4:

      [2020-03-17 04:32:42,756] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[misc], taskIds=[misc-3, misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

      Worker5:

      [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

       

       

      Attachments

        Issue Links

          Activity

            People

              LucentWong Yu Wang
              LucentWong Yu Wang
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: