123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- from tqdm import tqdm
- from concurrent.futures import ProcessPoolExecutor, as_completed
- def parallel_process(array, function, n_jobs=16, use_kwargs=False, front_num=0):
- """
- A parallel version of the map function with a progress bar.
- Args:
- array (array-like): An array to iterate over.
- function (function): A python function to apply to the elements of array
- n_jobs (int, default=16): The number of cores to use
- use_kwargs (boolean, default=False): Whether to consider the elements of array as dictionaries of
- keyword arguments to function
- front_num (int, default=3): The number of iterations to run serially before kicking off the parallel job.
- Useful for catching bugs
- Returns:
- [function(array[0]), function(array[1]), ...]
- """
- # We run the first few iterations serially to catch bugs
- if front_num > 0:
- front = [function(**a) if use_kwargs else function(a)
- for a in array[:front_num]]
- else:
- front = []
- # If we set n_jobs to 1, just run a list comprehension. This is useful for benchmarking and debugging.
- if n_jobs == 1:
- return front + [function(**a) if use_kwargs else function(a) for a in tqdm(array[front_num:])]
- # Assemble the workers
- with ProcessPoolExecutor(max_workers=n_jobs) as pool:
- # Pass the elements of array into function
- if use_kwargs:
- futures = [pool.submit(function, **a) for a in array[front_num:]]
- else:
- futures = [pool.submit(function, a) for a in array[front_num:]]
- kwargs = {
- 'total': len(futures),
- 'unit': 'it',
- 'unit_scale': True,
- 'leave': True
- }
- # Print out the progress as tasks complete
- for f in tqdm(as_completed(futures), **kwargs):
- pass
- out = []
- # Get the results from the futures.
- for i, future in tqdm(enumerate(futures)):
- try:
- out.append(future.result())
- except Exception as e:
- out.append(e)
- return front + out
|