Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4086

long processing consumer restart will stall

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 0.10.0.0
    • Fix Version/s: None
    • Component/s: consumer

      Description

      Jason Gustafson
      We have a long processing consumer. Whenever a new consumer tries to join the group while the long processing consumer is processing, the new consumer will stall.
      If we kill the long processing consumer and restart it again, it will stall both consumers.
      When we kill the long processing consumer, that consumer tries to issue a leaveGroup command but it will fail seemingly due to the client request timeout.
      When we try to start the long processing consumer again, it seems to be sending topic metadata to the broker then the subsequent join group request is issued and returning a future but when I check the server log I don't see the corresponding request in kafka-request.log.
      When we construct the consumer, we use the following code (based on kafka-python library):

              self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
                                            value_deserializer=deserializer,
                                            group_id=self.user_defined_sub_name,
                                            heartbeat_interval_ms=10000,
                                            session_timeout_ms=300000,
                                            enable_auto_commit=False)
      

      on the server side, we use 0.10.0.0 with default settings.
      looks like a `RebalanceInProgressError` is thrown

      2016-08-22 20:39:08,984 - kafka.coordinator - INFO - Discovered coordinator 100 for group v1.user.queue
      2016-08-22 20:39:08,984 - kafka.coordinator.consumer - INFO - Revoking previously assigned partitions set() for group v1.user.queue
      2016-08-22 20:39:08,990 - kafka.cluster - DEBUG - Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 1)
      2016-08-22 20:39:08,990 - kafka.coordinator - INFO - (Re-)joining group v1.user.queue
      2016-08-22 20:39:08,990 - kafka.coordinator - DEBUG - Sending JoinGroup (JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')])) to coordinator 100
      2016-08-22 20:39:08,991 - kafka.conn - DEBUG - <BrokerConnection host=10.128.64.81/10.128.64.81 port=9092> Request 5: JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')])
      2016-08-22 20:43:04,576 - kafka.conn - WARNING - <BrokerConnection host=10.128.64.81/10.128.64.81 port=9092> timed out after 40000 ms. Closing connection.
      2016-08-22 20:43:04,576 - kafka.client - WARNING - Node 100 connection failed – refreshing metadata
      2016-08-22 20:43:04,576 - kafka.coordinator - ERROR - Error sending JoinGroupRequest_v0 to node 100 [Error 7 RequestTimedOutError: Request timed out after 40000 ms]
      2016-08-22 20:43:04,576 - kafka.coordinator - WARNING - Marking the coordinator dead (node 100) for group v1.user.queue: None.
      2016-08-22 20:43:04,678 - kafka.coordinator - DEBUG - Sending group coordinator request for group v1.user.queue to broker 100
      

      fyi, we turned on the following in log4j:

      log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
      log4j.additivity.kafka.server.KafkaApis=true
      log4j.logger.kafka.request.logger=TRACE, requestAppender
      log4j.additivity.kafka.request.logger=true
      log4j.logger.kafka.controller=TRACE, controllerAppender
      log4j.additivity.kafka.controller=true
      log4j.logger.state.change.logger=TRACE, stateChangeAppender
      log4j.additivity.state.change.logger=true
      

      Please let us know how we can proceed forward to find out the root cause.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              dalejin2010@gmail.com Dale Jin
              Reviewer:
              Jason Gustafson
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: