Kafka
  1. Kafka
  2. KAFKA-671

DelayedProduce requests should not hold full producer request data

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.1
    • Component/s: None
    • Labels:

      Description

      Per summary, this leads to unnecessary memory usage.

      1. outOfMemFix-v1.patch
        8 kB
        Sriram Subramanian
      2. outOfMemFix-v2.patch
        19 kB
        Sriram Subramanian
      3. outOfMemFix-v2-rebase.patch
        19 kB
        Sriram Subramanian
      4. outOfMemFix-v3.patch
        14 kB
        Sriram Subramanian
      5. outOfMemFix-v4.patch
        15 kB
        Sriram Subramanian
      6. outOfMemFix-v4-rebase.patch
        17 kB
        Sriram Subramanian
      7. outOfMemFix-v5.patch
        18 kB
        Sriram Subramanian

        Activity

        Hide
        Jay Kreps added a comment -

        I am concerned this may be a blocker for production usage. If this adds 200k per request with 5000 outstanding requests that is 1G of memory. This is not too far out there for production usage with a high number of producers.

        Show
        Jay Kreps added a comment - I am concerned this may be a blocker for production usage. If this adds 200k per request with 5000 outstanding requests that is 1G of memory. This is not too far out there for production usage with a high number of producers.
        Hide
        Sriram Subramanian added a comment -

        This is one possible fix. The fix would largely depend on the decision for Kafka 703 but I just wanted one possible fix to be available to make better decision on the other bug.

        1. We now create a map of topic partion -> messagesizes and DelayedProduce uses this instead of the entire data payload.
        2. We reset the data payload at the end of HandleProduceRequest.

        The only way I have found to reset the immutable maps is to make them vars (Other option is to convert them to mutable maps but that involves a ton of code changes (all the place where the data objects are passed around need to be changed to mutable maps). Let me know of a better option in Scala.

        The expectation is that the data object will not be used after the reset method is called.

        Show
        Sriram Subramanian added a comment - This is one possible fix. The fix would largely depend on the decision for Kafka 703 but I just wanted one possible fix to be available to make better decision on the other bug. 1. We now create a map of topic partion -> messagesizes and DelayedProduce uses this instead of the entire data payload. 2. We reset the data payload at the end of HandleProduceRequest. The only way I have found to reset the immutable maps is to make them vars (Other option is to convert them to mutable maps but that involves a ton of code changes (all the place where the data objects are passed around need to be changed to mutable maps). Let me know of a better option in Scala. The expectation is that the data object will not be used after the reset method is called.
        Hide
        Neha Narkhede added a comment - - edited

        Overall, the changes look good. Minor suggestions -

        1. KafkaApis
        1.1 Can we rename messageSizeInBytes to messageSetSize ?
        1.2 Similarly rename messageSizes to messageSetSize

        2. ProducerRequest
        I'm guessing the resetData() API is added with the goal of helping with garbage collection ? I think this might not be required since once the produce request is handled, whether or not it entered the purgatory, it will be eligible for garbage collection

        Show
        Neha Narkhede added a comment - - edited Overall, the changes look good. Minor suggestions - 1. KafkaApis 1.1 Can we rename messageSizeInBytes to messageSetSize ? 1.2 Similarly rename messageSizes to messageSetSize 2. ProducerRequest I'm guessing the resetData() API is added with the goal of helping with garbage collection ? I think this might not be required since once the produce request is handled, whether or not it entered the purgatory, it will be eligible for garbage collection
        Hide
        Sriram Subramanian added a comment -

        2. I don't think so since DelayedProduce is still holding on to the RequestChannel's Request object which is the produceRequest. Let me know if that is not the case.

        Show
        Sriram Subramanian added a comment - 2. I don't think so since DelayedProduce is still holding on to the RequestChannel's Request object which is the produceRequest. Let me know if that is not the case.
        Hide
        Neha Narkhede added a comment -

        2. You maybe right. However, given that you set a large enough young gen compared to the total heap size, I wonder what is the impact of nullifying the data. It will be worth doing a quick experiment to study GC patterns and confirm that it does help. The only downside of this approach is that it looks ugly.

        Show
        Neha Narkhede added a comment - 2. You maybe right. However, given that you set a large enough young gen compared to the total heap size, I wonder what is the impact of nullifying the data. It will be worth doing a quick experiment to study GC patterns and confirm that it does help. The only downside of this approach is that it looks ugly.
        Hide
        Sriram Subramanian added a comment -

        2. I am not sure if it is an issue in the first place. Depends on how long these objects stay in the queue. If 5000 outstanding requests can be reached as suggested by Jay above then it does seem like an issue. Let me know what you find in your GC investigation. I agree it is ugly. Another option is to totally remove the request object dependency from DelayedItems but that would be a larger change.

        Show
        Sriram Subramanian added a comment - 2. I am not sure if it is an issue in the first place. Depends on how long these objects stay in the queue. If 5000 outstanding requests can be reached as suggested by Jay above then it does seem like an issue. Let me know what you find in your GC investigation. I agree it is ugly. Another option is to totally remove the request object dependency from DelayedItems but that would be a larger change.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. I agree with Neha that ProducerRequest.resetData() seems a bit hacky. Another thing is that currently, we log the request object in trace in RequestChannel.Request when processing the response. Resetting the data in the produce request may break the logic there.

        Maybe we should test it out and see if this is really a problem. It's true that there could be many producers. However, it may take some time for them to generate 1GB of data.

        Show
        Jun Rao added a comment - Thanks for the patch. I agree with Neha that ProducerRequest.resetData() seems a bit hacky. Another thing is that currently, we log the request object in trace in RequestChannel.Request when processing the response. Resetting the data in the produce request may break the logic there. Maybe we should test it out and see if this is really a problem. It's true that there could be many producers. However, it may take some time for them to generate 1GB of data.
        Hide
        Jun Rao added a comment -

        Thinking about this a bit more. There is another approach. In KafkaApis.handle(), after the request is handled, we set RequestChannel.Request.requestObj to null. At this point, the response is not sent yet. Currently, requestObj is needed when sending a response is in RequestChannel.Request.updateRequestMetrics() where we need to print requestObj (in trace). What we can do is that in RequestChannel.Request, after requestObj is constructed, get and cache the string value of request (set the string to null if not trace enabled). We also need to cache isFromFollower which is obtained from requestObj. This way, we don't need ProducerRequest.resetData() and we can guarantee that after KafkaApis.handle(), each requestObj can be GC-ed.

        Show
        Jun Rao added a comment - Thinking about this a bit more. There is another approach. In KafkaApis.handle(), after the request is handled, we set RequestChannel.Request.requestObj to null. At this point, the response is not sent yet. Currently, requestObj is needed when sending a response is in RequestChannel.Request.updateRequestMetrics() where we need to print requestObj (in trace). What we can do is that in RequestChannel.Request, after requestObj is constructed, get and cache the string value of request (set the string to null if not trace enabled). We also need to cache isFromFollower which is obtained from requestObj. This way, we don't need ProducerRequest.resetData() and we can guarantee that after KafkaApis.handle(), each requestObj can be GC-ed.
        Hide
        Sriram Subramanian added a comment -

        I think to do this right we should limit the change to produceRequest instead of getting rid of the requestobj completely. The reason is as you mention you would need to cache a bunch of fields from requestObj to be used in updateRequestMetrics while sending the response. This can be two fields today but going forward you may need more fields and this would need to be constantly maintained. I think if produceRequest handles data being null, the fix would be more robust and isolates the changes within produce request.

        Show
        Sriram Subramanian added a comment - I think to do this right we should limit the change to produceRequest instead of getting rid of the requestobj completely. The reason is as you mention you would need to cache a bunch of fields from requestObj to be used in updateRequestMetrics while sending the response. This can be two fields today but going forward you may need more fields and this would need to be constantly maintained. I think if produceRequest handles data being null, the fix would be more robust and isolates the changes within produce request.
        Hide
        Neha Narkhede added a comment -

        Nullifying the request object seems like a bigger change. I'm wondering about the precise impact to GC that these changes will introduce. If the impact is just that the objects get garbage collected within 1 iteration of the young gen collector VS 2-3 iterations, I would say the performance upside of this change is not worth the risk. But if it significantly reduces garbage collected overhead, it might be worth looking further into. Even if we have to do this, I agree with Sriram that his earlier change is smaller impact than nullifying request object and caching a bunch of things to get around it.

        Show
        Neha Narkhede added a comment - Nullifying the request object seems like a bigger change. I'm wondering about the precise impact to GC that these changes will introduce. If the impact is just that the objects get garbage collected within 1 iteration of the young gen collector VS 2-3 iterations, I would say the performance upside of this change is not worth the risk. But if it significantly reduces garbage collected overhead, it might be worth looking further into. Even if we have to do this, I agree with Sriram that his earlier change is smaller impact than nullifying request object and caching a bunch of things to get around it.
        Hide
        Neha Narkhede added a comment -

        Thinking about this a little more, the real problem seems to be that we hang onto the request object in DelayedProduce until we send out the response. There are 2 reasons for this -
        1. The request latency metrics are part of the request object. These need to be updated when the response is created.
        2. To send out the response, we need the selector key, which is inside the request object.

        To handle delayed produce requests without hanging onto the produce request data, we will need to -
        1. Remove the request object from DelayedProduce
        2. Pass in the selector key into DelayedProduce
        3. Define the request metrics in a separate object and remove those from the Request object. Pass in the new RequestMetrics object into DelayedProduce

        Since this requires changing the DelayedRequest object as well, it will affect all requests. My guess is that this refactoring is not that big of a change, but I could be wrong.

        Show
        Neha Narkhede added a comment - Thinking about this a little more, the real problem seems to be that we hang onto the request object in DelayedProduce until we send out the response. There are 2 reasons for this - 1. The request latency metrics are part of the request object. These need to be updated when the response is created. 2. To send out the response, we need the selector key, which is inside the request object. To handle delayed produce requests without hanging onto the produce request data, we will need to - 1. Remove the request object from DelayedProduce 2. Pass in the selector key into DelayedProduce 3. Define the request metrics in a separate object and remove those from the Request object. Pass in the new RequestMetrics object into DelayedProduce Since this requires changing the DelayedRequest object as well, it will affect all requests. My guess is that this refactoring is not that big of a change, but I could be wrong.
        Hide
        Sriram Subramanian added a comment -
        • Ignore the changes in bin and system tests
        • this change is cleaner and a lot safer.
        • the data is now an mutable map
        • we just cache the topicpartition - message size in producer request
        • we clear the map after handling the request
        • the toString implementation of produceRequest uses the cached map instead of the data
        • the byteBuffer in RequestChannel.Request is now made private and is set to null after deserialization.
        • update test cases to work with these changes
        • I will be updating the thread with how the heap characteristics look before and after this change.
        Show
        Sriram Subramanian added a comment - Ignore the changes in bin and system tests this change is cleaner and a lot safer. the data is now an mutable map we just cache the topicpartition - message size in producer request we clear the map after handling the request the toString implementation of produceRequest uses the cached map instead of the data the byteBuffer in RequestChannel.Request is now made private and is set to null after deserialization. update test cases to work with these changes I will be updating the thread with how the heap characteristics look before and after this change.
        Hide
        Sriram Subramanian added a comment -

        rebased

        Show
        Sriram Subramanian added a comment - rebased
        Hide
        Neha Narkhede added a comment -

        Thanks for the v2 patch, few review comments -

        1. kafka-run-class
        Revert these changes, the default heap size of 5g is too large. Also for the GC configs, there is another JIRA tracking it.
        2. ProducerRequest
        How about renaming emptyData() to clear() for consistency ?
        3. What is the purpose of the mutable map changes in DefaultEventHandler ?

        Show
        Neha Narkhede added a comment - Thanks for the v2 patch, few review comments - 1. kafka-run-class Revert these changes, the default heap size of 5g is too large. Also for the GC configs, there is another JIRA tracking it. 2. ProducerRequest How about renaming emptyData() to clear() for consistency ? 3. What is the purpose of the mutable map changes in DefaultEventHandler ?
        Hide
        Sriram Subramanian added a comment -

        1. You can ignore the changes in system test and bin. I will update with a patch that does not have them.
        2. emptyData seems more clear in what is happening in the produceRequest object Vs clear. Let me know if you think otherwise.
        3. You need them since ProduceRequest data gets set from the methods in DefaultEventHandler. Is there any other way specific to Scala?

        Show
        Sriram Subramanian added a comment - 1. You can ignore the changes in system test and bin. I will update with a patch that does not have them. 2. emptyData seems more clear in what is happening in the produceRequest object Vs clear. Let me know if you think otherwise. 3. You need them since ProduceRequest data gets set from the methods in DefaultEventHandler. Is there any other way specific to Scala?
        Hide
        Jay Kreps added a comment -

        Took a look at this. Looks reasonable.

        Other atrocities have occurred inside RequestQueue, but they aren't from this patch. Re-opened KAFKA-683.

        For maps it is nicer to import scala.collection and then refer to mutable.Map rather than scala.mutable.Map.

        I think the question is why do we need to hang onto the ProduceRequest object at all? We are doing work to null it out, but why can't we just take the one or two fields we need from that in the delayed produce? If we do that then won't the producerequest be out of scope after handleProduce and get gc'd? Is the root cause of this the fact that we moved deserialization into the network thread and shoved the api object into the request?

        Show
        Jay Kreps added a comment - Took a look at this. Looks reasonable. Other atrocities have occurred inside RequestQueue, but they aren't from this patch. Re-opened KAFKA-683 . For maps it is nicer to import scala.collection and then refer to mutable.Map rather than scala.mutable.Map. I think the question is why do we need to hang onto the ProduceRequest object at all? We are doing work to null it out, but why can't we just take the one or two fields we need from that in the delayed produce? If we do that then won't the producerequest be out of scope after handleProduce and get gc'd? Is the root cause of this the fact that we moved deserialization into the network thread and shoved the api object into the request?
        Hide
        Sriram Subramanian added a comment -

        The issue is that even if we pass only the required fields from produceRequest to DelayedProduce, we also pass the actual Request itself to delayedProduce which is used in multiple places. Now that contains the requestObj and hence there is a non zero reference to it still. Further, when sending the response we depend on the requestObj at multiple places in updateRequestMetrics, one of them being the requirement to log the complete request. We would have to do some non trivial changes to be able to get rid of the request object completely which would probably need to wait till a later time.

        Show
        Sriram Subramanian added a comment - The issue is that even if we pass only the required fields from produceRequest to DelayedProduce, we also pass the actual Request itself to delayedProduce which is used in multiple places. Now that contains the requestObj and hence there is a non zero reference to it still. Further, when sending the response we depend on the requestObj at multiple places in updateRequestMetrics, one of them being the requirement to log the complete request. We would have to do some non trivial changes to be able to get rid of the request object completely which would probably need to wait till a later time.
        Hide
        Jay Kreps added a comment -

        Okay let's sync up. I think requestObj is the devil.

        Show
        Jay Kreps added a comment - Okay let's sync up. I think requestObj is the devil.
        Hide
        Sriram Subramanian added a comment -

        KAFKA-745 is tracking the cleanup of RequestChannel.

        Show
        Sriram Subramanian added a comment - KAFKA-745 is tracking the cleanup of RequestChannel.
        Hide
        Sriram Subramanian added a comment -

        Jay - are you fine with checking this in? We would like to see the impact in shadow. We are using Kafka-745 for tracking the cleanup for RequestChannel.

        Show
        Sriram Subramanian added a comment - Jay - are you fine with checking this in? We would like to see the impact in shadow. We are using Kafka-745 for tracking the cleanup for RequestChannel.
        Hide
        Jay Kreps added a comment -

        I think hacking requestObj is fine as an intermediate step but we need a plan to get rid of it in 0.8. Jun what's the plan? I am +1 when we have figured that out.

        One nit pick in the current code is:
        val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
        buffer.rewind()
        + buffer = null

        Why are we keeping the buffer around at all if we immediately null it out (and we probably don't need to rewind it first)...?

        Show
        Jay Kreps added a comment - I think hacking requestObj is fine as an intermediate step but we need a plan to get rid of it in 0.8. Jun what's the plan? I am +1 when we have figured that out. One nit pick in the current code is: val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer.rewind() + buffer = null Why are we keeping the buffer around at all if we immediately null it out (and we probably don't need to rewind it first)...?
        Hide
        Sriram Subramanian added a comment -

        rebased

        Show
        Sriram Subramanian added a comment - rebased
        Hide
        Jun Rao added a comment -

        Patch v4 doesn't apply. 0.8 has moved since the patch is uploaded. Could you rebase again?

        Show
        Jun Rao added a comment - Patch v4 doesn't apply. 0.8 has moved since the patch is uploaded. Could you rebase again?
        Hide
        Sriram Subramanian added a comment -

        rebased

        Show
        Sriram Subramanian added a comment - rebased
        Hide
        Neha Narkhede added a comment -

        Looks good overall. Just one comment -

        In the toString(), the map operation is not correct. If we just want to print a list of topic partitions, shouldn't it just be topicPartitionMessageSizeMap.keys.mkString(",") ?

        Other than that, I'm +1 on this. If others don't have any concerns, I can check this in after the above change is made

        Show
        Neha Narkhede added a comment - Looks good overall. Just one comment - In the toString(), the map operation is not correct. If we just want to print a list of topic partitions, shouldn't it just be topicPartitionMessageSizeMap.keys.mkString(",") ? Other than that, I'm +1 on this. If others don't have any concerns, I can check this in after the above change is made
        Hide
        Sriram Subramanian added a comment -

        We want to print the topicPartition and the size.

        Show
        Sriram Subramanian added a comment - We want to print the topicPartition and the size.
        Hide
        Neha Narkhede added a comment -

        I see, in that case, I still don't see the use of the map(). It is not transforming anything -
        topicPartitionMessageSizeMap.map(r => r._1 -> r._2).toMap.mkString(",")

        Show
        Neha Narkhede added a comment - I see, in that case, I still don't see the use of the map(). It is not transforming anything - topicPartitionMessageSizeMap.map(r => r._1 -> r._2).toMap.mkString(",")
        Hide
        Sriram Subramanian added a comment -

        Yes it can be simplified.

        Show
        Sriram Subramanian added a comment - Yes it can be simplified.
        Hide
        Neha Narkhede added a comment -

        +1. If others don't have any other input, I will go ahead and check this in

        Show
        Neha Narkhede added a comment - +1. If others don't have any other input, I will go ahead and check this in
        Hide
        Neha Narkhede added a comment -

        Checked in patch v5

        Show
        Neha Narkhede added a comment - Checked in patch v5

          People

          • Assignee:
            Sriram Subramanian
            Reporter:
            Joel Koshy
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development