Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22763

avro-confluent format does not allow for authorization credentials to be supplied to Confluent Schema Registry

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

      Description

      In PyFlink, attempting to connect to a avro-confluent kafka stream where the Confluent Schema Registry requires authorization does not work.

       Table API definition:

      ddl_kafka_avro_confluent_source = f""" 
        CREATE TABLE gtt_records( 
          **table columsn**
        ) WITH ( 
          'connector' = 'kafka', 
          'topic' = 'topic.avro-v1', 
          'properties.bootstrap.servers' = 'pkc-*****.ap-southeast-2.aws.confluent.cloud:9092', 
          'properties.security.protocol' = 'SASL_SSL', 
          'properties.sasl.mechanism' = 'PLAIN', 
          'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{KAFKA_API_KEY}" password="{KAFKA_API_SECRET}";', 
          'properties.basic.auth.credentials.source' = 'USER_INFO',   
          'properties.basic.auth.user.info' = '{SR_API_KEY}:{SR_API_SECRET}',
      
          'key.format' = 'avro-confluent', 
          'key.avro-confluent.schema-registry.url' = 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-*****.ap-southeast-2.aws.confluent.cloud', 
          'key.fields' = '**key fields**', 
      
          'value.format' = 'avro-confluent', 
          'value.avro-confluent.schema-registry.url' = 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-*****.ap-southeast-2.aws.confluent.cloud', 
          'value.fields-include' = 'ALL', 
      
          'key.avro-confluent.schema-registry.subject' = 'topic.avro-v1-key', 
          'value.avro-confluent.schema-registry.subject' = 'topic.avro-v1-value' 
      ) """

       

      Attempting to run a job with this table as a source results in a 401 error for the Confluent Schema Registry:

       

      2021-05-19 04:50:21,830 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, gtt_records]], fields=[unique, direction, window_ts, road_number, link_number, carriageway, version_no, window_local_date, window_local_time, poll_ts, duration, traffic_duration, distance, link_length]) -> Sink: Sink(table=[default_catalog.default_database.kafka_messages], fields=[unique, direction, window_ts, road_number, link_number, carriageway, version_no, window_local_date, window_local_time, poll_ts, duration, traffic_duration, distance, link_length]) (1/1)#0 (7eddc3a42dbcad0fc313bb6bdfa2c922) switched from RUNNING to FAILED with failure cause: java.io.IOException: Failed to deserialize Avro record.    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:119)    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)Caused by: java.io.IOException: Could not find schema with id 100001 in registry    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:73)    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)    ... 9 moreCaused by: org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401    at org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)    at org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)    at org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)    at org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)    at org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)    at org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)    at org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)    at org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)    ... 11 more
      

        Attachments

        Issue Links

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              Sam-Fiddis Samuel Fiddis

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment