Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.1
    • Component/s: None
    • Labels:

      Description

      Currently the consumer directly writes their offsets to zookeeper. Two problems with this: (1) This is a poor use of zookeeper, and we need to replace it with a more scalable offset store, and (2) it makes it hard to carry over to clients in other languages. A first step towards accomplishing that is to add a proper Kafka API for committing offsets. The initial version of this would just write to zookeeper as we do today, but in the future we would then have the option of changing this.

      This api likely needs to take a sequence of consumer-group/topic/partition/offset entries and commit them all.

      It would be good to do a wiki design on how this would work and consensus on that first.

      1. KAFKA-657v8.patch
        56 kB
        David Arthur
      2. KAFKA-657v7.patch
        45 kB
        David Arthur
      3. KAFKA-657v6.patch
        45 kB
        David Arthur
      4. KAFKA-657v5.patch
        44 kB
        David Arthur
      5. KAFKA-657v4.patch
        30 kB
        David Arthur
      6. KAFKA-657v3.patch
        30 kB
        David Arthur
      7. KAFKA-657v2.patch
        30 kB
        David Arthur
      8. KAFKA-657v1.patch
        8 kB
        David Arthur

        Issue Links

          Activity

          Hide
          korebantic2 added a comment - - edited

          All,

          I'm looking to update a client to support this new addition to 0.8.1. I just wanted to check-in, does the guide here reflect the latest for the protocol?

          https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI

          It hasn't been updated since December so just wanted to make sure.

          Show
          korebantic2 added a comment - - edited All, I'm looking to update a client to support this new addition to 0.8.1. I just wanted to check-in, does the guide here reflect the latest for the protocol? https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI It hasn't been updated since December so just wanted to make sure.
          Hide
          Jay Kreps added a comment -

          I updated the property name. Normally I am against commenting out code since it is in version control anyway, but in this case those tests are actually useful and not in version control so probably makes sense to leave them.

          Show
          Jay Kreps added a comment - I updated the property name. Normally I am against commenting out code since it is in version control anyway, but in this case those tests are actually useful and not in version control so probably makes sense to leave them.
          Hide
          David Arthur added a comment -

          Jun,

          80. Those commented out tests will be valid once the metadata is actually stored. I left it to save someone the effort later on

          81. +1

          Since this patch has been committed maybe you or Jay can just make these changes on trunk.

          Show
          David Arthur added a comment - Jun, 80. Those commented out tests will be valid once the metadata is actually stored. I left it to save someone the effort later on 81. +1 Since this patch has been committed maybe you or Jay can just make these changes on trunk.
          Hide
          Jun Rao added a comment -

          v8 looks good. Just some minor comments.

          80. OffsetCommitTest.testCommitAndFetchOffsets(): Could you remove the commented out code? Also, remove unused imports.

          81. We are trying to standardize the config names in kafka-648. Should we rename offset.metadata.max.size to offset.metadata.max.bytes?

          Show
          Jun Rao added a comment - v8 looks good. Just some minor comments. 80. OffsetCommitTest.testCommitAndFetchOffsets(): Could you remove the commented out code? Also, remove unused imports. 81. We are trying to standardize the config names in kafka-648. Should we rename offset.metadata.max.size to offset.metadata.max.bytes?
          Hide
          Jay Kreps added a comment -

          Committed.

          Show
          Jay Kreps added a comment - Committed.
          Hide
          Neha Narkhede added a comment -

          +1 on v8

          Show
          Neha Narkhede added a comment - +1 on v8
          Hide
          Jay Kreps added a comment -

          This looks good to me. If no further objections I am taking this on trunk.

          Show
          Jay Kreps added a comment - This looks good to me. If no further objections I am taking this on trunk.
          Hide
          David Arthur added a comment -

          Attaching v8

          70. I have consolidated (offset, metadata, error) into a case class OffsetMetadataAndError
          71. Added InvalidOffset=-1 and NoMetadata="" to OffsetMetadataAndError as constants
          72. Done
          73. Fixed by using OffsetMetadataAndError
          74. Done
          75. Removed: redundant test cases, unused imports, misspellings
          76. Done
          77. Added metadata to OffsetCommitRequest and OffsetFetchResponse. Currently not doing anything with the metadata

          Other changes:

          • Added "offset.metadata.max.size" to KafkaConfig, default 1024
          • Pass KafkaConfig from KafkaServer to KafkaApis (through the constructor). Not sure if there is a more elegant way to do this, but this works
          • Added OffsetMetadataTooLargeException
          • Better error handling (maybe?) in the handle* methods. Instead of returning UnknownCode always, I handle specific cases and the catch-all uses ErrorMapping.codeFor instead of just UnknownCode

          Also, I found a bug in ApiUtils.writeShortString while testing this code. I opened up KAFKA-680 with a fix attached. Until that fix is incorporated, OffsetCommitTest.testLargeMetadataPayload will fail.

          Show
          David Arthur added a comment - Attaching v8 70. I have consolidated (offset, metadata, error) into a case class OffsetMetadataAndError 71. Added InvalidOffset=-1 and NoMetadata="" to OffsetMetadataAndError as constants 72. Done 73. Fixed by using OffsetMetadataAndError 74. Done 75. Removed: redundant test cases, unused imports, misspellings 76. Done 77. Added metadata to OffsetCommitRequest and OffsetFetchResponse. Currently not doing anything with the metadata Other changes: Added "offset.metadata.max.size" to KafkaConfig, default 1024 Pass KafkaConfig from KafkaServer to KafkaApis (through the constructor). Not sure if there is a more elegant way to do this, but this works Added OffsetMetadataTooLargeException Better error handling (maybe?) in the handle* methods. Instead of returning UnknownCode always, I handle specific cases and the catch-all uses ErrorMapping.codeFor instead of just UnknownCode Also, I found a bug in ApiUtils.writeShortString while testing this code. I opened up KAFKA-680 with a fix attached. Until that fix is incorporated, OffsetCommitTest.testLargeMetadataPayload will fail.
          Hide
          Jay Kreps added a comment -

          Works for me.

          Show
          Jay Kreps added a comment - Works for me.
          Hide
          David Arthur added a comment -

          I have "offset.metadata.max.size" for the time being with a default of 1024

          Show
          David Arthur added a comment - I have "offset.metadata.max.size" for the time being with a default of 1024
          Hide
          Jay Kreps added a comment -

          Yeah that is the right place for a new config. It is worth discussing the name as part of the review since this ends up being kind of part of our "api" to the operator.

          I would just skip storing the metadata for now (i.e. just throw it away). If we make the change in zk we need a script to grandfather from the old format to the new format. Since we will need a conversion script when we move off zk anyway it makes sense to avoid two conversions for users (one to add the metadata, and another to move off zk).

          Show
          Jay Kreps added a comment - Yeah that is the right place for a new config. It is worth discussing the name as part of the review since this ends up being kind of part of our "api" to the operator. I would just skip storing the metadata for now (i.e. just throw it away). If we make the change in zk we need a script to grandfather from the old format to the new format. Since we will need a conversion script when we move off zk anyway it makes sense to avoid two conversions for users (one to add the metadata, and another to move off zk).
          Hide
          David Arthur added a comment -

          We had talked about limiting the size of the metadata to 1k or something and making this configurable. My question is: where does this config belong, and how to get it into KafkaApis? Right now, I've added this config to KafkaConfig and have passed in the "config" variable from KafkaServer to KafkaApis.

          Also, for the offset payloads, I'm opting to store them as a JSON object in ZooKeeper:

          {"offset": 42, "metadata": "foo"}
          Show
          David Arthur added a comment - We had talked about limiting the size of the metadata to 1k or something and making this configurable. My question is: where does this config belong, and how to get it into KafkaApis? Right now, I've added this config to KafkaConfig and have passed in the "config" variable from KafkaServer to KafkaApis. Also, for the offset payloads, I'm opting to store them as a JSON object in ZooKeeper: {"offset": 42, "metadata": "foo"}
          Hide
          Jay Kreps added a comment -

          Looks good to me. A few things in addition to the other comments:
          76. We baselined the other request versions to 0, but this one is starting at 1. Can we change it to 0 for consistency?
          77. It would be good to add the metadata string field.

          Show
          Jay Kreps added a comment - Looks good to me. A few things in addition to the other comments: 76. We baselined the other request versions to 0, but this one is starting at 1. Can we change it to 0 for consistency? 77. It would be good to add the metadata string field.
          Hide
          Jun Rao added a comment -

          Thanks for patch v7. Some more comments:

          70. OffsetFetchResponse: requestInfo can be defined as Map[TopicAndPartition, (Long, Short)] without referencing Tuple2 directly.

          71. KafkaApis.handle():
          case (topicAndPartition) => (topicAndPartition, Tuple2(-1L, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))) can be
          case (topicAndPartition) => (topicAndPartition, (-1L, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
          Also, instead of using -1, can we define a constant like InvalidOffset? Use that constant in handleOffsetFetchRequest() too.

          72. javaapi.OffsetCommitResponse and javaapi.OffsetFetchResponse: Remove unused import ByteBuffer.

          73. javaapi.OffsetFetchResponse: Tuple2 is a scala thing. It would be weird to return that directly to the java caller. One way is to change the type of requestInfo in the scala version of OffsetFetchResponse to Map[TopicAndPartition, OffsetAndError]. This also follows our convention of limiting the usage of Tuple.

          74. java version of OffsetCommit

          {Request/Response} and OffsetFetch{Request/Response}

          : We don't really need the methods sizeInBytes() and writeTo(). They are only used for serialization and we don't serialize the java version of the request/response. This issue seems to already exist in the java version of OffsetRequest and FetchRequest. Could you remove these two methods there too?

          75. OffsetCommitTest:
          75.1 Remove unused imports.
          75.2 Wrong spelling non-existant.
          75.3 When handling OffsetCommitRequests, we do exactly the same thing whether a topic/partition exists in KafkaApis. So, it doesn't seem that we need to test unknown topic/partition separately.

          Show
          Jun Rao added a comment - Thanks for patch v7. Some more comments: 70. OffsetFetchResponse: requestInfo can be defined as Map [TopicAndPartition, (Long, Short)] without referencing Tuple2 directly. 71. KafkaApis.handle(): case (topicAndPartition) => (topicAndPartition, Tuple2(-1L, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class [Throwable] ]))) can be case (topicAndPartition) => (topicAndPartition, (-1L, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class [Throwable] ]))) Also, instead of using -1, can we define a constant like InvalidOffset? Use that constant in handleOffsetFetchRequest() too. 72. javaapi.OffsetCommitResponse and javaapi.OffsetFetchResponse: Remove unused import ByteBuffer. 73. javaapi.OffsetFetchResponse: Tuple2 is a scala thing. It would be weird to return that directly to the java caller. One way is to change the type of requestInfo in the scala version of OffsetFetchResponse to Map [TopicAndPartition, OffsetAndError] . This also follows our convention of limiting the usage of Tuple. 74. java version of OffsetCommit {Request/Response} and OffsetFetch{Request/Response} : We don't really need the methods sizeInBytes() and writeTo(). They are only used for serialization and we don't serialize the java version of the request/response. This issue seems to already exist in the java version of OffsetRequest and FetchRequest. Could you remove these two methods there too? 75. OffsetCommitTest: 75.1 Remove unused imports. 75.2 Wrong spelling non-existant. 75.3 When handling OffsetCommitRequests, we do exactly the same thing whether a topic/partition exists in KafkaApis. So, it doesn't seem that we need to test unknown topic/partition separately.
          Hide
          David Arthur added a comment -

          Attaching patch v7 - addresses Joel's comments.

          Show
          David Arthur added a comment - Attaching patch v7 - addresses Joel's comments.
          Hide
          Joel Koshy added a comment -

          Hey David, the patch (and the wiki) looks great.

          • For error handling. I think what Jun was referring to is the giant catch clause in handle - i.e., the new keys
            should be added as a case. That junk block of code really needs to be cleaned up
          • KafkaApis: if(offsetStr == null) : I don't think this can happen.
          • Default client id should probably be "" in all the request/responses i.e., to follow convention.
          • It would be better to use 1.toShort or val CurrentVersion: Short = 1 (instead of 1.shortValue); although it's more
            or less a non-issue as it's in the object.
          Show
          Joel Koshy added a comment - Hey David, the patch (and the wiki) looks great. For error handling. I think what Jun was referring to is the giant catch clause in handle - i.e., the new keys should be added as a case. That junk block of code really needs to be cleaned up KafkaApis: if(offsetStr == null) : I don't think this can happen. Default client id should probably be "" in all the request/responses i.e., to follow convention. It would be better to use 1.toShort or val CurrentVersion: Short = 1 (instead of 1.shortValue); although it's more or less a non-issue as it's in the object.
          Hide
          David Arthur added a comment -

          Post-holiday bump. Anyone had a chance to look at the v6 patch?

          Show
          David Arthur added a comment - Post-holiday bump. Anyone had a chance to look at the v6 patch?
          Hide
          David Arthur added a comment -

          Finally figured out what you guys meant by adding error handling to the handle method (the method named "handle", of course). Attaching v6 of the patch.

          Show
          David Arthur added a comment - Finally figured out what you guys meant by adding error handling to the handle method (the method named "handle", of course). Attaching v6 of the patch.
          Hide
          Neha Narkhede added a comment -

          +1 on v5. Just a minor comment which I think Jun raised as well -

          KafkaApis
          Today, we have a pretty awkward way of handling error codes for the various APIs in the handle() method. We should probably fix that, but until then, it will be good to maintain consistency. The new APIs are missing that error handling.

          Show
          Neha Narkhede added a comment - +1 on v5. Just a minor comment which I think Jun raised as well - KafkaApis Today, we have a pretty awkward way of handling error codes for the various APIs in the handle() method. We should probably fix that, but until then, it will be good to maintain consistency. The new APIs are missing that error handling.
          Hide
          David Arthur added a comment -

          Attaching patch v5. Added ASL header and javaapi. Not really sure how to test the javaapi wrapper - tried writing a Java junit test but SBT was not too happy about that.

          Jun, regarding point 42, I'm not sure what you mean, I have the "case e => ..." in both handle functions. What more do I need to add for exception handling here?

          Jay, this does not include the metadata field. I will work on that next.

          Show
          David Arthur added a comment - Attaching patch v5. Added ASL header and javaapi. Not really sure how to test the javaapi wrapper - tried writing a Java junit test but SBT was not too happy about that. Jun, regarding point 42, I'm not sure what you mean, I have the "case e => ..." in both handle functions. What more do I need to add for exception handling here? Jay, this does not include the metadata field. I will work on that next.
          Hide
          Jay Kreps added a comment -

          Also I had one more generalization I would like to add--metadata field. Sent proposal to the dev list. If no objections I will update the wiki.

          Show
          Jay Kreps added a comment - Also I had one more generalization I would like to add--metadata field. Sent proposal to the dev list. If no objections I will update the wiki.
          Hide
          Jay Kreps added a comment -

          It probably makes sense to do this in two phases. Let's get the patch in that adds the api, then make the changes to the consumers to make use of it as phase 2.

          Show
          Jay Kreps added a comment - It probably makes sense to do this in two phases. Let's get the patch in that adds the api, then make the changes to the consumers to make use of it as phase 2.
          Hide
          Jun Rao added a comment -

          Thanks for patch v4. Some comments:

          40. Could you add the Apache license header to all new files?

          41. SimpleConsumer is a public API. So we need to add the new requests to the javaapi version of SimpleConsumer. We likely need a java version of the new requests/responses.

          42. KafkaApis.handle(): Currently, for each type of requests, we catch all unexpected exceptions and send a corresponding response with an error code to the client. We need to do this for the 2 new types of requests too.

          43. Do we plan to use the new API to commit offsets in the high level consumer?

          Show
          Jun Rao added a comment - Thanks for patch v4. Some comments: 40. Could you add the Apache license header to all new files? 41. SimpleConsumer is a public API. So we need to add the new requests to the javaapi version of SimpleConsumer. We likely need a java version of the new requests/responses. 42. KafkaApis.handle(): Currently, for each type of requests, we catch all unexpected exceptions and send a corresponding response with an error code to the client. We need to do this for the 2 new types of requests too. 43. Do we plan to use the new API to commit offsets in the high level consumer?
          Hide
          David Arthur added a comment -

          I regenerated the patch after rebasing from trunk. Jun, see if it works now: git apply -p1 KAFKA-657v4.patch

          Show
          David Arthur added a comment - I regenerated the patch after rebasing from trunk. Jun, see if it works now: git apply -p1 KAFKA-657 v4.patch
          Hide
          Jun Rao added a comment -

          I was trying to apply patch v3 in trunk (on a revision before the 0.8 merge patch), but got the following error. Anyone know what the issue is?

          git apply -p0 ~/Downloads/KAFKA-657v3.patch
          fatal: git apply: bad git-diff - inconsistent new filename on line 5

          Show
          Jun Rao added a comment - I was trying to apply patch v3 in trunk (on a revision before the 0.8 merge patch), but got the following error. Anyone know what the issue is? git apply -p0 ~/Downloads/ KAFKA-657 v3.patch fatal: git apply: bad git-diff - inconsistent new filename on line 5
          Hide
          David Arthur added a comment -

          Jay, the one thing I'm still unclear on are the various failure scenarios. Could you double check that bit of the patch (in KafkaApis.scala)

          Show
          David Arthur added a comment - Jay, the one thing I'm still unclear on are the various failure scenarios. Could you double check that bit of the patch (in KafkaApis.scala)
          Hide
          David Arthur added a comment -

          v3 of the patch

          1. info -> debug
          2. catching general exceptions and returning UnknownCode
          3. removed ZK path checks

          Show
          David Arthur added a comment - v3 of the patch 1. info -> debug 2. catching general exceptions and returning UnknownCode 3. removed ZK path checks
          Hide
          Jay Kreps added a comment -

          Makes sense. The API is versioned so we can always add that in as the use cases come up.

          Show
          Jay Kreps added a comment - Makes sense. The API is versioned so we can always add that in as the use cases come up.
          Hide
          David Arthur added a comment -

          Yes, they are somewhat orthogonal. The only real use I can think of is concurrency control around offset updates, however if that stuff has been serialized in the Scala consumer then it is a non-issue. Perhaps it's best to punt on conditional commits for the time being.

          Show
          David Arthur added a comment - Yes, they are somewhat orthogonal. The only real use I can think of is concurrency control around offset updates, however if that stuff has been serialized in the Scala consumer then it is a non-issue. Perhaps it's best to punt on conditional commits for the time being.
          Hide
          Jay Kreps added a comment -

          I think that actually covers an orthogonal problem right?
          1. Checking topic/partition covers bugs in the client impl that set the wrong values.
          2. Check and set catches a bug that might lead to you clobbering your offset due to a concurrency issue where there are two processes both trying to update the same offset.

          Originally my concern with (2) was that I wasn't sure if we could implement it in a post-zk world. Now that we wrote up that proposal in a lot more detail I think we can.

          We wouldn't want to make the last offset mandatory because in the case that you are manually resetting your offset to 0 (or some low number) you might not know the previous value. But I think what you are proposing is that we could have a current_offset field in the request, and if it is set we would only update the offset if the current offset equals the given offset. We could make it optional by having the value -1 indicate "don't care, clobber whatever is there".

          The question is, what is the use case for this? Our approach to the scala client has been to ensure mutual exclusion for the consumer processes, at which point this basically can't happen. I wonder how an alternative client implementation could make use of it? It would be good to work that out before including it.

          Show
          Jay Kreps added a comment - I think that actually covers an orthogonal problem right? 1. Checking topic/partition covers bugs in the client impl that set the wrong values. 2. Check and set catches a bug that might lead to you clobbering your offset due to a concurrency issue where there are two processes both trying to update the same offset. Originally my concern with (2) was that I wasn't sure if we could implement it in a post-zk world. Now that we wrote up that proposal in a lot more detail I think we can. We wouldn't want to make the last offset mandatory because in the case that you are manually resetting your offset to 0 (or some low number) you might not know the previous value. But I think what you are proposing is that we could have a current_offset field in the request, and if it is set we would only update the offset if the current offset equals the given offset. We could make it optional by having the value -1 indicate "don't care, clobber whatever is there". The question is, what is the use case for this? Our approach to the scala client has been to ensure mutual exclusion for the consumer processes, at which point this basically can't happen. I wonder how an alternative client implementation could make use of it? It would be good to work that out before including it.
          Hide
          David Arthur added a comment -

          Re 3: Maybe this is a case for the check-and-set functionality I originally proposed. The default case could update ZK with no checks (which would cover your two use cases), and a special case could do the check as well as check the last offset stored for a conditional update. Thoughts?

          Show
          David Arthur added a comment - Re 3: Maybe this is a case for the check-and-set functionality I originally proposed. The default case could update ZK with no checks (which would cover your two use cases), and a special case could do the check as well as check the last offset stored for a conditional update. Thoughts?
          Hide
          Jay Kreps added a comment -

          This looks great!

          Three minor things:
          1. Can you change the logging for the common case to debug? Our logging policy is that you should be able to run in INFO and have all messages be things you need to know.
          2. Can you handle any exceptions from ZK and send back an UnknownException
          3. Can you remove the checks on topic/partition validity?

          (3) is maybe more controversial. Here is my rationale. First ZK is a huge bottleneck so adding two more zk round-trips will be a problem. Second we actually have two use cases for allowing the user to store offsets for non-existant topics or partitions. The first use case is that if you are doing mirroring between two clusters in different data centers (a common case) it probably makes sense to store the offsets in whatever data center the mirroring process runs in. However there is no requirement that the two clusters have the same partitioning. The second use case is probably specific only to our usage, we have several systems that produce "offset"-like markers and being able to commit these all together to mark a single "point in consumption time" across all systems is nice.

          Show
          Jay Kreps added a comment - This looks great! Three minor things: 1. Can you change the logging for the common case to debug? Our logging policy is that you should be able to run in INFO and have all messages be things you need to know. 2. Can you handle any exceptions from ZK and send back an UnknownException 3. Can you remove the checks on topic/partition validity? (3) is maybe more controversial. Here is my rationale. First ZK is a huge bottleneck so adding two more zk round-trips will be a problem. Second we actually have two use cases for allowing the user to store offsets for non-existant topics or partitions. The first use case is that if you are doing mirroring between two clusters in different data centers (a common case) it probably makes sense to store the offsets in whatever data center the mirroring process runs in. However there is no requirement that the two clusters have the same partitioning. The second use case is probably specific only to our usage, we have several systems that produce "offset"-like markers and being able to commit these all together to mark a single "point in consumption time" across all systems is nice.
          Hide
          David Arthur added a comment -

          v2 of the patch. Includes OffsetFetch and OffsetCommit APIs, some unit tests for both.

          This checks for the existence of a topic+partition in ZK before fetching or committing. Not sure if this is the behavior we want - simple enough to change.

          Show
          David Arthur added a comment - v2 of the patch. Includes OffsetFetch and OffsetCommit APIs, some unit tests for both. This checks for the existence of a topic+partition in ZK before fetching or committing. Not sure if this is the behavior we want - simple enough to change.
          Hide
          Jay Kreps added a comment -

          I wonder if you could take a look at the updated docs and see if they seem clear. I tried to cover those, but, well, documentation is hard: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

          Summary:
          version id is the version of this api format. In the future if we decide we missed an important field (e.g. lastOffset) we will add it and bump the version number and handle both cases on the server side based on the version we see.
          client id is a logical name for the client that could be used across many client servers. This is useful for logging and metrics (i.e. figuring out WHY you are suddenly getting 5x the qps, or whatever) if you have lots of clients.
          replica id is just in the fetch request and shouldn't be in this request. A fetch request can be issued either by a normal consumer or by a replica and the broker has slightly different behavior in each case (e.g. whether uncommitted messages are visible...)

          Show
          Jay Kreps added a comment - I wonder if you could take a look at the updated docs and see if they seem clear. I tried to cover those, but, well, documentation is hard: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol Summary: version id is the version of this api format. In the future if we decide we missed an important field (e.g. lastOffset) we will add it and bump the version number and handle both cases on the server side based on the version we see. client id is a logical name for the client that could be used across many client servers. This is useful for logging and metrics (i.e. figuring out WHY you are suddenly getting 5x the qps, or whatever) if you have lots of clients. replica id is just in the fetch request and shouldn't be in this request. A fetch request can be issued either by a normal consumer or by a replica and the broker has slightly different behavior in each case (e.g. whether uncommitted messages are visible...)
          Hide
          David Arthur added a comment -

          Thanks Jay Kreps, that clears things up quite a bit. Another question I have is around the request envelope (clientId, correlationId, etc).

          I understand correlationId is used to allow multiplexing requests/response, but what about replicaId, clientId, etc. I mostly copied these from other Request classes - a bit of cargo-cult programming I guess

              val versionId = buffer.getShort
              val correlationId = buffer.getInt
              val clientId = readShortString(buffer)
              val replicaId = buffer.getInt
          

          Are all of these necessary for the OffsetCommitRequest/Response? Specifically, is replicaId necessary?

          Show
          David Arthur added a comment - Thanks Jay Kreps , that clears things up quite a bit. Another question I have is around the request envelope (clientId, correlationId, etc). I understand correlationId is used to allow multiplexing requests/response, but what about replicaId, clientId, etc. I mostly copied these from other Request classes - a bit of cargo-cult programming I guess val versionId = buffer.getShort val correlationId = buffer.getInt val clientId = readShortString(buffer) val replicaId = buffer.getInt Are all of these necessary for the OffsetCommitRequest/Response? Specifically, is replicaId necessary?
          Hide
          Jay Kreps added a comment -

          The existing API describes the offset ranges contained in log segments on the server. It is poorly named and we should really rename it to something like LogMetadataRequest and we should really generalize it a bit to include things other than segment offset beginnings. The intended use case for the existing API is for new consumers when they consume for the first time in an existing stream. When they first start consuming they have no position in the log to read from (or to save out using your new api). They want to start consuming, but to start consuming they need a valid offset to start at. What offsets are valid depends on what is available on the server, so they need to be able to ask the server "what offset ranges do you have" and then they could chose to start consuming either at the beginning or end of that (or somewhere in the middle). Your api on the other hand answers the question "what is the latest offset I have 'committed' (i.e. recorded as consumed)." This would be used when a restart or rebalancing of the consumers occurs. Hope that makes sense? We could rename the existing API as part of this change to avoid the muddle.

          Show
          Jay Kreps added a comment - The existing API describes the offset ranges contained in log segments on the server. It is poorly named and we should really rename it to something like LogMetadataRequest and we should really generalize it a bit to include things other than segment offset beginnings. The intended use case for the existing API is for new consumers when they consume for the first time in an existing stream. When they first start consuming they have no position in the log to read from (or to save out using your new api). They want to start consuming, but to start consuming they need a valid offset to start at. What offsets are valid depends on what is available on the server, so they need to be able to ask the server "what offset ranges do you have" and then they could chose to start consuming either at the beginning or end of that (or somewhere in the middle). Your api on the other hand answers the question "what is the latest offset I have 'committed' (i.e. recorded as consumed)." This would be used when a restart or rebalancing of the consumers occurs. Hope that makes sense? We could rename the existing API as part of this change to avoid the muddle.
          Hide
          David Arthur added a comment -

          I'm fine working on the rest of 1. 1a is simple enough, 1c I might need some direction on. For 1b, how exactly is it different from the existing Offsets API? I have never really been clear what the purpose of the old API is.

          Show
          David Arthur added a comment - I'm fine working on the rest of 1. 1a is simple enough, 1c I might need some direction on. For 1b, how exactly is it different from the existing Offsets API? I have never really been clear what the purpose of the old API is.
          Hide
          Jay Kreps added a comment -

          Oh yes, two other things:
          1. We don't have a response in this api yet. We should at least have a way to indicate if the request failed (i.e. we got an error writing to zk, etc).
          2. Would be good to replace the println with a proper log statement.

          Show
          Jay Kreps added a comment - Oh yes, two other things: 1. We don't have a response in this api yet. We should at least have a way to indicate if the request failed (i.e. we got an error writing to zk, etc). 2. Would be good to replace the println with a proper log statement.
          Hide
          Jay Kreps added a comment -

          This looks great. To confirm, the final format for the commit response is
          group [topic [partition offset]]

          I think logically there are two phases of work around fixing offset management
          1. Add the API and convert the consumer to use it
          a. CommitOffsetRequest/Response (to save your position)
          b. FetchOffsetRequest/Response (to read back a saved position)
          c. Integration into the consumer (using the new api in the scala client)
          d. Unit test coverage for these (say in kafka.integration.PrimitiveApiTest)
          2. Move offsets out of zookeeper, since zookeeper doesn't scale well for writes

          It would be nice to do (1) more or less together, and if we do it right (2) can be a follow-up item and need not be done by you unless you want to. We can definitely break (1) into successive patches if that is helpful to keep the individual changes small--I am happy to take what you have now if you are up for finishing the other items in (1). I would like to get people to brainstorm a little on (2) in parallel as it could potentially have some impact on (1). We have some time to fiddle with the API if we think of improvements before it would be released and we would have to start versioning changes, though, so we probably don't need to block on that.

          So let me know if you are up for the rest of the items in (1)

          Show
          Jay Kreps added a comment - This looks great. To confirm, the final format for the commit response is group [topic [partition offset] ] I think logically there are two phases of work around fixing offset management 1. Add the API and convert the consumer to use it a. CommitOffsetRequest/Response (to save your position) b. FetchOffsetRequest/Response (to read back a saved position) c. Integration into the consumer (using the new api in the scala client) d. Unit test coverage for these (say in kafka.integration.PrimitiveApiTest) 2. Move offsets out of zookeeper, since zookeeper doesn't scale well for writes It would be nice to do (1) more or less together, and if we do it right (2) can be a follow-up item and need not be done by you unless you want to. We can definitely break (1) into successive patches if that is helpful to keep the individual changes small--I am happy to take what you have now if you are up for finishing the other items in (1). I would like to get people to brainstorm a little on (2) in parallel as it could potentially have some impact on (1). We have some time to fiddle with the API if we think of improvements before it would be released and we would have to start versioning changes, though, so we probably don't need to block on that. So let me know if you are up for the rest of the items in (1)
          Hide
          David Arthur added a comment -

          First pass at adding a commit offset request

          Show
          David Arthur added a comment - First pass at adding a commit offset request
          Hide
          David Arthur added a comment -

          I have started a wiki page for this design discussion https://cwiki.apache.org/confluence/display/KAFKA/Commit+Offset+API+-+Proposal

          Show
          David Arthur added a comment - I have started a wiki page for this design discussion https://cwiki.apache.org/confluence/display/KAFKA/Commit+Offset+API+-+Proposal

            People

            • Assignee:
              Unassigned
              Reporter:
              Jay Kreps
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development