Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3957

consumer timeout not being respected when kafka broker is not available

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Duplicate
    • 0.9.0.1
    • None
    • consumer
    • None

    Description

      KafkaConsumer v0.9::

      I have a consumer set up with session.timeout.ms set to 30s. I make a call like

      consumer.poll(10000)

      but if the kafka broker is down, that call will hang indefinitely.

      Digging into the code it seems that the timeout isn't respected:

      KafkaConsumer calls out to pollOnce() as seen below::

      private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
      // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
      coordinator.ensureCoordinatorKnown();

      // ensure we have partitions assigned if we expect to
      if (subscriptions.partitionsAutoAssigned())
      coordinator.ensurePartitionAssignment();

      // fetch positions if we have partitions we're subscribed to that we
      // don't know the offset for
      if (!subscriptions.hasAllFetchPositions())
      updateFetchPositions(this.subscriptions.missingFetchPositions());

      // init any new fetches (won't resend pending fetches)
      Cluster cluster = this.metadata.fetch();
      Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();

      // if data is available already, e.g. from a previous network client poll() call to commit,
      // then just return it immediately
      if (!records.isEmpty())

      { return records; }

      fetcher.initFetches(cluster);
      client.poll(timeout);
      return fetcher.fetchedRecords();
      }

      and we see that we stick on the call to coordinator.ensureCoordinatorKnown();

      AbstractCoordinator ::

      public void ensureCoordinatorKnown() {
      while (coordinatorUnknown()) {
      RequestFuture<Void> future = sendGroupMetadataRequest();
      client.poll(future);

      if (future.failed())

      { if (future.isRetriable()) client.awaitMetadataUpdate(); else throw future.exception(); }

      }
      }

      in this case the Future fails (since the broker is down) and then a call to client.awaitMetadataUpdate() is made which in the case of the ConsumerNetworkClient will block forever :

      public void awaitMetadataUpdate() {
      int version = this.metadata.requestUpdate();
      do

      { poll(Long.MAX_VALUE); }

      while (this.metadata.version() == version);
      }

      I feel that this is a bug. When you set a timeout on a call to a blocking method, that timeout should be respected and an exception should be thrown.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              vfumo Vincent Fumo
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: