Details
-
Improvement
-
Status: Resolved
-
P2
-
Resolution: Not A Bug
-
2.0.0
-
None
-
For repeating my situation, my running environment is:
OS: Ubuntn 14.04.3 LTS
JAVA: JDK 1.7
Beam 2.0.0 (with Direct runner)
Kafka 2.10-0.10.1.1
Maven 3.5.0, in which dependencies are listed in pom.xml:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.0.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>For repeating my situation, my running environment is: OS: Ubuntn 14.04.3 LTS JAVA: JDK 1.7 Beam 2.0.0 (with Direct runner) Kafka 2.10-0.10.1.1 Maven 3.5.0, in which dependencies are listed in pom.xml: <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>2.0.0</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-kafka</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency>
Description
Dear all,
I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).
With using this sdk, there are a situation about data latency, and the description of situation is in the following.
The data come from kafak with a fixed speed: 100 data size/ 1 sec.
I create a fixed window within 1 sec without delay. I found that the data size is 70, 80, 104, or greater than or equal to 104.
After one day, the data latency happens in my running time, and the data size will be only 10 in each window.
In order to clearly explain it, I also provide my code in the following.
" PipelineOptions readOptions = PipelineOptionsFactory.create();
final Pipeline p = Pipeline.create(readOptions);
PCollection<TimestampedValue<KV<String, String>>> readData =
p.apply(KafkaIO.<String, String>read()
.withBootstrapServers("127.0.0.1:9092")
.withTopic("kafkasink")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
.apply(ParDo.of(new DoFn<KV<String, String>, TimestampedValue<KV<String, String>>>() {
@ProcessElement
public void test(ProcessContext c) throws ParseException {
String element = c.element().getValue();
try
{ JsonNode arrNode = *new* ObjectMapper().readTree(element); String t = arrNode.path("v").findValue("Timestamp").textValue(); DateTimeFormatter formatter = DateTimeFormatter._ofPattern_("MM/dd/uuuu HH:mm:ss.SSSS"); LocalDateTime dateTime = LocalDateTime._parse_(t, formatter); java.time.Instant java_instant = dateTime.atZone(ZoneId._systemDefault_()).toInstant(); Instant timestamp = *new* Instant(java_instant.toEpochMilli()); c.output(TimestampedValue._of_(c.element(), timestamp)); }catch (JsonGenerationException e)
{ e.printStackTrace(); }catch (JsonMappingException e)
{ e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }}}));
PCollection<TimestampedValue<KV<String, String>>> readDivideData = readData.apply(
Window.<TimestampedValue<KV<String, String>>> into(FixedWindows.of(Duration.standardSeconds(1))
.withOffset(Duration.ZERO))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());"
In addition, the running result is as shown in the following.
"data-size=104
coming-data-time=2018-02-27 02:00:49.117
window-time=2018-02-27 02:00:49.999
data-size=78
coming-data-time=2018-02-27 02:00:50.318
window-time=2018-02-27 02:00:50.999
data-size=104
coming-data-time=2018-02-27 02:00:51.102
window-time=2018-02-27 02:00:51.999
After one day:
data-size=10
coming-data-time=2018-02-28 02:05:48.217
window-time=2018-03-01 10:35:16.999 "
If you have any idea about the problem (data latency), I am looking forward to hearing from you.
Thanks
Rick