Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21569

Flink SQL with CSV file input job hangs

    XMLWordPrintableJSON

Details

    Description

      In extension to FLINK-21567, I actually also got the job to be stuck on cancellation by doing the following in the SQL client:

      • configure SQL client defaults to run with parallelism 2
      • execute the following statement
      CREATE TABLE `airports` (
        `IATA_CODE` CHAR(3),
        `AIRPORT` STRING,
        `CITY` STRING,
        `STATE` CHAR(2),
        `COUNTRY` CHAR(3),
        `LATITUDE` DOUBLE NULL,
        `LONGITUDE` DOUBLE NULL,
        PRIMARY KEY (`IATA_CODE`) NOT ENFORCED
      ) WITH (
        'connector' = 'filesystem',
        'path' = 'file:///tmp/kaggle-flight-delay/airports.csv',
        'format' = 'csv',
        'csv.allow-comments' = 'true',
        'csv.ignore-parse-errors' = 'true',
        'csv.null-literal' = ''
      );
      
      CREATE TABLE `flights` (
        `_YEAR` CHAR(4),
        `_MONTH` CHAR(2),
        `_DAY` CHAR(2),
        `_DAY_OF_WEEK` TINYINT,
        `AIRLINE` CHAR(2),
        `FLIGHT_NUMBER` SMALLINT,
        `TAIL_NUMBER` CHAR(6),
        `ORIGIN_AIRPORT` CHAR(3),
        `DESTINATION_AIRPORT` CHAR(3),
        `_SCHEDULED_DEPARTURE` CHAR(4),
        `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
        `_DEPARTURE_TIME` CHAR(4),
        `DEPARTURE_DELAY` SMALLINT,
        `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT), TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00')),
        `TAXI_OUT` SMALLINT,
        `WHEELS_OFF` CHAR(4),
        `SCHEDULED_TIME` SMALLINT,
        `ELAPSED_TIME` SMALLINT,
        `AIR_TIME` SMALLINT,
        `DISTANCE` SMALLINT,
        `WHEELS_ON` CHAR(4),
        `TAXI_IN` SMALLINT,
        `SCHEDULED_ARRIVAL` CHAR(4),
        `ARRIVAL_TIME` CHAR(4),
        `ARRIVAL_DELAY` SMALLINT,
        `DIVERTED` BOOLEAN,
        `CANCELLED` BOOLEAN,
        `CANCELLATION_REASON` CHAR(1),
        `AIR_SYSTEM_DELAY` SMALLINT,
        `SECURITY_DELAY` SMALLINT,
        `AIRLINE_DELAY` SMALLINT,
        `LATE_AIRCRAFT_DELAY` SMALLINT,
        `WEATHER_DELAY` SMALLINT
      ) WITH (
        'connector' = 'filesystem',
        'path' = 'file:///tmp/kaggle-flight-delay/flights-small2.csv',
        'format' = 'csv',
        'csv.null-literal' = ''
      );
      
      SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, `NUM_DELAYS`
      FROM (
        SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, COUNT(*) AS `NUM_DELAYS`,
          ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) AS rownum
        FROM flights, airports
        WHERE `ORIGIN_AIRPORT` = `IATA_CODE` AND `DEPARTURE_DELAY` > 0
        GROUP BY `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`)
      WHERE rownum <= 10;
      

      Results are shown in the CLI but after quitting the result view, the job seems stuck in CANCELLING until (at least) one of the TMs shuts itself down because a task wouldn't react to the cancelling signal. This appears in its TM logs:

      2021-03-02 18:39:19,451 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Task 'Source: TableSourceScan(table=[[default_catalog, default_database, airports, project=[IATA_CODE, AIRPORT, STATE]]], fields=[IATA_CODE, AIRPORT, STATE]) (2/2)#0' did not react to cancelling signal for 30 seconds, but is stuck in method:
       sun.misc.Unsafe.park(Native Method)
      java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
      java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
      java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
      org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:653)
      org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
      org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
      org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
      java.lang.Thread.run(Thread.java:748)
      
      ...
      
      2021-03-02 18:39:49,447 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Task did not exit gracefully within 180 + seconds.
      org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
      	at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685) [flink-dist_2.12-1.12.1.jar:1.12.1]
      	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
      2021-03-02 18:39:49,448 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal error occurred while executing the TaskManager. Shutting it down...
      org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
      	at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685) [flink-dist_2.12-1.12.1.jar:1.12.1]
      	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
      

      Attachments

        1. flights-small2.csv
          847 kB
          Nico Kruber
        2. airports.csv
          23 kB
          Nico Kruber

        Issue Links

          Activity

            People

              TsReaper Caizhi Weng
              nkruber Nico Kruber
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: