Kafka
  1. Kafka
  2. KAFKA-683

Fix correlation ids in all requests sent to kafka

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: None

      Description

      We should fix the correlation ids in every request sent to Kafka and fix the request log on the broker to specify not only the type of request and who sent it, but also the correlation id. This will be very helpful while troubleshooting problems in production.

      1. kafka-683-v3-unit-test.patch
        55 kB
        Neha Narkhede
      2. kafka-683-v3.patch
        54 kB
        Neha Narkhede
      3. kafka-683-v2-rebased-twice.patch
        54 kB
        Neha Narkhede
      4. kafka-683-v2-rebased.patch
        54 kB
        Neha Narkhede
      5. kafka-683-v2.patch
        54 kB
        Neha Narkhede
      6. kafka-683-v1.patch
        52 kB
        Neha Narkhede

        Activity

        Jay Kreps made changes -
        Status Reopened [ 4 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Hide
        Jay Kreps added a comment -

        Closing since everyone ignored my code quality pleas

        Show
        Jay Kreps added a comment - Closing since everyone ignored my code quality pleas
        Hide
        Jay Kreps added a comment -

        The design was that the network layer is fully generic and the interface it presents to the application code is basically a "queue" or "channel" to dequeue and enqueue requests and responses. All the network layer knows about requests and responses is that they are glorified byte arrays.

        The details of how we handle request processing or the format of requests is not part of the network system. There is a specific layer, KafkaRequestHandler which is the kafka part. Note how this bit happens in the processing thread pool not in the network thread pool.

        The thought I had was to eventually factor more out of the KafkaApis layer and into the KafkaRequestHandler layer to make KafkaApis easier to test in the absence of a network server (i.e. you should just call new KafkaApis and then test the individual handle() methods). That was the layer that is meant to adopt our "business logic" in kafka apis to details of serialization, network, etc. Neha and I had briefly discussed this a few times.

        I am not saying this plan is the right way to go, I just would have liked to discuss that approach.

        Show
        Jay Kreps added a comment - The design was that the network layer is fully generic and the interface it presents to the application code is basically a "queue" or "channel" to dequeue and enqueue requests and responses. All the network layer knows about requests and responses is that they are glorified byte arrays. The details of how we handle request processing or the format of requests is not part of the network system. There is a specific layer, KafkaRequestHandler which is the kafka part. Note how this bit happens in the processing thread pool not in the network thread pool. The thought I had was to eventually factor more out of the KafkaApis layer and into the KafkaRequestHandler layer to make KafkaApis easier to test in the absence of a network server (i.e. you should just call new KafkaApis and then test the individual handle() methods). That was the layer that is meant to adopt our "business logic" in kafka apis to details of serialization, network, etc. Neha and I had briefly discussed this a few times. I am not saying this plan is the right way to go, I just would have liked to discuss that approach.
        Hide
        Jun Rao added a comment -

        Yes, the main changes were made by me in kafka-203. If we want to track the time per request, we have to somehow know the request type and potentially other fields (e.g., to distinguish a replica fetch request and a consumer fetch request). If there is a better way to do this, I'd be happy to make the change.

        On second thought, is deserializing request in RequestChannel really bad? After all, it is supposed to be a request channel and putting request objects into it seems to makes sense too. This basically puts the deserialization overhead into the network threads instead of the API threads, which may not be ideal. However, the deserialization overhead should be small.

        Show
        Jun Rao added a comment - Yes, the main changes were made by me in kafka-203. If we want to track the time per request, we have to somehow know the request type and potentially other fields (e.g., to distinguish a replica fetch request and a consumer fetch request). If there is a better way to do this, I'd be happy to make the change. On second thought, is deserializing request in RequestChannel really bad? After all, it is supposed to be a request channel and putting request objects into it seems to makes sense too. This basically puts the deserialization overhead into the network threads instead of the API threads, which may not be ideal. However, the deserialization overhead should be small.
        Hide
        Jay Kreps added a comment -

        Yeah, I am on the fence about the metrics stuff, maybe it does make sense to measure request metrics based on entrance and exit from the queue though clearly that implementation is super super nasty (I think we could make request metrics a general concept of name/start/stop and track them up and down the stack without too much unpleasentness).

        But I still see on 0.8 the thing I am complaining about which is
        val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
        which if I understand correctly moves api deserialization into the network layer and into the network threads. This was not meant to be the definition of the network layer. It hard-codes our api set and format so you can no longer test that layer independently.

        But basically yes, I was doing the code review for the other bug and noticed that indeed a virtually everyone on the team has come and done something mean to my beautiful little request queue (only Swapnil is innocent!). Each of these things was done as part of a useful feature but since it was done by someone who doesn't care much about this layer it was done without thinking about whether a request channel is a thing that instantiates producer requests, contains the list of all the metrics we track, parses requests, or whatever. I think each person was just trying to get their thing done and thought, ah, here is a nice place to shove my stuff. Probably everyone after the first person thought, "this code is real crap so no sense trying to hard" because once you have one or two WTFs in there no one else really puts in much effort.

        Show
        Jay Kreps added a comment - Yeah, I am on the fence about the metrics stuff, maybe it does make sense to measure request metrics based on entrance and exit from the queue though clearly that implementation is super super nasty (I think we could make request metrics a general concept of name/start/stop and track them up and down the stack without too much unpleasentness). But I still see on 0.8 the thing I am complaining about which is val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) which if I understand correctly moves api deserialization into the network layer and into the network threads. This was not meant to be the definition of the network layer. It hard-codes our api set and format so you can no longer test that layer independently. But basically yes, I was doing the code review for the other bug and noticed that indeed a virtually everyone on the team has come and done something mean to my beautiful little request queue (only Swapnil is innocent!). Each of these things was done as part of a useful feature but since it was done by someone who doesn't care much about this layer it was done without thinking about whether a request channel is a thing that instantiates producer requests, contains the list of all the metrics we track, parses requests, or whatever. I think each person was just trying to get their thing done and thought, ah, here is a nice place to shove my stuff. Probably everyone after the first person thought, "this code is real crap so no sense trying to hard" because once you have one or two WTFs in there no one else really puts in much effort.
        Hide
        Neha Narkhede added a comment -

        Understand and agree with you here. However, we added a huge chunk of kafka specific code to RequestChannel in KAFKA-203 (git commit 2bc65dab67933e9f9cdade59bd0e45ff99afd338) and have gotten rid of the changes to RequestChannel made by this patch in the current 0.8 branch. Given that, I don't think this JIRA is the problem.

        Show
        Neha Narkhede added a comment - Understand and agree with you here. However, we added a huge chunk of kafka specific code to RequestChannel in KAFKA-203 (git commit 2bc65dab67933e9f9cdade59bd0e45ff99afd338) and have gotten rid of the changes to RequestChannel made by this patch in the current 0.8 branch. Given that, I don't think this JIRA is the problem.
        Jay Kreps made changes -
        Resolution Fixed [ 1 ]
        Status Resolved [ 5 ] Reopened [ 4 ]
        Hide
        Jay Kreps added a comment -

        Guys, this patch adds request specific stuff to RequestChannel. RequestChannel is really not the place to be parsing out various kafka api fields. The network package is not supposed to have ANY kafka-specific logic in it and doing this in the request channel . Moving request parsing to the network threads is also not what we want, I think. I'm pretty frustrated because I keep spending time taking this stuff out and people keep adding it back in. I'm the maintainer for this package so I would really like to be added on review for it.
        </end_rant>

        Show
        Jay Kreps added a comment - Guys, this patch adds request specific stuff to RequestChannel. RequestChannel is really not the place to be parsing out various kafka api fields. The network package is not supposed to have ANY kafka-specific logic in it and doing this in the request channel . Moving request parsing to the network threads is also not what we want, I think. I'm pretty frustrated because I keep spending time taking this stuff out and people keep adding it back in. I'm the maintainer for this package so I would really like to be added on review for it. </end_rant>
        Neha Narkhede made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Hide
        Neha Narkhede added a comment -

        Checked in the patch after reverting the test log4j.properties. Also filed KAFKA-696 to fix the toString() method

        Show
        Neha Narkhede added a comment - Checked in the patch after reverting the test log4j.properties. Also filed KAFKA-696 to fix the toString() method
        Neha Narkhede made changes -
        Status In Progress [ 3 ] Patch Available [ 10002 ]
        Hide
        Jun Rao added a comment -

        The changes in core/src/test/resources/log4j.properties are not intended, right? Other than that, +1 on the latest patch. We can have another jira to clean up the request string format.

        Show
        Jun Rao added a comment - The changes in core/src/test/resources/log4j.properties are not intended, right? Other than that, +1 on the latest patch. We can have another jira to clean up the request string format.
        Neha Narkhede made changes -
        Attachment kafka-683-v3-unit-test.patch [ 12564479 ]
        Hide
        Neha Narkhede added a comment -

        Fixed correlation ids in AsyncProducerTest

        Show
        Neha Narkhede added a comment - Fixed correlation ids in AsyncProducerTest
        Neha Narkhede made changes -
        Attachment kafka-683-v3.patch [ 12564461 ]
        Hide
        Jun Rao added a comment -

        Got the following compilation error on the latest rebase.

        [info] == core-kafka / compile ==
        [info] Source analysis: 20 new/modified, 62 indirectly invalidated, 0 removed.
        [info] Compiling main sources...
        [error] /Users/jrao/intellij_workspace/kafka_git/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala:68: not enough arguments for method updateInfo: (topics: Set[String],correlationId: Int)Unit.
        [error] Unspecified value parameter correlationId.
        [error] Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet))
        [error] ^
        [error] one error found

        Show
        Jun Rao added a comment - Got the following compilation error on the latest rebase. [info] == core-kafka / compile == [info] Source analysis: 20 new/modified, 62 indirectly invalidated, 0 removed. [info] Compiling main sources... [error] /Users/jrao/intellij_workspace/kafka_git/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala:68: not enough arguments for method updateInfo: (topics: Set [String] ,correlationId: Int)Unit. [error] Unspecified value parameter correlationId. [error] Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet)) [error] ^ [error] one error found
        Neha Narkhede made changes -
        Attachment kafka-683-v2-rebased-twice.patch [ 12564453 ]
        Hide
        Neha Narkhede added a comment -

        Rebased yet again!

        Show
        Neha Narkhede added a comment - Rebased yet again!
        Hide
        Jun Rao added a comment -

        For rebased v2 patch, the problem in #3 still exists. Also, it seems that you need to rebase again.

        Show
        Jun Rao added a comment - For rebased v2 patch, the problem in #3 still exists. Also, it seems that you need to rebase again.
        Hide
        Jun Rao added a comment -

        We can include in the request string sth like clientId:aaa,correlationId:bbb. Will this be good enough for debugging?

        Show
        Jun Rao added a comment - We can include in the request string sth like clientId:aaa,correlationId:bbb. Will this be good enough for debugging?
        Hide
        Neha Narkhede added a comment -

        That is a good suggestion. However, that still doesn't get rid of the 3 special fields from RequestChannel since we don't have access to the individual request objects there unless we cast/convert, which I thought was unnecessary.

        Show
        Neha Narkhede added a comment - That is a good suggestion. However, that still doesn't get rid of the 3 special fields from RequestChannel since we don't have access to the individual request objects there unless we cast/convert, which I thought was unnecessary.
        Hide
        Jun Rao added a comment -

        I agree that seeing the whole request is not useful, especially Message, since it's binary. Maybe what we should do is to fix the toString() method in each request so that it only prints out meaningful info. This can be fixed in a separate jira. One benefit of this is that we can remove the extra deserialization of those 3 special fields in RequestChannel, which depends on all requests having those 3 fields in the same order and seems brittle.

        Show
        Jun Rao added a comment - I agree that seeing the whole request is not useful, especially Message, since it's binary. Maybe what we should do is to fix the toString() method in each request so that it only prints out meaningful info. This can be fixed in a separate jira. One benefit of this is that we can remove the extra deserialization of those 3 special fields in RequestChannel, which depends on all requests having those 3 fields in the same order and seems brittle.
        Neha Narkhede made changes -
        Attachment kafka-683-v2-rebased.patch [ 12564224 ]
        Hide
        Neha Narkhede added a comment -

        I see, I misunderstood your suggestion then. The reason I added the version, correlation id information explicitly is to make it easier to trace requests through the socket server to the request handler. In fact, I think logging the entire request might not be very useful if we do a good job of logging the correlation id and request type properly throughout our codebase. My plan was to remove it after we are sufficiently satisfied with troubleshooting problems just based on the correlation id wiring that we have currently. I will do a follow up to remove the logging of the request after that. Does that work for you ?

        Show
        Neha Narkhede added a comment - I see, I misunderstood your suggestion then. The reason I added the version, correlation id information explicitly is to make it easier to trace requests through the socket server to the request handler. In fact, I think logging the entire request might not be very useful if we do a good job of logging the correlation id and request type properly throughout our codebase. My plan was to remove it after we are sufficiently satisfied with troubleshooting problems just based on the correlation id wiring that we have currently. I will do a follow up to remove the logging of the request after that. Does that work for you ?
        Hide
        Jun Rao added a comment -

        Patch v2 doesn't seem to apply on 0.8. Could you rebase?

        For 6, I didn't mean to remove the trace logging in RequestChannel. What I meant is that we already print out requestObj which includes every field in a request. So, there is no need to explicitly print out clientid, correlationid and versionid.

        Show
        Jun Rao added a comment - Patch v2 doesn't seem to apply on 0.8. Could you rebase? For 6, I didn't mean to remove the trace logging in RequestChannel. What I meant is that we already print out requestObj which includes every field in a request. So, there is no need to explicitly print out clientid, correlationid and versionid.
        Neha Narkhede made changes -
        Attachment kafka-683-v2.patch [ 12564011 ]
        Hide
        Neha Narkhede added a comment -

        1. DefaultEventHandler: Good suggestion, included it
        2. FileMessageSet: Done
        3. KafkaMigrationTool: This was accidental, reverted it
        4.1 Makes sense, al though we don't really need a shouldRoll variable. Refactored this code a bit
        4.2 Adding a function name probably will be ugly. I added a statement that allows us to understand where those log statements are coming from. Also, we already print the names of the log segments followed by whether or not the index and data files exist.
        5. Good point. One concern with adding a contstructor without correlation id is that most users will use it, which will hide the correlation id information. And this data is useful for troubleshooting. Al though, I wonder how many users would think that is annoying
        6. I found it useful while troubleshooting since that marks the time we received the request from the socket. In KafkaApis, the statement tells us the time when it trickles through the queues to actually get handled. This is at trace anyway and will not show by up in the server log otherwise.
        7. Those changes to log4j.properties were not intended to be checked in. I was trying to see if log4j can be configured to delete older log files automatically. Turns out it cannot. Will revert those changes. Of course, by default, the server logs should go to ConsoleAppender
        8. State change log in this patch just means everything from kafka.controller. There is another patch to format the state change log correctly

        Show
        Neha Narkhede added a comment - 1. DefaultEventHandler: Good suggestion, included it 2. FileMessageSet: Done 3. KafkaMigrationTool: This was accidental, reverted it 4.1 Makes sense, al though we don't really need a shouldRoll variable. Refactored this code a bit 4.2 Adding a function name probably will be ugly. I added a statement that allows us to understand where those log statements are coming from. Also, we already print the names of the log segments followed by whether or not the index and data files exist. 5. Good point. One concern with adding a contstructor without correlation id is that most users will use it, which will hide the correlation id information. And this data is useful for troubleshooting. Al though, I wonder how many users would think that is annoying 6. I found it useful while troubleshooting since that marks the time we received the request from the socket. In KafkaApis, the statement tells us the time when it trickles through the queues to actually get handled. This is at trace anyway and will not show by up in the server log otherwise. 7. Those changes to log4j.properties were not intended to be checked in. I was trying to see if log4j can be configured to delete older log files automatically. Turns out it cannot. Will revert those changes. Of course, by default, the server logs should go to ConsoleAppender 8. State change log in this patch just means everything from kafka.controller. There is another patch to format the state change log correctly
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Some comments:

        1. DefaultEventHandler.send(): Instead of
        val currentCorrelationId = correlationId.get()-1
        it's probably better to
        val currentCorrelationId = correlationId.getAndIncrement()
        at the beginning and then reuse it when needed.

        2. FileMessageSet: If initChannelPositionToEnd is true, we could be either creating a new segment or loading an existing segment during startup. So, we should rephrase the info message a bit.

        3. KafkaMigrationTool: Currently, the tool requires exactly one of whitelist or blacklist. So, we will not be able to use the default value of whitelist. We can probably leave whitelist as a required argument, but put in the description of how to specify all topics correctly (ie, .*).

        4. Log:
        4.1 maybeRoll(): Rechecking the condition has the problem that time-based condition may not return the same value. We can probably check each condition once and if the condition is true, log the cause and set a boolean var shouldRoll to true.
        4.2 markDeletedWhile(): For the new logging, should we somehow indicate that those logs are from this function? Also, it seems that we log whether all current index/data files exist. Should we log the name of the index/data files too so that we know which ones are missing?

        5. javaapi.TopicMetadataRequest: The scala optional parameter for correlationId won't work for java. We will have to manually create two constructors, one with correlation id and the other without.

        6. RequestChannel: We are already logging the whole request which includes clientid, correlationid and versionid. So, there is no need to log them explicitly.

        7. config/log4j.properties: All scripts in bin/ currently uses this file. The changes are really intended for Kafka broker. Perhaps we can create a new log4j file just for the broker and change the kafka broker scripts accordingly. Also, for kafka broker, should we log to both file and console? Finally, I got the following warning when running the kafka server startup script.
        log4j:WARN No such property [maxBackupIndex] in org.apache.log4j.FileAppender.
        log4j:WARN No such property [maxFileSize] in org.apache.log4j.FileAppender.
        log4j:WARN No such property [maxBackupIndex] in org.apache.log4j.FileAppender.
        log4j:WARN No such property [maxFileSize] in org.apache.log4j.FileAppender.

        8. Was the state change log added? I didn't see the change in the scala code or the log4j property file.

        Show
        Jun Rao added a comment - Thanks for the patch. Some comments: 1. DefaultEventHandler.send(): Instead of val currentCorrelationId = correlationId.get()-1 it's probably better to val currentCorrelationId = correlationId.getAndIncrement() at the beginning and then reuse it when needed. 2. FileMessageSet: If initChannelPositionToEnd is true, we could be either creating a new segment or loading an existing segment during startup. So, we should rephrase the info message a bit. 3. KafkaMigrationTool: Currently, the tool requires exactly one of whitelist or blacklist. So, we will not be able to use the default value of whitelist. We can probably leave whitelist as a required argument, but put in the description of how to specify all topics correctly (ie, .*). 4. Log: 4.1 maybeRoll(): Rechecking the condition has the problem that time-based condition may not return the same value. We can probably check each condition once and if the condition is true, log the cause and set a boolean var shouldRoll to true. 4.2 markDeletedWhile(): For the new logging, should we somehow indicate that those logs are from this function? Also, it seems that we log whether all current index/data files exist. Should we log the name of the index/data files too so that we know which ones are missing? 5. javaapi.TopicMetadataRequest: The scala optional parameter for correlationId won't work for java. We will have to manually create two constructors, one with correlation id and the other without. 6. RequestChannel: We are already logging the whole request which includes clientid, correlationid and versionid. So, there is no need to log them explicitly. 7. config/log4j.properties: All scripts in bin/ currently uses this file. The changes are really intended for Kafka broker. Perhaps we can create a new log4j file just for the broker and change the kafka broker scripts accordingly. Also, for kafka broker, should we log to both file and console? Finally, I got the following warning when running the kafka server startup script. log4j:WARN No such property [maxBackupIndex] in org.apache.log4j.FileAppender. log4j:WARN No such property [maxFileSize] in org.apache.log4j.FileAppender. log4j:WARN No such property [maxBackupIndex] in org.apache.log4j.FileAppender. log4j:WARN No such property [maxFileSize] in org.apache.log4j.FileAppender. 8. Was the state change log added? I didn't see the change in the scala code or the log4j property file.
        Neha Narkhede made changes -
        Attachment kafka-683-v1.patch [ 12563344 ]
        Hide
        Neha Narkhede added a comment -

        This patch does the following -
        1. Fixes the correlation id for all requests. The correlation id is not supposed to be globally unique, just unique per client for a sufficiently long time. Hence, for the controller, it is initialized to zero upon startup. For fetch requests, it is initialized on the builder creation. For produce requests, it is initialized on producer object creation.
        2. Fixes logging to output a kafka request log and a state change log. Kafka request log is a daily rolling file appender that contains every request received and served by a kafka broker. State change log contains log statements coming from the controller so that it doesn't pollute the server log.
        3. Fixed a bug in StopReplicaRequest since it didn't write the correlation id as part of serializing the request
        4. Added some log4j statements to kafka.log to specify the reason for rolling a log segment and to say when we create and delete data and index files
        5. Fixed the client id for the fetchers inside ZookeeperConsumerConnector to have not just the wired in client id but also the fetcher thread name. So the requests sent by the individual fetchers from the consumer will appear with the read name in the Kafka broker's request log. This will allow us to correlate threads from thread dumps with the requests it sent to the broker. This is helpful during debugging.

        Show
        Neha Narkhede added a comment - This patch does the following - 1. Fixes the correlation id for all requests. The correlation id is not supposed to be globally unique, just unique per client for a sufficiently long time. Hence, for the controller, it is initialized to zero upon startup. For fetch requests, it is initialized on the builder creation. For produce requests, it is initialized on producer object creation. 2. Fixes logging to output a kafka request log and a state change log. Kafka request log is a daily rolling file appender that contains every request received and served by a kafka broker. State change log contains log statements coming from the controller so that it doesn't pollute the server log. 3. Fixed a bug in StopReplicaRequest since it didn't write the correlation id as part of serializing the request 4. Added some log4j statements to kafka.log to specify the reason for rolling a log segment and to say when we create and delete data and index files 5. Fixed the client id for the fetchers inside ZookeeperConsumerConnector to have not just the wired in client id but also the fetcher thread name. So the requests sent by the individual fetchers from the consumer will appear with the read name in the Kafka broker's request log. This will allow us to correlate threads from thread dumps with the requests it sent to the broker. This is helpful during debugging.
        Neha Narkhede made changes -
        Field Original Value New Value
        Status Open [ 1 ] In Progress [ 3 ]
        Neha Narkhede created issue -

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development