Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11982

BatchTableSourceFactory support Json Format File

    XMLWordPrintableJSON

Details

    Description

      java code :

      val connector = FileSystem().path("data/in/test.json")
      val desc = tEnv.connect(connector)
      .withFormat(
        new Json().schema( 
                          Types.ROW(Array[String]("id", "name", "age"), 
                                    Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.INT)) 
                  ) 
                  .failOnMissingField(true) 
      )
      .registerTableSource("persion")
      
      val sql = "select * from person"
      val result = tEnv.sqlQuery(sql)
      

      Exception info :

      Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.BatchTableSourceFactory' in
      the classpath.
      
      Reason: No context matches.
      
      The following properties are requested:
      connector.path=file:///Users/batch/test.json
      connector.property-version=1
      connector.type=filesystem
      format.derive-schema=true
      format.fail-on-missing-field=true
      format.property-version=1
      format.type=json
      
      The following factories have been considered:
      org.apache.flink.table.sources.CsvBatchTableSourceFactory
      org.apache.flink.table.sources.CsvAppendTableSourceFactory
      org.apache.flink.table.sinks.CsvBatchTableSinkFactory
      org.apache.flink.table.sinks.CsvAppendTableSinkFactory
      org.apache.flink.formats.avro.AvroRowFormatFactory
      org.apache.flink.formats.json.JsonRowFormatFactory
      org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
      org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
      
      at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
      at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
      at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
      at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:44)
      at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
      at com.meitu.mlink.sql.batch.JsonExample.main(JsonExample.java:36)

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ambition119 pingle wang
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m