Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10848

Gauge metrics error when setting timers

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.26.0
    • sdk-py-harness
    • None

    Description

      Gauges are affected by setting timers leading to None values:

      ERROR:apache_beam.runners.worker.sdk_worker:Error processing instruction 147. Original traceback is
      Traceback (most recent call last):
        File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
          response = task()
        File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
          lambda: self.create_worker().do_instruction(request), request)
        File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
          getattr(request, request_type), request.instruction_id)
        File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 516, in process_bundle
          monitoring_infos = bundle_processor.monitoring_infos()
        File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1107, in monitoring_infos
          op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
        File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.Operation.monitoring_infos
        File "apache_beam/runners/worker/operations.py", line 347, in apache_beam.runners.worker.operations.Operation.monitoring_infos
        File "apache_beam/runners/worker/operations.py", line 386, in apache_beam.runners.worker.operations.Operation.user_monitoring_infos
        File "apache_beam/metrics/execution.py", line 261, in apache_beam.metrics.execution.MetricsContainer.to_runner_api_monitoring_infos
        File "apache_beam/metrics/cells.py", line 222, in apache_beam.metrics.cells.GaugeCell.to_runner_api_monitoring_info
        File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 222, in int64_user_gauge
          payload = _encode_gauge(coder, timestamp, value)
        File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 397, in _encode_gauge
          coder.get_impl().encode_to_stream(value, stream, True)
        File "apache_beam/coders/coder_impl.py", line 690, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
        File "apache_beam/coders/coder_impl.py", line 692, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
      TypeError: an integer is required
      

      The transform has the following structure and errors when the lines following TODO have been uncommented:

      class StatefulOperation(beam.DoFn):
        def __init__(self, state_size_per_key_bytes, use_processing_timer=False):
          self.state_size_per_key_bytes = state_size_per_key_bytes
          self.str_coder = StrUtf8Coder().get_impl()
          self.bytes_gauge = Metrics.gauge('synthetic', 'state_bytes')
          self.elements_gauge = Metrics.gauge('synthetic', 'state_elements')
          self.use_processing_timer = use_processing_timer
      
        state_spec = userstate.BagStateSpec('state', StrUtf8Coder())
      
        state_spec2 = userstate.CombiningValueStateSpec('state_size_bytes', combine_fn=sum)
      
        state_spec3 = userstate.CombiningValueStateSpec('num_state_entries', combine_fn=sum)
      
        event_timer_spec = userstate.TimerSpec('event_timer', beam.TimeDomain.WATERMARK)
        processing_timer_spec = userstate.TimerSpec('proc_timer', beam.TimeDomain.REAL_TIME)
      
        def process(self,
                    element,
                    timestamp=beam.DoFn.TimestampParam,
                    state=beam.DoFn.StateParam(state_spec),
                    state_num_bytes=beam.DoFn.StateParam(state_spec2),
                    state_num_entries=beam.DoFn.StateParam(state_spec3),
                    event_timer=beam.DoFn.TimerParam(event_timer_spec),
                    processing_timer=beam.DoFn.TimerParam(processing_timer_spec)):
          # Append stringified element to state until the threshold has been reached
          # The cleanup timer will then clean up and the process will repeat.
          if state_num_bytes.read() <= self.state_size_per_key_bytes:
            state_element = str(element)
            state.add(state_element)
            bytes_added = len(self.str_coder.encode_nested(state_element))
            state_num_bytes.add(bytes_added)
            state_num_entries.add(1)
            timer = processing_timer if self.use_processing_timer else event_timer
            # Set a timer which will clear the state if it grows too large
            timer.set(timestamp.micros // 1000000 + 5)
          # Metrics
          # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
          #self.bytes_gauge.set(state_num_bytes.read())
          #self.elements_gauge.set(state_num_entries.read())
          yield element
      
        @userstate.on_timer(event_timer_spec)
        def on_event_timer(self,
                           key=beam.DoFn.KeyParam,
                           state=beam.DoFn.StateParam(state_spec),
                           state_num_bytes=beam.DoFn.StateParam(state_spec2),
                           state_num_entries=beam.DoFn.StateParam(state_spec3)):
          return self.timer_callback(state, state_num_bytes, state_num_entries)
      
        @userstate.on_timer(processing_timer_spec)
        def on_processing_timer(self,
                                state=beam.DoFn.StateParam(state_spec),
                                state_num_bytes=beam.DoFn.StateParam(state_spec2),
                                state_num_entries=beam.DoFn.StateParam(state_spec3)):
          return self.timer_callback(state, state_num_bytes, state_num_entries)
      
        def timer_callback(self, state, state_num_bytes, state_num_entries):
          count = 0
          for _ in state.read():
            count += 1
          state_count = state_num_entries.read()
          if count != state_count:
            raise Exception("Actual number of entries (%s) did not match expected (%s)" % (count, state_count))
          # Reset state bags
          state.clear()
          state_num_bytes.clear()
          state_num_entries.clear()
          # Reset metrics
          # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
          #self.bytes_gauge.set(0)
          #self.elements_gauge.set(0)
      

      Attachments

        Issue Links

          Activity

            People

              maghamravikiran@gmail.com maghamravikiran
              mxm Maximilian Michels
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h 40m
                  3h 40m