Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-2891

[Python] Preserve schema in write_to_dataset

    XMLWordPrintableJSON

Details

    Description

      When using `pyarrow.parquet.write_to_dataset` with `partition_cols` set, the schema of the `table` passed into the function is not enforced when iterating over the `subgroup` to create the `subtable`. See here.

      Since pandas is used to generate the subtables, there is a risk that some specificity is lost from the original `table.schema` due to the data types supported by pandas and some of the internal type conversions pandas performs. It would be ideal if a `subschema` was generated from `table.schema` and passed to `Table` when instantiating the `subtable` to allow the user to enforce the original schema.

      Here is a simple example of where we are running into issues while trying to preserve a valid schema. This use case is more likely to occur when working with sparse data sets.

       

      >>> from io import StringIO
      >>> import pandas as pd
      >>> import numpy as np
      >>> import pyarrow as pa
      >>> import parquet as pq
      >>> import pyarrow.parquet as pq
      
      # in csv col2 has no NaNs and in csv_nan col2 only has NaNs
      >>> csv = StringIO('"1","10","100"')
      >>> csv_nan = StringIO('"2","","200"')
      
      # read in col2 as a float since pandas does not support NaNs in ints
      >>> pd_dtype = {'col1': np.int32, 'col2': np.float32, 'col3': np.int32}
      >>> df = pd.read_csv(csv, header=None, names=['col1', 'col2', 'col3'], dtype=pd_dtype)
      >>> df_nan = pd.read_csv(csv_nan, header=None, names=['col1', 'col2', 'col3'], dtype=pd_dtype)
      
      # verify both dfs and their dtypes
      >>> df
      col1 col2 col3
      0 1 10.0 100
      
      >>> df.dtypes
      col1 int32
      col2 float32
      col3 int32
      dtype: object
      
      >>> df_nan
      col1 col2 col3
      0 2 NaN 200
      
      >>> df_nan.dtypes
      col1 int32
      col2 float32
      col3 int32
      dtype: object
      
      # define col2 as an int32 since pyarrow does support NaNs in ints
      # we want to preserve the original schema we started with and not
      # upcast just because we're using pandas to go from csv to pyarrow
      >>> schema = pa.schema([pa.field('col1', type=pa.int32()),
      pa.field('col2', type=pa.int32()),
      pa.field('col3', type=pa.int32())])
      
      # verify schema
      >>> schema
      col1: int32
      col2: int32
      col3: int32
      
      # create tables
      >>> table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
      >>> table_nan = pa.Table.from_pandas(df_nan, schema=schema, preserve_index=False)
      
      # verify table schemas and metadata
      # col2 has pandas_type int32 and numpy_type float32 in both tables
      >>> table
      pyarrow.Table
      col1: int32
      col2: int32
      col3: int32
      metadata
      --------
      {b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
      b' "col1", "field_name": "col1", "pandas_type": "int32", "numpy_ty'
      b'pe": "int32", "metadata": null}, {"name": "col2", "field_name": '
      b'"col2", "pandas_type": "int32", "numpy_type": "float32", "metada'
      b'ta": null}, {"name": "col3", "field_name": "col3", "pandas_type"'
      b': "int32", "numpy_type": "int32", "metadata": null}], "pandas_ve'
      b'rsion": "0.22.0"}'}
      
      >>> table_nan
      pyarrow.Table
      col1: int32
      col2: int32
      col3: int32
      metadata
      --------
      {b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
      b' "col1", "field_name": "col1", "pandas_type": "int32", "numpy_ty'
      b'pe": "int32", "metadata": null}, {"name": "col2", "field_name": '
      b'"col2", "pandas_type": "int32", "numpy_type": "float32", "metada'
      b'ta": null}, {"name": "col3", "field_name": "col3", "pandas_type"'
      b': "int32", "numpy_type": "int32", "metadata": null}], "pandas_ve'
      b'rsion": "0.22.0"}'}
      
      # write both tables to local filesystem
      >>> pq.write_to_dataset(table, '/Users/jkulzick/pyarrow_example',
      partition_cols=['col1'],
      preserve_index=False)
      >>> pq.write_to_dataset(table_nan, '/Users/jkulzick/pyarrow_example',
      partition_cols=['col1'],
      preserve_index=False)
      
      # read parquet files into a ParquetDataset to validate the schemas
      # the metadata and schemas for both files is different from their original tables
      # table now has pandas_type int32 and numpy_type int32 (was float32) for col2
      # table_nan now has pandas_type float64 (was int32) and numpy_type int64 (was float32) for col2
      >>> ds = pq.ParquetDataset('/Users/jkulzick/pyarrow_example')
      Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/Users/jkulzick/miniconda3/envs/bowerbird/lib/python3.6/site-packages/pyarrow/parquet.py", line 745, in __init__
      self.validate_schemas()
      File "/Users/jkulzick/miniconda3/envs/bowerbird/lib/python3.6/site-packages/pyarrow/parquet.py", line 775, in validate_schemas
      dataset_schema))
      ValueError: Schema in partition[col1=1] /Users/jkulzick/pyarrow_example/col1=2/b7b42ce9de6a46a786a5361c42d28731.parquet was different. 
      col2: double
      col3: int32
      metadata
      --------
      {b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
      b' "col2", "field_name": "col2", "pandas_type": "float64", "numpy_'
      b'type": "float64", "metadata": null}, {"name": "col3", "field_nam'
      b'e": "col3", "pandas_type": "int32", "numpy_type": "int32", "meta'
      b'data": null}], "pandas_version": "0.22.0"}'}
      
      vs
      
      col2: int32
      col3: int32
      metadata
      --------
      {b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
      b' "col2", "field_name": "col2", "pandas_type": "int32", "numpy_ty'
      b'pe": "int32", "metadata": null}, {"name": "col3", "field_name": '
      b'"col3", "pandas_type": "int32", "numpy_type": "int32", "metadata'
      b'": null}], "pandas_version": "0.22.0"}'}

      Attachments

        Issue Links

          Activity

            People

              jkulzick Jonathan Kulzick
              jkulzick Jonathan Kulzick
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 50m
                  1h 50m