Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.15.4, aws-connector-4.1.0
Description
First, I define a table which uses `kinesis` connector with an unsupported property for sink, e.g. `scan.stream.initpos`:
%flink.ssql(type=update) – Create input DROP TABLE IF EXISTS `kds_input`; CREATE TABLE `kds_input` ( `some_string` STRING, `some_int` BIGINT, `time` AS PROCTIME() ) WITH ( 'connector' = 'kinesis', 'stream' = 'ExampleInputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' );
I can read from my table (kds_input) without any issue, but it will throw exception if I try to write to the table:
%flink.ssql(type=update) – Use to generate data in the input table DROP TABLE IF EXISTS connector_cve_datagen; CREATE TABLE connector_cve_datagen( `some_string` STRING, `some_int` BIGINT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.some_string.length' = '2'); INSERT INTO kds_input SELECT some_string, some_int from connector_cve_datagen
Exception observed:
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kinesis'. Unsupported options: scan.stream.initpos Supported options: aws.region connector csv.allow-comments csv.array-element-delimiter csv.disable-quote-character csv.escape-character csv.field-delimiter csv.ignore-parse-errors csv.null-literal csv.quote-character format property-version sink.batch.max-size sink.fail-on-error sink.flush-buffer.size sink.flush-buffer.timeout sink.partitioner sink.partitioner-field-delimiter sink.producer.collection-max-count (deprecated) sink.producer.collection-max-size (deprecated) sink.producer.fail-on-error (deprecated) sink.producer.record-max-buffered-time (deprecated) sink.requests.max-buffered sink.requests.max-inflight stream at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624) at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978) at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978) at org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:65) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259) ... 36 more
Attachments
Issue Links
- links to