Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.16.0, 1.17.0
-
None
-
None
Description
version: upgrade pyflink1.15.2 to pyflink1.16.1
Report an error:
Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'data_stream.assign_timestamps_and_watermarks(...)'?
The application before with version 1.15.2 has never reported the error.
Example:
```python``` class MyTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value, record_timestamp: int) -> int: return value['version'] sql=""" select columns,version(milliseconds) from kafka_source """ table = st_env.sql_query(sql) stream = st_env.to_changelog_stream(table) stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10))) stream = stream.key_by(CommonKeySelector()) \ .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \ .process(WindowFunction(), typeInfo)
Try to debug to trace pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks and find watermark_strategy._timestamp_assigner is none.
Solution:
Remove the function with_idleness(Duration.of_seconds(10))
stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner()))
Is this a bug?