Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
None
-
None
Description
Gauges are affected by setting timers leading to None values:
ERROR:apache_beam.runners.worker.sdk_worker:Error processing instruction 147. Original traceback is Traceback (most recent call last): File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 516, in process_bundle monitoring_infos = bundle_processor.monitoring_infos() File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1107, in monitoring_infos op.monitoring_infos(transform_id, dict(tag_to_pcollection_id))) File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.Operation.monitoring_infos File "apache_beam/runners/worker/operations.py", line 347, in apache_beam.runners.worker.operations.Operation.monitoring_infos File "apache_beam/runners/worker/operations.py", line 386, in apache_beam.runners.worker.operations.Operation.user_monitoring_infos File "apache_beam/metrics/execution.py", line 261, in apache_beam.metrics.execution.MetricsContainer.to_runner_api_monitoring_infos File "apache_beam/metrics/cells.py", line 222, in apache_beam.metrics.cells.GaugeCell.to_runner_api_monitoring_info File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 222, in int64_user_gauge payload = _encode_gauge(coder, timestamp, value) File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 397, in _encode_gauge coder.get_impl().encode_to_stream(value, stream, True) File "apache_beam/coders/coder_impl.py", line 690, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream File "apache_beam/coders/coder_impl.py", line 692, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream TypeError: an integer is required
The transform has the following structure and errors when the lines following TODO have been uncommented:
class StatefulOperation(beam.DoFn): def __init__(self, state_size_per_key_bytes, use_processing_timer=False): self.state_size_per_key_bytes = state_size_per_key_bytes self.str_coder = StrUtf8Coder().get_impl() self.bytes_gauge = Metrics.gauge('synthetic', 'state_bytes') self.elements_gauge = Metrics.gauge('synthetic', 'state_elements') self.use_processing_timer = use_processing_timer state_spec = userstate.BagStateSpec('state', StrUtf8Coder()) state_spec2 = userstate.CombiningValueStateSpec('state_size_bytes', combine_fn=sum) state_spec3 = userstate.CombiningValueStateSpec('num_state_entries', combine_fn=sum) event_timer_spec = userstate.TimerSpec('event_timer', beam.TimeDomain.WATERMARK) processing_timer_spec = userstate.TimerSpec('proc_timer', beam.TimeDomain.REAL_TIME) def process(self, element, timestamp=beam.DoFn.TimestampParam, state=beam.DoFn.StateParam(state_spec), state_num_bytes=beam.DoFn.StateParam(state_spec2), state_num_entries=beam.DoFn.StateParam(state_spec3), event_timer=beam.DoFn.TimerParam(event_timer_spec), processing_timer=beam.DoFn.TimerParam(processing_timer_spec)): # Append stringified element to state until the threshold has been reached # The cleanup timer will then clean up and the process will repeat. if state_num_bytes.read() <= self.state_size_per_key_bytes: state_element = str(element) state.add(state_element) bytes_added = len(self.str_coder.encode_nested(state_element)) state_num_bytes.add(bytes_added) state_num_entries.add(1) timer = processing_timer if self.use_processing_timer else event_timer # Set a timer which will clear the state if it grows too large timer.set(timestamp.micros // 1000000 + 5) # Metrics # TODO Unfortunately buggy with timers, needs to be fixed in Beam: #self.bytes_gauge.set(state_num_bytes.read()) #self.elements_gauge.set(state_num_entries.read()) yield element @userstate.on_timer(event_timer_spec) def on_event_timer(self, key=beam.DoFn.KeyParam, state=beam.DoFn.StateParam(state_spec), state_num_bytes=beam.DoFn.StateParam(state_spec2), state_num_entries=beam.DoFn.StateParam(state_spec3)): return self.timer_callback(state, state_num_bytes, state_num_entries) @userstate.on_timer(processing_timer_spec) def on_processing_timer(self, state=beam.DoFn.StateParam(state_spec), state_num_bytes=beam.DoFn.StateParam(state_spec2), state_num_entries=beam.DoFn.StateParam(state_spec3)): return self.timer_callback(state, state_num_bytes, state_num_entries) def timer_callback(self, state, state_num_bytes, state_num_entries): count = 0 for _ in state.read(): count += 1 state_count = state_num_entries.read() if count != state_count: raise Exception("Actual number of entries (%s) did not match expected (%s)" % (count, state_count)) # Reset state bags state.clear() state_num_bytes.clear() state_num_entries.clear() # Reset metrics # TODO Unfortunately buggy with timers, needs to be fixed in Beam: #self.bytes_gauge.set(0) #self.elements_gauge.set(0)
Attachments
Issue Links
- links to