Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3264

Add load shedding policy into Kafka Consumers

    XMLWordPrintableJSON

Details

    Description

      There are situations when Flink's Kafka Consumer is not able to consume everything produced into a topic, for example when one Flink instance is subscribed to a busy Kafka topic (See user request: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Frequent-exceptions-killing-streaming-job-td4323.html )

      I think we should allow users to control the behavior of the Kafka consumer in those situations.

      I had an offline discussion with StephanEwen about this and we think that the allowing users to pass a LoadSheddingPolicy to the KafkaConsumer would be the best solution.
      In the policy, users can define a frequency for the consumer to request the latest offsets in the subscribed partitions (the requests can either be based on time (every n ms) or on record count (every n'th record). Then, the policy can decide to skip a certain amount of offsets (maybe even set to the latest offset).
      With the "offset skipping" approach, we'll avoid fetching records we can not process anyways.

      In the 0.9 consumer, there doesn't seem to be an API for requesting the latest offset of a topicPartition. I'll ask on the Kafka ML whats the status there.
      With seek() we can fetch from any offset.

      In the 0.8 SimpleConsumer, there is a method for requesting the offsets:

      kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
      			OffsetResponse response = consumer.getOffsetsBefore(request);
      

      The fetch offset is controlled within the LegacyFetcher.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              rmetzger Robert Metzger
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: