Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-9514

[Python] The new Dataset API will not work with files on Azure Blob

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Works for Me
    • Affects Version/s: 0.17.1, 1.0.0
    • Fix Version/s: 1.0.0
    • Component/s: Python
    • Environment:
      Ubuntu 18.04
    • Flags:
      Patch

      Description

      I tried using  pyarrow.dataset and pq.ParquetDataset(use_legacy_system=False) and my connection to Azure Blob fails. 

      I know the documentation says only hdfs and s3 are implemented, but I have been using Azure Blob by using fsspec as the filesystem when reading and writing parquet files/datasets with Pyarrow (with use_legacy_system=True). Also, Dask works with storage_options.

      I am hoping that Azure Blob will be supported because I'd really like to try out the new row filtering and non-hive partitioning schemes.

      This is what I use for the filesystem when using read_table() or write_to_dataset():

      fs = fsspec.filesystem(protocol='abfs', 
       account_name=base.login, 
       account_key=base.password)
      

       
      It seems like the class _ParquetDatasetV2 has a section that the original ParquetDataset does not have. Perhaps this is why the fsspec filesystem fails when I turn off the legacy system?

      Line 1423 in arrow/python/pyarrow/parquet.py:

      if filesystem is not None:
          filesystem = pyarrow.fs._ensure_filesystem(filesystem, use_mmap=memory_map) 

      EDIT -

      I got this to work using fsspec on single files on Azure Blob:

      import pyarrow.dataset as ds
      import fsspec
      
      fs = fsspec.filesystem(protocol='abfs', 
                             account_name=login, 
                             account_key=password)
      
      dataset = ds.dataset("abfs://analytics/test/test..parquet", format="parquet", filesystem=fs)
      dataset.to_table(columns=['ticket_id', 'event_value'], filter=ds.field('event_value') == 'closed').to_pandas().drop_duplicates('ticket_id')
      

      When I try to use this on a partitioned file I made using write_to_dataset, I run into an error though. I tried this with the same code as above and also with the partitioning='hive' option.

      TypeError                                 Traceback (most recent call last)
      <ipython-input-174-f44e707aa83e> in <module>
      ----> 1 dataset = ds.dataset("abfs://analytics/test/tickets-audits/", format="parquet", filesystem=fs, partitioning="hive", )
      
      ~/.local/lib/python3.7/site-packages/pyarrow/dataset.py in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes)
          665     # TODO(kszucs): support InMemoryDataset for a table input
          666     if _is_path_like(source):
      --> 667         return _filesystem_dataset(source, **kwargs)
          668     elif isinstance(source, (tuple, list)):
          669         if all(_is_path_like(elem) for elem in source):
      
      ~/.local/lib/python3.7/site-packages/pyarrow/dataset.py in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
          430         selector_ignore_prefixes=selector_ignore_prefixes
          431     )
      --> 432     factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options)
          433 
          434     return factory.finish(schema)
      
      ~/.local/lib/python3.7/site-packages/pyarrow/_dataset.pyx in pyarrow._dataset.FileSystemDatasetFactory.__init__()
      
      ~/.local/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.pyarrow_internal_check_status()
      
      ~/.local/lib/python3.7/site-packages/pyarrow/_fs.pyx in pyarrow._fs._cb_get_file_info_selector()
      
      ~/.local/lib/python3.7/site-packages/pyarrow/fs.py in get_file_info_selector(self, selector)
          159         infos = []
          160         selected_files = self.fs.find(
      --> 161             selector.base_dir, maxdepth=maxdepth, withdirs=True, detail=True
          162         )
          163         for path, info in selected_files.items():
      
      /opt/conda/lib/python3.7/site-packages/fsspec/spec.py in find(self, path, maxdepth, withdirs, **kwargs)
          369         # TODO: allow equivalent of -name parameter
          370         out = set()
      --> 371         for path, dirs, files in self.walk(path, maxdepth, **kwargs):
          372             if withdirs:
          373                 files += dirs
      
      /opt/conda/lib/python3.7/site-packages/fsspec/spec.py in walk(self, path, maxdepth, **kwargs)
          324 
          325         try:
      --> 326             listing = self.ls(path, detail=True, **kwargs)
          327         except (FileNotFoundError, IOError):
          328             return [], [], []
      
      TypeError: ls() got multiple values for keyword argument 'detail'
       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              ldacey Lance Dacey
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: