Details
-
Improvement
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
Description
There are currently no tests that verify that metrics that are reported using the Beam Metrics API are forwarded to Flink and a MetricReporter.
A test for this would have to manually configure a Flink "Mini Cluster", as in
// start also a re-usable Flink mini cluster flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); flink.start(); flinkPort = flink.getLeaderRPCPort();
with getFlinkConfiguration():
protected static Configuration getFlinkConfiguration() { Configuration flinkConfig = new Configuration(); flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter"); flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, MyTestReporter.class.getName()); return flinkConfig; }
where MyTestReporter is a MetricReporter that stores metrics being reported to it so we can verify that they are there after the job finishes.
Running a Pipeline on the mini cluster should be possible by specifying "localhost" and the port we received as a cluster endpoint.