Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
Description
Context:
When using the high-level API to join streams, Samza automatically sets up a couple of RocksDB stores in order to keep track of each side of the join. The retention time of the RocksDB stores is set to the join TTL. These RocksDB stores are backed up by Kafka changelogs. Samza will automatically create these changelogs in Kafka, and the retention time of the changelogs is set to the join TTL as well. See the JoinOperatorSpec.java class for the configuration set-up.
Issue:
If the Samza job is initially deployed with a certain join TTL, then the Kafka changelogs will be created with the retention time set to that initial join TTL. If the Samza job is then redeployed with a different join TTL, then the retention time for the Kafka changelog will not get updated to the new value. However, the RocksDB TTL will get updated. This means that there will be an inconsistency between the RocksDB TTL and the Kafka changelog retention time. This will cause an issue when the Kafka changelog is needed to bootstrap a container, because the Kafka changelog will not properly reflect the data that existed in the corresponding RocksDB store on the previous container.
Potential resources for solution:
Kafka has an AdminUtils class (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala) to fetch and change topic configurations (although these seem to currently be deprecated and replaced by AdminClient. Kafka 0.11 has an AdminClient (https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html) which allows for describing and altering configs for topics. One potential solution is that on startup, the Samza job could check the retention time config of the changelog topic and update it if necessary.