Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20463

flink-1.11.2 -sql cannot ignore exception record

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.11.2
    • None
    • Table SQL / Runtime
    • None
    • <flink.version>1.11.2</flink.version>
      <scala.binary.version>2.11</scala.binary.version>

    Description

      can Flink SQL provide an option to ignore exception record?

      I have a table that maps kafka data in json format.

      When parsing the exception data, an exception is thrown, but the data is valid JSON, not a valid record.

      exception data:{"SHEET":[""]}

      my table:

      CREATE TABLE offline
      (
      SHEET ROW (
      HEADER MAP < STRING, STRING >,
      ITEM ROW (
      AMOUNT STRING,
      COST STRING,
      GOODSID STRING,
      SALEVALUE STRING,
      SAP_RTMATNR STRING,
      SAP_RTPLU STRING,
      SERIALID STRING,
      SHEETID STRING
      ) ARRAY,
      ITEM5 MAP < STRING, STRING > ARRAY,
      ITEM1 MAP < STRING, STRING > ARRAY,
      TENDER MAP < STRING, STRING > ARRAY
      ) ARRAY
      )
      WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'xxx:9092',
      'properties.group.id' = 'realtime.sales.offline.group',
      'topic' = 'bms133',
      'format' = 'json',
      'json.ignore-parse-errors' = 'true',
      'scan.startup.mode' = 'earliest-offset'
      );

      exception:

      Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:116) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:129) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:51) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

      Attachments

        Activity

          People

            Unassigned Unassigned
            hiscat 谢波
            Votes:
            1 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: