Details
-
Bug
-
Status: Open
-
Blocker
-
Resolution: Unresolved
-
3.6.1
-
None
-
None
-
AWS EKS
Description
I am deploying Kafka inside Kubernetes cluster in HA mode (multiple brokers). The deployment consists of
Kubernetes
Kafka 3.6.1
Refer to the following files used in the deployment
Dockerfile
FROM eclipse-temurin:17.0.9_9-jdk-jammy ENV KAFKA_VERSION=3.6.1 ENV SCALA_VERSION=2.13 ENV KAFKA_HOME=/opt/kafka ENV PATH=${PATH}:${KAFKA_HOME}/bin LABEL name="kafka" version=${KAFKA_VERSION} RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \ && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \ && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \ && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME} \ && rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz COPY ./entrypoint.sh / RUN ["chmod", "+x", "/entrypoint.sh"] ENTRYPOINT ["/entrypoint.sh"]
entrypoint.sh
#!/bin/bash NODE_ID=${HOSTNAME:6} LISTENERS="SASL://:9092,CONTROLLER://:9093,INTERNAL://:29092" ADVERTISED_LISTENERS="SASL://kraft-$NODE_ID:9092,INTERNAL://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc.cluster.local:29092" CONTROLLER_QUORUM_VOTERS="" for i in $( seq 0 $REPLICAS); do if [[ $i != $REPLICAS ]]; then CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc.cluster.local:9093," else CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1} fi done mkdir -p $SHARE_DIR/$NODE_ID if [[ ! -f "$SHARE_DIR/cluster_id" && "$NODE_ID" = "0" ]]; then CLUSTER_ID=$(kafka-storage.sh random-uuid) echo $CLUSTER_ID > $SHARE_DIR/cluster_id else CLUSTER_ID=$(cat $SHARE_DIR/cluster_id) fi sed -e "s+^node.id=.*+node.id=$NODE_ID+" \ -e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \ -e "s+^listeners=.*+listeners=$LISTENERS+" \ -e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \ -e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \ /opt/kafka/config/kraft/server.properties > server.properties.updated \ && mv server.properties.updated /opt/kafka/config/kraft/server.properties JAAS="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\" user_admin=\"admin-secret\";" echo -e "\nlistener.name.sasl.plain.sasl.jaas.config=${JAAS}" >> /opt/kafka/config/kraft/server.properties echo -e "\nsasl.enabled.mechanisms=PLAIN" >> /opt/kafka/config/kraft/server.properties echo -e "\nlistener.security.protocol.map=SASL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT" >> /opt/kafka/config/kraft/server.properties echo -e "\ninter.broker.listener.name=INTERNAL" >> /opt/kafka/config/kraft/server.properties kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties
Kafka.yaml
apiVersion: v1 kind: Namespace metadata: name: kafka-kraft --- apiVersion: v1 kind: PersistentVolume metadata: name: kafka-pv-volume labels: type: local spec: storageClassName: manual capacity: storage: 10Gi accessModes: - ReadWriteOnce hostPath: path: '/mnt/data' --- apiVersion: v1 kind: PersistentVolumeClaim metadata: name: kafka-pv-claim namespace: kafka-kraft spec: storageClassName: manual accessModes: - ReadWriteOnce resources: requests: storage: 3Gi --- apiVersion: v1 kind: Service metadata: name: kafka-svc labels: app: kafka-app namespace: kafka-kraft spec: clusterIP: None ports: - name: '9092' port: 9092 protocol: TCP targetPort: 9092 selector: app: kafka-app --- apiVersion: apps/v1 kind: StatefulSet metadata: name: kafka labels: app: kafka-app namespace: kafka-kraft spec: serviceName: kafka-svc replicas: 5 selector: matchLabels: app: kafka-app template: metadata: labels: app: kafka-app spec: volumes: - name: kafka-storage persistentVolumeClaim: claimName: kafka-pv-claim containers: - name: kafka-container image: myimage/kafka-kraft:1.0 ports: - containerPort: 9092 - containerPort: 9093 env: - name: REPLICAS value: '5' - name: SERVICE value: kafka-svc - name: NAMESPACE value: kafka-kraft - name: SHARE_DIR value: /mnt/kafka volumeMounts: - name: kafka-storage mountPath: /mnt/kafka
After the deployment all the containers are up and running. I then connect the broker using following command
.\kafka-topics.bat --bootstrap-server kraft-0:9092,kraft-1:9092,kraft-2:9092,kraft-3:9092,kraft-4:9092 --command-config producer.properties --topic hello --create --replication-factor 5
producer.properties
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=admin password=admin-secret; security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN metadata.max.age.ms=1000
A prompt is displayed to enter a messag. Upon a sample text it throws following error.
[Producer clientId=console-producer] Received invalid metadata error in produce request on partition hello2-1 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
What I have tried so far
- Tried zookeeper and kraft mode
- Tried deleting and recreating the topics (This works randomly)
Unfortunately, the problem persists and not able to produce messages.