Dask Dataframe Nunique Operation: Worker Running Out Of Memory (mre)
Solution 1:
That's a great example of a recurring problem. The only shocking thing is that delayed
was not used during the synthetic data creation:
import dask
@dask.delayeddefcreate_sample(i):
df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')
return# Sample from the ids to create 100 parquet files
dels = [create_sample(i) for i inrange(100)]
_ = dask.compute(dels)
For the following answer I will actually just use a small number of partitions (so change to range(5)
), to have sane visualizations. Let's start with the loading:
df = dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'), use_cols=['id'])
print(df.npartitions) # 5
This is a minor point, but having use_cols=['id']
in .read_parquet()
, exploits the parquet advantage of columnar extraction (it might be that dask will do some optimization behind the scenes, but if you know the columns you want, there's no harm in being explicit).
Now, when you run df['id'].nunique()
, here's the DAG that dask will compute:
With more partitions, there would be more steps, but it's apparent that there's a potential bottleneck when each partition is trying to send data that is quite large. This data can be very large for high-dimensional columns, so if each worker is trying to send a result that requires object that is 100MB, then the receiving worker will have to have 5 times the memory to accept the data (which could potentially decrease after further value-counting).
Additional consideration is how many tasks a single worker can run at a given time. The easiest way to control how many tasks can run at the same time on a given worker is resources. If you initiate the cluster with resources
:
cluster = LocalCluster(n_workers=2, memory_limit='4GB', resources={'foo': 1})
Then every worker has the specified resources (in this case it's 1 unit of arbitrary foo
), so if you think that processing a single partition should happen one at a time (due to high memory footprint), then you can do:
# note, no split_every is needed in this case since we're just # passing a single number
df['id'].nunique().compute(resources={'foo': 1})
This will ensure that any single worker is busy with 1 task at a time, preventing excessive memory usage. (side note: there's also .nunique_approx()
, which may be of interest)
To control the amount of data that any given worker receives for further processing, one approach is to use split_every
option. Here's what the DAG will look like with split_every=3
:
You can see that now (for this number of partitions), the max memory that a worker will need is 3 times that max size of the dataset. So depending on your worker memory settings you might want to set split_every
to a low value (2,3,4 or so).
In general, the more unique the variable, the more memory is needed for each partition's object with unique counts, and so a lower value of split_every
is going to be useful to put a cap on the max memory usage. If the variable is not very unique, then each individual partition's unique count will be a small object, so there's no need to have a split_every
restriction.
Post a Comment for "Dask Dataframe Nunique Operation: Worker Running Out Of Memory (mre)"