Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21567

CSV Format exception while parsing: ArrayIndexOutOfBoundsException: 4000

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

      Description

      I've been trying to play a bit with the data available at https://www.kaggle.com/usdot/flight-delays and got the following exception:

      2021-02-16 18:57:37,913 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, flights, filter=[], project=[ORIGIN_AIRPORT, DEPARTURE_DELAY]]], fields=[ORIGIN_AIRPORT, DEPARTURE_DELAY]) -> Calc(select=[ORIGIN_AIRPORT], where=[(DEPARTURE_DELAY > 0)]) -> LocalHashAggregate(groupBy=[ORIGIN_AIRPORT], select=[ORIGIN_AIRPORT, Partial_COUNT(*) AS count1$0]) (1/1)#0 (ebbf1204d875a5a4ace529df0d5ba719) switched from RUNNING to FAILED.
      java.io.IOException: Failed to deserialize CSV row.
      	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:257) ~[flink-csv-1.12.1.jar:1.12.1]
      	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162) ~[flink-csv-1.12.1.jar:1.12.1]
      	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      Caused by: java.lang.ArrayIndexOutOfBoundsException: 4000
      	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.skipLinesWhenNeeded(CsvDecoder.java:527) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.startNewLine(CsvDecoder.java:499) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleObjectRowEnd(CsvParser.java:1067) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:858) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:665) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:250) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:280) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:250) ~[flink-csv-1.12.1.jar:1.12.1]
      	... 5 more
      

      Fully working example:

      Using the attached file (derived from the data on flight delays, linked above) and the SQL CLI:

      CREATE TABLE `flights` (
        `_YEAR` CHAR(4),
        `_MONTH` CHAR(2),
        `_DAY` CHAR(2),
        `_DAY_OF_WEEK` TINYINT,
        `AIRLINE` CHAR(2),
        `FLIGHT_NUMBER` SMALLINT,
        `TAIL_NUMBER` CHAR(6),
        `ORIGIN_AIRPORT` CHAR(3),
        `DESTINATION_AIRPORT` CHAR(3),
        `_SCHEDULED_DEPARTURE` CHAR(4),
        `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
        `_DEPARTURE_TIME` CHAR(4),
        `DEPARTURE_DELAY` SMALLINT,
        `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT), TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00')),
        `TAXI_OUT` SMALLINT,
        `WHEELS_OFF` CHAR(4),
        `SCHEDULED_TIME` SMALLINT,
        `ELAPSED_TIME` SMALLINT,
        `AIR_TIME` SMALLINT,
        `DISTANCE` SMALLINT,
        `WHEELS_ON` CHAR(4),
        `TAXI_IN` SMALLINT,
        `SCHEDULED_ARRIVAL` CHAR(4),
        `ARRIVAL_TIME` CHAR(4),
        `ARRIVAL_DELAY` SMALLINT,
        `DIVERTED` BOOLEAN,
        `CANCELLED` BOOLEAN,
        `CANCELLATION_REASON` CHAR(1),
        `AIR_SYSTEM_DELAY` SMALLINT,
        `SECURITY_DELAY` SMALLINT,
        `AIRLINE_DELAY` SMALLINT,
        `LATE_AIRCRAFT_DELAY` SMALLINT,
        `WEATHER_DELAY` SMALLINT
      ) WITH (
        'connector' = 'filesystem',
        'path' = 'file:///tmp/kaggle-flight-delay/flights-small.csv',
        'format' = 'csv',
        'csv.allow-comments' = 'true',
        'csv.null-literal' = ''
      );
      
      SELECT * FROM `flights` LIMIT 10;
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              nkruber Nico Kruber

              Dates

              • Created:
                Updated:

                Issue deployment