Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-10517

[Python] Unable to read/write Parquet datasets with fsspec on Azure Blob

    XMLWordPrintableJSON

Details

    Description

       

      # adal==1.2.5
      # adlfs==0.2.5
      # fsspec==0.7.4
      # pandas==1.1.3
      # pyarrow==2.0.0
      # azure-storage-blob==2.1.0
      # azure-storage-common==2.1.0
      
      import pyarrow.dataset as ds
      import fsspec
      from pyarrow.dataset import DirectoryPartitioning
      
      fs = fsspec.filesystem(protocol='abfs', 
                             account_name=base.login, 
                             account_key=base.password)
      
      
      ds.write_dataset(data=table, 
                       base_dir="dev/test7", 
                       basename_template=None, 
                       format="parquet",
                       partitioning=DirectoryPartitioning(pa.schema([("year", pa.string()), ("month", pa.string()), ("day", pa.string())])), 
                       schema=table.schema,
                       filesystem=fs, 
                      )
      

       I think this is due to early versions of adlfs having mkdir(). Although I use write_to_dataset and write_table all of the time, so I am not sure why this would be an issue.

      ---------------------------------------------------------------------------
      RuntimeError                              Traceback (most recent call last)
      <ipython-input-40-bb38d83f896e> in <module>
           13 
           14 
      ---> 15 ds.write_dataset(data=table, 
           16                  base_dir="dev/test7",
           17                  basename_template=None,
      
      /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in write_dataset(data, base_dir, basename_template, format, partitioning, schema, filesystem, file_options, use_threads)
          771     filesystem, _ = _ensure_fs(filesystem)
          772 
      --> 773     _filesystemdataset_write(
          774         data, base_dir, basename_template, schema,
          775         filesystem, partitioning, file_options, use_threads,
      
      /opt/conda/lib/python3.8/site-packages/pyarrow/_dataset.pyx in pyarrow._dataset._filesystemdataset_write()
      
      /opt/conda/lib/python3.8/site-packages/pyarrow/_fs.pyx in pyarrow._fs._cb_create_dir()
      
      /opt/conda/lib/python3.8/site-packages/pyarrow/fs.py in create_dir(self, path, recursive)
          226     def create_dir(self, path, recursive):
          227         # mkdir also raises FileNotFoundError when base directory is not found
      --> 228         self.fs.mkdir(path, create_parents=recursive)
          229 
          230     def delete_dir(self, path):
      
      /opt/conda/lib/python3.8/site-packages/adlfs/core.py in mkdir(self, path, delimiter, exists_ok, **kwargs)
          561             else:
          562                 ## everything else
      --> 563                 raise RuntimeError(f"Cannot create {container_name}{delimiter}{path}.")
          564         else:
          565             if container_name in self.ls("") and path:
      
      RuntimeError: Cannot create dev/test7/2020/01/28.
      

       
      Next, if I try to read a dataset (keep in mind that this works with read_table and ParquetDataset):

      ds.dataset(source="dev/staging/evaluations", 
                 format="parquet", 
                 partitioning="hive",
                 exclude_invalid_files=False,
                 filesystem=fs
                )
      

       
      This doesn't seem to respect the filesystem connected to Azure Blob.

      ---------------------------------------------------------------------------
      FileNotFoundError                         Traceback (most recent call last)
      <ipython-input-41-4de65fe95db7> in <module>
      ----> 1 ds.dataset(source="dev/staging/evaluations", 
            2            format="parquet",
            3            partitioning="hive",
            4            exclude_invalid_files=False,
            5            filesystem=fs
      
      /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes)
          669     # TODO(kszucs): support InMemoryDataset for a table input
          670     if _is_path_like(source):
      --> 671         return _filesystem_dataset(source, **kwargs)
          672     elif isinstance(source, (tuple, list)):
          673         if all(_is_path_like(elem) for elem in source):
      
      /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
          426         fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
          427     else:
      --> 428         fs, paths_or_selector = _ensure_single_source(source, filesystem)
          429 
          430     options = FileSystemFactoryOptions(
      
      /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in _ensure_single_source(path, filesystem)
          402         paths_or_selector = [path]
          403     else:
      --> 404         raise FileNotFoundError(path)
          405 
          406     return filesystem, paths_or_selector
      
      FileNotFoundError: dev/staging/evaluations
      

      This does work though when I list the blobs before passing them to ds.dataset:

      blobs = wasb.list_blobs(container_name="dev", prefix="staging/evaluations")
      
      dataset = ds.dataset(source=["dev/" + blob.name for blob in blobs], 
                           format="parquet", 
                           partitioning="hive",
                           exclude_invalid_files=False,
                           filesystem=fs)
      

      Next, if I downgrade to pyarrow 1.0.1, I am able to read datasets (but there is no write_datasets):

      # adal==1.2.5
      # adlfs==0.2.5
      # azure-storage-blob==2.1.0
      # azure-storage-common==2.1.0
      # fsspec==0.7.4
      # pandas==1.1.3
      # pyarrow==1.0.1
      
      dataset = ds.dataset("dev/staging/evaluations", format="parquet", filesystem=fs)
      dataset.to_table().to_pandas()
      

      edit:

      pyarrow 2.0.0
      fsspec 0.8.4
      adlfs v0.5.5
      pandas 1.1.4
      numpy 1.19.4
      azure.storage.blob 12.6.0

      x = adlfs.AzureBlobFileSystem(account_name=name, account_key=key)
      type(x.find("dev/test", detail=True))
      list
      
      fs = fsspec.filesystem(protocol="abfs", account_name=name, account_key=key)
      type(fs.find("dev/test", detail=True))
      list
      

      Attachments

        1. ss.PNG
          20 kB
          Lance Dacey
        2. ss2.PNG
          21 kB
          Lance Dacey

        Activity

          People

            Unassigned Unassigned
            ldacey Lance Dacey
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: