Description
SamzaContainer and TaskStorageManager improperly begin reading changelog streams from the "latest" offset in the default case, when using a KafkaSystemFactory for the changelog stream.
In the logs you can see:
2014-02-11 02:38:33 SamzaContainer$ [INFO] Got change log system streams: Map(realtime-state-store -> SystemStream [system=kafka, stream=realtime-state-store]) ... 2014-02-11 02:38:36 SamzaContainer [INFO] Starting task instance stores. ... 2014-02-11 02:38:36 BrokerProxy [INFO] Creating new SimpleConsumer for host localhost:10251 for system kafka 2014-02-11 02:38:36 GetOffset [INFO] Checking if auto.offset.reset is defined for topic realtime-state-store 2014-02-11 02:38:36 GetOffset [INFO] Got reset of type largest. 2014-02-11 02:38:36 GetOffset [INFO] Final offset to be returned for Topic and Partition [realtime-state-store,96] = 3358324
This is wrong, as it means we never read any messages from the changelog. In Kafka's case (and in general), we want to start reading from the EARLIEST offset in the log (always zero for Kafka). This is caused by TaskStorageManager's startConsumers method, which calls register with a null offset. With a KafkaSystemConsumer, this means to defer to the auto.offset.reset setting, which defaults to "largest" if it isn't defined.
Some possible ways that I can think of to fix this:
1. Introduce some kind of EARLIEST/LATEST constants for SystemConsumer.register, have TaskStorageManager use it, and KafkaSystemConsumer honor it.
2. Default KafkaSystemFactory.getConsumer to force all changelog stream auto-offset-resets to be EARLIEST when not defined.
3. Change KafkaSystemFactory auto-offset-reset to EARLIEST.
4. Add some kind of alternative "getChangelogConsumer" method to SystemConsumer.
5. Change the meaning of SystemConsumer.register(..., null) to always mean "earliest". Right now it means "defer to the consumer to decide what an appropriate offset is, since I don't have one."
There are many more tweaks we could make as well. In general, I think the question is, do we want to fix this at the Kafka-level, or at the framework-wide level. (2) and (3) are Kafka-specific. (1), (4), and (5) are framework-wide, and would apply to all SystemConsumers.
I think I'd rather fix this at the framework level. Right now, I think something like (1) seems like the best solution. It does add yet another constraint for SystemConsumer implementations, though, which is annoying.
What do you think about having a SystemConsumer.START_FROM_EARLIEST_OFFSET = "__samza_EARLIEST" constant, and using that in TaskStorageManager.startConsumers? SystemConsumers that don't honor this constant couldn't be used as changelog streams.
In the meantime, as a workaround, you can forcibly configure the auto.offset.reset setting to be smallest for all changelog streams.