Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.15.4, aws-connector-4.1.0
Description
Using following example table:
CREATE TABLE source ( text STRING, `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL ) WITH ( 'connector' = 'kinesis', 'stream' = 'test', 'aws.region' = 'us-east-1', 'json.ignore-parse-errors' = 'true', 'format' = 'json' )
Connector throws NullPointerException when source consumes malformed json message:
java.lang.NullPointerException at org.apache.flink.streaming.connectors.kinesis.table.RowDataKinesisDeserializationSchema.deserialize(RowDataKinesisDeserializationSchema.java:137) at org.apache.flink.streaming.connectors.kinesis.table.RowDataKinesisDeserializationSchema.deserialize(RowDataKinesisDeserializationSchema.java:44) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:202) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:126) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:118) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
Attachments
Issue Links
- links to