Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7662

Avro schema upgrade not supported on globalTable

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.0.0
    • Fix Version/s: None
    • Component/s: streams
    • Labels:

      Description

      I did quite a bit of testing around the avro upgrades, and it did not behave as I would have expected when the avro is used as a Key for a global table with a rocksDB store

      setup:

      • local confluent suite 4.0.2
      • test with stream app and producer (v 1.0.0)
      • schemas (key) :

      schemas :

      schema version @1
      {
      "namespace": "com.bell.cts.livecms.livemedia.topic",
      "type" : "record",
      "name" : "EventKey",
      "fields" : [
      {"name" : "keyAttribute1", "type" : "string"}
      ]
      }
      schema version @2
      {
      "namespace": "com.bell.cts.livecms.livemedia.topic",
      "type" : "record",
      "name" : "EventKey",
      "fields" : [
      {"name" : "keyAttribute1", "type" : "string"},
      {"name" : "keyAttribute2", "type" : ["null", "string"], "default" : null}
      ]
      }

       

      • TEST1 (PASS)
        • using schema version @1 
        • produce record1=[k@1, v@1] 
        • stream apps loads record1 in global table and store locally in rocksdb 
        • asyncAssert that store.get(k@1)=v@1 : PASS
      • TEST2 (PASS)
        • using schema version @1
        • delete local store (and checkpoint)
        • stream apps loads record1 in global table and store locally in rocksdb
        • asyncAssert that store.get(k@1)=v@1 : PASS
      • TEST3 (FAIL)
        • using schema version @2 
        • keep local store
        • stream apps does not reload record1 from topic because of local offset
        • asyncAssert that store.get(k@1)=v@1 : FAIL
        • however store.all().next().key.equals(k@2) , as built using schema version 2
        • this would be explained by the fact that the rocksdb store has some magic byte persisted of the record based on schema version 1
        • Not ideal, but I could consider accceptable to delete the local store in this cases.
      • TEST4 (FAIL)
        • using schema version @2
        • delete local store (and checkpoint)
        • stream apps loads record1 (produced from schema @1) in global table and store locally in rocksdb
        • asyncAssert that store.get(k@2)=v@2 : FAIL
        • however store.all().next().key.equals(k@2) , as built using schema version 2
        • I can't quite understand this one. I would have expected that the rockdb store should now be provisioned with a serialized version of the record based on the schema v2 (as it went though the stream app underpinning the store materialization)
      • TEST5 (FAIL)
        • using schema version @2 
        • produce record2=[k@2, v@2] (meant to be backward compatible and logically equals to record1) 
        • stream apps does the processing of record1(produced from schema @1) and record2 (produced from schema @2) and materialize the global table stored locally in rocksdb
        • asyncAssert that store.get(k@2)=v@2 : PASS but the store now has 2 entries !!!
        • it looks as if the stream.groupBy(key) of the topic underpinning the globaltable materialization did not group the 2 record keys together, although record1.key.equals(record2.key) is true in Java (by looping in the store)

      reading from the upstream raw topic throughout the testing :

      /tmp$ kafka-avro-console-consumer --topic topic-test-5 --bootstrap-server localhost:9092 --property schema.registry.url=http://127.0.0.1:8081 --property print.key=true --from-beginning 
      {"keyAttribute1":"key-attribute-1"}	{"valueAttribute1":"value-1"}
      {"keyAttribute1":"key-attribute-1"}	{"valueAttribute1":"value-1"}
      {"keyAttribute1":"key-attribute-1"}	{"valueAttribute1":"value-1"}
      {"keyAttribute1":"key-attribute-1","keyAttribute2":null}	{"valueAttribute1":"value-1"}

        Attachments

        1. avro-registry-http.txt
          42 kB
          Frederic Tardif
        2. kafka-avro-upgrade-testing.zip
          978 kB
          Frederic Tardif

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              frederic.tardif Frederic Tardif
            • Votes:
              2 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated: