Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33181

Table using `kinesis` connector can not be used for both read & write operations if it's defined with unsupported sink property

    XMLWordPrintableJSON

Details

    Description

      First, I define a table which uses `kinesis` connector with an unsupported property for sink, e.g. `scan.stream.initpos`:

      %flink.ssql(type=update)
      – Create input
      DROP TABLE IF EXISTS `kds_input`;
      CREATE TABLE `kds_input` (
      `some_string` STRING,
      `some_int` BIGINT,
      `time` AS PROCTIME()
      ) WITH (
      'connector' = 'kinesis',
      'stream' = 'ExampleInputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv'
      );
      

      I can read from my table (kds_input) without any issue, but it will throw exception if I try to write to the table:

      %flink.ssql(type=update)
      – Use to generate data in the input table
      
      DROP TABLE IF EXISTS connector_cve_datagen;
      CREATE TABLE connector_cve_datagen(
      `some_string` STRING,
      `some_int` BIGINT
      ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.some_string.length' = '2');
      INSERT INTO kds_input SELECT some_string, some_int from connector_cve_datagen
      

      Exception observed:

      Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kinesis'.
      
      Unsupported options:
      
      scan.stream.initpos
      
      Supported options:
      
      aws.region
      connector
      csv.allow-comments
      csv.array-element-delimiter
      csv.disable-quote-character
      csv.escape-character
      csv.field-delimiter
      csv.ignore-parse-errors
      csv.null-literal
      csv.quote-character
      format
      property-version
      sink.batch.max-size
      sink.fail-on-error
      sink.flush-buffer.size
      sink.flush-buffer.timeout
      sink.partitioner
      sink.partitioner-field-delimiter
      sink.producer.collection-max-count (deprecated)
      sink.producer.collection-max-size (deprecated)
      sink.producer.fail-on-error (deprecated)
      sink.producer.record-max-buffered-time (deprecated)
      sink.requests.max-buffered
      sink.requests.max-inflight
      stream
      at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624)
      at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914)
      at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978)
      at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938)
      at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978)
      at org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:65)
      at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
      ... 36 more
      

      Attachments

        Issue Links

          Activity

            People

              khanhvu Khanh Vu
              khanhvu Khanh Vu
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: