Skip to content Skip to sidebar Skip to footer

Pyspark: Count On Pyspark.sql.dataframe.dataframe Takes Long Time

I have a pyspark.sql.dataframe.DataFrame like the following df.show() +--------------------+----+----+---------+----------+---------+----------+---------+ | ID|Cod

Solution 1:

Your question is very ingesting and tricky..

I tested with a large dataset in order to reproduce your behavior.

Problem Description

I tested the following two cases in a large dataset:

# Case1
df.count() # Execution time: 37secs

# Case2
df.filter((df['ID'] == id0)).count() #Execution time: 1.39 min

Explaination

Lets see the Physical Plan with only .count():

==PhysicalPlan==*(2)HashAggregate(keys=[],functions=[count(1)],output=[count#38L])+-ExchangeSinglePartition+-*(1)HashAggregate(keys=[],functions=[partial_count(1)],output=[count#41L])+-*(1)FileScancsv [] Batched:false,Format:CSV,Location:InMemoryFileIndex[file:...],PartitionFilters: [],PushedFilters: [],ReadSchema:struct<>

Lets see the physical plan with .filter() and then .count():

==PhysicalPlan==*(2)HashAggregate(keys=[],functions=[count(1)],output=[count#61L])+-ExchangeSinglePartition+-*(1)HashAggregate(keys=[],functions=[partial_count(1)],output=[count#64L])+-*(1)Project+-*(1)Filter(isnotnull(ID#11)&&(ID#11=MuhammedMacIntyre))+-*(1)FileScancsv [ID#11] Batched:false,Format:CSV,Location:InMemoryFileIndex[file:...],PartitionFilters: [],PushedFilters: [IsNotNull(ID), EqualTo(ID,MuhammedMacIntyre)],ReadSchema:struct<_c1:string>

Generally, Spark when counts the number of rows maps the rows with count=1 and the reduce all the mappers to create the final number of rows.

In the Case 2 Spark has first to filter and then create the partial counts for every partition and then having another stage to sum those up together. So, for the same rows, in the second case the Spark doing also the filtering, something that affects the computation time in large datasets. Spark is a framework for distributed processing and doesn't have indexes like Pandas, which could do the filtering extremely fast without passing all the rows.

Summary

In that simple case you can't do a lot of things to improve the execution time. You can try your application with different configuration settings (e.g # spark.sql.shuffle.partitions, # spark.default.parallelism, # of executors, # executor memory etc)

Solution 2:

This is because spark is lazily evaluated. When you call tmp.count(), that is an action step. In other words, your timing of tmp.count also includes the filter time. If you want to truly compare the two counts, try the following:

%%time
df.count()

id0 = df.first().ID  ## First ID
tmp = df.filter( (df['ID'] == id0) )
tmp.persist().show()

%%time
tmp.count()

The important component here is the tmp.persist().show() BEFORE performing the count. This performs the filter and caches the result. That way, the tmp.count() only includes the actual count time.

Post a Comment for "Pyspark: Count On Pyspark.sql.dataframe.dataframe Takes Long Time"