Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.15.0
Description
For the following job:
import argparse import json import sys import time from typing import Iterable, cast from pyflink.common import Types, Time, Encoder from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction, EmbeddedRocksDBStateBackend, \ PredefinedOptions, FileSystemCheckpointStorage, CheckpointingMode, ExternalizedCheckpointCleanup from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy, OutputFileConfig from pyflink.datastream.state import ReducingState, ReducingStateDescriptor from pyflink.datastream.window import TimeWindow, Trigger, TriggerResult, T, TumblingProcessingTimeWindows, \ ProcessingTimeTrigger class CountWithProcessTimeoutTrigger(ProcessingTimeTrigger): def __init__(self, window_size: int): self._window_size = window_size self._count_state_descriptor = ReducingStateDescriptor( "count", lambda a, b: a + b, Types.LONG()) @staticmethod def of(window_size: int) -> 'CountWithProcessTimeoutTrigger': return CountWithProcessTimeoutTrigger(window_size) def on_element(self, element: T, timestamp: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: count_state = cast(ReducingState, ctx.get_partitioned_state(self._count_state_descriptor)) count_state.add(1) # print("element arrive:", element, "count_state:", count_state.get(), window.max_timestamp(), # ctx.get_current_watermark()) if count_state.get() >= self._window_size: # 必须fire&purge!!!! print("fire element count", element, count_state.get(), window.max_timestamp(), ctx.get_current_watermark()) count_state.clear() return TriggerResult.FIRE_AND_PURGE if timestamp >= window.end: count_state.clear() return TriggerResult.FIRE_AND_PURGE else: return TriggerResult.CONTINUE def on_processing_time(self, timestamp: int, window: TimeWindow, ctx: Trigger.TriggerContext) -> TriggerResult: if timestamp >= window.end: return TriggerResult.CONTINUE else: print("fire with process_time:", timestamp) count_state = cast(ReducingState, ctx.get_partitioned_state(self._count_state_descriptor)) count_state.clear() return TriggerResult.FIRE_AND_PURGE def on_event_time(self, timestamp: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: return TriggerResult.CONTINUE def clear(self, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> None: count_state = ctx.get_partitioned_state(self._count_state_descriptor) count_state.clear() def to_dict_map(v): time.sleep(1) dict_value = json.loads(v) return dict_value def get_group_key(value, keys): group_key_values = [] for key in keys: one_key_value = 'null' if key in value: list_value = value[key] if list_value: one_key_value = str(list_value[0]) group_key_values.append(one_key_value) group_key = '_'.join(group_key_values) # print("group_key=", group_key) return group_key class CountWindowProcessFunction(ProcessWindowFunction[dict, dict, str, TimeWindow]): def __init__(self, uf): self._user_function = uf def process(self, key: str, context: ProcessWindowFunction.Context[TimeWindow], elements: Iterable[dict]) -> Iterable[dict]: result_list = self._user_function.process_after_group_by_function(elements) return result_list if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( '--output', dest='output', required=False, help='Output file to write results to.') argv = sys.argv[1:] known_args, _ = parser.parse_known_args(argv) output_path = known_args.output env = StreamExecutionEnvironment.get_execution_environment() # write all the data to one file env.set_parallelism(1) # process time env.get_config().set_auto_watermark_interval(0) state_backend = EmbeddedRocksDBStateBackend(True) state_backend.set_predefined_options(PredefinedOptions.FLASH_SSD_OPTIMIZED) env.set_state_backend(state_backend) config = env.get_checkpoint_config() # config.set_checkpoint_storage(FileSystemCheckpointStorage("hdfs://ha-nn-uri/tmp/checkpoint/")) config.set_checkpoint_storage(FileSystemCheckpointStorage("file:///Users/10030122/Downloads/pyflink_checkpoint/")) config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE) config.set_checkpoint_interval(5000) config.set_externalized_checkpoint_cleanup(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) # define the source data_stream1 = env.from_collection(['{"user_id": ["0"], "goods_id": [0,0]}', '{"user_id": ["1"], "goods_id": [1,0]}', '{"user_id": ["2"], "goods_id": [2,0]}', '{"user_id": ["1"], "goods_id": [3,0]}', '{"user_id": ["2"], "goods_id": [4,0]}', '{"user_id": ["1"], "goods_id": [5,0]}', '{"user_id": ["2"], "goods_id": [6,0]}', '{"user_id": ["1"], "goods_id": [7,0]}', '{"user_id": ["2"], "goods_id": [8,0]}', '{"user_id": ["1"], "goods_id": [9,0]}', '{"user_id": ["2"], "goods_id": [10,0]}', '{"user_id": ["1"], "goods_id": [11,0]}', '{"user_id": ["2"], "goods_id": [12,0]}', '{"user_id": ["1"], "goods_id": [13,0]}', '{"user_id": ["2"], "goods_id": [14,0]}', '{"user_id": ["1"], "goods_id": [15,0]}', '{"user_id": ["2"], "goods_id": [16,0]}', '{"user_id": ["1"], "goods_id": [17,0]}', '{"user_id": ["2"], "goods_id": [18,0]}', '{"user_id": ["1"], "goods_id": [19,0]}', '{"user_id": ["2"], "goods_id": [20,0]}', '{"user_id": ["1"], "goods_id": [21,0]}', '{"user_id": ["2"], "goods_id": [22,0]}', '{"user_id": ["1"], "goods_id": [23,0]}', '{"user_id": ["2"], "goods_id": [24,0]}', '{"user_id": ["1"], "goods_id": [25,0]}', '{"user_id": ["2"], "goods_id": [26,0]}', '{"user_id": ["1"], "goods_id": [27,0]}', '{"user_id": ["2"], "goods_id": [28,0]}', '{"user_id": ["1"], "goods_id": [29,0]}', '{"user_id": ["2"], "goods_id": [30,0]}']) data_stream2 = env.from_collection(['{"user_id": ["0"], "goods_id": [0,0]}', '{"user_id": ["1"], "goods_id": [1,0]}', '{"user_id": ["2"], "goods_id": [2,0]}', '{"user_id": ["1"], "goods_id": [3,0]}', '{"user_id": ["2"], "goods_id": [4,0]}', '{"user_id": ["1"], "goods_id": [5,0]}', '{"user_id": ["2"], "goods_id": [6,0]}', '{"user_id": ["1"], "goods_id": [7,0]}', '{"user_id": ["2"], "goods_id": [8,0]}', '{"user_id": ["1"], "goods_id": [9,0]}', '{"user_id": ["2"], "goods_id": [10,0]}', '{"user_id": ["1"], "goods_id": [11,0]}', '{"user_id": ["2"], "goods_id": [12,0]}', '{"user_id": ["1"], "goods_id": [13,0]}', '{"user_id": ["2"], "goods_id": [14,0]}', '{"user_id": ["1"], "goods_id": [15,0]}', '{"user_id": ["2"], "goods_id": [16,0]}', '{"user_id": ["1"], "goods_id": [17,0]}', '{"user_id": ["2"], "goods_id": [18,0]}', '{"user_id": ["1"], "goods_id": [19,0]}', '{"user_id": ["2"], "goods_id": [20,0]}', '{"user_id": ["1"], "goods_id": [21,0]}', '{"user_id": ["2"], "goods_id": [22,0]}', '{"user_id": ["1"], "goods_id": [23,0]}', '{"user_id": ["2"], "goods_id": [24,0]}', '{"user_id": ["1"], "goods_id": [25,0]}', '{"user_id": ["2"], "goods_id": [26,0]}', '{"user_id": ["1"], "goods_id": [27,0]}', '{"user_id": ["2"], "goods_id": [28,0]}', '{"user_id": ["1"], "goods_id": [29,0]}', '{"user_id": ["2"], "goods_id": [30,0]}']) # group_keys = ['user_id', 'goods_id'] group_keys = ['user_id'] sink_to_file_flag = True data_stream = data_stream1.union(data_stream2) # user_function = __import__("UserFunction") ds = data_stream.map(lambda v: to_dict_map(v)) \ .filter(lambda v: v) \ .map(lambda v: v) \ .key_by(lambda v: get_group_key(v, group_keys)) \ .window(TumblingProcessingTimeWindows.of(Time.seconds(12))) \ .process(CountWindowProcessFunction(lambda v: v), Types.STRING()) ds = ds.map(lambda v: v, Types.PRIMITIVE_ARRAY(Types.BYTE())) base_path = "/tmp/1.txt" encoder = Encoder.simple_string_encoder() file_sink_builder = FileSink.for_row_format(base_path, encoder) file_sink = file_sink_builder \ .with_bucket_check_interval(1000) \ .with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \ .with_output_file_config( OutputFileConfig.builder().with_part_prefix("pre").with_part_suffix("suf").build()) \ .build() ds.sink_to(file_sink) # submit for execution env.execute()
The stream graph is as following:
{ "nodes" : [ { "id" : 1, "type" : "Source: Collection Source", "pact" : "Data Source", "contents" : "Source: Collection Source", "parallelism" : 1 }, { "id" : 2, "type" : "Source: Collection Source", "pact" : "Data Source", "contents" : "Source: Collection Source", "parallelism" : 1 }, { "id" : 9, "type" : "TumblingProcessingTimeWindows", "pact" : "Operator", "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), ProcessingTimeTrigger, CountWindowProcessFunction)", "parallelism" : 1, "predecessors" : [ { "id" : 15, "ship_strategy" : "HASH", "side" : "second" } ] }, { "id" : 10, "type" : "Map", "pact" : "Operator", "contents" : "Map", "parallelism" : 1, "predecessors" : [ { "id" : 9, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 15, "type" : "Map, Filter, Map, _stream_key_by_map_operator", "pact" : "Operator", "contents" : "Map, Filter, Map, _stream_key_by_map_operator", "parallelism" : 1, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" }, { "id" : 2, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 16, "type" : "TumblingProcessingTimeWindows, Map", "pact" : "Operator", "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), ProcessingTimeTrigger, CountWindowProcessFunction)", "parallelism" : 1, "predecessors" : [ { "id" : 15, "ship_strategy" : "HASH", "side" : "second" } ] }, { "id" : 18, "type" : "Sink: Writer", "pact" : "Operator", "contents" : "Sink: Writer", "parallelism" : 1, "predecessors" : [ { "id" : 10, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 20, "type" : "Sink: Committer", "pact" : "Operator", "contents" : "Sink: Committer", "parallelism" : 1, "predecessors" : [ { "id" : 18, "ship_strategy" : "FORWARD", "side" : "second" } ] } ] }
The plan is incorrect as we can see that TumblingProcessingTimeWindows appears twice.
Attachments
Issue Links
- links to