Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34076

flink-connector-base missing fails kinesis table sink to create

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • aws-connector-4.2.0
    • None
    • Connectors / Kinesis
    • None

    Description

      The following issue encounters with flink-kinesis-connector v4.2.0, Flink 1.17, it's working properly with kinesis connector v4.1.0 (I have not tested version pre v4.1.0).

      The commit which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in runtime.

      E.g. with following depenency only in pom.xml

              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-kinesis</artifactId>
                  <version>${flink.connector.kinesis.version}</version>
              </dependency>
      

      and a minimal job definition:

      	public static void main(String[] args) throws Exception {
      		// create data stream environment
      		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      		sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
      		StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
      
      		Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
      		TableDescriptor descriptor =
      				TableDescriptor.forConnector("kinesis")
      						.schema(a)
      						.format("json")
      						.build();
      		tEnv.createTemporaryTable("sinkTable", descriptor);
      
      		tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString()).print();
      	}
      

      following exception will be thrown:

      Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
      	at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?]
      	at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?]
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
      	... 28 more
      

      The fix is to explicitly specify `flink-connector-base` as dependency of the project:

      		<dependency>
      			<groupId>org.apache.flink</groupId>
      			<artifactId>flink-connector-kinesis</artifactId>
      			<version>${flink.connector.kinesis.version}</version>
      		</dependency>
      		<dependency>
      			<groupId>org.apache.flink</groupId>
      			<artifactId>flink-connector-base</artifactId>
      			<version>${flink.version}</version>
      			<scope>provided</scope>
      		</dependency>
      

      In general, `flink-connector-base` should be pulled in by default when pulling in the kinesis connector, the current separation adds unnecessary hassle to use the connector.

      Attachments

        1. screenshot-4.png
          617 kB
          jiabao.sun
        2. screenshot-3.png
          148 kB
          jiabao.sun
        3. screenshot-2.png
          249 kB
          jiabao.sun
        4. screenshot-1.png
          204 kB
          jiabao.sun

        Activity

          People

            Unassigned Unassigned
            khanhvu Khanh Vu
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: