It would be good to benchmark Samza vs the raw Kafka producer using the test harness in 0.8.2 (example commands are linked here: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines).
The primary important optimization in the producer is batching of messages. For small messages this is the difference between about 20k messages/sec and 1M messages sec.
This patch is likely good in an all-out performance test. However there may be some issues and it may prove to add more load to the Kafka cluster than the old implementation.
The reason is because I think the old implementation did it's own batching and hence was pretty agressive about not sending messages until there were multiple things to send.
But the new implementation does no batching and just relies on the producer to batch. The producer will batch whenever it can't send out requests fast enough. In practice this is kind of a race between the I/O thread that is sending requests and the Samza thread that is generating messages. As the samza thread generates more and more load the I/O thread will fall behind, at which point batching kicks in and makes each send a lot more efficient.
However an issue here is that a slightly slower samza job might generate a ton of messages, just not enough to overwhelm the I/O thread, and hence each will be sent immediately as it's own request. There is nothing inherently wrong with this, and then only way to get 0 latency, but it will definitely add Kafka load.
The setting that controls this is linger.ms. Linger.ms=5 says, look, just because you got one message, don't send it right away, wait up until 5 ms for more messages to be written. Even a linger.ms=1 will dramatically drop the request count at only a 1 ms latency hit.
The problem is that there is no way with linger.ms > 0 to immediately flush the client, you always wait that long. Hence if you set linger.ms=1, then when you want to commit you will likely wait a ms while things sit in the producer queue, which is silly.
To fix this we need to have a flush() call in the producer that immediately makes any queued message ready for sending (irrespective of linger.ms) and blocks on completion of all these requests.
If we do this we can set the linger.ms to something greater than 0 and all will be great.