Skip to content Skip to sidebar Skip to footer

Memory Leaks When Using Pandas_udf And Parquet Serialization?

I am currently developing my first whole system using PySpark and I am running into some strange, memory-related issues. In one of the stages, I would like to resemble a Split-Appl

Solution 1:

I wanted to comment to your post, but my reputation is too low.

According to my experience udf slow down your performance drastically, especially if you write them in python (or pandas?). There is an article, why you shoudn't use python udfs and use scala udfs instead: https://medium.com/wbaa/using-scala-udfs-in-pyspark-b70033dd69b9

In my case it was possible to use built-in functions, even it was pretty complicated, and the runtime decreased to about 5% compared to before.

For your OOM Error and why a repartition worked for you, I have no explanation. The only advice I can give you is to avoid UDFs as much as possible, although it seems to be not that easy in your case.

Solution 2:

This thread is a bit old, but I stumped across the exact same problem and spent quite a few hours ont it. So I just wanted to explain how I solve it, with the hope that it saves some hours for anyone else hitting the same issue in the future.

The problem here is not related to pandas_udf or parquet, but with the use of withColumn to generate the columns. When adding multiple columns to a dataframe is way more efficient to use the select method. This article explains why.

So for example, instead of

for j in range(z):
   df = df.withColumn(
       f"N{j}",
       F.col("ID") + float(j)
   )

you should write

df = df.select(
    *df.columns,
    *[(F.col("ID") + float(j)).alias(f"N{j}") for j in range(z)]
)

The rewritten script looks like this (Note that I still had to increment the driver memory to 2GB, but at least is quite a reasonable amount of memory)

import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as spktyp


# Dummy pandas_udf -------------------------------------------------------------@F.pandas_udf(spktyp.DoubleType())defpredict(x):
    return x + 100.0# Initialization ---------------------------------------------------------------
spark = (pyspark.sql.SparkSession.builder
        .appName("mre")
        .config("spark.driver.memory", "2g")
        .master("local[3]").getOrCreate())

sc = spark.sparkContext

# Generate a dataframe ---------------------------------------------------------
out_path = "out.parquet"

z = 105
m = 750000

schema = spktyp.StructType(
    [spktyp.StructField("ID", spktyp.DoubleType(), True)]
)

df = spark.createDataFrame(
    [(float(i),) for i inrange(m)],
    schema
)


df = df.select(
    *df.columns,
    *[(F.col("ID") + float(j)).alias(f"N{j}") for j inrange(z)]
)

df = df.withColumn(
    "X",
    F.array(
        F.lit("A"),
        F.lit("B"),
        F.lit("C"),
        F.lit("D"),
        F.lit("E")
    ).getItem(
        (F.rand()*3).cast("int")
    )
)

# Set the column names for grouping, input and output --------------------------
group_col = "X"
in_col = "N0"
out_col = "EP"# Extract different group ids in grouping variable -----------------------------
rows = df.select(group_col).distinct().collect()
groups = [row[group_col] for row in rows]
print(f"Groups: {groups}")

# Split and treat the first id -------------------------------------------------
first, *others = groups

cur_df = df.filter(F.col(group_col) == first)
result = cur_df.withColumn(
    out_col,
    predict(in_col)
)

# Traverse the remaining group ids ---------------------------------------------for i, other inenumerate(others):
    cur_df = df.filter(F.col(group_col) == other)
    new_df = cur_df.select(
        *cur_df.columns,
        predict(in_col).alias(out_col)
    )
    # Incremental union --------------------------------------------------------
    result = result.unionByName(new_df)

# Save to disk -----------------------------------------------------------------
result.write.mode("overwrite").parquet(out_path)

Post a Comment for "Memory Leaks When Using Pandas_udf And Parquet Serialization?"