Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15217

'java.time.LocalDate' should support for the CSV input format.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.10.0
    • 1.12.0
    • None

    Description

      The sql is like this:

      CREATE TABLE `DATE_TBL` (
      > f1 date
      > ) WITH (
      > 'format.field-delimiter'='|',
      > 'connector.type'='filesystem',
      > 'format.derive-schema'='true',
      > 'connector.path'='/defender_test_data/daily/test_date/sources/DATE_TBL.csv',
      > 'format.type'='csv'
      > );

      SELECT f1 AS Fifteen FROM DATE_TBL;

       

      After excute the sql, there will be a exception :

      [ERROR] Could not execute SQL statement. Reason:
      java.lang.IllegalArgumentException: The type 'java.time.LocalDate' is not supported for the CSV input format.

      The input file's content is:

      1957-04-09
      1957-06-13
      1996-02-28
      1996-02-29
      1996-03-01
      1996-03-02
      1997-02-28
      1997-03-01
      1997-03-02
      2000-04-01
      2000-04-02
      2000-04-03
      2038-04-08
      2039-04-09

       

      The whole exception is:

      Caused by: java.lang.IllegalArgumentException: The type 'java.time.LocalDate' is not supported for the CSV input format.Caused by: java.lang.IllegalArgumentException: The type 'java.time.LocalDate' is not supported for the CSV input format. at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289) at org.apache.flink.api.java.io.RowCsvInputFormat.<init>(RowCsvInputFormat.java:64) at org.apache.flink.table.sources.CsvTableSource$CsvInputFormatConfig.createInputFormat(CsvTableSource.java:518) at org.apache.flink.table.sources.CsvTableSource.getDataStream(CsvTableSource.java:182) at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:97) at org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:251) at org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:410) at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:400) at org.apache.flink.table.planner.StreamPlanner.writeToRetractSink(StreamPlanner.scala:308) at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$writeToSink(StreamPlanner.scala:272) at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:166) at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:145) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:145) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:680) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:353) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:341) at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:428) at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeQueryInternal$12(LocalExecutor.java:640) at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:227) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:638)

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              xiaojin.wy xiaojin.wy
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: