Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17049

Incremental rebalances assign too many tasks for the same connector together

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • connect
    • None

    Description

      This follows https://issues.apache.org/jira/browse/KAFKA-10413

      When runnning the following script, which

      1. runs one worker
      2. declares two connectors
      3. adds two more workers

       

      #!/bin/bash
      set -xe
      dkill() {
        docker stop "$1" || true
        docker rm -v -f "$1" || true
      }
      launch_minio() {
        # Launch Minio (Fake S3)
        docker run --network host -d --name minio \
          -e MINIO_ROOT_USER=minioadmin \
          -e MINIO_ROOT_PASSWORD=minioadmin \
          minio/minio server --console-address :9001 /data
            docker exec -it minio mkdir /data/my-minio-bucket
      }
      launch_kafka_connect() {
        # Start Kafka Connect with S3 Connector
        docker run --network host -d --name "kafka-connect$1" \
          -e  AWS_ACCESS_KEY_ID=minioadmin \
          -e  AWS_SECRET_ACCESS_KEY=minioadmin \
          -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
          -e  CONNECT_LISTENERS="http://localhost:808$1" \
          -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
          -e  CONNECT_REST_PORT="808$1" \
          -e  CONNECT_GROUP_ID="connect-cluster" \
          -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
          -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
          -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
          -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
          -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
          -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
          -e  CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
          -e  CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
          -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
          -e  CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
          --entrypoint bash \
          confluentinc/cp-kafka-connect:7.6.1 \
          -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
      }
      cleanup_docker_env() {
        docker volume prune -f
        for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio
        do
          dkill "$container"
        done
      }
      launch_kafka() {
        docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka
        for i in {1..2}
        do
          # Create a Kafka topic
          docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic "test_topic$i"
        done
        for topic in connect-configs connect-offsets connect-status
        do
          # with cleanup.policy=compact, we can't have more than 1 partition
          docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact
        done
      }
      cleanup_docker_env
      launch_kafka
      launch_minio
      launch_kafka_connect 1
      while true
      do
        sleep 5
        # Check if Kafka Connect is up
        curl http://localhost:8081/ || continue
        break
      done
      sleep 10
      for i in {1..2}
      do
      # Set up a connector
      curl -X POST -H "Content-Type: application/json" --data '{
        "name": "s3-connector'"$i"'",
        "config": {
          "connector.class": "io.confluent.connect.s3.S3SinkConnector",
          "tasks.max": "12",
          "topics": "test_topic'"$i"'",
          "s3.region": "us-east-1",
          "store.url": "http://0.0.0.0:9000",
          "s3.bucket.name": "my-minio-bucket",
          "s3.part.size": "5242880",
          "flush.size": "3",
          "storage.class": "io.confluent.connect.s3.storage.S3Storage",
          "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
          "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
          "schema.compatibility": "NONE"
        }
      }' http://localhost:8081/connectors
      done
      launch_kafka_connect 2
      launch_kafka_connect 3
      

       

       

      When the script ends, I have the first worker taking all the connectors/tasks:

      ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c
          12     "worker_id": "k1:8081"
      ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c
          12     "worker_id": "k1:8081"
      

       

      Then I wait a few minutes,

      And I get the final state:

      ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c
           6     "worker_id": "k2:8082"
           6     "worker_id": "k3:8083"

       

      ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c
           8     "worker_id": "k1:8081"
           2     "worker_id": "k2:8082"
           2     "worker_id": "k3:8083"
      

       

      In the end, we indeed get 8 tasks on each workers, but for distribution reasons , I think it should be (4, 4, 4) for each connector, because all connectors don't do the same amount of work, which will lead to a processing/network imbalance overall.

      In my test I always get the same outcome.

      This is consistent with what I see in production, which makes autoscaling impossible to use as is.

      Attachments

        Issue Links

          Activity

            People

              yazgoo yazgoo
              yazgoo yazgoo
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: