-
Type:
Bug
-
Status: Closed
-
Priority:
Critical
-
Resolution: Fixed
-
Affects Version/s: 1.4.0, 1.5.0
-
Component/s: Connectors / ElasticSearch
-
Labels:None
A user reported this issue on the user@f.a.o mailing list.
Setup:
- A program with two pipelines that write to ElasticSearch. The pipelines can be connected or completely separate.
- ElasticSearch 5.6.4, connector flink-connector-elasticsearch5_2.11
Problem:
Only one of the ES connectors correctly emits data. The other connector writes a single record and then stops emitting data (or does not write any data at all). The problem does not exist, if the second ES connector is replaced by a different connector (for example Cassandra).
Below is a program to reproduce the issue:
public class ElasticSearchTest1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set elasticsearch connection details Map<String, String> config = new HashMap<>(); config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "<cluster name>"); List<InetSocketAddress> transports = new ArrayList<>(); transports.add(new InetSocketAddress(InetAddress.getByName("<host ip>"), 9300)); //Set properties for Kafka Streaming Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "<host ip>"+":9092"); properties.setProperty("group.id", "testGroup"); properties.setProperty("auto.offset.reset", "latest"); //Create consumer for log records FlinkKafkaConsumer011 inputConsumer1 = new FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), properties); DataStream<RecordOne> firstStream = env .addSource(inputConsumer1) .flatMap(new CreateRecordOne()); firstStream .addSink(new ElasticsearchSink<RecordOne>(config, transports, new ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1"))); FlinkKafkaConsumer011 inputConsumer2 = new FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), properties); DataStream<RecordTwo> secondStream = env .addSource(inputConsumer2) .flatMap(new CreateRecordTwo()); secondStream .addSink(new ElasticsearchSink<RecordTwo>(config, transports, new ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2"))); env.execute("Elastic Search Test"); } } public class ElasticSearchOutputRecord implements ElasticsearchSinkFunction<RecordOne> { String index; String type; // Initialize filter function public ElasticSearchOutputRecord(String index, String type) { this.index = index; this.type = type; } // construct index request @Override public void process( RecordOne record, RuntimeContext ctx, RequestIndexer indexer) { // construct JSON document to index Map<String, String> json = new HashMap<>(); json.put("item_one", record.item1); json.put("item_two", record.item2); IndexRequest rqst = Requests.indexRequest() .index(index) // index name .type(type) // mapping name .source(json); indexer.add(rqst); } } public class ElasticSearchOutputRecord2 implements ElasticsearchSinkFunction<RecordTwo> { String index; String type; // Initialize filter function public ElasticSearchOutputRecord2(String index, String type) { this.index = index; this.type = type; } // construct index request @Override public void process( RecordTwo record, RuntimeContext ctx, RequestIndexer indexer) { // construct JSON document to index Map<String, String> json = new HashMap<>(); json.put("item_three", record.item3); json.put("item_four", record.item4); IndexRequest rqst = Requests.indexRequest() .index(index) // index name .type(type) // mapping name .source(json); indexer.add(rqst); } } public class CreateRecordOne implements FlatMapFunction<ObjectNode,RecordOne> { static final Logger log = LoggerFactory.getLogger(CreateRecordOne.class); @Override public void flatMap(ObjectNode value, Collector<RecordOne> out) throws Exception { try { out.collect(new RecordOne(value.get("item1").asText(),value.get("item2").asText())); } catch(Exception e) { log.error("error while creating RecordOne", e); } } } public class CreateRecordTwo implements FlatMapFunction<ObjectNode,RecordTwo> { static final Logger log = LoggerFactory.getLogger(CreateRecordTwo.class); @Override public void flatMap(ObjectNode value, Collector<RecordTwo> out) throws Exception { try { out.collect(new RecordTwo(value.get("item1").asText(),value.get("item2").asText())); } catch(Exception e) { log.error("error while creating RecordTwo", e); } } } public class RecordOne { public String item1; public String item2; public RecordOne() {}; public RecordOne ( String item1, String item2 ) { this.item1 = item1; this.item2 = item2; } } public class RecordTwo { public String item3; public String item4; public RecordTwo() {}; public RecordTwo ( String item3, String item4 ) { this.item3 = item3; this.item4 = item4; } }
- links to