Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.8.1
-
None
Description
Currently, only 'topic' option implemented in the Kafka Connector Descriptor, we can only use it like :
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) tableEnv .connect( new Kafka() .version("0.11") .topic("test-flink-1") .startFromEarliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) .withFormat( new Json() .deriveSchema() ) .withSchema( new Schema() .field("name", Types.STRING) .field("age", Types.STRING) )
but we cannot consume multiple topics or a topic regex pattern.
Here is my thoughts:
.topic("test-flink-1") //.topics("test-flink-1,test-flink-2") or topics(List<String> topics) //.subscriptionPattern("test-flink-.*") or subscriptionPattern(Pattern pattern)
I already implement the code on my local env with help of the FlinkKafkaConsumer, and it works.
Attachments
Issue Links
- duplicates
-
FLINK-13701 KafkaTableSourceSink doesn't support topic pattern
- Closed
- is duplicated by
-
FLINK-18449 Make topic discovery and partition discovery configurable for FlinkKafkaConsumer in Table API
- Closed
- links to