Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.18.1
-
None
-
None
Description
We have the following query running in batch mode.
WITH FEATURE_INCLUSION AS ( SELECT insertion_id, -- Not unique features -- Array<Row<key, value>> FROM features_table ), TOTAL AS ( SELECT COUNT(DISTINCT insertion_id) total_id FROM FEATURE_INCLUSION ), FEATURE_INCLUSION_COUNTS AS ( SELECT `key`, COUNT(DISTINCT insertion_id) AS id_count FROM FEATURE_INCLUSION, UNNEST(features) as t (`key`, `value`) WHERE TRUE GROUP BY `key` ), RESULTS AS ( SELECT `key` FROM FEATURE_INCLUSION_COUNTS, TOTAL WHERE (1.0 * id_count)/total_id > 0.1 ) SELECT JSON_ARRAYAGG(`key`) AS feature_ids, FROM RESULTS
The parallelism adaptively set by Flink for the following operator was always 1.
[37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, insertion_id])
+- [38]:LocalHashAggregate(groupBy=[key], select=[key, Partial_COUNT(insertion_id) AS count$0])
If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually set `parallelism.default` to be greater than one, it worked.
The screenshot of the full job graph is attached.