Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3770

The problem of kafkaIO sdk for data latency

Details

    • Improvement
    • Status: Resolved
    • P2
    • Resolution: Not A Bug
    • 2.0.0
    • 2.0.0
    • io-java-kafka
    • None

    Description

      Dear all,

       I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).

       With using this sdk, there are a situation about data latency, and the description of situation is in the following.

       The data come from kafak with a fixed speed: 100 data size/ 1 sec.

       I create a fixed window within 1 sec without delay. I found that the data size is 70, 80, 104, or greater than or equal to 104.

       After one day, the data latency happens in my running time, and the data size will be only 10 in each window.

       In order to clearly explain it, I also provide my code in the following.

      " PipelineOptions readOptions = PipelineOptionsFactory.create();

      final Pipeline p = Pipeline.create(readOptions);

       PCollection<TimestampedValue<KV<String, String>>> readData =

        p.apply(KafkaIO.<String, String>read()       

           .withBootstrapServers("127.0.0.1:9092")

           .withTopic("kafkasink")

           .withKeyDeserializer(StringDeserializer.class)

           .withValueDeserializer(StringDeserializer.class)

           .withoutMetadata())

           .apply(ParDo.of(new DoFn<KV<String, String>, TimestampedValue<KV<String, String>>>() {

              @ProcessElement

              public void test(ProcessContext c) throws ParseException {

                  String element = c.element().getValue();

                  try

      {               JsonNode arrNode = *new* ObjectMapper().readTree(element);               String t = arrNode.path("v").findValue("Timestamp").textValue();               DateTimeFormatter formatter = DateTimeFormatter._ofPattern_("MM/dd/uuuu HH:mm:ss.SSSS");              LocalDateTime dateTime = LocalDateTime._parse_(t, formatter);              java.time.Instant java_instant = dateTime.atZone(ZoneId._systemDefault_()).toInstant();              Instant timestamp  = *new* Instant(java_instant.toEpochMilli());               c.output(TimestampedValue._of_(c.element(), timestamp));             }

      catch (JsonGenerationException e)

      {                 e.printStackTrace();             }

      catch (JsonMappingException e)

      {                 e.printStackTrace();           } catch (IOException e) {                 e.printStackTrace();           }

              }}));

       PCollection<TimestampedValue<KV<String, String>>> readDivideData = readData.apply(

            Window.<TimestampedValue<KV<String, String>>> into(FixedWindows.of(Duration.standardSeconds(1))

                .withOffset(Duration.ZERO))

                .triggering(AfterWatermark.pastEndOfWindow()           

                   .withLateFirings(AfterProcessingTime.pastFirstElementInPane()

                     .plusDelayOf(Duration.ZERO)))

                .withAllowedLateness(Duration.ZERO)

                .discardingFiredPanes());"

       In addition, the running result is as shown in the following.

      "data-size=104

      coming-data-time=2018-02-27 02:00:49.117

      window-time=2018-02-27 02:00:49.999

       data-size=78

      coming-data-time=2018-02-27 02:00:50.318

      window-time=2018-02-27 02:00:50.999

       data-size=104

      coming-data-time=2018-02-27 02:00:51.102

      window-time=2018-02-27 02:00:51.999

       After one day:

      data-size=10

      coming-data-time=2018-02-28 02:05:48.217

      window-time=2018-03-01 10:35:16.999 "

      If you have any idea about the problem (data latency), I am looking forward to hearing from you.

      Thanks

      Rick

      Attachments

        Activity

          People

            rangadi Raghu Angadi
            Ricklin Rick Lin
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: