Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
aws-connector-4.2.0
-
None
-
None
Description
The following issue encounters with flink-kinesis-connector v4.2.0, Flink 1.17, it's working properly with kinesis connector v4.1.0 (I have not tested version pre v4.1.0).
The commit which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in runtime.
E.g. with following depenency only in pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${flink.connector.kinesis.version}</version> </dependency>
and a minimal job definition:
public static void main(String[] args) throws Exception { // create data stream environment StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv); Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build(); TableDescriptor descriptor = TableDescriptor.forConnector("kinesis") .schema(a) .format("json") .build(); tEnv.createTemporaryTable("sinkTable", descriptor); tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString()).print(); }
following exception will be thrown:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?] at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] ... 28 more
The fix is to explicitly specify `flink-connector-base` as dependency of the project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${flink.connector.kinesis.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
In general, `flink-connector-base` should be pulled in by default when pulling in the kinesis connector, the current separation adds unnecessary hassle to use the connector.