Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.1.0
-
None
Description
In one cluster I had a partially crashed MirrorCheckpointConnector instance. It had stopped mirroring offsets and emitting metrics completely but the connector and its single task were still reporting as running in the REST API.
Looking at the logs, I found this stacktrace:
java.lang.NullPointerException at org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:187) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:171) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:173) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:157) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:139) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) WARN [prod-source->sc-prod-target.MirrorCheckpointConnector|task-0] Failure polling consumer state for checkpoints. (org.apache.kafka.connect.mirror.MirrorCheckpointTask) [task-thread-prod-source->sc-prod-target.MirrorCheckpointConnector-0]
Not sure if it's related but prior this exception, there's quite a lot of:
ERROR [prod-source->sc-prod-target.MirrorCheckpointConnector|task-0] WorkerSourceTask{id=prod-source->sc-prod-target.MirrorCheckpointConnector-0} failed to send record to prod-source.checkpoints.internal: (org.apache.kafka.connect.runtime.WorkerSourceTask) [kafka-producer-network-thread | connector-producer-prod-source->sc-prod-target.MirrorCheckpointConnector-0] org.apache.kafka.common.KafkaException: Producer is closed forcefully. at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) at java.base/java.lang.Thread.run(Thread.java:829)
and some users had started consumers in the target cluster hence causing these log lines:
ERROR [prod-source->sc-prod-target.MirrorCheckpointConnector|task-0] [AdminClient clientId=adminclient-137] OffsetCommit request for group id <GROUP_ID> and partition <TP> failed due to unexpected error UNKNOWN_MEMBER_ID. (org.apache.kafka.clients.admin.internals.AlterConsumerGroupOffsetsHandler) [kafka-admin-client-thread | adminclient-137]
Unfortunately I don't have the full history, so it's unclear if this happened while stopping but the connector stayed in this state for several hours until it was explicitly deleted via the REST API.
Attachments
Issue Links
- duplicates
-
KAFKA-14545 MirrorCheckpointTask throws NullPointerException when group hasn't consumed from some partitions
- Resolved