Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-15438

[Python] Flaky test test_write_dataset_max_open_files

Details

    Description

      Found during 7.0.0 verification

      pyarrow/tests/test_dataset.py::test_write_dataset_max_open_files FAILED                                            [ 30%]
      >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>tempdir = PosixPath('/tmp/pytest-of-root/pytest-1/test_write_dataset_max_open_fi0')    def test_write_dataset_max_open_files(tempdir):
              directory = tempdir / 'ds'
              file_format = "parquet"
              partition_column_id = 1
              column_names = ['c1', 'c2']
              record_batch_1 = pa.record_batch(data=[[1, 2, 3, 4, 0, 10],
                                                     ['a', 'b', 'c', 'd', 'e', 'a']],
                                               names=column_names)
              record_batch_2 = pa.record_batch(data=[[5, 6, 7, 8, 0, 1],
                                                     ['a', 'b', 'c', 'd', 'e', 'c']],
                                               names=column_names)
              record_batch_3 = pa.record_batch(data=[[9, 10, 11, 12, 0, 1],
                                                     ['a', 'b', 'c', 'd', 'e', 'd']],
                                               names=column_names)
              record_batch_4 = pa.record_batch(data=[[13, 14, 15, 16, 0, 1],
                                                     ['a', 'b', 'c', 'd', 'e', 'b']],
                                               names=column_names)
          
              table = pa.Table.from_batches([record_batch_1, record_batch_2,
                                             record_batch_3, record_batch_4])
          
              partitioning = ds.partitioning(
                  pa.schema([(column_names[partition_column_id], pa.string())]),
                  flavor="hive")
          
              data_source_1 = directory / "default"
          
              ds.write_dataset(data=table, base_dir=data_source_1,
                               partitioning=partitioning, format=file_format)
          
              # Here we consider the number of unique partitions created when
              # partitioning column contains duplicate records.
              #   Returns: (number_of_files_generated, number_of_partitions)
              def _get_compare_pair(data_source, record_batch, file_format, col_id):
                  num_of_files_generated = _get_num_of_files_generated(
                      base_directory=data_source, file_format=file_format)
                  number_of_partitions = len(pa.compute.unique(record_batch[col_id]))
                  return num_of_files_generated, number_of_partitions
          
              # CASE 1: when max_open_files=default & max_open_files >= num_of_partitions
              #         In case of a writing to disk via partitioning based on a
              #         particular column (considering row labels in that column),
              #         the number of unique rows must be equal
              #         to the number of files generated
          
              num_of_files_generated, number_of_partitions \
                  = _get_compare_pair(data_source_1, record_batch_1, file_format,
                                      partition_column_id)
              assert num_of_files_generated == number_of_partitions
          
              # CASE 2: when max_open_files > 0 & max_open_files < num_of_partitions
              #         the number of files generated must be greater than the number of
              #         partitions
          
              data_source_2 = directory / "max_1"
          
              max_open_files = 3
          
              ds.write_dataset(data=table, base_dir=data_source_2,
                               partitioning=partitioning, format=file_format,
                               max_open_files=max_open_files)
          
              num_of_files_generated, number_of_partitions \
                  = _get_compare_pair(data_source_2, record_batch_1, file_format,
                                      partition_column_id)
      >       assert num_of_files_generated > number_of_partitions
      E       assert 5 > 5pyarrow/tests/test_dataset.py:3807: AssertionError
       

      Attachments

        Activity

          People

            vibhatha Vibhatha Lakmal Abeykoon
            lidavidm David Li
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 5h
                5h

                Slack

                  Issue deployment