Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13421

Python DeferredDataFrame.xs differs from Pandas

Details

    • Bug
    • Status: In Progress
    • P2
    • Resolution: Unresolved
    • 2.34.0
    • 2.36.0
    • dsl-dataframe
    • None

    Description

      When testing the `xs` method on DeferredDataFrames I'm seeing a few inconsistent results.  I have two minimal examples that showcase the errors.

       

      First inconsistency: Beam's `xs` requries one left over index field while Pandas does not.

      with beam.Pipeline(options=PipelineOptions()) as pipeline:
          df = pd.DataFrame(
              np.array([
                  ['state', 'day1', 12],
                  ['state', 'day1', 1],
                  ['state', 'day2', 14],
                  ['county', 'day1', 9],
              ]),
              columns=['provider', 'time', 'value'])
          # Create just one index field
          df = df.set_index(['provider'])
          df.to_parquet('test.parquet')
          
          # Should print out
          #           time value
          # provider            
          # state     day1    12
          # state     day1     1
          # state     day2    14
          print(df.xs('state'))
          
          # Should emit the same data to a csv but instead dies due to
          # Cannot remove 1 levels from an index with 1 levels: at least one level must be left.
          test_df = (pipeline | read_parquet('test.parquet'))
          (
              test_df.xs('state').to_csv('test.csv')
          ) 

      Second inconsistency: Beam dies for no clear reason

      import pandas as pd
      import numpy as npwith beam.Pipeline(options=PipelineOptions()) as pipeline:
          df = pd.DataFrame(
              np.array([
                  ['state', 'day1', 12],
                  ['state', 'day1', 1],
                  ['state', 'day2', 14],
                  ['county', 'day1', 9],
              ]),
              columns=['provider', 'time', 'value'])
          # Create two index fields to satisfy Beam
          df = df.set_index(['provider', 'time'])
          df.to_parquet('test.parquet')
          
          # Should print out
          #      value
          # time      
          # day1    12
          # day1     1
          # day2    14
          print(df.xs('state'))
          
          # Dies with no clear error at
          # /opt/conda/lib/python3.9/site-packages/apache_beam/dataframe/transforms.py in output_partitioning_in_stage(expr, stage)
          # 305 
          # 306       # Anything that's not an input must have arguments
          # 307       assert len(expr.args())
          # 308 
          # 309       arg_partitionings = set(
          test_df = (pipeline | read_parquet('test.parquet'))
          (
              test_df.xs('state').to_csv('test.csv')
          ) 

      Attachments

        Issue Links

          Activity

            People

              bhulette Brian Hulette
              fozziethebeat Keith Stevens
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m