Hey Guozhang, thanks for the detailed stylistic questions. I think these are important to discuss. Quick responses inline:
1. How to we decide where to put Exception definitions? Currently we have an errors folder in kafka.comm and some folders also have their only exceptions.
That package was meant to be explicitly API errors. I.e. those errors which are defined in ErrorKeys.java with a registered error code and have a bidirectional mapping to this code. These represent communication between the client and server so I wanted to put them in a special place. In general exceptions should be kept with the package with which they most naturally fit (ConfigException goes with config, etc).
The most important code organization principle I had in mind was that each package should be either public or private. Only public packages will be javadoc'd. All classes in a public package are exposed to the user and are part of the public interface. The idea is that we would be very careful with these public packages. I considered actually differentiating these as something like kafka.clients.producer.pub or something but I thought that was a bit ugly--maybe there is another way to differentiate or annotate classes so that we are very explicit about public or private. Essentially any change to interfaces in these packages is breaking all our users so we have to think very carefully about API design and change. The rest of the classes (most of them actually) are really just an implementation detail and we can change them at will.
Currently the public packages are just:
One item I wanted to document and discuss as a separate thread was code organization as I think these kinds of conventions only work if they are documented and broadly understood.
2. Shall we merge kafka.comm.protocol and kafka.comm.request folders since the requests definitions are highly dependent on the protocol class?
I would rather not. The kafka.common.network package defines a low-level network framing based on size delimited messages. This is fully generic--the unit test tests an "echo server". It is not tied to any details of our protocol and it is really important that people not leak details of our protocol into it!!!!
The protocol is just a bunch of message definitions and isn't tied to the network transport or framing at all. It is just a way of laying out bytes.
The request package combines the protocol definition and network framing.
I am hoping to keep these things orthogonal.
Once we get our build fixed (ahem), I'd really like us to get checkstyle integrated so we can enforce these kinds of package dependencies and keep some kind of logical coherence. It has been a struggle otherwise.
3. Shall we put Node, Cluster, Partition, TopicPartition in kafka.comm into one sub-folder, for example, called kafka.comm.metadata?
I'm open to that. I liked having the current flat package for simplicity for the user (fewer things to import). Basically I am trying to make the javadoc for the producer as simple and flat as possible.
4. Shall we put the Serializer classes into the protocol folder?
The Serializer is the PUBLIC interface for users to serialize their messages. It actually isn't related to the definition of our protocol definition.
5. Shall we move the kafka.clients.common.network sub-folder to kafka.common?
Yeah I think that is actually how it is. I previously had it separate and the rationale was that many of the classes...e.g. the Selector were really written with the clients in mind. Theoretically the same Selector class could be the basis for the socket server but I didn't really think those use cases through.
1. Since the nextNode use global round robin, we need to make sure no more than one objects access a single Cluster’s nextNode.
That may just be a bad name. The goal of that method was load balancing not iterating over the nodes. So actually the intention was to give a different node to each thread in the multithreaded case.
1. Shall we put config names such as ENCODING_CONFIG all in a single file?
I planned to do a discussion on config. The way it works is that configs are defined by the ConfigDef. However we allow plug-in interfaces (Serializer, Partitioner, etc). These may need configs too, but these are (generally speaking) user classes. So we allow including user-defined configs. So StringSerializer is essentially a user plug-in that seemed useful enough to include in the main code. I think it makes more sense to document it's configs with the class rather than elsewhere.
1. makeNext is not supposed to left in other states other than DONE and READY?
Yeah this is basically a transliteration of the same class in the main code base which is a transliteration of the iterator in Google Collections.
1. kafka.common.protocl.Schema: Will Field order difference make to different schemas?
Yes our protocol is based on position not name.
1. kafka.common.protocl.ProtoUtil: parseMetadataResponse: after reading the function I feel that the TopicInfo/PartitionInfo object for parsing might be preferable. We can put these objects in the Protocol.java file so any protocol change would only require one file edit.
I'd like to have a discussion about this. I tried this approach. The problem is that the mixing of protocol definition with business logic leads to leaking of logic into the protocol and makes the protocol hard to read. There are several other options. It would be good to discuss.
1. kafka.common.record.LogEntry: Maybe we can rename to OffsetRecord?
Hmm, I'm open to changing it but I'm not sure that's better. I try to avoid names like TopicPartition which are just the concatenation of all the fields as I feel the purpose of a name is to capture the concept the fields describe. I.e. if we add a new field we shouldn't need to lengthen the name!
1. kafka.common.record.Record: Do we expect MIN_HEADER_SIZE and RECORD_OVERHEAD to be different in the future? Currently their values are the same and the way they are computed are also identical.
Good point I'll look into this, these are just copied from the scala.
1. kafka.common.request.RequestHeader: Is it better to define "client_id" strings as static field in the Protocol.java?
Unlikely. Currently you can access a field by the string name or the field instance. The field instance is an array access and the name is a hash table lookup to get the field followed by an array access. So the two reasonable options are to have static variables for the Fields or to access with Strings. Let's procrastinate that to the discussion of handling request definitions.
2. kafka.client.common.NetworkReceive: Does REQUEST/RESPONSE_HEADER also need to be versioned?
If we want to change it. We didn't have a version number for these in the protocol so we can't add one now. Any header change today is non-backwards compatible.
1. In the first constructor, why not also initializing the size buffer also to ByteBuffer.allocate(4)?
The point of the size buffer is to read the size to allocate and read the message buffer, but that constructor takes an already allocated/read message buffer. I was using that constructor for unit testing to fake responses that weren't really being read.
2. Why NetworkReceive not extending ByteBufferReceive?
Yes this distressed me as well. Here is the issue. I want ByteBufferSend/NetworkSend to work on an array of bytebuffers to handle the case where you already have serialized chunks of data (i.e. message sets). But in the case of a receive we don't currently have a good way to concatenate buffers so reading into multiple buffers isn't useful. This is basically a limitation of the ByteBuffer api.
1. kafka.client.common.Selector: “transmissions.send.remaining() <= 0”, under what condition can remaining() be < 0?
None, I think.
2. “if (trans != null) this.disconnected.add(trans.id); “, should it be trans == null?
Well that would give a null pointer, no? What this is saying is "if we have an id for this connection, record it as disconnected".
1. kafka.client.producer.internals.BufferPoolIn the freeUp() function, should use this.free.pollLast().capacity() instead of limit()?
Yeah that would be better, technically anything on the free list must have capacity==limit but I think it is bad to depend on that.
2. What is the rational of having just one poolable size?
Basically just to avoid implementing malloc. I guess the choice is either to implement a generic BufferPool or one specific to the needs of the producer. Since most of the time the producer will be doing allocations of that poolable size it makes sense to keep those around. If you try to keep arbitrary sizes I think things quickly get complex but since we will almost always be allocating the same size it seemed simpler to just handle that case. I'm open to generalizing it if it isn't too complex.
1. After configs are added, we need to remove the hard-coded default values. So for all of these places we could leave a TODO mark for now.
1. Its member fields are dependent on Protocol.java, so once we change the protocol we would probably also need to change this file.
I don't believe this is dependent on the protocol, maybe you can elaborate?
1. kafka.clients.producer.internals.RecordAccumulatorTypo: “Get a list of topic-partitions which are ready to be send.”
1. One corner case we may need to consider is the following: if a partition becomes not available, and producer keep sending data to this partition, then later on this partition could exhaust the memory, keeping other partitions to not able to take more messages but block waiting.
If I understand the case you are describing you are saying that the producer could use up the full buffer on a partition which is not available and the producer will then block. This is correct and that is the intention. This shouldn't block the sender, though, it will keep trying to send until the partition becomes available again. I think this is what you want: you can buffer for a while but eventually must either block or drop data if memory is bounded.
2. In handling dis-connection, the ProduceRequestResult will set the exception, and if await() is called this exception will be thrown and the callback not be executed. Since this exception is already stored in the RecordSend I think a better way is not throw exception on await() but let the callback function to handle it. That would make the application code more clean since otherwise the application need so try-catch the await() call.
I think the callback is always executed...if there is a case this doesn't happen it is a bug.
I agree that
// do something
is easier to read. The problem is that it is incumbant on you to check and if you don't it silently fails. This is the complaint people have about mongodb. The principle I am going on is that
should be exactly interchangable with a blocking call. Anyhow I am sympathetic to your point, let's move it into the public api discussion.
3. In closing the producer, there is another corner case that the io thread can keep trying to send the rest of the data and failed. Probably we could add another option to drop whatever is in the buffer and let the callback functions of the application to handle them.
I think what you are saying is that close() blocks until all data is sent. That is the intention. Since send is async I think it is counterintuitive to fail/drop in-progress calls as the user may not know their calls aren't completed.