Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23745

Python test_keyed_co_process fails on azure

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.14.0
    • 1.14.0
    • API / Python

    Description

      https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22020&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=c67e71ed-6451-5d26-8920-5a8cf9651901&l=21602

      https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22020&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=c67e71ed-6451-5d26-8920-5a8cf9651901&l=21809

      Aug 12 22:44:38 =================================== FAILURES ===================================
      Aug 12 22:44:38 ______________ StreamingModeDataStreamTests.test_keyed_co_process ______________
      Aug 12 22:44:38 
      Aug 12 22:44:38 self = <pyflink.datastream.tests.test_data_stream.StreamingModeDataStreamTests testMethod=test_keyed_co_process>
      Aug 12 22:44:38 
      Aug 12 22:44:38     def test_keyed_co_process(self):
      Aug 12 22:44:38         ds1 = self.env.from_collection([("a", 1), ("b", 2), ("c", 3)],
      Aug 12 22:44:38                                        type_info=Types.ROW([Types.STRING(), Types.INT()]))
      Aug 12 22:44:38         ds2 = self.env.from_collection([("b", 2), ("c", 3), ("d", 4)],
      Aug 12 22:44:38                                        type_info=Types.ROW([Types.STRING(), Types.INT()]))
      Aug 12 22:44:38         ds1 = ds1.assign_timestamps_and_watermarks(
      Aug 12 22:44:38             WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
      Aug 12 22:44:38                 SecondColumnTimestampAssigner()))
      Aug 12 22:44:38         ds2 = ds2.assign_timestamps_and_watermarks(
      Aug 12 22:44:38             WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
      Aug 12 22:44:38                 SecondColumnTimestampAssigner()))
      Aug 12 22:44:38         ds1.connect(ds2) \
      Aug 12 22:44:38             .key_by(lambda x: x[0], lambda x: x[0]) \
      Aug 12 22:44:38             .process(MyKeyedCoProcessFunction()) \
      Aug 12 22:44:38             .map(lambda x: Row(x[0], x[1] + 1)) \
      Aug 12 22:44:38             .add_sink(self.test_sink)
      Aug 12 22:44:38         self.env.execute('test_keyed_co_process_function')
      Aug 12 22:44:38         results = self.test_sink.get_results(True)
      Aug 12 22:44:38         expected = ["<Row('a', 2)>",
      Aug 12 22:44:38                     "<Row('b', 2)>",
      Aug 12 22:44:38                     "<Row('b', 3)>",
      Aug 12 22:44:38                     "<Row('c', 2)>",
      Aug 12 22:44:38                     "<Row('c', 3)>",
      Aug 12 22:44:38                     "<Row('d', 2)>",
      Aug 12 22:44:38                     "<Row('on_timer', 4)>"]
      Aug 12 22:44:38 >       self.assert_equals_sorted(expected, results)
      Aug 12 22:44:38 
      Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:211: 
      Aug 12 22:44:38 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:61: in assert_equals_sorted
      Aug 12 22:44:38     self.assertEqual(expected, actual)
      Aug 12 22:44:38 E   AssertionError: Lists differ: ["<Ro[82 chars]<Row('d', 2)>", "<Row('on_timer', 4)>"] != ["<Ro[82 chars]<Row('d', 2)>", "<Row('on_timer', 4)>", "<Row('on_timer', 4)>"]
      Aug 12 22:44:38 E   
      Aug 12 22:44:38 E   Second list contains 1 additional elements.
      Aug 12 22:44:38 E   First extra element 7:
      Aug 12 22:44:38 E   "<Row('on_timer', 4)>"
      Aug 12 22:44:38 E   
      Aug 12 22:44:38 E     ["<Row('a', 2)>",
      Aug 12 22:44:38 E      "<Row('b', 2)>",
      Aug 12 22:44:38 E      "<Row('b', 3)>",
      Aug 12 22:44:38 E      "<Row('c', 2)>",
      Aug 12 22:44:38 E      "<Row('c', 3)>",
      Aug 12 22:44:38 E      "<Row('d', 2)>",
      Aug 12 22:44:38 E   +  "<Row('on_timer', 4)>",
      Aug 12 22:44:38 E      "<Row('on_timer', 4)>"]
      Aug 12 22:44:38 ________________ BatchModeDataStreamTests.test_keyed_co_process ________________
      Aug 12 22:44:38 
      Aug 12 22:44:38 self = <pyflink.datastream.tests.test_data_stream.BatchModeDataStreamTests testMethod=test_keyed_co_process>
      Aug 12 22:44:38 
      Aug 12 22:44:38     def test_keyed_co_process(self):
      Aug 12 22:44:38         ds1 = self.env.from_collection([("a", 1), ("b", 2), ("c", 3)],
      Aug 12 22:44:38                                        type_info=Types.ROW([Types.STRING(), Types.INT()]))
      Aug 12 22:44:38         ds2 = self.env.from_collection([("b", 2), ("c", 3), ("d", 4)],
      Aug 12 22:44:38                                        type_info=Types.ROW([Types.STRING(), Types.INT()]))
      Aug 12 22:44:38         ds1 = ds1.assign_timestamps_and_watermarks(
      Aug 12 22:44:38             WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
      Aug 12 22:44:38                 SecondColumnTimestampAssigner()))
      Aug 12 22:44:38         ds2 = ds2.assign_timestamps_and_watermarks(
      Aug 12 22:44:38             WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
      Aug 12 22:44:38                 SecondColumnTimestampAssigner()))
      Aug 12 22:44:38         ds1.connect(ds2) \
      Aug 12 22:44:38             .key_by(lambda x: x[0], lambda x: x[0]) \
      Aug 12 22:44:38             .process(MyKeyedCoProcessFunction()) \
      Aug 12 22:44:38             .map(lambda x: Row(x[0], x[1] + 1)) \
      Aug 12 22:44:38             .add_sink(self.test_sink)
      Aug 12 22:44:38         self.env.execute('test_keyed_co_process_function')
      Aug 12 22:44:38         results = self.test_sink.get_results(True)
      Aug 12 22:44:38         expected = ["<Row('a', 2)>",
      Aug 12 22:44:38                     "<Row('b', 2)>",
      Aug 12 22:44:38                     "<Row('b', 3)>",
      Aug 12 22:44:38                     "<Row('c', 2)>",
      Aug 12 22:44:38                     "<Row('c', 3)>",
      Aug 12 22:44:38                     "<Row('d', 2)>",
      Aug 12 22:44:38                     "<Row('on_timer', 4)>"]
      Aug 12 22:44:38 >       self.assert_equals_sorted(expected, results)
      Aug 12 22:44:38 
      Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:211: 
      Aug 12 22:44:38 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:61: in assert_equals_sorted
      Aug 12 22:44:38     self.assertEqual(expected, actual)
      Aug 12 22:44:38 E   AssertionError: Lists differ: ["<Ro[82 chars]<Row('d', 2)>", "<Row('on_timer', 4)>"] != ["<Ro[82 chars]<Row('d', 2)>", "<Row('on_timer', 4)>", "<Row('on_timer', 4)>"]
      Aug 12 22:44:38 E   
      Aug 12 22:44:38 E   Second list contains 1 additional elements.
      Aug 12 22:44:38 E   First extra element 7:
      Aug 12 22:44:38 E   "<Row('on_timer', 4)>"
      Aug 12 22:44:38 E   
      Aug 12 22:44:38 E     ["<Row('a', 2)>",
      Aug 12 22:44:38 E      "<Row('b', 2)>",
      Aug 12 22:44:38 E      "<Row('b', 3)>",
      Aug 12 22:44:38 E      "<Row('c', 2)>",
      Aug 12 22:44:38 E      "<Row('c', 3)>",
      Aug 12 22:44:38 E      "<Row('d', 2)>",
      Aug 12 22:44:38 E   +  "<Row('on_timer', 4)>",
      Aug 12 22:44:38 E      "<Row('on_timer', 4)>"]
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              xtsong Xintong Song
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: