Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6260

AbstractCoordinator not clearly handles NULL Exception

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.0.0
    • Fix Version/s: 1.0.1, 1.1.0
    • Component/s: consumer
    • Labels:
      None
    • Environment:
      RedHat Linux

      Description

      The error reporting is not clear. But it seems that Kafka Heartbeat shuts down application due to NULL exception caused by "fake" disconnections.

      One more comment. We are processing messages in the stream, but sometimes we have to block processing for minutes, as consumers are not handling too much load. Is it possibble that when stream is waiting, then heartbeat is as well blocked?

      Can you check that?

      2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Received successful Heartbeat response
      2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Sending Heartbeat request to coordinator cljp01.eb.lan.at:9093 (id: 2147483646 rack: null)
      2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Sending HEARTBEAT {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} with correlation id 24 to node 2147483646
      2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT with correlation id 24, received {throttle_time_ms=0,error_code=0}
      2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Received successful Heartbeat response
      2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout.
      2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Cancelled request {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} with correlation id 21 due to node 1 being disconnected
      2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, apiVersion=6, clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, correlationId=21) with correlation id 21 due to node 1 being disconnected
      2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Fetch request {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) failed org.apache.kafka.common.errors.DisconnectException: null
      2017-11-23 23:54:52 TRACE NetworkClient:123 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Found least loaded node cljp01.eb.lan.at:9093 (id: 1 rack: DC-1)
      2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Initialize connection to node cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) for sending metadata request
      2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Initiating connection to node cljp01.eb.lan.at:9093 (id: 1 rack: DC-1)
      2017-11-23 23:54:52 ERROR AbstractCoordinator:296 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Heartbeat thread for group kafka-endpoint failed due to unexpected error
      java.lang.NullPointerException: null
              at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:436) ~[my-kafka-endpoint.jar:?]
              at org.apache.kafka.common.network.Selector.poll(Selector.java:395) ~[my-kafka-endpoint.jar:?]
              at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) ~[my-kafka-endpoint.jar:?]
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) ~[my-kafka-endpoint.jar:?]
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:275) ~[my-kafka-endpoint.jar:?]
              at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:934) [my-kafka-endpoint.jar:?]
      2017-11-23 23:54:52 DEBUG AbstractCoordinator:177 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Heartbeat thread has closed
      2017-11-23 23:55:16 INFO  KafkaElasticsearchEndpoint:61 - Got shutdown requests ...
      2017-11-23 23:55:16 INFO  StreamProcessor:106 - Streams closing ...
      2017-11-23 23:55:16 INFO  StreamProcessor:111 - with 5 [s] timeout
      2017-11-23 23:55:16 DEBUG KafkaStreams:183 - stream-client [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31]Stopping Streams client with timeoutMillis = 5000 ms.
      2017-11-23 23:55:16 INFO  KafkaStreams:346 - stream-client [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31]State transition from RUNNING to PENDING_SHUTDOWN
      2017-11-23 23:55:16 INFO  StreamThread:336 - stream-thread [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1] Informed to shut down
      2017-11-23 23:55:16 INFO  StreamThread:346 - stream-thread [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
      

      Kafka settings:

              auto.commit.interval.ms = 5000
              auto.offset.reset = earliest
              bootstrap.servers = [xxx:9093, yyy:9093]
              check.crcs = true
              client.id = kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer
              connections.max.idle.ms = 540000
              enable.auto.commit = false
              exclude.internal.topics = true
              fetch.max.bytes = 52428800
              fetch.max.wait.ms = 6000
              fetch.min.bytes = 1
              group.id = kafka-endpoint
              heartbeat.interval.ms = 3000
              interceptor.classes = null
              internal.leave.group.on.close = false
              isolation.level = read_uncommitted
              key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
              max.partition.fetch.bytes = 1048576
              max.poll.interval.ms = 10000000
              max.poll.records = 2000
              metadata.max.age.ms = 300000
              metric.reporters = []
              metrics.num.samples = 2
              metrics.recording.level = INFO
              metrics.sample.window.ms = 30000
              partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
              receive.buffer.bytes = 65536
              reconnect.backoff.max.ms = 1000
              reconnect.backoff.ms = 50
              request.timeout.ms = 7000
              retry.backoff.ms = 100
              sasl.jaas.config = null
              sasl.kerberos.kinit.cmd = /usr/bin/kinit
              sasl.kerberos.min.time.before.relogin = 60000
              sasl.kerberos.service.name = null
              sasl.kerberos.ticket.renew.jitter = 0.05
              sasl.kerberos.ticket.renew.window.factor = 0.8
              sasl.mechanism = GSSAPI
              security.protocol = SSL
              send.buffer.bytes = 131072
              session.timeout.ms = 6000
              ssl.cipher.suites = null
              ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
              ssl.endpoint.identification.algorithm = null
              ssl.key.password = [hidden]
              ssl.keymanager.algorithm = SunX509
              ssl.keystore.location = /data/my/etc/kafka/client-keystore
              ssl.keystore.password = [hidden]
              ssl.keystore.type = JKS
              ssl.protocol = TLS
              ssl.provider = null
              ssl.secure.random.implementation = SHA1PRNG
              ssl.trustmanager.algorithm = PKIX
              ssl.truststore.location = /data/my/etc/kafka/truststore
              ssl.truststore.password = [hidden]
              ssl.truststore.type = JKS
              value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                hachikuji Jason Gustafson
                Reporter:
                habdank Seweryn Habdank-Wojewodzki
              • Votes:
                0 Vote for this issue
                Watchers:
                9 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: