from typing import Tuple
from pyflink.common import Configuration
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import AggregateFunction
from pyflink.datastream.window import EventTimeSessionWindows
class SecondColumnTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])
def main():
config = Configuration()
config.set_string("python.execution-mode", "thread")
env = StreamExecutionEnvironment.get_execution_environment(config)
data_stream = env.from_collection([
('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()])) watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
class MyAggregateFunction(AggregateFunction):
def create_accumulator(self) -> Tuple[int, str]:
return 0, ''
def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) -> Tuple[int, str]:
return value[1] + accumulator[0], value[0]
def get_result(self, accumulator: Tuple[str, int]):
return accumulator[1], accumulator[0]
def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
return acc_a[0] + acc_b[0], acc_a[1]
ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
.aggregate(MyAggregateFunction(),
accumulator_type=Types.TUPLE([Types.INT(), Types.STRING()]),
output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
ds.print()
env.execute('test_window_aggregate_accumulator_type')
if __name__ == '__main__':
main()