Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24359

Migrate FileSystem connector to ResolvedSchema

    XMLWordPrintableJSON

Details

    Description

      Filesystem connector uses the TableSchema deprecated APIs. This causes issues with Table APIs, because TableSchema#fromResolvedSchema(ResolvedSchema) requires the expressions to be serializable strings (ResolvedExpression#asSerializableString).

      For example:

      TableDescriptor inputTable = TableDescriptor.forConnector("filesystem")
              .schema(
                      Schema.newBuilder()
                              .column("character", DataTypes.STRING())
                              .column("latitude", DataTypes.STRING())
                              .column("longitude", DataTypes.STRING())
                              .column("time", DataTypes.TIMESTAMP(3))
                              .watermark("time", $("time").minus(lit(2).seconds()))
                              .build()
              )
              // Other options
              .build();
      

      When used in a table pipeline, throws the following exception:

      Caused by: org.apache.flink.table.api.TableException: Expression 'minus(time, 2000)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
      	at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
      	at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
      	at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4976)
      	at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
      	at org.apache.flink.table.catalog.ResolvedCatalogBaseTable.getSchema(ResolvedCatalogBaseTable.java:54)
      	at org.apache.flink.table.filesystem.AbstractFileSystemTable.<init>(AbstractFileSystemTable.java:52)
      	at org.apache.flink.table.filesystem.FileSystemTableSource.<init>(FileSystemTableSource.java:91)
      	at org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74)
      	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:145)
      

      The same table definition using SQL works fine:

      CREATE TABLE IF NOT EXISTS LocationEvents (
          `character` STRING,
          `latitude` STRING,
          `longitude` STRING,
          `time` TIMESTAMP(3),
          WATERMARK FOR `time` AS `time` - INTERVAL '5' MINUTES
      ) WITH (
          -- Load from filesystem
          'connector' = 'filesystem',
          --- Other configs
      );
      

      Attachments

        Issue Links

          Activity

            People

              slinkydeveloper Francesco Guardiani
              slinkydeveloper Francesco Guardiani
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: