Error Message
AssertionError: Items in the second set but not the first: 'stateful).beam.metric:statecache:hit: 11' 'stateful).beam.metric:statecache:put: 1' 'stateful).beam.metric:statecache:miss: 1' 'stateful).beam.metric:statecache:get_total: 120' 'stateful).beam.metric:statecache:size: 10' 'stateful).beam.metric:statecache:get: 12' 'stateful).beam.metric:statecache:evict: 0' 'stateful).beam.metric:statecache:capacity: 123' 'stateful).beam.metric:statecache:put_total: 10' 'stateful).beam.metric:statecache:evict_total: 0' 'counter: 110' 'stateful).beam.metric:statecache:miss_total: 10' 'stateful).beam.metric:statecache:hit_total: 110'
Stacktrace
self = <apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized testMethod=test_flink_metrics>
def test_flink_metrics(self):
"""Run a simple DoFn that increments a counter and verifies state
caching metrics. Verifies that its expected value is written to a
temporary file by the FileReporter"""
counter_name = 'elem_counter'
state_spec = userstate.BagStateSpec('state', VarIntCoder())
class DoFn(beam.DoFn):
def __init__(self):
self.counter = Metrics.counter(self.__class__, counter_name)
_LOGGER.info('counter: %s' % self.counter.metric_name)
def process(self, kv, state=beam.DoFn.StateParam(state_spec)):
# Trigger materialization
list(state.read())
state.add(1)
self.counter.inc()
options = self.create_options()
# Test only supports parallelism of 1
options._all_options['parallelism'] = 1
# Create multiple bundles to test cache metrics
options._all_options['max_bundle_size'] = 10
options._all_options['max_bundle_time_millis'] = 95130590130
experiments = options.view_as(DebugOptions).experiments or []
experiments.append('state_cache_size=123')
options.view_as(DebugOptions).experiments = experiments
with Pipeline(self.get_runner(), options) as p:
# pylint: disable=expression-not-assigned
(
p
| "create" >> beam.Create(list(range(0, 110)))
| "mapper" >> beam.Map(lambda x: (x % 10, 'val'))
| "stateful" >> beam.ParDo(DoFn()))
lines_expected = {'counter: 110'}
if options.view_as(StandardOptions).streaming:
lines_expected.update([
# Gauges for the last finished bundle
'stateful.beam.metric:statecache:capacity: 123',
'stateful.beam.metric:statecache:size: 10',
'stateful.beam.metric:statecache:get: 20',
'stateful.beam.metric:statecache:miss: 0',
'stateful.beam.metric:statecache:hit: 20',
'stateful.beam.metric:statecache:put: 0',
'stateful.beam.metric:statecache:evict: 0',
# Counters
'stateful.beam.metric:statecache:get_total: 220',
'stateful.beam.metric:statecache:miss_total: 10',
'stateful.beam.metric:statecache:hit_total: 210',
'stateful.beam.metric:statecache:put_total: 10',
'stateful.beam.metric:statecache:evict_total: 0',
])
else:
# Batch has a different processing model. All values for
# a key are processed at once.
lines_expected.update([
# Gauges
'stateful).beam.metric:statecache:capacity: 123',
# For the first key, the cache token will not be set yet.
# It's lazily initialized after first access in StateRequestHandlers
'stateful).beam.metric:statecache:size: 10',
# We have 11 here because there are 110 / 10 elements per key
'stateful).beam.metric:statecache:get: 12',
'stateful).beam.metric:statecache:miss: 1',
'stateful).beam.metric:statecache:hit: 11',
# State is flushed back once per key
'stateful).beam.metric:statecache:put: 1',
'stateful).beam.metric:statecache:evict: 0',
# Counters
'stateful).beam.metric:statecache:get_total: 120',
'stateful).beam.metric:statecache:miss_total: 10',
'stateful).beam.metric:statecache:hit_total: 110',
'stateful).beam.metric:statecache:put_total: 10',
'stateful).beam.metric:statecache:evict_total: 0',
])
lines_actual = set()
with open(self.test_metrics_path, 'r') as f:
for line in f:
for metric_str in lines_expected:
metric_name = metric_str.split()[0]
if metric_str in line:
lines_actual.add(metric_str)
elif metric_name in line:
lines_actual.add(line)
> self.assertSetEqual(lines_actual, lines_expected)
E AssertionError: Items in the second set but not the first:
E 'stateful).beam.metric:statecache:hit: 11'
E 'stateful).beam.metric:statecache:put: 1'
E 'stateful).beam.metric:statecache:miss: 1'
E 'stateful).beam.metric:statecache:get_total: 120'
E 'stateful).beam.metric:statecache:size: 10'
E 'stateful).beam.metric:statecache:get: 12'
E 'stateful).beam.metric:statecache:evict: 0'
E 'stateful).beam.metric:statecache:capacity: 123'
E 'stateful).beam.metric:statecache:put_total: 10'
E 'stateful).beam.metric:statecache:evict_total: 0'
E 'counter: 110'
E 'stateful).beam.metric:statecache:miss_total: 10'
E 'stateful).beam.metric:statecache:hit_total: 110'
apache_beam/runners/portability/flink_runner_test.py:390: AssertionError