Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.2.0
    • Component/s: None
    • Labels:
      None

      Description

      The plan is to take a dump of the producer code "as is" and then do a series of post-commit reviews to get it into shape. This bug tracks just the code dump.

      1. KAFKA-1227.patch
        467 kB
        Jay Kreps
      2. KAFKA-1227.patch
        101 kB
        Jay Kreps
      3. KAFKA-1227.patch
        105 kB
        Jay Kreps

        Issue Links

          Activity

          Hide
          Jun Rao added a comment -

          Resolving this one since all comments have been addressed.

          Show
          Jun Rao added a comment - Resolving this one since all comments have been addressed.
          Hide
          Jun Rao added a comment -

          Addressed commenets 20-26 in kafka-1307.

          Show
          Jun Rao added a comment - Addressed commenets 20-26 in kafka-1307.
          Hide
          Jun Rao added a comment -

          Accumulated some more review comments.

          20. Regarding VIP. We need to think through another corner case. When the brokers for all replicas of an existing topic die, the admin can start new brokers with existing broker ids to start the topic from scratch. Those new brokers can be added to the VIP. However, if the producer only uses the VIP once during startup, there is no way for the producer to identify the new brokers unless it's restarted.

          21. AbstractConfig:
          21.1 To be consistent, we probably should change the return value of the following api from Long to long.
          public Long getLong(String key)
          21.1 Could we also add getDouble(String key)?

          22. Metadata.fetch(): I am a bit puzzled by the following code. Not sure what the intention for the handling of InterruptedException is. This code will be called in the producer client thread, which will probably be interrupted during shutdown. When that happens, we probably should just throw the InterruptedException back to the caller. Otherwise, the producer may have to wait metadataFetchTimeoutMs before it can shut down. Also, it seems that we need to adjust maxWaitMs in each loop. Otherwise, we will be waiting longer than we should.
          try

          { wait(maxWaitMs); }

          catch (InterruptedException e)

          { /* this is fine, just try again */ }

          23. Sender: We learned in kafka-1228 that it's not enough to just handle IOException in the following code. UnresolvedAddressException is an Error, not an IOException.
          private void initiateConnect(Node node, long now) {
          try

          { selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.nodeStates.connecting(node.id(), now); }

          catch (IOException e)

          { /* attempt failed, we'll try again after the backoff */ nodeStates.disconnected(node.id()); /* maybe the problem is our metadata, update it */ metadata.forceUpdate(); }

          24. BufferPool.allocate(): Could we add some comments on when this call is blocked?

          25. MemoryRecords: The following constructor doesn't seem to be used.
          public MemoryRecords(int size)

          26. Spellings:
          26.1 ConfigDef: seperated list, programmatically
          26.2 Metadata: ellapsed

          Show
          Jun Rao added a comment - Accumulated some more review comments. 20. Regarding VIP. We need to think through another corner case. When the brokers for all replicas of an existing topic die, the admin can start new brokers with existing broker ids to start the topic from scratch. Those new brokers can be added to the VIP. However, if the producer only uses the VIP once during startup, there is no way for the producer to identify the new brokers unless it's restarted. 21. AbstractConfig: 21.1 To be consistent, we probably should change the return value of the following api from Long to long. public Long getLong(String key) 21.1 Could we also add getDouble(String key)? 22. Metadata.fetch(): I am a bit puzzled by the following code. Not sure what the intention for the handling of InterruptedException is. This code will be called in the producer client thread, which will probably be interrupted during shutdown. When that happens, we probably should just throw the InterruptedException back to the caller. Otherwise, the producer may have to wait metadataFetchTimeoutMs before it can shut down. Also, it seems that we need to adjust maxWaitMs in each loop. Otherwise, we will be waiting longer than we should. try { wait(maxWaitMs); } catch (InterruptedException e) { /* this is fine, just try again */ } 23. Sender: We learned in kafka-1228 that it's not enough to just handle IOException in the following code. UnresolvedAddressException is an Error, not an IOException. private void initiateConnect(Node node, long now) { try { selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.nodeStates.connecting(node.id(), now); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ nodeStates.disconnected(node.id()); /* maybe the problem is our metadata, update it */ metadata.forceUpdate(); } 24. BufferPool.allocate(): Could we add some comments on when this call is blocked? 25. MemoryRecords: The following constructor doesn't seem to be used. public MemoryRecords(int size) 26. Spellings: 26.1 ConfigDef: seperated list, programmatically 26.2 Metadata: ellapsed
          Hide
          Jay Kreps added a comment -

          The later one. I think I haven't mastered our uber-patch tool.

          Show
          Jay Kreps added a comment - The later one. I think I haven't mastered our uber-patch tool.
          Hide
          Edward Ribeiro added a comment -

          Two review board entries were created for the same issue. Which one is valid?

          Show
          Edward Ribeiro added a comment - Two review board entries were created for the same issue. Which one is valid?
          Hide
          Jay Kreps added a comment -

          Created reviewboard https://reviews.apache.org/r/17688/
          against branch trunk

          Show
          Jay Kreps added a comment - Created reviewboard https://reviews.apache.org/r/17688/ against branch trunk
          Hide
          Jay Kreps added a comment -

          Created reviewboard https://reviews.apache.org/r/17653/
          against branch trunk

          Show
          Jay Kreps added a comment - Created reviewboard https://reviews.apache.org/r/17653/ against branch trunk
          Hide
          Guozhang Wang added a comment -

          About reasoning the offset in RecordSend, the value should be

          RecordSend.relative_offset + RecordSend.ProduceRequestResult.base_offset.

          not just

          RecordSend.relative_offset

          Right? If yes we'd better update the KafkaProducer's comments accordingly

          Show
          Guozhang Wang added a comment - About reasoning the offset in RecordSend, the value should be RecordSend.relative_offset + RecordSend.ProduceRequestResult.base_offset. not just RecordSend.relative_offset Right? If yes we'd better update the KafkaProducer's comments accordingly
          Hide
          Jay Kreps added a comment -

          Hey Edward,

          What I actually want for the Cluster constructor is to make it private to my jar but since that isn't possible I went with public so it is accessible to unit tests. In general that API wouldn't be used as the client tells you about the cluster not vice versa. But there is no harm in using the more general types, so I'll change those to Collection.

          The partitionsFor call does need to return a list as one use case for that is being able to fetch by index, which is not part of Collection. This is useful when implementing a partitioner.

          Show
          Jay Kreps added a comment - Hey Edward, What I actually want for the Cluster constructor is to make it private to my jar but since that isn't possible I went with public so it is accessible to unit tests. In general that API wouldn't be used as the client tells you about the cluster not vice versa. But there is no harm in using the more general types, so I'll change those to Collection. The partitionsFor call does need to return a list as one use case for that is being able to fetch by index, which is not part of Collection. This is useful when implementing a partitioner.
          Hide
          Edward Ribeiro added a comment -

          Hello folks,

          I've just started to look into your new API design and would like to register a few observations, from an API design perspective, for now. I hope you enjoy my suggestions and, please, let me know what you think about them. Excuse me in advance for the long message. Well, let's start:

          It is a good practice to replace the implementation specific (List) of the parameter by a more general (abstract or interface) type so

          Cluster(java.util.List<Node> nodes, java.util.List<PartitionInfo> partitions) 
          

          becomes

          Cluster(java.util.Collection<Node> nodes, java.util.Collection<PartitionInfo> partitions) 
          

          This makes it possible to pass a Set, a List, for example. The same goes to

          bootstrap(java.util.List<java.net.InetSocketAddress> addresses) 
          

          that becomes

          bootstrap(java.util.Collection<java.net.InetSocketAddress> addresses) 
          

          This can seem futile, but I have been parts of ZooKeeper API that need to fix a thing, but are literally freezed because their API published a concrete class.

          Also, the methods who return a collection should also return a more generic collection so that the swap in the future (say change the List by a Set) doesn't become too difficult. Therefore,

          java.util.List<PartitionInfo>	partitionsFor(java.lang.String topic) 
          

          becomes

          java.util.Collection<PartitionInfo>	partitionsFor(java.lang.String topic) 
          

          I have also looked into empty() method. Hey, it returns a new object each time it's called! See

              /**
               * Create an empty cluster instance with no nodes and no topic-partitions.
               */
              public static Cluster empty() {
                  return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0));
              }
          

          There's no need to do this. You can create a EMPTY_CLUSTER field as below and then return it each time the method is called. See

             private static final Cluster EMPTY_CLUSTER = new Cluster(Collections.<Node>emptyList(), Collections.<Node>emptyList());
          
              ... 
          
              /**
               * Create an empty cluster instance with no nodes and no topic-partitions.
               */
              public static Cluster empty() {
                  return EMPTY_CLUSTER;
              }
          

          This option saves creation of unnecessary objects, and makes it easy to perform comparison as "if (myNode == myCluster.empty())" in the client side.

          I also see the necessity of adding an 'isEmpty()' method so that users can check if the Cluster is empty. In the case of adding an isEmpty then the declaration of EMPTY_CLUSTER becomes

             private static final Cluster EMPTY_CLUSTER = new Cluster(Collections.<Node>emptyList(), Collections.<Node>emptyList()) {
          							public boolean isEmpty() {
          								return true;
          							}
          						}
          
          

          As I said, it was just the first glance over the code. I can possibly have further suggestions, more algorithmic oriented or yet from an API design perspective, but that's all for now. I hope you like it.

          Cheers,
          Edward Ribeiro

          Show
          Edward Ribeiro added a comment - Hello folks, I've just started to look into your new API design and would like to register a few observations, from an API design perspective, for now. I hope you enjoy my suggestions and, please, let me know what you think about them. Excuse me in advance for the long message. Well, let's start: It is a good practice to replace the implementation specific (List) of the parameter by a more general (abstract or interface) type so Cluster(java.util.List<Node> nodes, java.util.List<PartitionInfo> partitions) becomes Cluster(java.util.Collection<Node> nodes, java.util.Collection<PartitionInfo> partitions) This makes it possible to pass a Set, a List, for example. The same goes to bootstrap(java.util.List<java.net.InetSocketAddress> addresses) that becomes bootstrap(java.util.Collection<java.net.InetSocketAddress> addresses) This can seem futile, but I have been parts of ZooKeeper API that need to fix a thing, but are literally freezed because their API published a concrete class. Also, the methods who return a collection should also return a more generic collection so that the swap in the future (say change the List by a Set) doesn't become too difficult. Therefore, java.util.List<PartitionInfo> partitionsFor(java.lang. String topic) becomes java.util.Collection<PartitionInfo> partitionsFor(java.lang. String topic) I have also looked into empty() method. Hey, it returns a new object each time it's called! See /** * Create an empty cluster instance with no nodes and no topic-partitions. */ public static Cluster empty() { return new Cluster( new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0)); } There's no need to do this. You can create a EMPTY_CLUSTER field as below and then return it each time the method is called. See private static final Cluster EMPTY_CLUSTER = new Cluster(Collections.<Node>emptyList(), Collections.<Node>emptyList()); ... /** * Create an empty cluster instance with no nodes and no topic-partitions. */ public static Cluster empty() { return EMPTY_CLUSTER; } This option saves creation of unnecessary objects, and makes it easy to perform comparison as "if (myNode == myCluster.empty())" in the client side. I also see the necessity of adding an 'isEmpty()' method so that users can check if the Cluster is empty. In the case of adding an isEmpty then the declaration of EMPTY_CLUSTER becomes private static final Cluster EMPTY_CLUSTER = new Cluster(Collections.<Node>emptyList(), Collections.<Node>emptyList()) { public boolean isEmpty() { return true ; } } As I said, it was just the first glance over the code. I can possibly have further suggestions, more algorithmic oriented or yet from an API design perspective, but that's all for now. I hope you like it. Cheers, Edward Ribeiro
          Hide
          Jay Kreps added a comment -

          Hey Guozhang, thanks for the detailed stylistic questions. I think these are important to discuss. Quick responses inline:

          1. How to we decide where to put Exception definitions? Currently we have an errors folder in kafka.comm and some folders also have their only exceptions.

          That package was meant to be explicitly API errors. I.e. those errors which are defined in ErrorKeys.java with a registered error code and have a bidirectional mapping to this code. These represent communication between the client and server so I wanted to put them in a special place. In general exceptions should be kept with the package with which they most naturally fit (ConfigException goes with config, etc).

          The most important code organization principle I had in mind was that each package should be either public or private. Only public packages will be javadoc'd. All classes in a public package are exposed to the user and are part of the public interface. The idea is that we would be very careful with these public packages. I considered actually differentiating these as something like kafka.clients.producer.pub or something but I thought that was a bit ugly--maybe there is another way to differentiate or annotate classes so that we are very explicit about public or private. Essentially any change to interfaces in these packages is breaking all our users so we have to think very carefully about API design and change. The rest of the classes (most of them actually) are really just an implementation detail and we can change them at will.

          Currently the public packages are just:
          kafka.clients.producer
          kafka.common
          kafka.common.errors

          One item I wanted to document and discuss as a separate thread was code organization as I think these kinds of conventions only work if they are documented and broadly understood.

          2. Shall we merge kafka.comm.protocol and kafka.comm.request folders since the requests definitions are highly dependent on the protocol class?

          I would rather not. The kafka.common.network package defines a low-level network framing based on size delimited messages. This is fully generic--the unit test tests an "echo server". It is not tied to any details of our protocol and it is really important that people not leak details of our protocol into it!!!!

          The protocol is just a bunch of message definitions and isn't tied to the network transport or framing at all. It is just a way of laying out bytes.

          The request package combines the protocol definition and network framing.

          I am hoping to keep these things orthogonal.

          Once we get our build fixed (ahem), I'd really like us to get checkstyle integrated so we can enforce these kinds of package dependencies and keep some kind of logical coherence. It has been a struggle otherwise.

          3. Shall we put Node, Cluster, Partition, TopicPartition in kafka.comm into one sub-folder, for example, called kafka.comm.metadata?

          I'm open to that. I liked having the current flat package for simplicity for the user (fewer things to import). Basically I am trying to make the javadoc for the producer as simple and flat as possible.

          4. Shall we put the Serializer classes into the protocol folder?

          The Serializer is the PUBLIC interface for users to serialize their messages. It actually isn't related to the definition of our protocol definition.

          5. Shall we move the kafka.clients.common.network sub-folder to kafka.common?

          Yeah I think that is actually how it is. I previously had it separate and the rationale was that many of the classes...e.g. the Selector were really written with the clients in mind. Theoretically the same Selector class could be the basis for the socket server but I didn't really think those use cases through.

          1. Since the nextNode use global round robin, we need to make sure no more than one objects access a single Cluster’s nextNode.

          That may just be a bad name. The goal of that method was load balancing not iterating over the nodes. So actually the intention was to give a different node to each thread in the multithreaded case.

          1. Shall we put config names such as ENCODING_CONFIG all in a single file?

          I planned to do a discussion on config. The way it works is that configs are defined by the ConfigDef. However we allow plug-in interfaces (Serializer, Partitioner, etc). These may need configs too, but these are (generally speaking) user classes. So we allow including user-defined configs. So StringSerializer is essentially a user plug-in that seemed useful enough to include in the main code. I think it makes more sense to document it's configs with the class rather than elsewhere.

          — kafka.common.AbstractIterator
          1. makeNext is not supposed to left in other states other than DONE and READY?

          Yeah this is basically a transliteration of the same class in the main code base which is a transliteration of the iterator in Google Collections.

          1. kafka.common.protocl.Schema: Will Field order difference make to different schemas?

          Yes our protocol is based on position not name.

          1. kafka.common.protocl.ProtoUtil: parseMetadataResponse: after reading the function I feel that the TopicInfo/PartitionInfo object for parsing might be preferable. We can put these objects in the Protocol.java file so any protocol change would only require one file edit.

          I'd like to have a discussion about this. I tried this approach. The problem is that the mixing of protocol definition with business logic leads to leaking of logic into the protocol and makes the protocol hard to read. There are several other options. It would be good to discuss.

          1. kafka.common.record.LogEntry: Maybe we can rename to OffsetRecord?
          Hmm, I'm open to changing it but I'm not sure that's better. I try to avoid names like TopicPartition which are just the concatenation of all the fields as I feel the purpose of a name is to capture the concept the fields describe. I.e. if we add a new field we shouldn't need to lengthen the name!

          1. kafka.common.record.Record: Do we expect MIN_HEADER_SIZE and RECORD_OVERHEAD to be different in the future? Currently their values are the same and the way they are computed are also identical.

          Good point I'll look into this, these are just copied from the scala.

          1. kafka.common.request.RequestHeader: Is it better to define "client_id" strings as static field in the Protocol.java?
          Unlikely. Currently you can access a field by the string name or the field instance. The field instance is an array access and the name is a hash table lookup to get the field followed by an array access. So the two reasonable options are to have static variables for the Fields or to access with Strings. Let's procrastinate that to the discussion of handling request definitions.

          2. kafka.client.common.NetworkReceive: Does REQUEST/RESPONSE_HEADER also need to be versioned?

          If we want to change it. We didn't have a version number for these in the protocol so we can't add one now. Any header change today is non-backwards compatible.

          1. In the first constructor, why not also initializing the size buffer also to ByteBuffer.allocate(4)?

          The point of the size buffer is to read the size to allocate and read the message buffer, but that constructor takes an already allocated/read message buffer. I was using that constructor for unit testing to fake responses that weren't really being read.

          2. Why NetworkReceive not extending ByteBufferReceive?

          Yes this distressed me as well. Here is the issue. I want ByteBufferSend/NetworkSend to work on an array of bytebuffers to handle the case where you already have serialized chunks of data (i.e. message sets). But in the case of a receive we don't currently have a good way to concatenate buffers so reading into multiple buffers isn't useful. This is basically a limitation of the ByteBuffer api.

          1. kafka.client.common.Selector: “transmissions.send.remaining() <= 0”, under what condition can remaining() be < 0?

          None, I think.

          2. “if (trans != null) this.disconnected.add(trans.id); “, should it be trans == null?

          Well that would give a null pointer, no? What this is saying is "if we have an id for this connection, record it as disconnected".

          1. kafka.client.producer.internals.BufferPoolIn the freeUp() function, should use this.free.pollLast().capacity() instead of limit()?

          Yeah that would be better, technically anything on the free list must have capacity==limit but I think it is bad to depend on that.

          2. What is the rational of having just one poolable size?

          Basically just to avoid implementing malloc. I guess the choice is either to implement a generic BufferPool or one specific to the needs of the producer. Since most of the time the producer will be doing allocations of that poolable size it makes sense to keep those around. If you try to keep arbitrary sizes I think things quickly get complex but since we will almost always be allocating the same size it seemed simpler to just handle that case. I'm open to generalizing it if it isn't too complex.

          — kafka.clients.producer.internals.Metadata
          1. After configs are added, we need to remove the hard-coded default values. So for all of these places we could leave a TODO mark for now.
          Yup.

          — kafka.clients.producer.internals.ProduceRequestResult
          1. Its member fields are dependent on Protocol.java, so once we change the protocol we would probably also need to change this file.

          I don't believe this is dependent on the protocol, maybe you can elaborate?

          1. kafka.clients.producer.internals.RecordAccumulatorTypo: “Get a list of topic-partitions which are ready to be send.”

          Ack.

          — kafka.clients.producer.internals.Sender
          1. One corner case we may need to consider is the following: if a partition becomes not available, and producer keep sending data to this partition, then later on this partition could exhaust the memory, keeping other partitions to not able to take more messages but block waiting.

          If I understand the case you are describing you are saying that the producer could use up the full buffer on a partition which is not available and the producer will then block. This is correct and that is the intention. This shouldn't block the sender, though, it will keep trying to send until the partition becomes available again. I think this is what you want: you can buffer for a while but eventually must either block or drop data if memory is bounded.

          2. In handling dis-connection, the ProduceRequestResult will set the exception, and if await() is called this exception will be thrown and the callback not be executed. Since this exception is already stored in the RecordSend I think a better way is not throw exception on await() but let the callback function to handle it. That would make the application code more clean since otherwise the application need so try-catch the await() call.

          I think the callback is always executed...if there is a case this doesn't happen it is a bug.

          I agree that
          if(result.hasError())
          // do something
          is easier to read. The problem is that it is incumbant on you to check and if you don't it silently fails. This is the complaint people have about mongodb. The principle I am going on is that
          producer.send(message).await()
          should be exactly interchangable with a blocking call. Anyhow I am sympathetic to your point, let's move it into the public api discussion.

          3. In closing the producer, there is another corner case that the io thread can keep trying to send the rest of the data and failed. Probably we could add another option to drop whatever is in the buffer and let the callback functions of the application to handle them.

          I think what you are saying is that close() blocks until all data is sent. That is the intention. Since send is async I think it is counterintuitive to fail/drop in-progress calls as the user may not know their calls aren't completed.

          Show
          Jay Kreps added a comment - Hey Guozhang, thanks for the detailed stylistic questions. I think these are important to discuss. Quick responses inline: 1. How to we decide where to put Exception definitions? Currently we have an errors folder in kafka.comm and some folders also have their only exceptions. That package was meant to be explicitly API errors. I.e. those errors which are defined in ErrorKeys.java with a registered error code and have a bidirectional mapping to this code. These represent communication between the client and server so I wanted to put them in a special place. In general exceptions should be kept with the package with which they most naturally fit (ConfigException goes with config, etc). The most important code organization principle I had in mind was that each package should be either public or private. Only public packages will be javadoc'd. All classes in a public package are exposed to the user and are part of the public interface. The idea is that we would be very careful with these public packages. I considered actually differentiating these as something like kafka.clients.producer.pub or something but I thought that was a bit ugly--maybe there is another way to differentiate or annotate classes so that we are very explicit about public or private. Essentially any change to interfaces in these packages is breaking all our users so we have to think very carefully about API design and change. The rest of the classes (most of them actually) are really just an implementation detail and we can change them at will. Currently the public packages are just: kafka.clients.producer kafka.common kafka.common.errors One item I wanted to document and discuss as a separate thread was code organization as I think these kinds of conventions only work if they are documented and broadly understood. 2. Shall we merge kafka.comm.protocol and kafka.comm.request folders since the requests definitions are highly dependent on the protocol class? I would rather not. The kafka.common.network package defines a low-level network framing based on size delimited messages. This is fully generic--the unit test tests an "echo server". It is not tied to any details of our protocol and it is really important that people not leak details of our protocol into it!!!! The protocol is just a bunch of message definitions and isn't tied to the network transport or framing at all. It is just a way of laying out bytes. The request package combines the protocol definition and network framing. I am hoping to keep these things orthogonal. Once we get our build fixed (ahem), I'd really like us to get checkstyle integrated so we can enforce these kinds of package dependencies and keep some kind of logical coherence. It has been a struggle otherwise. 3. Shall we put Node, Cluster, Partition, TopicPartition in kafka.comm into one sub-folder, for example, called kafka.comm.metadata? I'm open to that. I liked having the current flat package for simplicity for the user (fewer things to import). Basically I am trying to make the javadoc for the producer as simple and flat as possible. 4. Shall we put the Serializer classes into the protocol folder? The Serializer is the PUBLIC interface for users to serialize their messages. It actually isn't related to the definition of our protocol definition. 5. Shall we move the kafka.clients.common.network sub-folder to kafka.common? Yeah I think that is actually how it is. I previously had it separate and the rationale was that many of the classes...e.g. the Selector were really written with the clients in mind. Theoretically the same Selector class could be the basis for the socket server but I didn't really think those use cases through. 1. Since the nextNode use global round robin, we need to make sure no more than one objects access a single Cluster’s nextNode. That may just be a bad name. The goal of that method was load balancing not iterating over the nodes. So actually the intention was to give a different node to each thread in the multithreaded case. 1. Shall we put config names such as ENCODING_CONFIG all in a single file? I planned to do a discussion on config. The way it works is that configs are defined by the ConfigDef. However we allow plug-in interfaces (Serializer, Partitioner, etc). These may need configs too, but these are (generally speaking) user classes. So we allow including user-defined configs. So StringSerializer is essentially a user plug-in that seemed useful enough to include in the main code. I think it makes more sense to document it's configs with the class rather than elsewhere. — kafka.common.AbstractIterator 1. makeNext is not supposed to left in other states other than DONE and READY? Yeah this is basically a transliteration of the same class in the main code base which is a transliteration of the iterator in Google Collections. 1. kafka.common.protocl.Schema: Will Field order difference make to different schemas? Yes our protocol is based on position not name. 1. kafka.common.protocl.ProtoUtil: parseMetadataResponse: after reading the function I feel that the TopicInfo/PartitionInfo object for parsing might be preferable. We can put these objects in the Protocol.java file so any protocol change would only require one file edit. I'd like to have a discussion about this. I tried this approach. The problem is that the mixing of protocol definition with business logic leads to leaking of logic into the protocol and makes the protocol hard to read. There are several other options. It would be good to discuss. 1. kafka.common.record.LogEntry: Maybe we can rename to OffsetRecord? Hmm, I'm open to changing it but I'm not sure that's better. I try to avoid names like TopicPartition which are just the concatenation of all the fields as I feel the purpose of a name is to capture the concept the fields describe. I.e. if we add a new field we shouldn't need to lengthen the name! 1. kafka.common.record.Record: Do we expect MIN_HEADER_SIZE and RECORD_OVERHEAD to be different in the future? Currently their values are the same and the way they are computed are also identical. Good point I'll look into this, these are just copied from the scala. 1. kafka.common.request.RequestHeader: Is it better to define "client_id" strings as static field in the Protocol.java? Unlikely. Currently you can access a field by the string name or the field instance. The field instance is an array access and the name is a hash table lookup to get the field followed by an array access. So the two reasonable options are to have static variables for the Fields or to access with Strings. Let's procrastinate that to the discussion of handling request definitions. 2. kafka.client.common.NetworkReceive: Does REQUEST/RESPONSE_HEADER also need to be versioned? If we want to change it. We didn't have a version number for these in the protocol so we can't add one now. Any header change today is non-backwards compatible. 1. In the first constructor, why not also initializing the size buffer also to ByteBuffer.allocate(4)? The point of the size buffer is to read the size to allocate and read the message buffer, but that constructor takes an already allocated/read message buffer. I was using that constructor for unit testing to fake responses that weren't really being read. 2. Why NetworkReceive not extending ByteBufferReceive? Yes this distressed me as well. Here is the issue. I want ByteBufferSend/NetworkSend to work on an array of bytebuffers to handle the case where you already have serialized chunks of data (i.e. message sets). But in the case of a receive we don't currently have a good way to concatenate buffers so reading into multiple buffers isn't useful. This is basically a limitation of the ByteBuffer api. 1. kafka.client.common.Selector: “transmissions.send.remaining() <= 0”, under what condition can remaining() be < 0? None, I think. 2. “if (trans != null) this.disconnected.add(trans.id); “, should it be trans == null? Well that would give a null pointer, no? What this is saying is "if we have an id for this connection, record it as disconnected". 1. kafka.client.producer.internals.BufferPoolIn the freeUp() function, should use this.free.pollLast().capacity() instead of limit()? Yeah that would be better, technically anything on the free list must have capacity==limit but I think it is bad to depend on that. 2. What is the rational of having just one poolable size? Basically just to avoid implementing malloc. I guess the choice is either to implement a generic BufferPool or one specific to the needs of the producer. Since most of the time the producer will be doing allocations of that poolable size it makes sense to keep those around. If you try to keep arbitrary sizes I think things quickly get complex but since we will almost always be allocating the same size it seemed simpler to just handle that case. I'm open to generalizing it if it isn't too complex. — kafka.clients.producer.internals.Metadata 1. After configs are added, we need to remove the hard-coded default values. So for all of these places we could leave a TODO mark for now. Yup. — kafka.clients.producer.internals.ProduceRequestResult 1. Its member fields are dependent on Protocol.java, so once we change the protocol we would probably also need to change this file. I don't believe this is dependent on the protocol, maybe you can elaborate? 1. kafka.clients.producer.internals.RecordAccumulatorTypo: “Get a list of topic-partitions which are ready to be send.” Ack. — kafka.clients.producer.internals.Sender 1. One corner case we may need to consider is the following: if a partition becomes not available, and producer keep sending data to this partition, then later on this partition could exhaust the memory, keeping other partitions to not able to take more messages but block waiting. If I understand the case you are describing you are saying that the producer could use up the full buffer on a partition which is not available and the producer will then block. This is correct and that is the intention. This shouldn't block the sender, though, it will keep trying to send until the partition becomes available again. I think this is what you want: you can buffer for a while but eventually must either block or drop data if memory is bounded. 2. In handling dis-connection, the ProduceRequestResult will set the exception, and if await() is called this exception will be thrown and the callback not be executed. Since this exception is already stored in the RecordSend I think a better way is not throw exception on await() but let the callback function to handle it. That would make the application code more clean since otherwise the application need so try-catch the await() call. I think the callback is always executed...if there is a case this doesn't happen it is a bug. I agree that if(result.hasError()) // do something is easier to read. The problem is that it is incumbant on you to check and if you don't it silently fails. This is the complaint people have about mongodb. The principle I am going on is that producer.send(message).await() should be exactly interchangable with a blocking call. Anyhow I am sympathetic to your point, let's move it into the public api discussion. 3. In closing the producer, there is another corner case that the io thread can keep trying to send the rest of the data and failed. Probably we could add another option to drop whatever is in the buffer and let the callback functions of the application to handle them. I think what you are saying is that close() blocks until all data is sent. That is the intention. Since send is async I think it is counterintuitive to fail/drop in-progress calls as the user may not know their calls aren't completed.
          Hide
          Jay Kreps added a comment - - edited

          Hey Jun, quick responses. I'll try to get a patch up with some of the minor things, though a few of the others I'd like to do post-commit.

          1. WRT closing connections. Yes, this is true. I agree it is needed but decided to punt on it for the first pass. It is an important follow-up item. There are two cases to handle: metadata fetches and leadership handoffs.
          Obviously the Selector will not handle these special cases which are specific to this use case. Theoretically this could all be done in the Sender logic but it would be a bit complex. I think the best solution is just to have us time out idle connections after some configurable period of disuse (30 seconds, say).

          2. I think the VIP problem can be handled by just timing out idle connections. Special cases related to metadata won't help because non-metadata related connections can also be idle. Not retaining the bootstrap urls is intentional: future metadata requests should use the full broker set the producer is connecting to. You mention that this will cause the producer to prefer to fetch metadata from a broker to which it already has a connection for subsequent metadata fetches after the initial bootstrap, but this was the idea--no need to setup and then timeout another connection if we already have one.

          3. WRT initializing the partitioner to 0: Yeah we can initialize to something random. This problem would be a bit pathological as you would have to start all your producers the same instant and send exactly the same number of messages through them for this to persist.

          4. I included the numPartitions even though it is easily computable from cluster as all partitioners will need to mod by number of partitions, but the vast majority won't need to use the cluster. So it just seemed more intuitive rather than the user having to figure out that they can get it by calling into cluster and worrying about the underlying performance of that just to give it to them.

          5.1 Yes, that is a bug.
          5.2 It is a bit slangy

          6. I don't prevent this: a zero hash code will be recomputed each time, but this is an unlikely case and recomputing is what would happen in all cases if we didn't cache.

          7. Good point, I'll improve the error message.

          8.1 I'll try to think of a better name.
          8.2 Yes, we can do that. I think that would be good for latency in the case where we had to allocate a non-standard size

          9. I think you could argue either way in terms of the preferrable sequencing. However I wanted to reuse the RecordSend object as the argument to the callback rather than introduce another object. However this means I do need to complete the record send first otherwise the callback will block trying to access fields in the send.

          10. Ah, very good point.

          11. Thanks

          12. I am not two picky. 2 spaces is the recommended style in scala and 4 spaces is the classic "sun java style". I would like to get our style formally specified in an IDE formatter. That is what I am using for eclipse and it is very nice, it does all formatting for you and ensures a very consistent style. I will start a thread on this one as likely everyone has an opinion.

          13. I had the same thought. I'm not sure if it is better to give the current value or the average over the window. Thoughts? Since we mostly look at graphs polled every 30 seconds if we do the instantaneous measurement it amounts to just a single data point for the whole 30 seconds but that may be okay...

          14. I figured we would need to discuss logging so I just punted for now. The standard insofar as there is one is really slf4j, which I consider kind of silly. There are really just a couple of places that need logging so maybe it would be fine to just use java.util.logging which comes with the jvm. I'll start a thread on this.

          15. In a client I think this is something we should leave to the client. Printing lots of messages in their logs is a bit rude. I think it is better to give an API to get information about the configs.

          Show
          Jay Kreps added a comment - - edited Hey Jun, quick responses. I'll try to get a patch up with some of the minor things, though a few of the others I'd like to do post-commit. 1. WRT closing connections. Yes, this is true. I agree it is needed but decided to punt on it for the first pass. It is an important follow-up item. There are two cases to handle: metadata fetches and leadership handoffs. Obviously the Selector will not handle these special cases which are specific to this use case. Theoretically this could all be done in the Sender logic but it would be a bit complex. I think the best solution is just to have us time out idle connections after some configurable period of disuse (30 seconds, say). 2. I think the VIP problem can be handled by just timing out idle connections. Special cases related to metadata won't help because non-metadata related connections can also be idle. Not retaining the bootstrap urls is intentional: future metadata requests should use the full broker set the producer is connecting to. You mention that this will cause the producer to prefer to fetch metadata from a broker to which it already has a connection for subsequent metadata fetches after the initial bootstrap, but this was the idea--no need to setup and then timeout another connection if we already have one. 3. WRT initializing the partitioner to 0: Yeah we can initialize to something random. This problem would be a bit pathological as you would have to start all your producers the same instant and send exactly the same number of messages through them for this to persist. 4. I included the numPartitions even though it is easily computable from cluster as all partitioners will need to mod by number of partitions, but the vast majority won't need to use the cluster. So it just seemed more intuitive rather than the user having to figure out that they can get it by calling into cluster and worrying about the underlying performance of that just to give it to them. 5.1 Yes, that is a bug. 5.2 It is a bit slangy 6. I don't prevent this: a zero hash code will be recomputed each time, but this is an unlikely case and recomputing is what would happen in all cases if we didn't cache. 7. Good point, I'll improve the error message. 8.1 I'll try to think of a better name. 8.2 Yes, we can do that. I think that would be good for latency in the case where we had to allocate a non-standard size 9. I think you could argue either way in terms of the preferrable sequencing. However I wanted to reuse the RecordSend object as the argument to the callback rather than introduce another object. However this means I do need to complete the record send first otherwise the callback will block trying to access fields in the send. 10. Ah, very good point. 11. Thanks 12. I am not two picky. 2 spaces is the recommended style in scala and 4 spaces is the classic "sun java style". I would like to get our style formally specified in an IDE formatter. That is what I am using for eclipse and it is very nice, it does all formatting for you and ensures a very consistent style. I will start a thread on this one as likely everyone has an opinion. 13. I had the same thought. I'm not sure if it is better to give the current value or the average over the window. Thoughts? Since we mostly look at graphs polled every 30 seconds if we do the instantaneous measurement it amounts to just a single data point for the whole 30 seconds but that may be okay... 14. I figured we would need to discuss logging so I just punted for now. The standard insofar as there is one is really slf4j, which I consider kind of silly. There are really just a couple of places that need logging so maybe it would be fine to just use java.util.logging which comes with the jvm. I'll start a thread on this. 15. In a client I think this is something we should leave to the client. Printing lots of messages in their logs is a bit rude. I think it is better to give an API to get information about the configs.
          Hide
          Guozhang Wang added a comment -

          Some more comments:

          — General

          1. How to we decide where to put Exception definitions? Currently we have an errors folder in kafka.comm and some folders also have their only exceptions.

          2. Shall we merge kafka.comm.protocol and kafka.comm.request folders since the requests definitions are highly dependent on the protocol class?

          3. Shall we put Node, Cluster, Partition, TopicPartition in kafka.comm into one sub-folder, for example, called kafka.comm.metadata?

          4. Shall we put the Serializer classes into the protocol folder?

          5. Shall we move the kafka.clients.common.network sub-folder to kafka.common?

          — kafka.common.Cluster

          1. Since the nextNode use global round robin, we need to make sure no more than one objects access a single Cluster’s nextNode.

          — kafka.common.StringSerialization

          1. Shall we put config names such as ENCODING_CONFIG all in a single file?

          — kafka.common.AbstractIterator

          1. makeNext is not supposed to left in other states other than DONE and READY?

          — kafka.common.protocl.Schema

          1. Will Field order difference make to different schemas?

          — kafka.common.protocl.ProtoUtil

          1. parseMetadataResponse: after reading the function I feel that the TopicInfo/PartitionInfo object for parsing might be preferable. We can put these objects in the Protocol.java file so any protocol change would only require one file edit.

          — kafka.common.record.LogEntry

          1. Maybe we can rename to OffsetRecord?

          — kafka.common.record.Record

          1. Do we expect MIN_HEADER_SIZE and RECORD_OVERHEAD to be different in the future? Currently their values are the same and the way they are computed are also identical.

          — kafka.common.request.RequestHeader

          1. Is it better to define "client_id" strings as static field in the Protocol.java?

          2. Does REQUEST/RESPONSE_HEADER also need to be versioned?

          — kafka.client.common.NetworkReceive

          1. In the first constructor, why not also initializing the size buffer also to ByteBuffer.allocate(4)?

          2. Why NetworkReceive not extending ByteBufferReceive?

          — kafka.client.common.Selector

          1. “transmissions.send.remaining() <= 0”, under what condition can remaining() be < 0?

          2. “if (trans != null) this.disconnected.add(trans.id); “, should it be trans == null?

          — kafka.client.producer.internals.BufferPool:

          1. In the freeUp() function, should use this.free.pollLast().capacity() instead of limit()?

          2. What is the rational of having just one poolable size?

          — kafka.clients.producer.internals.Metadata

          1. After configs are added, we need to remove the hard-coded default values. So for all of these places we could leave a TODO mark for now.

          — kafka.clients.producer.internals.ProduceRequestResult

          1. Its member fields are dependent on Protocol.java, so once we change the protocol we would probably also need to change this file.

          — kafka.clients.producer.internals.RecordAccumulator

          1. Typo: “Get a list of topic-partitions which are ready to be send.”

          — kafka.clients.producer.internals.Sender

          1. One corner case we may need to consider is the following: if a partition becomes not available, and producer keep sending data to this partition, then later on this partition could exhaust the memory, keeping other partitions to not able to take more messages but block waiting.

          2. In handling dis-connection, the ProduceRequestResult will set the exception, and if await() is called this exception will be thrown and the callback not be executed. Since this exception is already stored in the RecordSend I think a better way is not throw exception on await() but let the callback function to handle it. That would make the application code more clean since otherwise the application need so try-catch the await() call.

          3. In closing the producer, there is another corner case that the io thread can keep trying to send the rest of the data and failed. Probably we could add another option to drop whatever is in the buffer and let the callback functions of the application to handle them.

          Show
          Guozhang Wang added a comment - Some more comments: — General 1. How to we decide where to put Exception definitions? Currently we have an errors folder in kafka.comm and some folders also have their only exceptions. 2. Shall we merge kafka.comm.protocol and kafka.comm.request folders since the requests definitions are highly dependent on the protocol class? 3. Shall we put Node, Cluster, Partition, TopicPartition in kafka.comm into one sub-folder, for example, called kafka.comm.metadata? 4. Shall we put the Serializer classes into the protocol folder? 5. Shall we move the kafka.clients.common.network sub-folder to kafka.common? — kafka.common.Cluster 1. Since the nextNode use global round robin, we need to make sure no more than one objects access a single Cluster’s nextNode. — kafka.common.StringSerialization 1. Shall we put config names such as ENCODING_CONFIG all in a single file? — kafka.common.AbstractIterator 1. makeNext is not supposed to left in other states other than DONE and READY? — kafka.common.protocl.Schema 1. Will Field order difference make to different schemas? — kafka.common.protocl.ProtoUtil 1. parseMetadataResponse: after reading the function I feel that the TopicInfo/PartitionInfo object for parsing might be preferable. We can put these objects in the Protocol.java file so any protocol change would only require one file edit. — kafka.common.record.LogEntry 1. Maybe we can rename to OffsetRecord? — kafka.common.record.Record 1. Do we expect MIN_HEADER_SIZE and RECORD_OVERHEAD to be different in the future? Currently their values are the same and the way they are computed are also identical. — kafka.common.request.RequestHeader 1. Is it better to define "client_id" strings as static field in the Protocol.java? 2. Does REQUEST/RESPONSE_HEADER also need to be versioned? — kafka.client.common.NetworkReceive 1. In the first constructor, why not also initializing the size buffer also to ByteBuffer.allocate(4)? 2. Why NetworkReceive not extending ByteBufferReceive? — kafka.client.common.Selector 1. “transmissions.send.remaining() <= 0”, under what condition can remaining() be < 0? 2. “if (trans != null) this.disconnected.add(trans.id); “, should it be trans == null? — kafka.client.producer.internals.BufferPool: 1. In the freeUp() function, should use this.free.pollLast().capacity() instead of limit()? 2. What is the rational of having just one poolable size? — kafka.clients.producer.internals.Metadata 1. After configs are added, we need to remove the hard-coded default values. So for all of these places we could leave a TODO mark for now. — kafka.clients.producer.internals.ProduceRequestResult 1. Its member fields are dependent on Protocol.java, so once we change the protocol we would probably also need to change this file. — kafka.clients.producer.internals.RecordAccumulator 1. Typo: “Get a list of topic-partitions which are ready to be send.” — kafka.clients.producer.internals.Sender 1. One corner case we may need to consider is the following: if a partition becomes not available, and producer keep sending data to this partition, then later on this partition could exhaust the memory, keeping other partitions to not able to take more messages but block waiting. 2. In handling dis-connection, the ProduceRequestResult will set the exception, and if await() is called this exception will be thrown and the callback not be executed. Since this exception is already stored in the RecordSend I think a better way is not throw exception on await() but let the callback function to handle it. That would make the application code more clean since otherwise the application need so try-catch the await() call. 3. In closing the producer, there is another corner case that the io thread can keep trying to send the rest of the data and failed. Probably we could add another option to drop whatever is in the buffer and let the callback functions of the application to handle them.
          Hide
          Jun Rao added a comment -

          I made a pass of the producer client code. The following are my comments.

          1. Selector: It seems that the selector never closes an existing socket on its own (other than when the selector itself is closed). For example, not existing sockets are closed after metadata refresh. This has the implication that it may increase the # of socket connections that a client has to maintain. For example, if every client uses all brokers as the metadata broker list, it means that every client will maintain a socket connection to every broker, which doesn't seem to be very scalable. Also, if a partition is moved to some new brokers, the client will still be maintaining the socket connections to the old brokers. In 0.8, we close all existing sockets everytime the metadata is refreshed.

          2. Metadata: We need to think through the case when the clients use a VIP in the metadata broker list. In this patch, it seems that we only use the VIP once and then switch to actual broker list after first metadata update. This means that the producer can only issue metadata requests to brokers to which replicas are assigned. In 0.8, we always fetch metadata requests using the metadata broker list. Another thing that we do in 0.8 is to close the socket connection after each metadata request. When using a VIP, an idle socket connection can be killed by the load balancer. If the vip is not configured properly, it may take a long time (e.g., 8 minutes) to detect that the socket is already killed, which will slow down the fetching of metadata.

          3. DefaultPartitioner:
          3.1 This has the issue that every instance of producer always starts with partition 0, which could create imbalanced load if multiple producers are created at the same time.
          3.2 Also, a better default partitioner when no partition key is provided, is probably to select a random "available" (i.e., leader node exists) partition, instead of just a random partition.

          4.Partitioner.partition(): From cluster, we can get the partition list for a topic. Is the passed in numPartitions redundant?

          5. Sender:
          5.1 run(): It seems that it's possible to have a produce request and metadata request to be sent to the same node in one iteration. This will cause selector.poll() to fail since we can't send more than 1 request to the same node per poll.
          5.2 produceRequest(): topicDatas is weird since data is the plural form of datum.

          6. TopicPartition: How do we prevent that the computed hash code is exactly 0?

          7. BufferExhaustedException: It's probably useful to include the requested size in the exception.

          8. RecordAccumulator:
          8.1 Should we call free bufferPool?
          8.2 ready(): Should a partition be also considered ready if it has only 1 ReocrdBatch whose size is exactly of batchSize?

          9. RecordBatch.done(): Should we unblock RecordSend after registered callbacks are called?

          10. RecordSend: We should include at least the partition number and probably the topic itself.

          11. Various mis-spellings:
          11.1 ProducerRecord: chosing
          11.2 KafkaProducer:
          11.2.1 comments above send(): messaging waiting = > messages waiting
          11.2.2

          {@link kafka.clients.producer.RecordSend.await() await()}

          : 2 await()
          11.3 RecordBatch: sufficent

          12. Formatting: Should we use 4-space indentation vs 2-space? The latter is what we have been using in scala.

          The following can be added to the TODO list:

          13. BufferPool: When we add jmx, it's probably using to have one on size of the waiter list, and another on the available memory.

          14. logging: It seems that there is no logging messages and we use e.printStackTrace() in a few places. Should we use log4j?

          15. Configs: It would be useful to log every overridden value and unused property name.

          Show
          Jun Rao added a comment - I made a pass of the producer client code. The following are my comments. 1. Selector: It seems that the selector never closes an existing socket on its own (other than when the selector itself is closed). For example, not existing sockets are closed after metadata refresh. This has the implication that it may increase the # of socket connections that a client has to maintain. For example, if every client uses all brokers as the metadata broker list, it means that every client will maintain a socket connection to every broker, which doesn't seem to be very scalable. Also, if a partition is moved to some new brokers, the client will still be maintaining the socket connections to the old brokers. In 0.8, we close all existing sockets everytime the metadata is refreshed. 2. Metadata: We need to think through the case when the clients use a VIP in the metadata broker list. In this patch, it seems that we only use the VIP once and then switch to actual broker list after first metadata update. This means that the producer can only issue metadata requests to brokers to which replicas are assigned. In 0.8, we always fetch metadata requests using the metadata broker list. Another thing that we do in 0.8 is to close the socket connection after each metadata request. When using a VIP, an idle socket connection can be killed by the load balancer. If the vip is not configured properly, it may take a long time (e.g., 8 minutes) to detect that the socket is already killed, which will slow down the fetching of metadata. 3. DefaultPartitioner: 3.1 This has the issue that every instance of producer always starts with partition 0, which could create imbalanced load if multiple producers are created at the same time. 3.2 Also, a better default partitioner when no partition key is provided, is probably to select a random "available" (i.e., leader node exists) partition, instead of just a random partition. 4.Partitioner.partition(): From cluster, we can get the partition list for a topic. Is the passed in numPartitions redundant? 5. Sender: 5.1 run(): It seems that it's possible to have a produce request and metadata request to be sent to the same node in one iteration. This will cause selector.poll() to fail since we can't send more than 1 request to the same node per poll. 5.2 produceRequest(): topicDatas is weird since data is the plural form of datum. 6. TopicPartition: How do we prevent that the computed hash code is exactly 0? 7. BufferExhaustedException: It's probably useful to include the requested size in the exception. 8. RecordAccumulator: 8.1 Should we call free bufferPool? 8.2 ready(): Should a partition be also considered ready if it has only 1 ReocrdBatch whose size is exactly of batchSize? 9. RecordBatch.done(): Should we unblock RecordSend after registered callbacks are called? 10. RecordSend: We should include at least the partition number and probably the topic itself. 11. Various mis-spellings: 11.1 ProducerRecord: chosing 11.2 KafkaProducer: 11.2.1 comments above send(): messaging waiting = > messages waiting 11.2.2 {@link kafka.clients.producer.RecordSend.await() await()} : 2 await() 11.3 RecordBatch: sufficent 12. Formatting: Should we use 4-space indentation vs 2-space? The latter is what we have been using in scala. The following can be added to the TODO list: 13. BufferPool: When we add jmx, it's probably using to have one on size of the waiter list, and another on the available memory. 14. logging: It seems that there is no logging messages and we use e.printStackTrace() in a few places. Should we use log4j? 15. Configs: It would be useful to log every overridden value and unused property name.
          Hide
          Jay Kreps added a comment -

          Created reviewboard https://reviews.apache.org/r/17263/
          against branch trunk

          Show
          Jay Kreps added a comment - Created reviewboard https://reviews.apache.org/r/17263/ against branch trunk

            People

            • Assignee:
              Jay Kreps
              Reporter:
              Jay Kreps
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development