Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.14.0
Description
Aug 12 22:44:38 =================================== FAILURES =================================== Aug 12 22:44:38 ______________ StreamingModeDataStreamTests.test_keyed_co_process ______________ Aug 12 22:44:38 Aug 12 22:44:38 self = <pyflink.datastream.tests.test_data_stream.StreamingModeDataStreamTests testMethod=test_keyed_co_process> Aug 12 22:44:38 Aug 12 22:44:38 def test_keyed_co_process(self): Aug 12 22:44:38 ds1 = self.env.from_collection([("a", 1), ("b", 2), ("c", 3)], Aug 12 22:44:38 type_info=Types.ROW([Types.STRING(), Types.INT()])) Aug 12 22:44:38 ds2 = self.env.from_collection([("b", 2), ("c", 3), ("d", 4)], Aug 12 22:44:38 type_info=Types.ROW([Types.STRING(), Types.INT()])) Aug 12 22:44:38 ds1 = ds1.assign_timestamps_and_watermarks( Aug 12 22:44:38 WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner( Aug 12 22:44:38 SecondColumnTimestampAssigner())) Aug 12 22:44:38 ds2 = ds2.assign_timestamps_and_watermarks( Aug 12 22:44:38 WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner( Aug 12 22:44:38 SecondColumnTimestampAssigner())) Aug 12 22:44:38 ds1.connect(ds2) \ Aug 12 22:44:38 .key_by(lambda x: x[0], lambda x: x[0]) \ Aug 12 22:44:38 .process(MyKeyedCoProcessFunction()) \ Aug 12 22:44:38 .map(lambda x: Row(x[0], x[1] + 1)) \ Aug 12 22:44:38 .add_sink(self.test_sink) Aug 12 22:44:38 self.env.execute('test_keyed_co_process_function') Aug 12 22:44:38 results = self.test_sink.get_results(True) Aug 12 22:44:38 expected = ["<Row('a', 2)>", Aug 12 22:44:38 "<Row('b', 2)>", Aug 12 22:44:38 "<Row('b', 3)>", Aug 12 22:44:38 "<Row('c', 2)>", Aug 12 22:44:38 "<Row('c', 3)>", Aug 12 22:44:38 "<Row('d', 2)>", Aug 12 22:44:38 "<Row('on_timer', 4)>"] Aug 12 22:44:38 > self.assert_equals_sorted(expected, results) Aug 12 22:44:38 Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:211: Aug 12 22:44:38 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:61: in assert_equals_sorted Aug 12 22:44:38 self.assertEqual(expected, actual) Aug 12 22:44:38 E AssertionError: Lists differ: ["<Ro[82 chars]<Row('d', 2)>", "<Row('on_timer', 4)>"] != ["<Ro[82 chars]<Row('d', 2)>", "<Row('on_timer', 4)>", "<Row('on_timer', 4)>"] Aug 12 22:44:38 E Aug 12 22:44:38 E Second list contains 1 additional elements. Aug 12 22:44:38 E First extra element 7: Aug 12 22:44:38 E "<Row('on_timer', 4)>" Aug 12 22:44:38 E Aug 12 22:44:38 E ["<Row('a', 2)>", Aug 12 22:44:38 E "<Row('b', 2)>", Aug 12 22:44:38 E "<Row('b', 3)>", Aug 12 22:44:38 E "<Row('c', 2)>", Aug 12 22:44:38 E "<Row('c', 3)>", Aug 12 22:44:38 E "<Row('d', 2)>", Aug 12 22:44:38 E + "<Row('on_timer', 4)>", Aug 12 22:44:38 E "<Row('on_timer', 4)>"] Aug 12 22:44:38 ________________ BatchModeDataStreamTests.test_keyed_co_process ________________ Aug 12 22:44:38 Aug 12 22:44:38 self = <pyflink.datastream.tests.test_data_stream.BatchModeDataStreamTests testMethod=test_keyed_co_process> Aug 12 22:44:38 Aug 12 22:44:38 def test_keyed_co_process(self): Aug 12 22:44:38 ds1 = self.env.from_collection([("a", 1), ("b", 2), ("c", 3)], Aug 12 22:44:38 type_info=Types.ROW([Types.STRING(), Types.INT()])) Aug 12 22:44:38 ds2 = self.env.from_collection([("b", 2), ("c", 3), ("d", 4)], Aug 12 22:44:38 type_info=Types.ROW([Types.STRING(), Types.INT()])) Aug 12 22:44:38 ds1 = ds1.assign_timestamps_and_watermarks( Aug 12 22:44:38 WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner( Aug 12 22:44:38 SecondColumnTimestampAssigner())) Aug 12 22:44:38 ds2 = ds2.assign_timestamps_and_watermarks( Aug 12 22:44:38 WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner( Aug 12 22:44:38 SecondColumnTimestampAssigner())) Aug 12 22:44:38 ds1.connect(ds2) \ Aug 12 22:44:38 .key_by(lambda x: x[0], lambda x: x[0]) \ Aug 12 22:44:38 .process(MyKeyedCoProcessFunction()) \ Aug 12 22:44:38 .map(lambda x: Row(x[0], x[1] + 1)) \ Aug 12 22:44:38 .add_sink(self.test_sink) Aug 12 22:44:38 self.env.execute('test_keyed_co_process_function') Aug 12 22:44:38 results = self.test_sink.get_results(True) Aug 12 22:44:38 expected = ["<Row('a', 2)>", Aug 12 22:44:38 "<Row('b', 2)>", Aug 12 22:44:38 "<Row('b', 3)>", Aug 12 22:44:38 "<Row('c', 2)>", Aug 12 22:44:38 "<Row('c', 3)>", Aug 12 22:44:38 "<Row('d', 2)>", Aug 12 22:44:38 "<Row('on_timer', 4)>"] Aug 12 22:44:38 > self.assert_equals_sorted(expected, results) Aug 12 22:44:38 Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:211: Aug 12 22:44:38 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:61: in assert_equals_sorted Aug 12 22:44:38 self.assertEqual(expected, actual) Aug 12 22:44:38 E AssertionError: Lists differ: ["<Ro[82 chars]<Row('d', 2)>", "<Row('on_timer', 4)>"] != ["<Ro[82 chars]<Row('d', 2)>", "<Row('on_timer', 4)>", "<Row('on_timer', 4)>"] Aug 12 22:44:38 E Aug 12 22:44:38 E Second list contains 1 additional elements. Aug 12 22:44:38 E First extra element 7: Aug 12 22:44:38 E "<Row('on_timer', 4)>" Aug 12 22:44:38 E Aug 12 22:44:38 E ["<Row('a', 2)>", Aug 12 22:44:38 E "<Row('b', 2)>", Aug 12 22:44:38 E "<Row('b', 3)>", Aug 12 22:44:38 E "<Row('c', 2)>", Aug 12 22:44:38 E "<Row('c', 3)>", Aug 12 22:44:38 E "<Row('d', 2)>", Aug 12 22:44:38 E + "<Row('on_timer', 4)>", Aug 12 22:44:38 E "<Row('on_timer', 4)>"]
Attachments
Issue Links
- duplicates
-
FLINK-23742 test_keyed_co_process test failed in py36 and py37
- Closed