Details
Description
In our production environment, upon start two KafkaConnect workers, during the first couple of minutes, the leader bounces between worker1 and worker2. And a lot of tasks throw Task already exists in this worker exception on worker2.
The sequence of events:
worker2(hostname:sinkdp2)
gen3 assign
Start task 1
gen4 assign task 1
gen5 assign task 1
gen6 skip stopping task 1 and removal due to rebalance unresolved
revoke
gen7 assign task 1
Start task 1(Task already exists eror)
Worker1(hostname: sinkdp1)
03:36:07,340 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:36:10,460 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 1 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[dp-hive-sink-connector-dptask_475_22, dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta 03:36:10,694 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Starting task dp-hive-sink-connector-dptask_475_22-0 [pool-9-thread-5][DistributedHerder.java:1073] 03:36:10,979 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 with version 0.14.0-SNAPSHOT of type com.datapipeline.sink.connector.hive.HiveConnectorTask [pool-9-thread-5][Worker.java:426] 03:36:37,692 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:36:37,806 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0 [pool-9-thread-5][Worker.java:702] 03:40:09,721 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Finished stopping tasks in preparation for rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502] 03:40:09,722 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 2 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[dp-hive-sink-connector-dptask_599_20, dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta 03:40:09,722 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:41:10,650 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks [DistributedHerder-connect-1][DistributedHerder.java:1517] 03:41:10,650 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 4 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', leaderUrl='http://sinkdp2:8083/', offset=6457, connectorIds=[], taskIds=[], revokedConnectorIds=[dp-hive-sink-connector-dptask_599_20, dp-tidb-connector-dptask 03:41:10,651 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:42:10,815 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 5 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[dp-hive-sink-connector-dptask_475_22, dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta 03:42:10,953 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Starting task dp-hive-sink-connector-dptask_475_22-0 [pool-9-thread-8][DistributedHerder.java:1073] 03:42:10,953 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 with version 0.14.0-SNAPSHOT of type com.datapipeline.sink.connector.hive.HiveConnectorTask [pool-9-thread-8][Worker.java:426] 03:42:29,429 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:43:28,336 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0 [pool-9-thread-2][Worker.java:702] 03:46:05,804 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Finished stopping tasks in preparation for rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502] 03:46:05,806 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 6 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[dp-hive-sink-connector-dptask_599_20, dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta 03:46:05,806 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:47:06,564 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks [DistributedHerder-connect-1][DistributedHerder.java:1517]
Worker2 (hostname: sinkdp2)
03:36:35,984 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:36:37,780 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 2 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 [Dist 03:37:40,789 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:37:40,916 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 3 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', leaderUrl='http://sinkdp2:8083/', offset=6457, connectorIds=[dp-hive-sink-connector-dptask_475_22, dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta 03:37:41,151 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Starting task dp-hive-sink-connector-dptask_475_22-0 [pool-9-thread-1][DistributedHerder.java:1073] 03:37:41,507 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 with version 0.14.0-SNAPSHOT of type com.datapipeline.sink.connector.hive.HiveConnectorTask [pool-9-thread-1][Worker.java:426] 03:40:13,254 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:42:27,376 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Finished stopping tasks in preparation for rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502] 03:42:27,377 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 4 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', leaderUrl='http://sinkdp2:8083/', offset=6457, connectorIds=[dp-hive-sink-connector-dptask_475_22, dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta 03:42:27,378 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:43:28,190 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks [DistributedHerder-connect-1][DistributedHerder.java:1517] 03:43:28,191 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 6 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22, dp-hive-sink-connector-d 03:43:28,191 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:44:28,358 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 7 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d', leaderUrl='http://sinkdp2:8083/', offset=6457, connectorIds=[dp-hive-sink-connector-dptask_475_22, dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta 03:44:28,692 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Starting task dp-hive-sink-connector-dptask_475_22-0 [pool-9-thread-7][DistributedHerder.java:1073] kafka.connect.errors.ConnectException: Task already exists in this worker: dp-hive-sink-connector-dptask_475_22-0 03:46:07,401 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:48:07,024 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Finished stopping tasks in preparation for rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502] 03:48:07,246 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 8 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d', leaderUrl='http://sinkdp2:8083/', offset=6457, connectorIds=[dp-hive-sink-connector-dptask_475_22, dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta 03:48:07,246 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:49:07,446 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks [DistributedHerder-connect-1][DistributedHerder.java:1517] 03:49:07,446 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 10 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-2a498edc-a517-457b-b982-c84cc9ff4521', leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22, dp-hive-sink-connector- 03:49:07,447 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Rebalance started [DistributedHerder-connect-1][WorkerCoordinator.java:233] 03:50:07,677 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Joined group at generation 11 with protocol version 1 and got assignment: Assignment{error=0, leader='connect-1-d0be6a23-8dfa-4289-a818-8a655effa48d', leaderUrl='http://sinkdp2:8083/', offset=6457, connectorIds=[dp-hive-sink-connector-dptask_475_22, dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpt 03:50:08,079 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] Starting task dp-hive-sink-connector-dptask_475_22-0 [pool-9-thread-3][DistributedHerder.java:1073] kafka.connect.errors.ConnectException: Task already exists in this worker: dp-hive-sink-connector-dptask_475_22-0
Attachments
Issue Links
- links to