Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-8489

Data is not emitted by second ElasticSearch connector

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.4.0, 1.5.0
    • Fix Version/s: 1.5.0, 1.4.1
    • Labels:
      None

      Description

      A user reported this issue on the user@f.a.o mailing list.

      Setup:

      • A program with two pipelines that write to ElasticSearch. The pipelines can be connected or completely separate.
      • ElasticSearch 5.6.4, connector flink-connector-elasticsearch5_2.11

      Problem:
      Only one of the ES connectors correctly emits data. The other connector writes a single record and then stops emitting data (or does not write any data at all). The problem does not exist, if the second ES connector is replaced by a different connector (for example Cassandra).

      Below is a program to reproduce the issue:

      public class ElasticSearchTest1 {
      
      	public static void main(String[] args) throws Exception {
      		
      		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      		
      		// set elasticsearch connection details	
      		Map<String, String> config = new HashMap<>();
      		config.put("bulk.flush.max.actions", "1");
      		config.put("cluster.name", "<cluster name>");
      		List<InetSocketAddress> transports = new ArrayList<>();		
      		transports.add(new InetSocketAddress(InetAddress.getByName("<host ip>"), 9300));
      		
      		//Set properties for Kafka Streaming
      	        Properties properties = new Properties();
      		properties.setProperty("bootstrap.servers", "<host ip>"+":9092");
      		properties.setProperty("group.id", "testGroup");
      		properties.setProperty("auto.offset.reset", "latest");	
      				
      		//Create consumer for log records
      		
      		FlinkKafkaConsumer011 inputConsumer1 = new FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), properties);
      				
      		DataStream<RecordOne> firstStream = env
      				.addSource(inputConsumer1)
      				.flatMap(new CreateRecordOne());
      		 	
      		firstStream		
      		.addSink(new ElasticsearchSink<RecordOne>(config, 
      				transports, 
      				new ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1")));
      		
      		FlinkKafkaConsumer011 inputConsumer2 = new FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), properties);
      		
      		DataStream<RecordTwo> secondStream = env
      					.addSource(inputConsumer2)		
      					.flatMap(new CreateRecordTwo());
      		 	
      		secondStream		
      		.addSink(new ElasticsearchSink<RecordTwo>(config, 
      				transports, 
      				new ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2")));
      				
      		env.execute("Elastic Search Test");
      	}
      }
      
      public class ElasticSearchOutputRecord implements ElasticsearchSinkFunction<RecordOne> {
      
      	String index;
      	String type;
          // Initialize filter function
          public ElasticSearchOutputRecord(String index, String type) {
              this.index = index;
              this.type = type;
          }
      	// construct index request
      	@Override
      	public void process(
      			RecordOne record,
      		RuntimeContext ctx,
      		RequestIndexer indexer) {
      
      		// construct JSON document to index
      		Map<String, String> json = new HashMap<>();
      		
      		json.put("item_one", record.item1);      
      		json.put("item_two", record.item2);      
      						
      		IndexRequest rqst = Requests.indexRequest()
      				.index(index)           // index name
      				.type(type)     // mapping name
      				.source(json);
      
      		indexer.add(rqst);
      	}
      }
      
      public class ElasticSearchOutputRecord2 implements ElasticsearchSinkFunction<RecordTwo> {
      
      	String index;
      	String type;
          // Initialize filter function
          public ElasticSearchOutputRecord2(String index, String type) {
              this.index = index;
              this.type = type;
          }
      	// construct index request
      	@Override
      	public void process(
      			RecordTwo record,
      		RuntimeContext ctx,
      		RequestIndexer indexer) {
      
      		// construct JSON document to index
      		Map<String, String> json = new HashMap<>();
      		
      		json.put("item_three", record.item3);      
      		json.put("item_four", record.item4);      
      						
      		IndexRequest rqst = Requests.indexRequest()
      				.index(index)           // index name
      				.type(type)     // mapping name
      				.source(json);
      
      		indexer.add(rqst);
      	}
      }
      
      public class CreateRecordOne implements FlatMapFunction<ObjectNode,RecordOne> {
      	
      	static final Logger log = LoggerFactory.getLogger(CreateRecordOne.class);
      	
      	@Override
      	public void flatMap(ObjectNode value, Collector<RecordOne> out) throws Exception {
      		try {
      			out.collect(new RecordOne(value.get("item1").asText(),value.get("item2").asText()));
      		}
      		catch(Exception e) {
      			log.error("error while creating RecordOne", e);
      		}
      	}
      
      }
      
      public class CreateRecordTwo implements FlatMapFunction<ObjectNode,RecordTwo> {
      	
      	static final Logger log = LoggerFactory.getLogger(CreateRecordTwo.class);
      	
      	@Override
      	public void flatMap(ObjectNode value, Collector<RecordTwo> out) throws Exception {
      		try {
      			out.collect(new RecordTwo(value.get("item1").asText(),value.get("item2").asText()));
      		}
      		catch(Exception e) {
      			log.error("error while creating RecordTwo", e);
      		}
      	}
      
      }
      
      public class RecordOne {	
      	
      	public String item1;	
      	public String item2;	
      		
      	public RecordOne() {};
      	
      	public RecordOne (
      			
      			String item1,	
      			String item2	
      			 						
      			) {	
      		
      				 this.item1 =	item1;
      				 this.item2 = item2;	
      				 
      	}		
      }
      
      public class RecordTwo {	
      	
      	public String item3;	
      	public String item4;	
      		
      	public RecordTwo() {};
      	
      	public RecordTwo (			
      			String item3,	
      			String item4			 						
      			) {		
      				 this.item3 =	item3;
      				 this.item4 = item4;	
      				 
      	}		
      }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Zentol Chesnay Schepler
                Reporter:
                fhueske Fabian Hueske
              • Votes:
                1 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: