Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
Flink 1.14-SNAPSHOT
Description
Filesystem connector uses the TableSchema deprecated APIs. This causes issues with Table APIs, because TableSchema#fromResolvedSchema(ResolvedSchema) requires the expressions to be serializable strings (ResolvedExpression#asSerializableString).
For example:
TableDescriptor inputTable = TableDescriptor.forConnector("filesystem") .schema( Schema.newBuilder() .column("character", DataTypes.STRING()) .column("latitude", DataTypes.STRING()) .column("longitude", DataTypes.STRING()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", $("time").minus(lit(2).seconds())) .build() ) // Other options .build();
When used in a table pipeline, throws the following exception:
Caused by: org.apache.flink.table.api.TableException: Expression 'minus(time, 2000)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4976)
at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
at org.apache.flink.table.catalog.ResolvedCatalogBaseTable.getSchema(ResolvedCatalogBaseTable.java:54)
at org.apache.flink.table.filesystem.AbstractFileSystemTable.<init>(AbstractFileSystemTable.java:52)
at org.apache.flink.table.filesystem.FileSystemTableSource.<init>(FileSystemTableSource.java:91)
at org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:145)
The same table definition using SQL works fine:
CREATE TABLE IF NOT EXISTS LocationEvents ( `character` STRING, `latitude` STRING, `longitude` STRING, `time` TIMESTAMP(3), WATERMARK FOR `time` AS `time` - INTERVAL '5' MINUTES ) WITH ( -- Load from filesystem 'connector' = 'filesystem', --- Other configs );
Attachments
Issue Links
- links to