Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.10.0.0
-
None
Description
hachikuji
We have a long processing consumer. Whenever a new consumer tries to join the group while the long processing consumer is processing, the new consumer will stall.
If we kill the long processing consumer and restart it again, it will stall both consumers.
When we kill the long processing consumer, that consumer tries to issue a leaveGroup command but it will fail seemingly due to the client request timeout.
When we try to start the long processing consumer again, it seems to be sending topic metadata to the broker then the subsequent join group request is issued and returning a future but when I check the server log I don't see the corresponding request in kafka-request.log.
When we construct the consumer, we use the following code (based on kafka-python library):
self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, value_deserializer=deserializer, group_id=self.user_defined_sub_name, heartbeat_interval_ms=10000, session_timeout_ms=300000, enable_auto_commit=False)
on the server side, we use 0.10.0.0 with default settings.
looks like a `RebalanceInProgressError` is thrown
2016-08-22 20:39:08,984 - kafka.coordinator - INFO - Discovered coordinator 100 for group v1.user.queue 2016-08-22 20:39:08,984 - kafka.coordinator.consumer - INFO - Revoking previously assigned partitions set() for group v1.user.queue 2016-08-22 20:39:08,990 - kafka.cluster - DEBUG - Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 1) 2016-08-22 20:39:08,990 - kafka.coordinator - INFO - (Re-)joining group v1.user.queue 2016-08-22 20:39:08,990 - kafka.coordinator - DEBUG - Sending JoinGroup (JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')])) to coordinator 100 2016-08-22 20:39:08,991 - kafka.conn - DEBUG - <BrokerConnection host=10.128.64.81/10.128.64.81 port=9092> Request 5: JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')]) 2016-08-22 20:43:04,576 - kafka.conn - WARNING - <BrokerConnection host=10.128.64.81/10.128.64.81 port=9092> timed out after 40000 ms. Closing connection. 2016-08-22 20:43:04,576 - kafka.client - WARNING - Node 100 connection failed – refreshing metadata 2016-08-22 20:43:04,576 - kafka.coordinator - ERROR - Error sending JoinGroupRequest_v0 to node 100 [Error 7 RequestTimedOutError: Request timed out after 40000 ms] 2016-08-22 20:43:04,576 - kafka.coordinator - WARNING - Marking the coordinator dead (node 100) for group v1.user.queue: None. 2016-08-22 20:43:04,678 - kafka.coordinator - DEBUG - Sending group coordinator request for group v1.user.queue to broker 100
fyi, we turned on the following in log4j:
log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender log4j.additivity.kafka.server.KafkaApis=true log4j.logger.kafka.request.logger=TRACE, requestAppender log4j.additivity.kafka.request.logger=true log4j.logger.kafka.controller=TRACE, controllerAppender log4j.additivity.kafka.controller=true log4j.logger.state.change.logger=TRACE, stateChangeAppender log4j.additivity.state.change.logger=true
Please let us know how we can proceed forward to find out the root cause.