Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12019

apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics is flaky

Details

    Description

      Sample error:
      https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Commit/2869/testReport/junit/apache_beam.runners.portability.flink_runner_test/FlinkRunnerTestOptimized/test_flink_metrics/

      Error Message
      AssertionError: Items in the second set but not the first: 'stateful).beam.metric:statecache:hit: 11' 'stateful).beam.metric:statecache:put: 1' 'stateful).beam.metric:statecache:miss: 1' 'stateful).beam.metric:statecache:get_total: 120' 'stateful).beam.metric:statecache:size: 10' 'stateful).beam.metric:statecache:get: 12' 'stateful).beam.metric:statecache:evict: 0' 'stateful).beam.metric:statecache:capacity: 123' 'stateful).beam.metric:statecache:put_total: 10' 'stateful).beam.metric:statecache:evict_total: 0' 'counter: 110' 'stateful).beam.metric:statecache:miss_total: 10' 'stateful).beam.metric:statecache:hit_total: 110'
      Stacktrace
      self = <apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized testMethod=test_flink_metrics>
      
          def test_flink_metrics(self):
            """Run a simple DoFn that increments a counter and verifies state
            caching metrics. Verifies that its expected value is written to a
            temporary file by the FileReporter"""
          
            counter_name = 'elem_counter'
            state_spec = userstate.BagStateSpec('state', VarIntCoder())
          
            class DoFn(beam.DoFn):
              def __init__(self):
                self.counter = Metrics.counter(self.__class__, counter_name)
                _LOGGER.info('counter: %s' % self.counter.metric_name)
          
              def process(self, kv, state=beam.DoFn.StateParam(state_spec)):
                # Trigger materialization
                list(state.read())
                state.add(1)
                self.counter.inc()
          
            options = self.create_options()
            # Test only supports parallelism of 1
            options._all_options['parallelism'] = 1
            # Create multiple bundles to test cache metrics
            options._all_options['max_bundle_size'] = 10
            options._all_options['max_bundle_time_millis'] = 95130590130
            experiments = options.view_as(DebugOptions).experiments or []
            experiments.append('state_cache_size=123')
            options.view_as(DebugOptions).experiments = experiments
            with Pipeline(self.get_runner(), options) as p:
              # pylint: disable=expression-not-assigned
              (
                  p
                  | "create" >> beam.Create(list(range(0, 110)))
                  | "mapper" >> beam.Map(lambda x: (x % 10, 'val'))
                  | "stateful" >> beam.ParDo(DoFn()))
          
            lines_expected = {'counter: 110'}
            if options.view_as(StandardOptions).streaming:
              lines_expected.update([
                  # Gauges for the last finished bundle
                  'stateful.beam.metric:statecache:capacity: 123',
                  'stateful.beam.metric:statecache:size: 10',
                  'stateful.beam.metric:statecache:get: 20',
                  'stateful.beam.metric:statecache:miss: 0',
                  'stateful.beam.metric:statecache:hit: 20',
                  'stateful.beam.metric:statecache:put: 0',
                  'stateful.beam.metric:statecache:evict: 0',
                  # Counters
                  'stateful.beam.metric:statecache:get_total: 220',
                  'stateful.beam.metric:statecache:miss_total: 10',
                  'stateful.beam.metric:statecache:hit_total: 210',
                  'stateful.beam.metric:statecache:put_total: 10',
                  'stateful.beam.metric:statecache:evict_total: 0',
              ])
            else:
              # Batch has a different processing model. All values for
              # a key are processed at once.
              lines_expected.update([
                  # Gauges
                  'stateful).beam.metric:statecache:capacity: 123',
                  # For the first key, the cache token will not be set yet.
                  # It's lazily initialized after first access in StateRequestHandlers
                  'stateful).beam.metric:statecache:size: 10',
                  # We have 11 here because there are 110 / 10 elements per key
                  'stateful).beam.metric:statecache:get: 12',
                  'stateful).beam.metric:statecache:miss: 1',
                  'stateful).beam.metric:statecache:hit: 11',
                  # State is flushed back once per key
                  'stateful).beam.metric:statecache:put: 1',
                  'stateful).beam.metric:statecache:evict: 0',
                  # Counters
                  'stateful).beam.metric:statecache:get_total: 120',
                  'stateful).beam.metric:statecache:miss_total: 10',
                  'stateful).beam.metric:statecache:hit_total: 110',
                  'stateful).beam.metric:statecache:put_total: 10',
                  'stateful).beam.metric:statecache:evict_total: 0',
              ])
            lines_actual = set()
            with open(self.test_metrics_path, 'r') as f:
              for line in f:
                for metric_str in lines_expected:
                  metric_name = metric_str.split()[0]
                  if metric_str in line:
                    lines_actual.add(metric_str)
                  elif metric_name in line:
                    lines_actual.add(line)
      >     self.assertSetEqual(lines_actual, lines_expected)
      E     AssertionError: Items in the second set but not the first:
      E     'stateful).beam.metric:statecache:hit: 11'
      E     'stateful).beam.metric:statecache:put: 1'
      E     'stateful).beam.metric:statecache:miss: 1'
      E     'stateful).beam.metric:statecache:get_total: 120'
      E     'stateful).beam.metric:statecache:size: 10'
      E     'stateful).beam.metric:statecache:get: 12'
      E     'stateful).beam.metric:statecache:evict: 0'
      E     'stateful).beam.metric:statecache:capacity: 123'
      E     'stateful).beam.metric:statecache:put_total: 10'
      E     'stateful).beam.metric:statecache:evict_total: 0'
      E     'counter: 110'
      E     'stateful).beam.metric:statecache:miss_total: 10'
      E     'stateful).beam.metric:statecache:hit_total: 110'
      
      apache_beam/runners/portability/flink_runner_test.py:390: AssertionError
      
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            tvalentyn Valentyn Tymofieiev
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: