Details
Description
Let's create a simple Pandas Dataframe with a single column named 'id' that contains a sequential range.
df = pd.DataFrame(np.arange(0,1000), columns=['id'])
Consider this function that selects the first 4 indexes of the 'id' column of an array.
def udf_example(df): some_index = np.array([0, 1, 2, 3]) values = df['id'].values[some_index] df = pd.DataFrame(values, columns=['id']) return df
If I apply this function in Pyspark I get this result:
schema = t.StructType([t.StructField('id', t.LongType(), True)])
df_spark = spark.createDataFrame(df).groupBy().applyInPandas(udf_example, schema)
display(df_spark)
# id
# 125
# 126
# 127
# 128
If I apply it in Python I get the correct and expected result:
udf_example(df) # id # 0 # 1 # 2 # 3
Using NumPy indexing operations inside a Pandas UDF in Spark causes side effects and unexpected results.