Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12871

Support for Confluent Schema Registry

Details

    • New Feature
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • cross-language, io-py-kafka
    • None

    Description

      I would like to use the Python components apache_beam.io.kafka.WriteToKafka and apache_beam.io.kafka.ReadFromKafka while accessing and updating Schema Registry for sending and reading `protobuf` messages.

      In Java it is possible to achieve this:

       

       PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
         .apply(KafkaIO.<Long, GenericRecord>read()
            .withBootstrapServers("broker_1:9092,broker_2:9092")
            .withTopic("my_topic")
            .withKeyDeserializer(LongDeserializer.class)
            // Use Confluent Schema Registry, specify schema registry URL and value subject
            .withValueDeserializer(
                ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
          ...
      

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            denesb Dénes Bartha
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: