self = <pyflink.table.tests.test_pandas_udaf.StreamPandasUDAFITTests testMethod=test_sliding_group_window_over_proctime>
Dec 06 03:10:43
Dec 06 03:10:43 def test_sliding_group_window_over_proctime(self):
Dec 06 03:10:43 self.t_env.get_config().get_configuration().set_string("parallelism.default", "1")
Dec 06 03:10:43 from pyflink.table.window import Slide
Dec 06 03:10:43 self.t_env.register_function("mean_udaf", mean_udaf)
Dec 06 03:10:43
Dec 06 03:10:43 source_table = """
Dec 06 03:10:43 create table source_table(
Dec 06 03:10:43 a INT,
Dec 06 03:10:43 proctime as PROCTIME()
Dec 06 03:10:43 ) with(
Dec 06 03:10:43 'connector' = 'datagen',
Dec 06 03:10:43 'rows-per-second' = '1',
Dec 06 03:10:43 'fields.a.kind' = 'sequence',
Dec 06 03:10:43 'fields.a.start' = '1',
Dec 06 03:10:43 'fields.a.end' = '10'
Dec 06 03:10:43 )
Dec 06 03:10:43 """
Dec 06 03:10:43 self.t_env.execute_sql(source_table)
Dec 06 03:10:43 t = self.t_env.from_path("source_table")
Dec 06 03:10:43 iterator = t.select("a, proctime") \
Dec 06 03:10:43 .window(Slide.over("1.seconds").every("1.seconds").on("proctime").alias("w")) \
Dec 06 03:10:43 .group_by("a, w") \
Dec 06 03:10:43 .select("mean_udaf(a) as b, w.start").execute().collect()
Dec 06 03:10:43 result = [i for i in iterator]
Dec 06 03:10:43 # if the WindowAssigner.isEventTime() does not return false,
Dec 06 03:10:43 # the w.start would be 1970-01-01
Dec 06 03:10:43 # TODO: After fixing the TimeZone problem of window with processing time (will be fixed in
Dec 06 03:10:43 # FLIP-162), we should replace it with a more accurate assertion.
Dec 06 03:10:43 > self.assertTrue(result[0][1].year > 1970)
Dec 06 03:10:43 E IndexError: list index out of range