Details
-
New Feature
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
Description
I would like to use the Python components apache_beam.io.kafka.WriteToKafka and apache_beam.io.kafka.ReadFromKafka while accessing and updating Schema Registry for sending and reading `protobuf` messages.
In Java it is possible to achieve this:
PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline .apply(KafkaIO.<Long, GenericRecord>read() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("my_topic") .withKeyDeserializer(LongDeserializer.class) // Use Confluent Schema Registry, specify schema registry URL and value subject .withValueDeserializer( ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value")) ...