Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15552

parameters --library and --jar doesn't work for DDL in sqlClient

    XMLWordPrintableJSON

Details

    Description

      How to Reproduce:
      first, I start a sql client and using `-l` to point to a kafka connector directory.

      `
      bin/sql-client.sh embedded -l /xx/connectors/kafka/

      `

      Then, I create a Kafka Table like following
      `
      Flink SQL> CREATE TABLE MyUserTable (
      > content String
      > ) WITH (
      > 'connector.type' = 'kafka',
      > 'connector.version' = 'universal',
      > 'connector.topic' = 'test',
      > 'connector.properties.zookeeper.connect' = 'localhost:2181',
      > 'connector.properties.bootstrap.servers' = 'localhost:9092',
      > 'connector.properties.group.id' = 'testGroup',
      > 'connector.startup-mode' = 'earliest-offset',
      > 'format.type' = 'csv'
      > );
      [INFO] Table has been created.
      `

      Then I select from just created table and an exception been thrown:

      `
      Flink SQL> select * from MyUserTable;
      [ERROR] Could not execute SQL statement. Reason:
      org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
      the classpath.

      Reason: Required context properties mismatch.

      The matching candidates:
      org.apache.flink.table.sources.CsvBatchTableSourceFactory
      Mismatched properties:
      'connector.type' expects 'filesystem', but is 'kafka'

      The following properties are requested:
      connector.properties.bootstrap.servers=localhost:9092
      connector.properties.group.id=testGroup
      connector.properties.zookeeper.connect=localhost:2181
      connector.startup-mode=earliest-offset
      connector.topic=test
      connector.type=kafka
      connector.version=universal
      format.type=csv
      schema.0.data-type=VARCHAR(2147483647)
      schema.0.name=content

      The following factories have been considered:
      org.apache.flink.table.sources.CsvBatchTableSourceFactory
      org.apache.flink.table.sources.CsvAppendTableSourceFactory
      `
      Potential Reasons:
      Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a CatalogTable to TableSource, but when call `TableFactoryService.find` we don't pass current classLoader to this method, the default loader will be BootStrapClassLoader, which can not find our factory.

      I verified in my box, it's truly caused by this behavior.

      Attachments

        Issue Links

          Activity

            People

              leonard Leonard Xu
              Terry1897 Terry Wang
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m