Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.34.0
-
None
-
None
-
Debian 5.10.46-5rodete1 (2021-09-28) x86_64 GNU/Linux
Description
For example:
from concurrent import futures import threading from absl import app from absl import logging import apache_beam as beam NAMESPACE = 'METRICS_THREADS_REPRO' main_thread_counter = beam.metrics.Metrics.counter( NAMESPACE, 'main_thread_counter') sub_thread_counter = beam.metrics.Metrics.counter( NAMESPACE, 'sub_thread_counter') def increment_counter(counter): counter.inc() logging.info('Incremented counter %s from thread %s', counter.metric_name, threading.current_thread().name) class IncrementCountersFn(beam.DoFn): def setup(self): self.executor = futures.ThreadPoolExecutor() def process(self, idx): increment_counter(main_thread_counter) self.executor.submit(increment_counter, sub_thread_counter).result() logging.info('Processed %i', idx) def main(argv): if len(argv) > 1: raise app.UsageError('Too many command-line arguments.') p = beam.Pipeline() _ = ( p | 'Create' >> beam.Create(range(100)) | 'Process' >> beam.ParDo(IncrementCountersFn())) result = p.run() result.wait_until_finish() filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE) filtered_metrics = result.metrics().query(filter_by_namespace) logging.info('Pipeline finished, metrics logged: %s', filtered_metrics) if __name__ == '__main__': app.run(main)
Only main_thread_counter is incremented, not sub_thread_counter:
I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished, metrics logged: {'counters': [MetricResult(key=MetricKey(step=Process, metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_ counter), labels={}), committed=100, attempted=100)], 'distributions': [], 'gauges': []}