python - Why is the output from tqdm+concurrent.futures such a mess and what can be done about it? - Stack Overflow

admin2025-04-27  2

I want to run processes in parallel and show their progress. I have this code:

from math import factorial
from decimal import Decimal, getcontext
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time

def calc(n_digits, pos, total):
    # number of iterations
    n = int(n_digits + 1 / 14.181647462725477)
    n = n if n >= 1 else 1

    # set the number of digits for our numbers
    getcontext().prec = n_digits + 1

    t = Decimal(0)
    pi = Decimal(0)
    deno = Decimal(0)

    for k in tqdm(range(n), position=pos, desc=f"Job {pos + 1} of {total}", leave=True, dynamic_ncols=True):
        t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
        deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
        pi += Decimal(t) / Decimal(deno)

    pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
    pi = 1 / pi

    # no need to round
    return pi

def parallel_with_concurrent_futures():
    # Define the number of threads to use
    n_threads = 3
    # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
    tasks = [1200, 1700, 900, 1400]  # Edit to make code for longer

    # Create a list of tqdm objects to manage progress bars
    progress_bars = [tqdm(total=int(task + 1 / 14.181647462725477), position=pos, desc=f"Job {pos + 1} of {len(tasks)}", leave=True, dynamic_ncols=True) for pos, task in enumerate(tasks)]

    # Run tasks in parallel
    with ThreadPoolExecutor(max_workers=n_threads) as executor:
        futures = {executor.submit(calc, n, pos, len(tasks)): pos for pos, n in enumerate(tasks)}
        for future in as_completed(futures):
            pos = futures[future]
            progress_bars[pos].close()  # Close the progress bar when the job is done
            try:
                result = future.result()
                # Optionally, you can print the result here if needed
                # print(f"Job {pos + 1} of {len(tasks)} completed with result: {result}")
            except Exception as e:
                print(f"Job {pos + 1} of {len(tasks)} failed with error: {e}")

if __name__ == "__main__":
    parallel_with_concurrent_futures()

However, when I run it I see the output gets graduallly worse. It starts well with:

That is exactly how I want it. But then after the first process finishes I get:

The problem has now started. Then later it shows:

This is even worse. And then when it terminates it shows:

How can I change the code so it still uses 3 cores in parallel but I only see something like the first of the pictures above? I don't mind using other modules than concurrent and tqdm if that is the right thing to do.

I want to run processes in parallel and show their progress. I have this code:

from math import factorial
from decimal import Decimal, getcontext
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time

def calc(n_digits, pos, total):
    # number of iterations
    n = int(n_digits + 1 / 14.181647462725477)
    n = n if n >= 1 else 1

    # set the number of digits for our numbers
    getcontext().prec = n_digits + 1

    t = Decimal(0)
    pi = Decimal(0)
    deno = Decimal(0)

    for k in tqdm(range(n), position=pos, desc=f"Job {pos + 1} of {total}", leave=True, dynamic_ncols=True):
        t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
        deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
        pi += Decimal(t) / Decimal(deno)

    pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
    pi = 1 / pi

    # no need to round
    return pi

def parallel_with_concurrent_futures():
    # Define the number of threads to use
    n_threads = 3
    # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
    tasks = [1200, 1700, 900, 1400]  # Edit to make code for longer

    # Create a list of tqdm objects to manage progress bars
    progress_bars = [tqdm(total=int(task + 1 / 14.181647462725477), position=pos, desc=f"Job {pos + 1} of {len(tasks)}", leave=True, dynamic_ncols=True) for pos, task in enumerate(tasks)]

    # Run tasks in parallel
    with ThreadPoolExecutor(max_workers=n_threads) as executor:
        futures = {executor.submit(calc, n, pos, len(tasks)): pos for pos, n in enumerate(tasks)}
        for future in as_completed(futures):
            pos = futures[future]
            progress_bars[pos].close()  # Close the progress bar when the job is done
            try:
                result = future.result()
                # Optionally, you can print the result here if needed
                # print(f"Job {pos + 1} of {len(tasks)} completed with result: {result}")
            except Exception as e:
                print(f"Job {pos + 1} of {len(tasks)} failed with error: {e}")

if __name__ == "__main__":
    parallel_with_concurrent_futures()

However, when I run it I see the output gets graduallly worse. It starts well with:

That is exactly how I want it. But then after the first process finishes I get:

The problem has now started. Then later it shows:

This is even worse. And then when it terminates it shows:

How can I change the code so it still uses 3 cores in parallel but I only see something like the first of the pictures above? I don't mind using other modules than concurrent and tqdm if that is the right thing to do.

Share Improve this question edited Jan 11 at 20:01 Simd asked Jan 11 at 13:55 SimdSimd 21.4k48 gold badges154 silver badges314 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

Unfortunately, what tqdm supports are nested progress bars (https://github.com/tqdm/tqdm?tab=readme-ov-file#nested-progress-bars). Your issue can be reproduced by changing a line of the official multiprocessing example:

from time import sleep
from tqdm import trange, tqdm
from multiprocessing import Pool, RLock, freeze_support

L = list(range(9))

def progresser(n):
    interval = 0.001 / ([3,1,4,1,5,9,2,6,5][n] + 2)  # changed from `interval = 0.001 / (n + 2)`
    total = 5000
    text = f"#{n}, est. {interval * total:<04.2}s"
    for _ in trange(total, desc=text, position=n):
        sleep(interval)

if __name__ == '__main__':
    freeze_support()  # for Windows support
    tqdm.set_lock(RLock())  # for managing output contention
    p = Pool(initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),))
    p.map(progresser, L)

(By the way, your code might need tqdm.set_lock(RLock()).)

A solution to the issue is to use tqdm v4.44.1, which is before this refactoring, with the changes to the lines 1284-1285 of tqdm/std.py in order to leave bars at pos=pos instead of pos=0.

git clone [email protected]:tqdm/tqdm.git -b v4.44.1
cd tqdm
patch -p0 << "EOS"
--- tqdm/std.py
+++ tqdm/std.py
@@ -1281,8 +1281,7 @@
             if leave:
                 # stats for overall rate (no weighted average)
                 self.avg_time = None
-                self.display(pos=0)
-                fp_write('\n')
+                self.display(pos=pos)
             else:
                 self.display(msg='', pos=pos)
                 if not pos:
EOS
pip install .

Then add print("\n" * len(progress_bars)) to your code.

转载请注明原文地址:http://anycun.com/QandA/1745708949a91128.html