Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3775

Throttle maximum number of tasks assigned to a single KafkaStreams

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Won't Fix
    • Affects Version/s: 0.10.0.0
    • Fix Version/s: None
    • Component/s: streams
    • Labels:
      None

      Description

      As of today, if I start a Kafka Streams app on a single machine which consists of single KafkaStreams instance, that instance gets all partitions of the target topic assigned.
      As we're using it to process topics which has huge number of partitions and message traffic, it is a problem that we don't have a way of throttling the maximum amount of partitions assigned to a single instance.

      In fact, when we started a Kafka Streams app which consumes a topic which has more than 10MB/sec traffic of each partition we saw that all partitions assigned to the first instance and soon the app dead by OOM.
      I know that there's some workarounds considerable here. for example:

      • Start multiple instances at once so the partitions distributed evenly.
        => Maybe works. but as Kafka Streams is a library but not an execution framework, there's no predefined procedure of starting Kafka Streams apps so some users might wanna take an option to start the first single instance and check if it works as expected with lesster number of partitions(I want )
      • Adjust config parameters such as buffered.records.per.partition, max.partition.fetch.bytes and max.poll.records to reduce the heap pressure.
        => Maybe works. but still have two problems IMO:
      • Still leads traffic explosion with high throughput processing as it accepts all incoming messages from hundreads of partitions.
      • In the first place, by the distributed system principle, it's wired that users don't have a away to control maximum "partitions" assigned to a single shard(an instance of KafkaStreams here). Users should be allowed to provide the maximum amount of partitions that is considered as possible to be processed with single instance(or host).

      Here, I'd like to introduce a new configuration parameter max.tasks.assigned, which limits the number of tasks(a notion of partition) assigned to the processId(which is the notion of single KafkaStreams instance).
      At the same time we need to change StreamPartitionAssignor(TaskAssignor) to tolerate the incomplete assignment. That is, Kafka Streams should continue working for the part of partitions even there are some partitions left unassigned, in order to satisfy this> "user may want to take an option to start the first single instance and check if it works as expected with lesster number of partitions(I want )".

      I've implemented the rough POC for this. PTAL and if it make sense I will continue sophisticating it.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kawamuray opened a pull request:

          https://github.com/apache/kafka/pull/1460

          KAFKA-3775: Throttle maximum number of tasks assigned to a single KafkaStreams

          Issue: https://issues.apache.org/jira/browse/KAFKA-3775

          POC. Discussion in progress.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kawamuray/kafka KAFKA-3775-throttle-tasks

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/1460.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1460


          commit fefe259b2c97bb1bbf14b572533ca74348651c0d
          Author: Yuto Kawamura <kawamuray.dadada@gmail.com>
          Date: 2016-06-02T03:46:51Z

          MINOR: Add toString() to ClientState for debugging

          commit c4f363d32d9a496c0f4b4e66ee846429a2a2eda5
          Author: Yuto Kawamura <kawamuray.dadada@gmail.com>
          Date: 2016-06-02T03:51:34Z

          MINOR: Remove meanglessly repeated assertions in unit test

          commit 3c173fa5d029277e5d1974c104d7e66939b5cd17
          Author: Yuto Kawamura <kawamuray.dadada@gmail.com>
          Date: 2016-06-02T03:55:10Z

          KAFKA-3775: Intorduce new streams configuration max.tasks.assigned

          This configuration limits the maximum number of tasks assigned to a single KafkaStreams instance.
          As a task consists of single partition for more than 1 topic, setting this value to lower is useful
          to prevent huge number of partitions are assigned to an instance which started first.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kawamuray opened a pull request: https://github.com/apache/kafka/pull/1460 KAFKA-3775 : Throttle maximum number of tasks assigned to a single KafkaStreams Issue: https://issues.apache.org/jira/browse/KAFKA-3775 POC. Discussion in progress. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kawamuray/kafka KAFKA-3775 -throttle-tasks Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1460.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1460 commit fefe259b2c97bb1bbf14b572533ca74348651c0d Author: Yuto Kawamura <kawamuray.dadada@gmail.com> Date: 2016-06-02T03:46:51Z MINOR: Add toString() to ClientState for debugging commit c4f363d32d9a496c0f4b4e66ee846429a2a2eda5 Author: Yuto Kawamura <kawamuray.dadada@gmail.com> Date: 2016-06-02T03:51:34Z MINOR: Remove meanglessly repeated assertions in unit test commit 3c173fa5d029277e5d1974c104d7e66939b5cd17 Author: Yuto Kawamura <kawamuray.dadada@gmail.com> Date: 2016-06-02T03:55:10Z KAFKA-3775 : Intorduce new streams configuration max.tasks.assigned This configuration limits the maximum number of tasks assigned to a single KafkaStreams instance. As a task consists of single partition for more than 1 topic, setting this value to lower is useful to prevent huge number of partitions are assigned to an instance which started first.
          Hide
          kawamuray Yuto Kawamura added a comment -

          Guozhang Wang What do you think?

          Show
          kawamuray Yuto Kawamura added a comment - Guozhang Wang What do you think?
          Hide
          mjsax Matthias J. Sax added a comment -

          I have some concerns about this:
          1) a KStreams application should process the whole topic and not parts of it – limiting the number of partitions is kinda artificial from my point of view
          2) even if we limit the number of partitions, it is quite random which would get processed which not – I would assume that users would like to have a more transparent assignment
          3) last but not least, under the hood we are using the standard Java KafkaConsumer: looking at your patch (just briefly), it seems you changed the task assignment – however, this is independent from the partitions assignment of the used consumer – thus, the consumer would still poll all partitions but would not be able to assign records for some partitions as the corresponding tasks are missing.

          Haven elaborated 3) this raises a more complex problem: Right now, KafkaStreams relies on Kafka's internal partition assignment. If you want to assign only some partitions, we cannot use standard high level Java KafkaConsumer and would need to implement an own assignment strategy to allow for partial assignment within a consumer group (ie, allow a consumer group that does not assign all partitions).

          Show
          mjsax Matthias J. Sax added a comment - I have some concerns about this: 1) a KStreams application should process the whole topic and not parts of it – limiting the number of partitions is kinda artificial from my point of view 2) even if we limit the number of partitions, it is quite random which would get processed which not – I would assume that users would like to have a more transparent assignment 3) last but not least, under the hood we are using the standard Java KafkaConsumer: looking at your patch (just briefly), it seems you changed the task assignment – however, this is independent from the partitions assignment of the used consumer – thus, the consumer would still poll all partitions but would not be able to assign records for some partitions as the corresponding tasks are missing. Haven elaborated 3) this raises a more complex problem: Right now, KafkaStreams relies on Kafka's internal partition assignment. If you want to assign only some partitions, we cannot use standard high level Java KafkaConsumer and would need to implement an own assignment strategy to allow for partial assignment within a consumer group (ie, allow a consumer group that does not assign all partitions).
          Hide
          kawamuray Yuto Kawamura added a comment -

          Thanks for feedback Matthias J. Sax .

          > 1) a KStreams application should process the whole topic and not parts of it – limiting the number of partitions is kinda artificial from my point of view

          So the question is what "KStreams application" consists of. I know that Kafka Streams is designed to work evenly with standalone but the main purpose of making it able to work as standalone is about easy development and testing IIUC. Practially, if we try to run it with the production traffic which consists of hundreads of partitions, it is practially impossible to assign all partitions to a single instance transparently. Indeed restricting the maximum number of partition per instance is an artificial control but that should be given as Kafka Streams is not an execution framework as I said. Users have almost full control of how to construct the Kafka Streams app cluster, that is, it should be allowed to run instances gradually one by one instead of starting necessary number of instances at once, but it's impossible with the existing impl by the reason I described.

          > 2) even if we limit the number of partitions, it is quite random which would get processed which not – I would assume that users would like to have a more transparent assignment

          I think Kafka Streams partition assignment already isn't transparent. Unless the sticky partition assignment strategy enabled, StreamPartitionAssignor chooses which task(partition) assigned to which instance in round robin with intorducing randomness. That is, we have no control of which partition assigned to which instance by nature.
          At least you can ensure that all partitions are being assigned if you start instances more than partitions / `max.assigned.tasks`, and also it's remain possible to not take this option by leaving the configuration with default value(Interger.MAX_VALUE) which guarantees that single instance still accepts all tasks(partitions) assigned.

          > 3) last but not least, under the hood we are using the standard Java KafkaConsumer: looking at your patch (just briefly), it seems you changed the task assignment – however, this is independent from the partitions assignment of the used consumer – thus, the consumer would still poll all partitions but would not be able to assign records for some partitions as the corresponding tasks are missing.

          Hmm, not sure if I'm understanding your explanation correctly but this sounds different from what I know.
          First, KafkaStreams is providing custom PartitionAssignor; StreamPartitionAssignor which takes full control of which partition to assign which consumer thread of which instance.
          Second, the consuemr polls only partitions which it gets assigned by group coordinator that relies on PartitionAssignor to decide the actual assignment. So that is, an instance will never get a record from the partition which isn't being assigned to it, therefore what you've concerned will never happend IIUC.
          Am I misunderstand something?

          Show
          kawamuray Yuto Kawamura added a comment - Thanks for feedback Matthias J. Sax . > 1) a KStreams application should process the whole topic and not parts of it – limiting the number of partitions is kinda artificial from my point of view So the question is what "KStreams application" consists of. I know that Kafka Streams is designed to work evenly with standalone but the main purpose of making it able to work as standalone is about easy development and testing IIUC. Practially, if we try to run it with the production traffic which consists of hundreads of partitions, it is practially impossible to assign all partitions to a single instance transparently. Indeed restricting the maximum number of partition per instance is an artificial control but that should be given as Kafka Streams is not an execution framework as I said. Users have almost full control of how to construct the Kafka Streams app cluster, that is, it should be allowed to run instances gradually one by one instead of starting necessary number of instances at once, but it's impossible with the existing impl by the reason I described. > 2) even if we limit the number of partitions, it is quite random which would get processed which not – I would assume that users would like to have a more transparent assignment I think Kafka Streams partition assignment already isn't transparent. Unless the sticky partition assignment strategy enabled, StreamPartitionAssignor chooses which task(partition) assigned to which instance in round robin with intorducing randomness. That is, we have no control of which partition assigned to which instance by nature. At least you can ensure that all partitions are being assigned if you start instances more than partitions / `max.assigned.tasks` , and also it's remain possible to not take this option by leaving the configuration with default value(Interger.MAX_VALUE) which guarantees that single instance still accepts all tasks(partitions) assigned. > 3) last but not least, under the hood we are using the standard Java KafkaConsumer: looking at your patch (just briefly), it seems you changed the task assignment – however, this is independent from the partitions assignment of the used consumer – thus, the consumer would still poll all partitions but would not be able to assign records for some partitions as the corresponding tasks are missing. Hmm, not sure if I'm understanding your explanation correctly but this sounds different from what I know. First, KafkaStreams is providing custom PartitionAssignor; StreamPartitionAssignor which takes full control of which partition to assign which consumer thread of which instance. Second, the consuemr polls only partitions which it gets assigned by group coordinator that relies on PartitionAssignor to decide the actual assignment. So that is, an instance will never get a record from the partition which isn't being assigned to it, therefore what you've concerned will never happend IIUC. Am I misunderstand something?
          Hide
          BigAndy Andy Coates added a comment - - edited

          I wonder how such a design would maintain the level of resiliency currently offered? At the moment, in a running multi-process cluster, the other processes pick up the slack if one of them should fail. With the purposed design some partitions would remain with out a consumer. This seems like a fundamental switch away from Kafka's current model, and a risky one in IMHO.

          Could you also elaborate on why settings such as 'max.poll.records' don't help stop your initial instance going pop? Maybe there are other alternative solutions here...

          Show
          BigAndy Andy Coates added a comment - - edited I wonder how such a design would maintain the level of resiliency currently offered? At the moment, in a running multi-process cluster, the other processes pick up the slack if one of them should fail. With the purposed design some partitions would remain with out a consumer. This seems like a fundamental switch away from Kafka's current model, and a risky one in IMHO. Could you also elaborate on why settings such as 'max.poll.records' don't help stop your initial instance going pop? Maybe there are other alternative solutions here...
          Hide
          kawamuray Yuto Kawamura added a comment - - edited

          Thanks for feedback Andy Coates .

          > With the purposed design some partitions would remain with out a consumer. This seems like a fundamental switch away from Kafka's current model, and a risky one in IMHO.

          Some partitions would remain without a consumer if the number of living instances become lower than the number of num of partitions / max.tasks.assigned.
          Let's say you have 100 partitions and launching 50 KafkaStreams instances with setting max.tasks.assigned=5. When you started all 50 instances each instance might get 2 partitions assigned, which is the desired distribution.
          Then what will happen when an instance failed? 2 partitions which were held by the dead instance will be reassigned to remaining instances without any problem as other instances still have plenty number of max.tasks.assigned.
          If more than 31 instances dead at the moment, yes, some partitions will be remain unassigned, but this is out of consideration as the value of max.tasks.assigned was determined with the consideration to the amount of system resources(CPU, mem, network bandwidth), which means these unassigned partitions could never be processed normally even they reassigned to the living instances because of hardware resource is limited.

          > This seems like a fundamental switch away from Kafka's current model, and a risky one in IMHO.

          BTW, may I ask what you meant by "Kafka's current model" and what risk could you expect much concretely?(user won't noticed unassigned partitions existence?)

          > Could you also elaborate on why settings such as 'max.poll.records' don't help stop your initial instance going pop? Maybe there are other alternative solutions here...

          Because even I set max.poll.records to lower, it reduced the number of records fetched by single Fetch request but instead the number of Fetch request will be increased. That means the total throughput wouldn't chagne which still leads traffic bursting.
          At the same time, it doesn't make sense to me that adjusting the value of max.poll.records with expecting that a single instance gets all partitions assigned, as I can set that value to much higher practically when other instances join the group and partitions are evenly distributed.

          Show
          kawamuray Yuto Kawamura added a comment - - edited Thanks for feedback Andy Coates . > With the purposed design some partitions would remain with out a consumer. This seems like a fundamental switch away from Kafka's current model, and a risky one in IMHO. Some partitions would remain without a consumer if the number of living instances become lower than the number of num of partitions / max.tasks.assigned . Let's say you have 100 partitions and launching 50 KafkaStreams instances with setting max.tasks.assigned=5 . When you started all 50 instances each instance might get 2 partitions assigned, which is the desired distribution. Then what will happen when an instance failed? 2 partitions which were held by the dead instance will be reassigned to remaining instances without any problem as other instances still have plenty number of max.tasks.assigned . If more than 31 instances dead at the moment, yes, some partitions will be remain unassigned, but this is out of consideration as the value of max.tasks.assigned was determined with the consideration to the amount of system resources(CPU, mem, network bandwidth), which means these unassigned partitions could never be processed normally even they reassigned to the living instances because of hardware resource is limited. > This seems like a fundamental switch away from Kafka's current model, and a risky one in IMHO. BTW, may I ask what you meant by "Kafka's current model" and what risk could you expect much concretely?(user won't noticed unassigned partitions existence?) > Could you also elaborate on why settings such as 'max.poll.records' don't help stop your initial instance going pop? Maybe there are other alternative solutions here... Because even I set max.poll.records to lower, it reduced the number of records fetched by single Fetch request but instead the number of Fetch request will be increased. That means the total throughput wouldn't chagne which still leads traffic bursting. At the same time, it doesn't make sense to me that adjusting the value of max.poll.records with expecting that a single instance gets all partitions assigned, as I can set that value to much higher practically when other instances join the group and partitions are evenly distributed.
          Hide
          BigAndy Andy Coates added a comment -

          Yes, the risk I'm taking about is unassigned partitions.

          I would expect that you should be able to set max.poll.records to a level that your application can handle without OOM'ing. This should be a the same level regardless of the number of partitions assigned.

          But I guess what this doesn't take into account is any per-partition state that is being maintained by your application as it processes the messages from the different partitions. Is this what is causing your application to OOM? If not, can you shed light on what is eating up your heap and causing the OOM?

          Show
          BigAndy Andy Coates added a comment - Yes, the risk I'm taking about is unassigned partitions. I would expect that you should be able to set max.poll.records to a level that your application can handle without OOM'ing. This should be a the same level regardless of the number of partitions assigned. But I guess what this doesn't take into account is any per-partition state that is being maintained by your application as it processes the messages from the different partitions. Is this what is causing your application to OOM? If not, can you shed light on what is eating up your heap and causing the OOM?
          Hide
          guozhang Guozhang Wang added a comment -

          Hi Matthias J. Sax Yuto Kawamura, great discussion regarding 1) above! Personally I think it is flexible to let users specify which partitions among all the topics the topology defines as sources to be processed, which is better from a user experience point of view.

          Actually it is not completely true that "users don't have a away to control maximum "partitions" assigned to a single shard(an instance of KafkaStreams here)." In fact, the user customizable PartitionGrouper is used exactly for that, which takes the list of all topic-partitions as input, generates the tasks with each task assigned with some topic-partitions. The DefaultPartitionGrouper of course tries to capture all topic-partitions and generates multiple tasks for them. But users can also customize it by, for example, generating only one task which takes one partition for each of the input topic, and this single task will be assigned to the ONLY instance in your case. NOTE that this partition grouper is global, such that if you have two instances, both of them will execute the same PartitionGrouper, and if only one task is generated, some instance will become completely idle, and this need to be communicated clearly to users. Does that sound good?

          Show
          guozhang Guozhang Wang added a comment - Hi Matthias J. Sax Yuto Kawamura , great discussion regarding 1) above! Personally I think it is flexible to let users specify which partitions among all the topics the topology defines as sources to be processed, which is better from a user experience point of view. Actually it is not completely true that "users don't have a away to control maximum "partitions" assigned to a single shard(an instance of KafkaStreams here)." In fact, the user customizable PartitionGrouper is used exactly for that, which takes the list of all topic-partitions as input, generates the tasks with each task assigned with some topic-partitions. The DefaultPartitionGrouper of course tries to capture all topic-partitions and generates multiple tasks for them. But users can also customize it by, for example, generating only one task which takes one partition for each of the input topic, and this single task will be assigned to the ONLY instance in your case. NOTE that this partition grouper is global, such that if you have two instances, both of them will execute the same PartitionGrouper , and if only one task is generated, some instance will become completely idle, and this need to be communicated clearly to users. Does that sound good?
          Hide
          jkreps Jay Kreps added a comment -

          I'd have to agree I think this is a memory management problem I think we should solve that directly rather than having you throttle tasks which is a very roundabout way to control memory and would then lead to unprocessed partitions. I think the problems are
          1. Kafka consumer does a poor job of controlling memory usage (known issue, needs to be fixed)
          2. We may exacerbate it by giving configs around memory that are per-task when they should likely be global.
          3. Maybe others?

          Show
          jkreps Jay Kreps added a comment - I'd have to agree I think this is a memory management problem I think we should solve that directly rather than having you throttle tasks which is a very roundabout way to control memory and would then lead to unprocessed partitions. I think the problems are 1. Kafka consumer does a poor job of controlling memory usage (known issue, needs to be fixed) 2. We may exacerbate it by giving configs around memory that are per-task when they should likely be global. 3. Maybe others?
          Hide
          mjsax Matthias J. Sax added a comment -

          I want to push this discussion further. As Guozhang Wang suggested, it might be better to hand in a custom PartitionGrouper instead of patching StreamPartitionAssignor. Would this work for you use-case Yuto Kawamura? If yes, we could close this as "not a problem" a keep the list of configurable parameters short.

          Show
          mjsax Matthias J. Sax added a comment - I want to push this discussion further. As Guozhang Wang suggested, it might be better to hand in a custom PartitionGrouper instead of patching StreamPartitionAssignor . Would this work for you use-case Yuto Kawamura ? If yes, we could close this as "not a problem" a keep the list of configurable parameters short.
          Hide
          kawamuray Yuto Kawamura added a comment -

          Sorry for leaving this discussion for a while and thanks for all your comments. I was busy for other work before half of this week and just yesterday I finished deploying my Kafka Streams app on production. I've got some experience through that so let me summarize it here.

          So the initial configuration at the first moment was look like this:

          poll.ms=100(default)
          buffered.records.per.partition=1000(default)
          max.poll.records=Integer.MAX_VALUE(default)
          max.partition.fetch.bytes=1MiB(default)
          num.stream.threads=1
          

          First I tried to take a heapdump of the Kafka Streams process just before it dies. Then I found that there's 918772 instances of ConsumerRecord(most of them should be already a garbage as I took the heapdump with -F switch) which consumes more than 500MB of heap with it's referenced byte array at the moment. There was no other significant usage of heap by other objects(which are irrelevant to ConsumerRecord) so I'm sure this was causing OOM of my app.

          So I tried several configuration adjustment to avoid OOM. Here's the list I've tried:

          • Decrease buffered.records.per.partition from 1000(default) to 10 => No luck. Still OOM.
          • Decrease max.partition.fetch.bytes => Couldn't as we allow 1MiB size of message at maximum.
          • Decrease max.poll.records from Integer.MAX_VALUE(default) to 10 => Worked. No more OOM.

          Therefore by decreasing max.poll.records my application stop dying by OOM. Before that on each poll() invocation it might returned all records fetched for each partition so the memory could be exhausted very easy(I was misunderstanding about this point; I was thinking that poll() is never called as long as all tasks keep records more than buffered.records.per.partition but it was called continually in fact regardless to that because of poll.ms expiration).
          Network traffic increased about 300Mbps on that host but still not problematic ATM as the throughput was likely throttled by the single thread(num.stream.threads=1).

          After the all instances are up I confirmed that the total throughput isn't enough as I saw the consumption lag keep increasing. I increased the num.stream.threads up to 3 and did the same deployment again(I know that I could perform rolling restart but just wanted to see what will happen with increased number of threads).
          So again, first instance survived without OOM but this time the traffic on the NIC increased about 600Mbps which was almost critical level on our network spec. As I started rest of instances, all partitions are distributed equally and now they are running pretty well.

          So my conclusion is:

          • Decreasing max.poll.records to the proper value works in terms of OOM. Still it's not intuitive that it controls memory pressure as the heap usage throttling is just a side effect of this adjustment(it's not for this purpose but for adjusting interval to call consumer.poll() within proper moment to avoid assignment expiration IIUC).
          • Still couldn't throttle the network traffic. As I wrote above, when I started a single instance with giving num.stream.threads=3, the traffic on a NIC of that host reached it's maximum capacity(1Gbps) while it's on catch up read. This could be serious in terms of packet dropping as we're deploying other service daemons on the same node.
          • I'm still not certain what is the best way of doing it but I believe it's worthful if we have an option to throttle the maximum number of incoming messages to a single instance(or in other word, the maximum capability of single KafkaStreams instance) regarding both memory pressure and traffic. So I'm basically +1 on idea that Jay Kreps suggested(global memory allocation throttling) but still wondering what you can suggest me an option for throttling the network traffic.

          And about PartitionGrouper:
          So it can be used to reduce the number of target tasks but that can't be changed w/o rewriting configuration(to revert partition.grouper) and restarting an instance right?
          If so, that's too cumbersome to perform such a 2-step deployment. First I have to deploy a single instance of custom partition.grouper, then deploy rest of instances, and finally revert the configuration and deploy again the first instance? No way

          Show
          kawamuray Yuto Kawamura added a comment - Sorry for leaving this discussion for a while and thanks for all your comments. I was busy for other work before half of this week and just yesterday I finished deploying my Kafka Streams app on production. I've got some experience through that so let me summarize it here. So the initial configuration at the first moment was look like this: poll.ms=100( default ) buffered.records.per.partition=1000( default ) max.poll.records= Integer .MAX_VALUE( default ) max.partition.fetch.bytes=1MiB( default ) num.stream.threads=1 First I tried to take a heapdump of the Kafka Streams process just before it dies. Then I found that there's 918772 instances of ConsumerRecord (most of them should be already a garbage as I took the heapdump with -F switch) which consumes more than 500MB of heap with it's referenced byte array at the moment. There was no other significant usage of heap by other objects(which are irrelevant to ConsumerRecord) so I'm sure this was causing OOM of my app. So I tried several configuration adjustment to avoid OOM. Here's the list I've tried: Decrease buffered.records.per.partition from 1000(default) to 10 => No luck. Still OOM. Decrease max.partition.fetch.bytes => Couldn't as we allow 1MiB size of message at maximum. Decrease max.poll.records from Integer.MAX_VALUE(default) to 10 => Worked. No more OOM. Therefore by decreasing max.poll.records my application stop dying by OOM. Before that on each poll() invocation it might returned all records fetched for each partition so the memory could be exhausted very easy(I was misunderstanding about this point; I was thinking that poll() is never called as long as all tasks keep records more than buffered.records.per.partition but it was called continually in fact regardless to that because of poll.ms expiration). Network traffic increased about 300Mbps on that host but still not problematic ATM as the throughput was likely throttled by the single thread( num.stream.threads=1 ). After the all instances are up I confirmed that the total throughput isn't enough as I saw the consumption lag keep increasing. I increased the num.stream.threads up to 3 and did the same deployment again(I know that I could perform rolling restart but just wanted to see what will happen with increased number of threads). So again, first instance survived without OOM but this time the traffic on the NIC increased about 600Mbps which was almost critical level on our network spec. As I started rest of instances, all partitions are distributed equally and now they are running pretty well. So my conclusion is: Decreasing max.poll.records to the proper value works in terms of OOM. Still it's not intuitive that it controls memory pressure as the heap usage throttling is just a side effect of this adjustment(it's not for this purpose but for adjusting interval to call consumer.poll() within proper moment to avoid assignment expiration IIUC). Still couldn't throttle the network traffic. As I wrote above, when I started a single instance with giving num.stream.threads=3 , the traffic on a NIC of that host reached it's maximum capacity(1Gbps) while it's on catch up read. This could be serious in terms of packet dropping as we're deploying other service daemons on the same node. I'm still not certain what is the best way of doing it but I believe it's worthful if we have an option to throttle the maximum number of incoming messages to a single instance(or in other word, the maximum capability of single KafkaStreams instance) regarding both memory pressure and traffic. So I'm basically +1 on idea that Jay Kreps suggested(global memory allocation throttling) but still wondering what you can suggest me an option for throttling the network traffic. And about PartitionGrouper: So it can be used to reduce the number of target tasks but that can't be changed w/o rewriting configuration(to revert partition.grouper) and restarting an instance right? If so, that's too cumbersome to perform such a 2-step deployment. First I have to deploy a single instance of custom partition.grouper , then deploy rest of instances, and finally revert the configuration and deploy again the first instance? No way
          Hide
          guozhang Guozhang Wang added a comment -

          Hi Yuto Kawamura Thanks for sharing your usage scenarios, it is very helpful for us to make user experience improvements.

          In the long run, we definitely would like to make convenient memory management in Kafka Streams since 1) many users may start their applications in a container with strict memory limit, and 2) we want to control the case where task migration caused by, say failures, can cause cascading OOMs on other instances because of sudden increase of memory for new tasks; this is a similar scenario with your case but just in an reversed order: changing from multiple instances to less instances. And I agree that the static partition.grouper config is not best suited here. There are already some discussion in the KIP-63 thread, which I will try to summarize in a centralized wiki.

          In the near term, we can remove the continuous poll(0) just for rebalance once KIP-62 is adopted, which will handle the heartbeat mechanism of the consumer and hence streams do not need to worry about frequent polling just for that. After this change, the memory pressure from ConsumerRecord should be reduced.

          Does that sound good to you?

          Show
          guozhang Guozhang Wang added a comment - Hi Yuto Kawamura Thanks for sharing your usage scenarios, it is very helpful for us to make user experience improvements. In the long run, we definitely would like to make convenient memory management in Kafka Streams since 1) many users may start their applications in a container with strict memory limit, and 2) we want to control the case where task migration caused by, say failures, can cause cascading OOMs on other instances because of sudden increase of memory for new tasks; this is a similar scenario with your case but just in an reversed order: changing from multiple instances to less instances. And I agree that the static partition.grouper config is not best suited here. There are already some discussion in the KIP-63 thread, which I will try to summarize in a centralized wiki. In the near term, we can remove the continuous poll(0) just for rebalance once KIP-62 is adopted, which will handle the heartbeat mechanism of the consumer and hence streams do not need to worry about frequent polling just for that. After this change, the memory pressure from ConsumerRecord should be reduced. Does that sound good to you?
          Hide
          kawamuray Yuto Kawamura added a comment -

          Guozhang Wang Yes, both of your short and long term measures are sounds reasonable to me.

          Personally I'm satisfied with this overall discussion in terms of to avoid memory pressure of when there's very less number of instances of Kafka Streams.
          As I explained, for now, max.poll.records worked well at least to stop my app from dying by OOM. I'm also thinking that it'd be better to have much explicit way of configuring memory management of Kafka Streams as you said but that's not urgent for me for now.

          Still don't know what to do with concentrated network traffic once it became a serious level but I will fill up or reopen an issue once it became an actual problem like killing colocated services by exhausting NIC capacity for example.

          So I'm going to close my PR and this issue as it's not likely a good way of introducing throttling as most of people disagreed to introduce a risk of unassigned partitions. Expecting we'll continue discussing about the way you suggested in the proper KIP or issue for that.

          Again, thanks for all your help

          Show
          kawamuray Yuto Kawamura added a comment - Guozhang Wang Yes, both of your short and long term measures are sounds reasonable to me. Personally I'm satisfied with this overall discussion in terms of to avoid memory pressure of when there's very less number of instances of Kafka Streams. As I explained, for now, max.poll.records worked well at least to stop my app from dying by OOM. I'm also thinking that it'd be better to have much explicit way of configuring memory management of Kafka Streams as you said but that's not urgent for me for now. Still don't know what to do with concentrated network traffic once it became a serious level but I will fill up or reopen an issue once it became an actual problem like killing colocated services by exhausting NIC capacity for example. So I'm going to close my PR and this issue as it's not likely a good way of introducing throttling as most of people disagreed to introduce a risk of unassigned partitions. Expecting we'll continue discussing about the way you suggested in the proper KIP or issue for that. Again, thanks for all your help
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kawamuray closed the pull request at:

          https://github.com/apache/kafka/pull/1460

          Show
          githubbot ASF GitHub Bot added a comment - Github user kawamuray closed the pull request at: https://github.com/apache/kafka/pull/1460
          Hide
          wushujames James Cheng added a comment -

          Yuto Kawamura, for the network traffic, could you use Kafka's quota feature? http://kafka.apache.org/documentation.html#design_quotas Kafka lets you specify how much network bandwidth a given client-id is allowed to use. If Kafka Streams lets you specify a client id, then this would let you limit how much network traffic your application uses.

          Show
          wushujames James Cheng added a comment - Yuto Kawamura , for the network traffic, could you use Kafka's quota feature? http://kafka.apache.org/documentation.html#design_quotas Kafka lets you specify how much network bandwidth a given client-id is allowed to use. If Kafka Streams lets you specify a client id, then this would let you limit how much network traffic your application uses.
          Hide
          mjsax Matthias J. Sax added a comment -

          Teng Yutong James Cheng Kafka Streams allow to specify Consumer and Producer configurations the same way as for the regular Java consumer/producer via StreamConfig (some restrictions apply, as Kafka Streams does for example not allow to enable "auto.commit"). But using the quota feature should work.

          Show
          mjsax Matthias J. Sax added a comment - Teng Yutong James Cheng Kafka Streams allow to specify Consumer and Producer configurations the same way as for the regular Java consumer/producer via StreamConfig (some restrictions apply, as Kafka Streams does for example not allow to enable "auto.commit"). But using the quota feature should work.
          Hide
          guozhang Guozhang Wang added a comment - - edited

          Yuto Kawamura I dumped my thoughts about memory management in Kafka Streams in this wiki: https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams

          Feel free to comment on it.

          Show
          guozhang Guozhang Wang added a comment - - edited Yuto Kawamura I dumped my thoughts about memory management in Kafka Streams in this wiki: https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams Feel free to comment on it.

            People

            • Assignee:
              kawamuray Yuto Kawamura
              Reporter:
              kawamuray Yuto Kawamura
              Reviewer:
              Matthias J. Sax
            • Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development