Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13040

AsIter side input is not correctly recognized as a dependency.

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • sdk-py-core
    • None
    • Linux Debian 5.10 x86_64
      Python 3.8.11

    Description

      The error is happening at current master (head). It is fine on the latest release (2.33.0).

      Example to reproduce: 

      import unittest
      
      import apache_beam as beam
      
      
      class LoggingFn(beam.DoFn):
      
        def __init__(self, name):
          self._name = name
      
        def process(self, element, *side_inputs):
          print(f'Running {self._name} (side inputs: {[list(s) for s in side_inputs]})')
          return [self._name]
      
      
      class BeamDagTest(unittest.TestCase):
      
        def test_dag(self):
          with beam.Pipeline() as p:
            root = p | 'CreateRoot' >> beam.Create([None])
            example_gen = root | 'CsvExampleGen' >> beam.ParDo(
                LoggingFn('CsvExampleGen'),
            )
            statistics_gen = root | 'StatisticsGen' >> beam.ParDo(
                LoggingFn('StatisticsGen'),
                beam.pvalue.AsIter(example_gen),  # AsIter to specify upstream task dependency.
            )
            schema_gen = root | 'SchemaGen' >> beam.ParDo(
                LoggingFn('SchemaGen'),
                beam.pvalue.AsIter(statistics_gen),
            )
            example_validator = root | 'ExampleValidator' >> beam.ParDo(
                LoggingFn('ExampleValidator'),
                beam.pvalue.AsIter(statistics_gen),
                beam.pvalue.AsIter(schema_gen),
            )
            transform = root | 'Transform' >> beam.ParDo(
                LoggingFn('Transform'),
                beam.pvalue.AsIter(example_gen),
                beam.pvalue.AsIter(schema_gen),
            )
            trainer = root | 'Trainer' >> beam.ParDo(
                LoggingFn('Trainer'),
                beam.pvalue.AsIter(example_gen),
                beam.pvalue.AsIter(schema_gen),
                beam.pvalue.AsIter(transform),
            )
            model_resolver = root | 'latest_blessed_model_resolver' >> beam.ParDo(
                LoggingFn('latest_blessed_model_resolver'),
            )
            evaluator = root | 'Evaluator' >> beam.ParDo(
                LoggingFn('Evaluator'),
                beam.pvalue.AsIter(example_gen),
                beam.pvalue.AsIter(trainer),
                beam.pvalue.AsIter(model_resolver),
            )
            pusher = root | 'Pusher' >> beam.ParDo(
                LoggingFn('Pusher'),
                beam.pvalue.AsIter(trainer),
                beam.pvalue.AsIter(evaluator),
            )

       
      According to AsIter documentation, entire PCollection should be made available as a side input, which means side input PTransform should run before the current PTransform. We used to exploit this feature to run DAG of tasks by injecting task dependency with side inputs, however this mechanism does not work properly in current master (71d7213d98):

      Output with apache-beam==2.33.0:

      Running CsvExampleGen (side inputs: [])
      Running latest_blessed_model_resolver (side inputs: [])
      Running StatisticsGen (side inputs: [['CsvExampleGen']])
      Running SchemaGen (side inputs: [['StatisticsGen']])
      Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
      Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']])
      Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], ['Transform']])
      Running Evaluator (side inputs: [['CsvExampleGen'], ['Trainer'], ['latest_blessed_model_resolver']])
      Running Pusher (side inputs: [['Trainer'], ['Evaluator']])

      Output with apache-beam installed from 71d7213d98 (origin/master):

      Running CsvExampleGen (side inputs: [])
      Running latest_blessed_model_resolver (side inputs: [])
      Running StatisticsGen (side inputs: [['CsvExampleGen']])
      Running Pusher (side inputs: [[], []])
      Running Evaluator (side inputs: [['CsvExampleGen'], [], ['latest_blessed_model_resolver']])
      Running SchemaGen (side inputs: [['StatisticsGen']])
      Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], []])
      Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
      Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']])

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Park Jongbin Park
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m