Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.10.0
-
None
Description
If we put flink-sql-connector-elasticsearch6 and flink-sql-connector-elasticsearch7 into /lib at the same time, and use it in sql-client, will get exceptions like:
[ERROR] Could not execute SQL statement. Reason: java.lang.AbstractMethodError: org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.createElasticsearchUpsertTableSink(ZLorg/apache/flink/table/api/TableSchema;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Lorg/apache/flink/api/common/serialization/SerializationSchema;Lorg/apache/flink/elasticsearch6/shaded/org/elasticsearch/common/xcontent/XContentType;Lorg/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler;Ljava/util/Map;)Lorg/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase;
After analyzing the exceptions, IMO, it's because flink-connector-elasticsearch-base is included into both flink-sql-connector-elasticsearch6 and flink-sql-connector-elasticsearch7. And flink-connector-elasticsearch-base has different implementation in 6 & 7, because the version of elastic-search is different.
A simple way for fixing this is we can relocate flink-connector-elasticsearch-base in flink-sql-connector-elasticsearch6 and }}{{flink-sql-connector-elasticsearch7. For example for }}{{flink-sql-connector-elasticsearch7:
<relocation> <pattern>org.apache.flink.streaming.connectors.elasticsearch.</pattern> <shadedPattern>org.apache.flink.streaming.connectors.elasticsearch7.base.</shadedPattern> </relocation>