Description
We have seen an issue lately where, after some time, a container will stop receiving messages from some partitions.
After examining the container, it appears that this is triggered by an abdication for a BrokerProxy that was not previously instantiated during the KafkaSystemConsumers.start() method. Here's what we see:
18:12:01,417 INFO KafkaSystemConsumer:128 - Abdicating for [MY-TOPIC,105] 18:12:01,418 INFO VerifiableProperties:68 - Verifying properties 18:12:01,419 INFO VerifiableProperties:68 - Property client.id is overridden to samza_consumer-job-thingy-i001-1395955794446-1 18:12:01,419 INFO VerifiableProperties:68 - Property metadata.broker.list is overridden to localhost:10251 18:12:01,419 INFO VerifiableProperties:68 - Property request.timeout.ms is overridden to 6000 18:12:01,420 INFO ClientUtils$:68 - Fetching metadata from broker id:0,host:localhost,port:10251 with correlation id 0 for 1 topic(s) Set(MY-TOPIC) 18:12:01,421 INFO SyncProducer:68 - Connected to localhost:10251 for producing 18:12:01,436 INFO SyncProducer:68 - Disconnecting from localhost:10251 18:12:01,437 INFO BrokerProxy:128 - Creating new SimpleConsumer for host localhost:10251 for system kafka 18:12:01,442 INFO GetOffset:128 - Validating offset 2872503 for topic and partition [MY-TOPIC,105] 18:12:01,456 INFO GetOffset:128 - Able to successfully read from offset 2872503 for topic and partition [MY-TOPIC,105]. Using it to instantiate consumer.
Notice that this log line never appears from BrokerProxy:
def start {
info("Starting " + toString)
...
}
Digging in a bit, KafkaSystemConsumer.refreshBrokers can create a new BrokerProxy that wasn't created in the KafkaSystemConsumer.start() method in cases where a partition was moved to a broker that it hasn't yet created a proxy for.
brokerOption match { case Some(broker) => def createBrokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) brokerProxies .getOrElseUpdate((broker.host, broker.port), createBrokerProxy) .addTopicPartition(head, Option(nextOffset)) case None => warn("No such topic-partition: %s, dropping." format head)
But it never starts the thread.
Attachments
Attachments
Issue Links
- is related to
-
SAMZA-579 KafkaSystemConsumer drops SSPs on failure
- Resolved