Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6036

How to periodically refresh side inputs

Details

    • Bug
    • Status: Resolved
    • P0
    • Resolution: Not A Problem
    • None
    • Not applicable
    • beam-model
    • None

    Description

      I have followed the example provided here https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 in the "Pattern: Slowly-changing lookup cache" section. I've converted the pseudo-code from the article into this Java code:

      return p
      .apply("GenerateSequence", GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
      .apply("GenerateSequenceWindow",
      Window.<Long>into(new GlobalWindows()).triggering(
      Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
      .discardingFiredPanes())
      .apply("RetrieveKVs",
      ParDo.of(new RetrieveKVs()))
      .apply("ToMap", View.asMap());

      RetrieveKVs() queries BigQuery table and outputs KVs.

      The issue here is that the resulting map mixes up KVs from different periods (i.e. the sequence is generated every 1 hour, the resulting map includes KVs from 2 adjacent hours).

      In an attempt to solve it I tried using View.asSingleton() instead.

      return p
      .apply("GenerateSequence", GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
      .apply("GenerateSequenceWindow",
      Window.<Long>into(new GlobalWindows()).triggering(
      Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
      .discardingFiredPanes())
      .apply("RetrieveMap",
      ParDo.of(new RetrieveMap()))
      .apply("ToMap", View.asSingleton());

      RetrieveMap queries data from BigQuery and outputs the complete map. The issue with this is it not only results in flaky tests with the exception 1 times out of 10:

      Caused by: java.lang.IllegalArgumentException: Empty PCollection accessed as a singleton view. Consider setting withDefault to provide a def
      ault value

      but also it doesn't seem to work. In the logs I see the RetrieveMap is called every hour, but the pipeline using the side input gets stale data.

      Is there a real working example for how to make a side input refresh periodically?

      Related https://issues.apache.org/jira/browse/BEAM-6086?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel

      Attachments

        Activity

          People

            kenn Kenneth Knowles
            medvedev1088 Evgeny
            Votes:
            1 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: