How to avoid repeated CSV reading in each parallel process in Python? - Stack Overflow

admin2025-04-22  3

I am working on a Python project where I need to process a large CSV file using Polars. Currently, I am reading the CSV file inside each parallel process, which works but is inefficient because the read_csv operation is repeated for each process.

To improve performance, I tried reading the CSV file once before running the parallel processes and passing the resulting DataFrame to each process. However, I noticed that the code took significantly longer to execute in this setup.

Here’s a simplified version of my workflow:

import polars as pl
from concurrent.futures import ProcessPoolExecutor

def process_data(df, numbers):
    # Filter and process data (example)
    return df.filter(pl.col("Points").is_in(numbers)) 

# Reading inside each process
def worker(task_param):
    df = pl.read_csv("path/to/file.csv")
    return process_data(df, task_param)

with ProcessPoolExecutor(max_workers=4) as executor:
    task_params_list = [[10, 20], [30, 40], [50, 60], [70, 80]]
    results = list(executor.map(worker, task_params_list))

When I moved the pl.read_csv outside the worker function and tried to share the DataFrame across processes, the execution became slower:

df = pl.read_csv("path/to/file.csv")

def process_data(df, numbers):
    # Filter and process data (example)
    return df.filter(pl.col("Points").is_in(numbers)) 

def worker(task_param):
    df = task_param[0]
    numbers = task_param[1:]
    return process_data(df, numbers)

with ProcessPoolExecutor(max_workers=4) as executor:
    task_params_list = [[df, 10, 20], [df, 30, 40], [df, 50, 60], [df, 70, 80]]
    results = list(executor.map(worker, task_params_list))

I suspect the slowdown is due to the DataFrame being serialized and copied to each process, but I'm not sure how to address this.

My questions are: How can I avoid repeated reading of the CSV file in each parallel process while maintaining good performance? Is there a better approach to share or reuse a Polars DataFrame across parallel processes? Any guidance or insights would be greatly appreciated.

I am working on a Python project where I need to process a large CSV file using Polars. Currently, I am reading the CSV file inside each parallel process, which works but is inefficient because the read_csv operation is repeated for each process.

To improve performance, I tried reading the CSV file once before running the parallel processes and passing the resulting DataFrame to each process. However, I noticed that the code took significantly longer to execute in this setup.

Here’s a simplified version of my workflow:

import polars as pl
from concurrent.futures import ProcessPoolExecutor

def process_data(df, numbers):
    # Filter and process data (example)
    return df.filter(pl.col("Points").is_in(numbers)) 

# Reading inside each process
def worker(task_param):
    df = pl.read_csv("path/to/file.csv")
    return process_data(df, task_param)

with ProcessPoolExecutor(max_workers=4) as executor:
    task_params_list = [[10, 20], [30, 40], [50, 60], [70, 80]]
    results = list(executor.map(worker, task_params_list))

When I moved the pl.read_csv outside the worker function and tried to share the DataFrame across processes, the execution became slower:

df = pl.read_csv("path/to/file.csv")

def process_data(df, numbers):
    # Filter and process data (example)
    return df.filter(pl.col("Points").is_in(numbers)) 

def worker(task_param):
    df = task_param[0]
    numbers = task_param[1:]
    return process_data(df, numbers)

with ProcessPoolExecutor(max_workers=4) as executor:
    task_params_list = [[df, 10, 20], [df, 30, 40], [df, 50, 60], [df, 70, 80]]
    results = list(executor.map(worker, task_params_list))

I suspect the slowdown is due to the DataFrame being serialized and copied to each process, but I'm not sure how to address this.

My questions are: How can I avoid repeated reading of the CSV file in each parallel process while maintaining good performance? Is there a better approach to share or reuse a Polars DataFrame across parallel processes? Any guidance or insights would be greatly appreciated.

Share Improve this question edited Jan 21 at 14:39 jqurious 22.3k5 gold badges20 silver badges39 bronze badges asked Jan 21 at 14:38 Pedro_SiqueiraPedro_Siqueira 751 silver badge5 bronze badges 6
  • 9 What are you running in parallel processes? Because if you are running Polars, you are now contending with it's internal parallelism. It's best to leave the parallelism to Polars itself. – ritchie46 Commented Jan 21 at 14:46
  • 1 @Pedro_Siqueira If the dataframe is very large then you will be experiencing a significant overhead due to serialisation of that object – Adon Bilivit Commented Jan 21 at 16:18
  • 1 "the code took significantly longer to execute in this setup" Multiprocessing in CPython serialize/unserialize all objects using pickle and send data using IPC. This is very expensive and so pretty slow for large data. Multiprocessing is only useful when the operation are fully compute-bound (not memory-bound). Multithreading is limited by the GIL so it is useless here. Polar can AFAIK its own native thread to work on native internal arrays without being limited by the GIL. This is the only way to get fast parallel codes with CPython in such a case. – Jérôme Richard Commented Jan 21 at 19:04
  • 1 Besides, if you run multiple process each running multiple thread, then this will not make things faster, but significantly slower. Indeed, there will certainly be more threads to execute than cores. The context switches will cause significant overheads, not to mention the additional cache misses. Thread/Process oversubscription is bad, don't do that. – Jérôme Richard Commented Jan 21 at 19:07
  • 1 Oh and reading large CSV should not be a bottleneck for in-production applications since CSV are really not designed for that. You should use binary file format. They are much faster and more robust. CSV is for debugging/analysis and/or small datasets. – Jérôme Richard Commented Jan 21 at 19:12
 |  Show 1 more comment

1 Answer 1

Reset to default 0

One thing you could try is to create temp files for each chunk of the df that the subprocesses need and have them only read that file. Further, that file should use ipc or parquet (try both and see which is faster, ipc should be faster unless your disk is slow enough that compression is cheaper than writing more data) to save on serializing csv at every step. Something like this:

import polars as pl
from concurrent.futures import ProcessPoolExecutor


def process_data(df, numbers):
    # Filter and process data (example)
    return df

# Reading inside each process
def worker(task_param):
    df = pl.read_ipc(f"file_{task_param}.ipc")
    return process_data(df, task_param)

df = pl.read_csv("path/to/file.csv")
task_params_list = [[10, 20], [30, 40], [50, 60], [70, 80]]
for numbers in task_params_list:
    df.filter(pl.col("Points").is_in(numbers)).write_ipc(f"file_{numbers}.ipc")

with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(worker, task_params_list))

Depending on what's going on in your actual process_data, you may see better improvements by optimizing that than by trying to use python parallel processes.

转载请注明原文地址:http://anycun.com/QandA/1745303839a90617.html