Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Not A Problem
-
1.9.0, 1.9.1
-
None
Description
I use two approaches to register table in StreamTableEnvironment. The DDL approach run fine and the "StreamTableEnvironment.connect" one throw exception.
the root cause :
"CsvTableSourceFactoryBase.supportedProperties" does't include "format.schema", it cause that "org.apache.flink.table.factories.TableFactoryService#filterBySupportedProperties" return no "TableSourceFactory"
this approach run successfully (using DDL)
public static void main1(String[] args) throws Exception{ StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings); String sql = "create table test(last_update_dt TIMESTAMP) " + "with (" + "'connector.type' = 'filesystem'," + "'connector.path' = 'C:/work/1.csv'," + "'format.type' = 'csv'," + "'format.fields.0.name' = 'last_update_dt'," + "'format.fields.0.type' = 'TIMESTAMP'" + ")"; streamTableEnvironment.sqlUpdate(sql); Table data = streamTableEnvironment.sqlQuery("select * from test"); streamTableEnvironment.toAppendStream(data, Row.class).print(); environment.execute(); }
this approach throw Exception
public static void main(String[] args) throws Exception{ StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings); streamTableEnvironment.connect( new FileSystem() .path("c:/work/1.csv") ).withFormat( new Csv() .schema(Types.ROW(Types.SQL_TIMESTAMP)) ).withSchema( new Schema() .field("last_update_dt", Types.SQL_TIMESTAMP) ).inAppendMode().registerTableSource("test"); Table data = streamTableEnvironment.sqlQuery("select * from test"); streamTableEnvironment.toAppendStream(data, Row.class).print(); environment.execute(); }
Attachments
Attachments
Issue Links
- links to