I am working on a Python project where I need to process a large CSV file using Polars. Currently, I am reading the CSV file inside each parallel process, which works but is inefficient because the read_csv
operation is repeated for each process.
To improve performance, I tried reading the CSV file once before running the parallel processes and passing the resulting DataFrame
to each process. However, I noticed that the code took significantly longer to execute in this setup.
Here’s a simplified version of my workflow:
import polars as pl
from concurrent.futures import ProcessPoolExecutor
def process_data(df, numbers):
# Filter and process data (example)
return df.filter(pl.col("Points").is_in(numbers))
# Reading inside each process
def worker(task_param):
df = pl.read_csv("path/to/file.csv")
return process_data(df, task_param)
with ProcessPoolExecutor(max_workers=4) as executor:
task_params_list = [[10, 20], [30, 40], [50, 60], [70, 80]]
results = list(executor.map(worker, task_params_list))
When I moved the pl.read_csv outside the worker function and tried to share the DataFrame across processes, the execution became slower:
df = pl.read_csv("path/to/file.csv")
def process_data(df, numbers):
# Filter and process data (example)
return df.filter(pl.col("Points").is_in(numbers))
def worker(task_param):
df = task_param[0]
numbers = task_param[1:]
return process_data(df, numbers)
with ProcessPoolExecutor(max_workers=4) as executor:
task_params_list = [[df, 10, 20], [df, 30, 40], [df, 50, 60], [df, 70, 80]]
results = list(executor.map(worker, task_params_list))
I suspect the slowdown is due to the DataFrame being serialized and copied to each process, but I'm not sure how to address this.
My questions are: How can I avoid repeated reading of the CSV file in each parallel process while maintaining good performance? Is there a better approach to share or reuse a Polars DataFrame across parallel processes? Any guidance or insights would be greatly appreciated.
I am working on a Python project where I need to process a large CSV file using Polars. Currently, I am reading the CSV file inside each parallel process, which works but is inefficient because the read_csv
operation is repeated for each process.
To improve performance, I tried reading the CSV file once before running the parallel processes and passing the resulting DataFrame
to each process. However, I noticed that the code took significantly longer to execute in this setup.
Here’s a simplified version of my workflow:
import polars as pl
from concurrent.futures import ProcessPoolExecutor
def process_data(df, numbers):
# Filter and process data (example)
return df.filter(pl.col("Points").is_in(numbers))
# Reading inside each process
def worker(task_param):
df = pl.read_csv("path/to/file.csv")
return process_data(df, task_param)
with ProcessPoolExecutor(max_workers=4) as executor:
task_params_list = [[10, 20], [30, 40], [50, 60], [70, 80]]
results = list(executor.map(worker, task_params_list))
When I moved the pl.read_csv outside the worker function and tried to share the DataFrame across processes, the execution became slower:
df = pl.read_csv("path/to/file.csv")
def process_data(df, numbers):
# Filter and process data (example)
return df.filter(pl.col("Points").is_in(numbers))
def worker(task_param):
df = task_param[0]
numbers = task_param[1:]
return process_data(df, numbers)
with ProcessPoolExecutor(max_workers=4) as executor:
task_params_list = [[df, 10, 20], [df, 30, 40], [df, 50, 60], [df, 70, 80]]
results = list(executor.map(worker, task_params_list))
I suspect the slowdown is due to the DataFrame being serialized and copied to each process, but I'm not sure how to address this.
My questions are: How can I avoid repeated reading of the CSV file in each parallel process while maintaining good performance? Is there a better approach to share or reuse a Polars DataFrame across parallel processes? Any guidance or insights would be greatly appreciated.
One thing you could try is to create temp files for each chunk of the df
that the subprocesses need and have them only read that file. Further, that file should use ipc or parquet (try both and see which is faster, ipc should be faster unless your disk is slow enough that compression is cheaper than writing more data) to save on serializing csv at every step. Something like this:
import polars as pl
from concurrent.futures import ProcessPoolExecutor
def process_data(df, numbers):
# Filter and process data (example)
return df
# Reading inside each process
def worker(task_param):
df = pl.read_ipc(f"file_{task_param}.ipc")
return process_data(df, task_param)
df = pl.read_csv("path/to/file.csv")
task_params_list = [[10, 20], [30, 40], [50, 60], [70, 80]]
for numbers in task_params_list:
df.filter(pl.col("Points").is_in(numbers)).write_ipc(f"file_{numbers}.ipc")
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(worker, task_params_list))
Depending on what's going on in your actual process_data
, you may see better improvements by optimizing that than by trying to use python parallel processes.