Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33176

Kinesis source throws NullPointerException in Table API on ignored parsing errors

    XMLWordPrintableJSON

Details

    Description

      Using following example table:

       

      CREATE TABLE source (
        text STRING,
        `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL
      ) WITH (
        'connector' = 'kinesis',
        'stream' = 'test',
        'aws.region' = 'us-east-1',
        'json.ignore-parse-errors' = 'true',
        'format' = 'json'
      ) 

      Connector throws NullPointerException when source consumes malformed json message:

      java.lang.NullPointerException
          at org.apache.flink.streaming.connectors.kinesis.table.RowDataKinesisDeserializationSchema.deserialize(RowDataKinesisDeserializationSchema.java:137)
          at org.apache.flink.streaming.connectors.kinesis.table.RowDataKinesisDeserializationSchema.deserialize(RowDataKinesisDeserializationSchema.java:44)
          at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:202)
          at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:126)
          at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:118)
          at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)
          at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829) 

       

      Attachments

        Issue Links

          Activity

            People

              a.pilipenko Aleksandr Pilipenko
              a.pilipenko Aleksandr Pilipenko
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: