Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14994

StreamTableEnvironment.connect throw exception when using "FileSystem" connector and "CSV" format

    XMLWordPrintableJSON

Details

    Description

      I use two approaches to register table in StreamTableEnvironment.  The DDL approach run fine and the "StreamTableEnvironment.connect" one throw exception. 

      the root cause :

      "CsvTableSourceFactoryBase.supportedProperties" does't include "format.schema", it cause that "org.apache.flink.table.factories.TableFactoryService#filterBySupportedProperties" return  no "TableSourceFactory"

       

      this approach run successfully (using DDL)

      public static void main1(String[] args) throws Exception{
          StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
          EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
          StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings);
      
          String sql = "create table test(last_update_dt TIMESTAMP) " +
                          "with (" +
                              "'connector.type' = 'filesystem'," +
                              "'connector.path' = 'C:/work/1.csv'," +
                              "'format.type' = 'csv'," +
                              "'format.fields.0.name' = 'last_update_dt'," +
                              "'format.fields.0.type' = 'TIMESTAMP'" +
                          ")";
          streamTableEnvironment.sqlUpdate(sql);
          Table data = streamTableEnvironment.sqlQuery("select * from test");
          streamTableEnvironment.toAppendStream(data, Row.class).print();
          environment.execute();
      }

       

      this approach throw Exception

      public static void main(String[] args) throws Exception{
          StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
          EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
          StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings);
      
          streamTableEnvironment.connect(
                  new FileSystem()
                  .path("c:/work/1.csv")
          ).withFormat(
                  new Csv()
                  .schema(Types.ROW(Types.SQL_TIMESTAMP))
          ).withSchema(
                  new Schema()
                  .field("last_update_dt", Types.SQL_TIMESTAMP)
          ).inAppendMode().registerTableSource("test");
      
          Table data = streamTableEnvironment.sqlQuery("select * from test");
          streamTableEnvironment.toAppendStream(data, Row.class).print();
      
          environment.execute();
      }
      

       

      Attachments

        1. 1.csv
          49 kB
          Dezhi Cai
        2. image-2019-11-29-22-50-50-367.png
          49 kB
          Dezhi Cai

        Issue Links

          Activity

            People

              Unassigned Unassigned
              caidezhi655 Dezhi Cai
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m