I want to run processes in parallel and show their progress. I have this code:
from math import factorial
from decimal import Decimal, getcontext
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time
def calc(n_digits, pos, total):
# number of iterations
n = int(n_digits + 1 / 14.181647462725477)
n = n if n >= 1 else 1
# set the number of digits for our numbers
getcontext().prec = n_digits + 1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in tqdm(range(n), position=pos, desc=f"Job {pos + 1} of {total}", leave=True, dynamic_ncols=True):
t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
pi += Decimal(t) / Decimal(deno)
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1 / pi
# no need to round
return pi
def parallel_with_concurrent_futures():
# Define the number of threads to use
n_threads = 3
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400] # Edit to make code for longer
# Create a list of tqdm objects to manage progress bars
progress_bars = [tqdm(total=int(task + 1 / 14.181647462725477), position=pos, desc=f"Job {pos + 1} of {len(tasks)}", leave=True, dynamic_ncols=True) for pos, task in enumerate(tasks)]
# Run tasks in parallel
with ThreadPoolExecutor(max_workers=n_threads) as executor:
futures = {executor.submit(calc, n, pos, len(tasks)): pos for pos, n in enumerate(tasks)}
for future in as_completed(futures):
pos = futures[future]
progress_bars[pos].close() # Close the progress bar when the job is done
try:
result = future.result()
# Optionally, you can print the result here if needed
# print(f"Job {pos + 1} of {len(tasks)} completed with result: {result}")
except Exception as e:
print(f"Job {pos + 1} of {len(tasks)} failed with error: {e}")
if __name__ == "__main__":
parallel_with_concurrent_futures()
However, when I run it I see the output gets graduallly worse. It starts well with:
That is exactly how I want it. But then after the first process finishes I get:
The problem has now started. Then later it shows:
This is even worse. And then when it terminates it shows:
How can I change the code so it still uses 3 cores in parallel but I only see something like the first of the pictures above? I don't mind using other modules than concurrent and tqdm if that is the right thing to do.
I want to run processes in parallel and show their progress. I have this code:
from math import factorial
from decimal import Decimal, getcontext
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time
def calc(n_digits, pos, total):
# number of iterations
n = int(n_digits + 1 / 14.181647462725477)
n = n if n >= 1 else 1
# set the number of digits for our numbers
getcontext().prec = n_digits + 1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in tqdm(range(n), position=pos, desc=f"Job {pos + 1} of {total}", leave=True, dynamic_ncols=True):
t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
pi += Decimal(t) / Decimal(deno)
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1 / pi
# no need to round
return pi
def parallel_with_concurrent_futures():
# Define the number of threads to use
n_threads = 3
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400] # Edit to make code for longer
# Create a list of tqdm objects to manage progress bars
progress_bars = [tqdm(total=int(task + 1 / 14.181647462725477), position=pos, desc=f"Job {pos + 1} of {len(tasks)}", leave=True, dynamic_ncols=True) for pos, task in enumerate(tasks)]
# Run tasks in parallel
with ThreadPoolExecutor(max_workers=n_threads) as executor:
futures = {executor.submit(calc, n, pos, len(tasks)): pos for pos, n in enumerate(tasks)}
for future in as_completed(futures):
pos = futures[future]
progress_bars[pos].close() # Close the progress bar when the job is done
try:
result = future.result()
# Optionally, you can print the result here if needed
# print(f"Job {pos + 1} of {len(tasks)} completed with result: {result}")
except Exception as e:
print(f"Job {pos + 1} of {len(tasks)} failed with error: {e}")
if __name__ == "__main__":
parallel_with_concurrent_futures()
However, when I run it I see the output gets graduallly worse. It starts well with:
That is exactly how I want it. But then after the first process finishes I get:
The problem has now started. Then later it shows:
This is even worse. And then when it terminates it shows:
How can I change the code so it still uses 3 cores in parallel but I only see something like the first of the pictures above? I don't mind using other modules than concurrent and tqdm if that is the right thing to do.
Unfortunately, what tqdm supports are nested progress bars (https://github.com/tqdm/tqdm?tab=readme-ov-file#nested-progress-bars). Your issue can be reproduced by changing a line of the official multiprocessing example:
from time import sleep
from tqdm import trange, tqdm
from multiprocessing import Pool, RLock, freeze_support
L = list(range(9))
def progresser(n):
interval = 0.001 / ([3,1,4,1,5,9,2,6,5][n] + 2) # changed from `interval = 0.001 / (n + 2)`
total = 5000
text = f"#{n}, est. {interval * total:<04.2}s"
for _ in trange(total, desc=text, position=n):
sleep(interval)
if __name__ == '__main__':
freeze_support() # for Windows support
tqdm.set_lock(RLock()) # for managing output contention
p = Pool(initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),))
p.map(progresser, L)
(By the way, your code might need tqdm.set_lock(RLock())
.)
A solution to the issue is to use tqdm v4.44.1, which is before this refactoring, with the changes to the lines 1284-1285 of tqdm/std.py
in order to leave bars at pos=pos instead of pos=0.
git clone [email protected]:tqdm/tqdm.git -b v4.44.1
cd tqdm
patch -p0 << "EOS"
--- tqdm/std.py
+++ tqdm/std.py
@@ -1281,8 +1281,7 @@
if leave:
# stats for overall rate (no weighted average)
self.avg_time = None
- self.display(pos=0)
- fp_write('\n')
+ self.display(pos=pos)
else:
self.display(msg='', pos=pos)
if not pos:
EOS
pip install .
Then add print("\n" * len(progress_bars))
to your code.