Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Duplicate
-
2.2.0
-
None
-
idea, ubuntu 16.04, FlinkRunner
Description
in my demo, I read data from kafka and count globally, finally output the total count of recieved data, as follow:
FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(FlinkPipelineOptions.class); options.setStreaming(true); options.setRunner(FlinkRunner.class); Pipeline pipeline = Pipeline.create(options); pipeline .apply("Read from kafka", KafkaIO.<String, String>read() // .withTimestampFn(kafkaData -> TimeUtil.timeMillisToInstant(kafkaData.getKey())) .withBootstrapServers("localhost:9092") .withTopic("recharge") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withoutMetadata() ) .apply(Values.create()) .apply(Window.<String>into(new GlobalWindows()) .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5)))) .accumulatingFiredPanes() ) .apply(Count.globally()) .apply("output", ParDo.of(new DoFn<Long, Void>() { @ProcessElement public void process(ProcessContext context) { System.out.println("---get at: " + Instant.now() + "------"); System.out.println(context.element()); } }));
the result should be displayed after (5s) I sent first data, but sometimes there were nothing display after I sent data. the pic shows the outputs i got in a test:
(cant upload a pic, desc as text)
Send 681Msg at: 2018-01-05T06:34:31.436 ---get at: 2018-01-05T06:34:36.668Z------ 681 Send 681Msg at: 2018-01-05T06:34:47.166 ---get at: 2018-01-05T06:34:52.284Z------ 1362 Send 681Msg at: 2018-01-05T06:34:55.505 Send 681Msg at: 2018-01-05T06:35:22.068 ---get at: 2018-01-05T06:35:22.112Z------ 2044
btw, the code works fine with direct runner.